# READ DATA FROM KINESIS INTO DATABRICKS

### Get AWS Access key & Secret key

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib


# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


### Create DataFrames (Pin, Geo, User)

In [0]:
def get_kinesis_stream(kinesis_stream_name): 
    dataframe =     spark.readStream  .format('kinesis') \
                                      .option('streamName',kinesis_stream_name) \
                                      .option('initialPosition','earliest') \
                                      .option('region','us-east-1') \
                                      .option('awsAccessKey', ACCESS_KEY) \
                                      .option('awsSecretKey', SECRET_KEY) \
                                      .load()
    return dataframe

df_k_pin    = get_kinesis_stream("streaming-0e2a0bfcc015-pin")
df_k_geo    = get_kinesis_stream("streaming-0e2a0bfcc015-geo")
df_k_user   = get_kinesis_stream("streaming-0e2a0bfcc015-user")


display(df_k_pin)
#display(df_k_geo)
#display(df_k_user)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
desired-name,IiI=,streaming-0e2a0bfcc015-pin,shardId-000000000002,49648943687442502996269417421907107470712786860128600098,2024-02-04T06:37:35.250+0000
desired-name,IiI=,streaming-0e2a0bfcc015-pin,shardId-000000000002,49648943687442502996269417421908316396532401626742259746,2024-02-04T06:37:35.912+0000
desired-name,IiI=,streaming-0e2a0bfcc015-pin,shardId-000000000002,49648943687442502996269417421909525322352016255916965922,2024-02-04T06:37:36.586+0000
desired-name,IiI=,streaming-0e2a0bfcc015-pin,shardId-000000000002,49648943687442502996269417421910734248171631022530625570,2024-02-04T06:37:38.575+0000
desired-name,IiI=,streaming-0e2a0bfcc015-pin,shardId-000000000002,49648943687442502996269417421911943173991245651705331746,2024-02-04T06:37:39.153+0000
desired-name,IiI=,streaming-0e2a0bfcc015-pin,shardId-000000000002,49648943687442502996269417421913152099810860349599514658,2024-02-04T06:37:39.743+0000
desired-name,IiI=,streaming-0e2a0bfcc015-pin,shardId-000000000002,49648943687442502996269417421914361025630475184932651042,2024-02-04T06:37:41.836+0000
desired-name,IiI=,streaming-0e2a0bfcc015-pin,shardId-000000000002,49648943687442502996269417421915569951450089814107357218,2024-02-04T06:37:42.414+0000
desired-name,IiI=,streaming-0e2a0bfcc015-pin,shardId-000000000002,49648943687442502996269417421916778877269704443282063394,2024-02-04T06:37:42.976+0000
desired-name,IiI=,streaming-0e2a0bfcc015-pin,shardId-000000000002,49648943687442502996269417421917987803089319209895723042,2024-02-04T06:37:45.048+0000


### Schemas
The DataFrames are currently serialised and will require us to unpack them. In order to do this we will require a **schema** of the intended DataFrame Structures. 

##### Pinterest Post Schema

In [0]:
pin_schema =    StructType([
                StructField("index", IntegerType()),
                StructField("unique_id", StringType()),
                StructField("title", StringType()),
                StructField("description", StringType()),
                StructField("poster_name", StringType()),
                StructField("follower_count", StringType()),
                StructField("tag_list", StringType()),
                StructField("is_image_or_video", StringType()),
                StructField("image_src", StringType()),
                StructField("downloaded", IntegerType()),
                StructField("save_location", StringType()),
                StructField("category", StringType())
                ])

##### Geolocation Schema

In [0]:
geo_schema =    StructType([
                StructField("ind", IntegerType()),
                StructField("timestamp", TimestampType()),
                StructField("latitude", FloatType()),
                StructField("longitude", FloatType()),
                StructField("country", StringType())
                ])

##### User Schema

In [0]:
user_schema =   StructType([
                StructField("ind", IntegerType()),
                StructField("first_name", StringType()),
                StructField("last_name", StringType()),
                StructField("age", StringType()),
                StructField("date_joined", TimestampType())
                ])

### Deserialise DataFrames

In [0]:
def deserialize_stream(kinesis_stream_name, schema):
    dataframe = kinesis_stream_name \
                .selectExpr("CAST(data as STRING)") \
                .withColumn("data", from_json(col("data"), schema)) \
                .select(col("data.*"))
    
    return dataframe


df_ds_pin = deserialize_stream(df_k_pin, pin_schema)
df_ds_geo = deserialize_stream(df_k_geo, geo_schema)
df_ds_user = deserialize_stream(df_k_user, user_schema)


# CLEANING DATAFRAMES

### Cleaning Pinterest Post

In [0]:
null_info_dict = {  'title':            'No Title Data Available',
                    'description':      'No description available Story format',
                    'poster_name':      'User Info Error',
                    'follower_count':   'User Info Error',
                    'tag_list':         'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e',
                    'image_src':        'Image src error.',
                    }

#Setup Cleaned DataFrame variable
cleaned_dfp = df_ds_pin

# Replace Error Values with None
cleaned_dfp = cleaned_dfp.replace({'No Title Data Available': None},                  subset=['title'])
cleaned_dfp = cleaned_dfp.replace({'No description available Story format': None},    subset=['description'])
cleaned_dfp = cleaned_dfp.replace({'User Info Error': None},                          subset=['poster_name'])
cleaned_dfp = cleaned_dfp.replace({'User Info Error': None},                          subset=['follower_count'])
cleaned_dfp = cleaned_dfp.replace({'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': None},        subset=['tag_list'])
cleaned_dfp = cleaned_dfp.replace({'Image src error.': None},                         subset=['image_src'])


#follower_count column contains k and M letters refering to thousand and million, respectively. 
#we'll use regexp replace to correct these so we can cast column as integer.

cleaned_dfp = cleaned_dfp.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
cleaned_dfp = cleaned_dfp.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))

#change data type of numeric columns into int
cleaned_dfp = cleaned_dfp.withColumn("follower_count", cleaned_dfp["follower_count"].cast("integer"))
cleaned_dfp = cleaned_dfp.withColumn("downloaded", cleaned_dfp["downloaded"].cast("integer"))
cleaned_dfp = cleaned_dfp.withColumn("index", cleaned_dfp["index"].cast("integer"))

#clean datain save_location
cleaned_dfp = cleaned_dfp.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

#rename index column to ind
cleaned_dfp = cleaned_dfp.withColumnRenamed("index", "ind") 

#re-order the DataFrame

cleaned_dfp = cleaned_dfp.select    (   "ind",
                                        "unique_id",
                                        "title",
                                        "description",
                                        "follower_count",
                                        "poster_name",
                                        "tag_list",
                                        "is_image_or_video",
                                        "image_src",
                                        "save_location",
                                        "category"
                                        )


#Setup Cleaned DataFrame variable
df_pin = cleaned_dfp

display(df_pin)

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
,,,,,,,,,,
,,,,,,,,,,
,,,,,,,,,,
,,,,,,,,,,
,,,,,,,,,,
,,,,,,,,,,
,,,,,,,,,,
,,,,,,,,,,
,,,,,,,,,,
,,,,,,,,,,


### Cleaning Geolocation

In [0]:
#Setup Cleaned DataFrame variable
cleaned_dfg = df_ds_geo

## CLEANING

from pyspark.sql.functions import array
cleaned_dfg = cleaned_dfg.withColumn("coordinates", array("latitude", "longitude"))

cleaned_dfg = cleaned_dfg.drop("latitude", "longitude")

from pyspark.sql.functions import to_timestamp
cleaned_dfg = cleaned_dfg.withColumn("timestamp", to_timestamp("timestamp"))

cleaned_dfg = cleaned_dfg.select    (   "ind",
                                        "country",
                                        "coordinates",
                                        "timestamp"
                                        )

#Setup Cleaned DataFrame variable
df_geo = cleaned_dfg

display(df_geo)
# Print Schema 
df_geo.printSchema()

ind,country,coordinates,timestamp
,,"List(null, null)",
,,"List(null, null)",
,,"List(null, null)",
,,"List(null, null)",
,,"List(null, null)",
,,"List(null, null)",
,,"List(null, null)",
,,"List(null, null)",
,,"List(null, null)",
,,"List(null, null)",


### Cleaning User Data

In [0]:
#Setup Cleaned DataFrame variable
cleaned_dfu = df_ds_user

# Concatenate the first_name and last_name columns to create the user_name column
cleaned_dfu = cleaned_dfu.withColumn("user_name", concat("first_name", "last_name"))

cleaned_dfu = cleaned_dfu.drop("first_name", "last_name")

from pyspark.sql.functions import to_timestamp
cleaned_dfu = cleaned_dfu.withColumn("date_joined", to_timestamp("date_joined"))

cleaned_dfu = cleaned_dfu.select    (   "ind",
                                        "user_name",
                                        "age",
                                        "date_joined"
                                        )

#Setup Cleaned DataFrame variable
df_user = cleaned_dfu

display(df_user)
# Print Schema 
df_user.printSchema()

ind,user_name,age,date_joined
,,,
,,,
,,,
,,,
,,,
,,,
,,,
,,,
,,,
,,,


# Write Data to Delta Tables

In [0]:
#Table Names
'''
0e2a0bfcc015_pin_table
0e2a0bfcc015_geo_table
0e2a0bfcc015_user_table
'''

#Table Locations
'''
"/tmp/kinesis/0e2a0bfcc015_pin_table_checkpoints/"
"/tmp/kinesis/0e2a0bfcc015_geo_table_checkpoints/"
"/tmp/kinesis/0e2a0bfcc015_user_table_checkpoints/"
'''

def write_df_deltatable (dataframe, table_name, checkpoint_loc):
    dataframe   .writeStream \
                .format("delta") \
                .outputMode("append") \
                .option("checkpointLocation", checkpoint_loc) \
                .table(table_name)


write_df_deltatable(df_pin, "0e2a0bfcc015_pin_table",   "/tmp/kinesis/0e2a0bfcc015_pin_table_checkpoints/")
write_df_deltatable(df_geo, "0e2a0bfcc015_geo_table",   "/tmp/kinesis/0e2a0bfcc015_geo_table_checkpoints/")
write_df_deltatable(df_user,"0e2a0bfcc015_user_table",  "/tmp/kinesis/0e2a0bfcc015_user_table_checkpoints/") 