# AWS Kinesis data streaming

## Preparation for streaming

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Path to the AWS credentials.
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# AWS access key and secret key.
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# Names of the three streams from Kinesis
pin_stream = 'streaming-12a740a19697-pin'
geo_stream = 'streaming-12a740a19697-geo'
user_stream = 'streaming-12a740a19697-user'

# Names of the three delta table to write stream to
pin_table = '12a740a19697_pin_table'
geo_table = '12a740a19697_geo_table'
user_table = '12a740a19697_user_table'


In [0]:
%sql
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


## Function definitions

In [0]:
def read_kinesis_stream(stream_name, stream_schema):
    '''
    This function reads stream data from an AWS Kinesis,
    deserialises and by using the given schema casting
    it into a table format.

    Args:
        stream_name (string): name of the Kinesis stream to read from.
        stream_schema (StructType): StructType object that contains the 
                                    casting format.
    Returns:
        The deserialized and casted table.
    '''
    
    kinesis_stream = spark \
                    .readStream \
                    .format('kinesis') \
                    .option('streamName', stream_name) \
                    .option('initialPosition','earliest') \
                    .option('region','us-east-1') \
                    .option('awsAccessKey', ACCESS_KEY) \
                    .option('awsSecretKey', SECRET_KEY) \
                    .option('inferSchema', 'True') \
                    .load()

    # Deserializing the stream data and casting it into a table
    df = kinesis_stream.selectExpr("CAST(data as STRING)") \
                       .withColumn("data", from_json(col("data"), stream_schema)) \
                       .select(col("data.*"))
    return df

def write_stream_to_delta_table(df, table_name):
    '''
    This function writes stream into databricks delta tables.

    Args:
        df (dataframe): the table to save
        table_name (string): the name of the table to save the data to
    
    Returns:
        None, creates a delta table.
    '''
    
    df.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", f"/tmp/kinesis/{table_name}_checkpoints/") \
        .table(table_name)

## Data processing

#### Pinterest data ingestion, cleaning and saving

In [0]:
# Defining the schema for the pinterest stream
pin_stream_schema = StructType([
        StructField('index', IntegerType()),
        StructField('unique_id', StringType()),
        StructField('title', StringType()),
        StructField('description', StringType()),
        StructField('poster_name', StringType()),
        StructField('follower_count', IntegerType()),
        StructField('tag_list', StringType()),
        StructField('is_image_or_video', StringType()),
        StructField('image_src', StringType()),
        StructField('save_location', StringType()),
        StructField('category', StringType())
        ])

# Ingesting the pinterest stream data
df_pin = read_kinesis_stream(pin_stream, pin_stream_schema)

# Dictionary containing the value and column to clean.
# Key-Value pair reversed due to the dictionary key restriction.
to_replace = {
    'No description available': 'description',
    'Image src error.': 'image_src',
    'User Info Error': 'poster_name',
    'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': 'tag_list', 
    'No Title Data Available': 'title',
    'No description available Story format': 'description',
    'Untitled': 'description'
}

# Looping through the dictionary and calling the replace function to remove unnecessary data.
for value, column in to_replace.items():
    df_pin = df_pin.replace({value: None}, subset=[column])

# There is an extra cleaning necessary as it would be a duplicate key entry in the dictionary.
df_pin = df_pin.replace({'User Info Error': None}, subset=['follower_count'])

# Dropping duplicate rows.
df_pin = df_pin.dropDuplicates(df_pin.columns)

# Rewriting the follower_count column to be numeric only.
df_pin = df_pin.withColumn('follower_count', regexp_replace(df_pin['follower_count'], 'k','000'))
df_pin = df_pin.withColumn('follower_count', regexp_replace(df_pin['follower_count'], 'M','000000'))

# Transforming the column to integer format.
df_pin = df_pin.withColumn('follower_count', df_pin['follower_count'].cast('int'))
df_pin = df_pin.withColumn('index', df_pin['index'].cast('int'))

# Cleaning the save_location column
df_pin = df_pin.withColumn('save_location', regexp_replace(df_pin['save_location'], 'Local save in ',''))

# Renaming index column
df_pin = df_pin.withColumnRenamed('index', 'ind')

# Rearranging the columns
df_pin = df_pin.select('ind',
                    'unique_id',
                    'title',
                    'description',
                    'follower_count',
                    'poster_name',
                    'tag_list',
                    'is_image_or_video',
                    'image_src',
                    'save_location',
                    'category'
)

# Function call to write data into delta table
write_stream_to_delta_table(df_pin, pin_table)

#### Geolocation data ingestion, cleaning and saving

In [0]:
# Defining the schema for the geolocation stream
geo_stream_schema = StructType([
        StructField('ind', IntegerType()),
        StructField('country', StringType()),
        StructField('latitude', StringType()),
        StructField('longitude', StringType()),
        StructField('timestamp', TimestampType())
    ])

# Ingesting the geolocation stream data
df_geo = read_kinesis_stream(geo_stream, geo_stream_schema)

# Creating a new column coordinates with the latitude and longitude as array.
df_geo = df_geo.withColumn('coordinates', array(df_geo['latitude'], df_geo['longitude']))

# Dropping the latutide and longitude columns as no longer needed.
df_geo = df_geo.drop('latitude','longitude')

# Casting the timestamp column into timestamp format.
df_geo = df_geo.withColumn('timestamp', to_timestamp('timestamp'))
df_geo = df_geo.withColumn('timestamp', date_format('timestamp', 'yyyy-MM-dd HH:mm:ss'))

# Dropping duplicate entries
df_pin = df_pin.dropDuplicates(df_pin.columns)

# Reordering the columns
df_geo = df_geo.select('ind',
                       'country',
                       'coordinates',
                       'timestamp')

# Function call to write data into delta table
write_stream_to_delta_table(df_geo, geo_table)

#### Users data ingestion, cleaning and saving

In [0]:
# Defining the schema for the user stream
user_stream_schema = StructType([
        StructField('ind', IntegerType()),
        StructField('first_name', StringType()),
        StructField('last_name', StringType()),
        StructField('age', IntegerType()),
        StructField('date_joined', TimestampType())
    ])

# Ingesting the user stream data
df_user = read_kinesis_stream(user_stream, user_stream_schema)

# Creating a new colum containing the first name and last name of the user
df_user = df_user.withColumn('user_name', concat(df_user['first_name'], lit(' '), df_user['last_name']))

# Dropping the first_name and last_name columns as no longer needed.
df_user = df_user.drop('first_name','last_name')

# Casting the date_joined column into timestamp format.
df_user = df_user.withColumn('date_joined', to_timestamp('date_joined'))
df_user = df_user.withColumn('date_joined', date_format('date_joined', 'yyyy-MM-dd HH:mm:ss'))

# Dropping duplicate entries
df_pin = df_pin.dropDuplicates(df_pin.columns)

# Reordering the columns
df_user = df_user.select('ind',
                         'user_name',
                         'age',
                         'date_joined')

# Function call to write data into delta table
write_stream_to_delta_table(df_user, user_table)