# Imports

In [None]:
# imports
from pyspark.sql.functions import *
from pyspark.sql.types import StructField, StringType, StructType, TimestampType, IntegerType

import urllib

# Reading the Kinesis Streams

First, we are reading the credential file, and extracting the keys we need. 

> Note that we are also encoding the SECERET_KEY for privacy reasons.

In [None]:
# specifying CSV file format with headers and comma as the delimiter.
file_type = "csv"
first_row_is_header = "true"
delimiter = ","

# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

We will now create a function that reads the kinesis data, and transforms it into a usable dataframe.

In [None]:
def read_kinesis_data(stream_name, json_schema, desired_alias):
    ''' A function that reads the Kinesis data and outputs a usable dataframe.

    Args:
        stream_name (`string`): The name of the stream we wish to stream data from.
        json_schema (`StructType`): The desired schema of the output dataframe.
        desired_alias (`string`): The desired alias of the dataframe we create.

    Returns:
        df (`pyspark.sql.DataFrame`): A DataFrame with the specified schema and alias.
    
    '''

    # Creating the kinesis_stream dataframe
    kinesis_stream = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', stream_name) \
        .option('initialPosition', 'earliest') \
        .option('region', 'us-east-1') \
        .option('awsAccessKey', ACCESS_KEY) \
        .option('awsSecretKey', SECRET_KEY) \
        .load()

    # extracting the  data from kinesis_stream, it also is decoded in the process
    data = kinesis_stream.selectExpr("CAST(data as STRING)") 

    # applying a schema to the 'data' column in the 'pin_data' dataframe, also applied an alias.
    df = data.select(
        from_json('data', json_schema).alias(desired_alias)
    )
    return df

# Writing to Delta Tables

We will now create a function which writes the transformed data to appropriate delta tables.

In [None]:
def write_kinesis_data(table_name, df):
    ''' A function that writes data to the specified delta table

    Args:
        table_name (`string`): The desired name for the delta table
        df (`pyspark.sql.DataFrame`): 
    
    '''

    df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \ # adds ability to recover the previous state of a table
    .table(table_name)

# Cleaning

Now we read the pin streaming data, cleaning it then uploading it to the appropriate delta table. I have removed sensitive data too, for example we should have:

```
stream_name = '<name_of_kinesis_stream>'
```
and also:

```
table_name = '<desired_delta_table_name>'
```

In [None]:
#defining the stream_name
stream_name = '<name_of_kinesis_stream>'

# defining the schema for the json data
json_schema = StructType([
    StructField('index', IntegerType()),
    StructField('unique_id', StringType()),
    StructField('title', StringType()),
    StructField('description', StringType()),
    StructField('follower_count', IntegerType()),
    StructField('poster_name', StringType()),
    StructField('tag_list', StringType()),
    StructField('is_image_or_video', StringType()),
    StructField('image_src', StringType()),
    StructField('save_location', StringType()),
    StructField('category', StringType())
])

desired_alias = 'pin'

df_pin = read_kinesis_data(stream_name, json_schema, desired_alias)

# selecting the fields from the 'pin' dataframe, and putting them into the desired order
df_pin = df_pin.selectExpr(
    'pin.index as ind',
    'pin.unique_id as unique_id',
    'pin.title as title',
    'pin.description as description',
    'pin.follower_count as follower_count',
    'pin.poster_name as poster_name',
    'pin.tag_list as tag_list',
    'pin.is_image_or_video as is_image_or_video',
    'pin.image_src as image_src',
    'pin.save_location as save_location',
    'pin.category as category'
)

# replacing invalid entries with `None`, it is best to define a dictionary here as this makes the whole process more scalable. 

col_and_entries_to_replace = {
    'description' : 'No description available Story format',
    'follower_count' : 'User Info Error',
    'image_src' : 'Image src error.',
    'poster_name' : 'User Info Error',
    'tag_list' : 'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e',
    'title' : 'No Title Data Available'
}

for column, value in col_and_entries_to_replace.items():
    df_pin = df_pin.withColumn(column, when(df_pin[column] == value, None).otherwise(df_pin[column]))

# replacing k and M with 000 and 000000 respectivly.
df_pin = (df_pin
    .withColumn('follower_count', 
        when(df_pin.follower_count.endswith('k'), regexp_replace(df_pin.follower_count, 'k', '000'))
        .when(df_pin.follower_count.endswith('M'), regexp_replace(df_pin.follower_count, 'M', '000000'))
        .otherwise(df_pin.follower_count))
)

# casting folower count to integers
df_pin = df_pin.withColumn('follower_count', df_pin.follower_count.cast('int'))

# making the 'save_location' column show the path
df_pin = df_pin.withColumn('save_location', regexp_replace( 'save_location', 'Local save in ', ''))

# defining the delta table name.
table_name = '<desired_delta_table_name>'

# writing the data to the delta tables. 
write_kinesis_data(table_name, df_pin)

Now we read the geo streaming data, cleaning it then uploading it to the appropriate delta table. I have removed sensitive data too, for example we should have:

```
stream_name = '<name_of_kinesis_stream>'
```
and also:

```
table_name = '<desired_delta_table_name>'
```

In [None]:
# this is the name of the kinesis stream we want to clean
stream_name = '<name_of_kinesis_stream>'

# defining the schema for the json data
json_schema = StructType([
    StructField('ind', IntegerType()),
    StructField('country', StringType()),
    StructField('latitude', StringType()),
    StructField('longitude', StringType()),
    StructField('timestamp', TimestampType())
])

# setting alias
desired_alias = 'geo'

# using the function defined above.
df_geo = read_kinesis_data(stream_name, json_schema, desired_alias)

# selecting the fields from the 'geo' dataframe, and putting them into the desired order
df_geo = df_geo.selectExpr(
    'geo.ind as ind',
    'geo.country as country',
    'geo.latitude as latitude',
    'geo.longitude as longitude',
    'geo.timestamp as timestamp'
)

# creating a new column called 'coordinates'
df_geo = df_geo.withColumn('coordinates', array(df_geo.latitude, df_geo.longitude))

# dropping the columns 'latitude' and 'longitude'
df_geo = df_geo.drop(*['latitude', 'longitude'])

# converting 'timestamp' to a timestamp data type.
df_geo = df_geo.withColumn('timestamp', to_timestamp(df_geo.timestamp))

# defining the delta table name.
table_name = '<desired_delta_table_name>'

# writing the data to the delta tables. 
write_kinesis_data(table_name, df_geo)

Now we read the user streaming data, cleaning it then uploading it to the appropriate delta table. I have removed sensitive data too, for example we should have:

```
stream_name = '<name_of_kinesis_stream>'
```
and also:

```
table_name = '<desired_delta_table_name>'
```

In [None]:
# this is the name of the kinesis stream we want to clean
stream_name = '<name_of_kinesis_stream>'

# defining the schema for the json data
json_schema = StructType([
    StructField('ind', IntegerType()),
    StructField('first_name', StringType()),
    StructField('last_name', StringType()),
    StructField('age', IntegerType()),
    StructField('date_joined', TimestampType())
])

# setting alias
desired_alias = 'user'

# using the function defined above.
df_user = read_kinesis_data(stream_name, json_schema, desired_alias)

# selecting the fields from the 'user' dataframe, and putting them into the desired order
df_user = df_user.selectExpr(
    'user.ind as ind',
    'user.first_name as first_name',
    'user.last_name as last_name',
    'user.age as age',
    'user.date_joined as date_joined'
)
# creating a new column called 'user_name'
df_user = df_user.withColumn('user_name', concat(df_user.first_name, lit(' '), df_user.last_name))

# dropping the 'first_name' and 'last_name' columns
df_user = df_user.drop(*['first_name', 'last_name'])

# converting 'date_joined' to a timestamp data type.
df_user = df_user.withColumn('date_joined', to_timestamp(df_user.date_joined))

# reordering columns
df_user = df_user.select('ind', 'user_name', 'age', 'date_joined')

# defining the delta table name.
table_name = '<desired_delta_table_name>'

# writing the data to the delta tables. 
write_kinesis_data(table_name, df_user)