### Kinesis Streams Data Processing

This notebook demonstrates the process of reading data from AWS Kinesis streams using PySpark. The code performs the following operations:

1. Reads AWS access and secret keys from a CSV file.
2. Initialises stream readers for three Kinesis streams: "streaming-0a48d8473ced-pin," "streaming-0a48d8473ced-geo," and "streaming-0a48d8473ced-user."
3. Retrieves and displays data frames from the respective Kinesis streams.
4. Data cleaning operations 
5. Write the cleaned data to a Delta table.


#### Configuration for AWS
The following code reads the AWS access and secret keys from a CSV file. This is allows us to access and retrieve data for streaming.

In [0]:
dbutils.fs.ls("/FileStore/tables")

from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib
file_type = "csv"
first_row_is_header = "true"
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

#### Initialise Stream Readers
Next initialise stream readers for three Kinesis streams: "streaming-0a48d8473ced-pin," "streaming-0a48d8473ced-geo," and "streaming-0a48d8473ced-user."

In [0]:
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a48d8473ced-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

display(df_pin.limit(5))

df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a48d8473ced-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

display(df_geo.limit(5))

df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a48d8473ced-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

display(df_user.limit(5))

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
5730,eyJpbmRleCI6NTczMCwidW5pcXVlX2lkIjoiMWUxZjBjOGItOWZjZi00NjBiLTkxNTQtYzc3NTgyNzIwNmViIiwidGl0bGUiOiJJc2xhbmQgT2FzaXMgQ291cG9uIE9yZ2FuaXplciIsImRlc2NyaXB0aW9uIjoiRGVzY3JpcHQ= (truncated),streaming-0a48d8473ced-pin,shardId-000000000000,49645583445008519613343965539482364925642092958332223490,2023-10-20T11:47:37.285+0000
8304,eyJpbmRleCI6ODMwNCwidW5pcXVlX2lkIjoiNWI2ZDA5MTMtMjVlNC00M2FiLTgzOWQtODVkNTUxNmY3OGE0IiwidGl0bGUiOiJUaGUgIzEgUmVhc29uIFlvdeKAmXJlIE5vdCBIaXMgUHJpb3JpdHkgQW55bW9yZSAtIE1hdHQ= (truncated),streaming-0a48d8473ced-pin,shardId-000000000000,49645583445008519613343965539640734208011609517657686018,2023-10-20T11:47:40.199+0000
7554,eyJpbmRleCI6NzU1NCwidW5pcXVlX2lkIjoiYzZmYTEyZjQtMGQ0YS00YjA3LWEzMzUtNWJmOWYzN2Y4MjgxIiwidGl0bGUiOiJDcmFpZyBTdHlsZSIsImRlc2NyaXB0aW9uIjoiaW1nZW50bGVib3NzOiDigJwgLSBNb3JlIGE= (truncated),streaming-0a48d8473ced-pin,shardId-000000000000,49645583445008519613343965541987259223883606532468768770,2023-10-20T11:48:05.457+0000
3156,eyJpbmRleCI6MzE1NiwidW5pcXVlX2lkIjoiZmE2ZTMxYTQtMThjMi00ZWNhLWE2ZDgtZTkwM2VlZTJjMmE0IiwidGl0bGUiOiJIYW5kcHJpbnQgUmVpbmRlZXIgT3JuYW1lbnRzIC0gQ3JhZnR5IE1vcm5pbmciLCJkZXNjcmk= (truncated),streaming-0a48d8473ced-pin,shardId-000000000000,49645583445008519613343965543272347370133958238534631426,2023-10-20T11:48:19.076+0000
2074,eyJpbmRleCI6MjA3NCwidW5pcXVlX2lkIjoiODZlZDA5YTctODQyZC00OTZkLTk1MDEtMDEwYzY1NGViMzQwIiwidGl0bGUiOiIzNSBDaHJpc3RtYXMgRGVjb3JhdGluZyBJZGVhcyBXZSBCZXQgWW91IEhhdmVuJ3QgVGhvdWc= (truncated),streaming-0a48d8473ced-pin,shardId-000000000000,49645583445008519613343965543916704831988556069689360386,2023-10-20T11:48:25.675+0000


#### Display Data Frames
This code snippet retrieves and displays data frames from the respective Kinesis streams.

The selectExpr("CAST(data as STRING)") operation is used to convert the streamed data into a string format, enabling easier manipulation and analysis. 
The subsequent display functions showcase the contents of the data frames df_pin, df_geo, and df_user to facilitate further data processing and analysis.

In [0]:
df_pin = df_pin.selectExpr("CAST(data as STRING)")
display(df_pin.limit(5))
df_geo = df_geo.selectExpr("CAST(data as STRING)")
display(df_geo.limit(5))
df_user = df_user.selectExpr("CAST(data as STRING)")
display(df_user.limit(5))

data
"{""ind"":5730,""first_name"":""Rachel"",""last_name"":""Davis"",""age"":36,""date_joined"":""2015-12-08 20:02:43""}"
"{""ind"":8304,""first_name"":""Charles"",""last_name"":""Berry"",""age"":25,""date_joined"":""2015-12-28 04:21:39""}"
"{""ind"":7554,""first_name"":""Cheryl"",""last_name"":""Huerta"",""age"":20,""date_joined"":""2017-04-11 16:35:33""}"
"{""ind"":3156,""first_name"":""Andrew"",""last_name"":""Baker"",""age"":22,""date_joined"":""2015-12-21 08:06:54""}"
"{""ind"":2074,""first_name"":""Annette"",""last_name"":""Forbes"",""age"":21,""date_joined"":""2016-01-03 15:42:12""}"


### Data Cleaning

The following code snippets demonstrates the data cleaning process for the DataFrames. These operations contribute to ensuring the quality and consistency of the data within the DataFrames. This is in preparation for subsequent analysis and processing.

#### Data Cleaning for df_pin

In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StringType, StructField, StructType

# Take the JSON string and convert into a data frame with the corresponding schema
# Define the schema based on the JSON structure
pin_schema = StructType([
    StructField("index", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
    ])
# Extract fields from the JSON string and create separate columns
df_pin = df_pin.select(from_json(col("data"), pin_schema).alias("data")).select("data.*")

# Replace empty, irrelevant entries with Nones
df_pin = df_pin.replace('', None)
df_pin = df_pin.replace(' ', None)
df_pin = df_pin.replace('nan', None)
# Convert follower_count to integer, handling 'k' and 'M' notations
df_pin = df_pin.withColumn('follower_count', regexp_replace('follower_count', 'k', '000'))
df_pin = df_pin.withColumn('follower_count', regexp_replace('follower_count', 'M', '000000'))
df_pin = df_pin.withColumn('follower_count', df_pin['follower_count'].cast('int'))
# Clean save_location column by removing 'Local save in' text
df_pin = df_pin.withColumn('save_location', regexp_replace('save_location', 'Local save in ', ''))
# Rename index column to 'ind' and reorder DataFrame columns
df_pin = df_pin.withColumnRenamed('index', 'ind')
#Reorder the DataFrame columns to have the following column order:
df_pin = df_pin.select('ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name', 'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category')
display(df_pin.limit(5))

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",0,Consuelo Aguirre,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,/data/finance,finance
8304,5b6d0913-25e4-43ab-839d-85d5516f78a4,The #1 Reason You’re Not His Priority Anymore - Matthew Coast,#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself,51000,Commitment Connection,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",image,https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png,/data/quotes,quotes
7554,c6fa12f4-0d4a-4b07-a335-5bf9f37f8281,Craig Style,imgentleboss: “ - More about men’s fashion at @Gentleboss - GB’s Facebook - ”,940,iElylike ..✿◕‿◕✿ஐ✿◕‿◕✿,"Mens Fashion Blog,Look Fashion,Autumn Fashion,Fashion News,Fashion Sale,80s Fashion,Paris Fashion,Runway Fashion,Fashion Trends",image,https://i.pinimg.com/originals/e7/6e/8e/e76e8ed6cc838b84a934c6948a5caff7.jpg,/data/mens-fashion,mens-fashion
3156,fa6e31a4-18c2-4eca-a6d8-e903eee2c2a4,Handprint Reindeer Ornaments - Crafty Morning,"This post may contain affiliate links, read our Disclosure Policy for more information. As an Amazon Associate I earn from qualifying purchases, thank you! Make some cute handpr…",892000,Michelle {CraftyMorning.com},"Christmas Gifts For Parents,Christmas Decorations For Kids,Christmas Crafts For Toddlers,Preschool Christmas,Christmas Crafts For Gifts,Christmas Activities,Toddler Crafts,Kids Christmas,Christmas Feeling",image,https://i.pinimg.com/originals/ff/fe/38/fffe384f3ec18a0d87cb2d80cc8c1499.jpg,/data/diy-and-crafts,diy-and-crafts
2074,86ed09a7-842d-496d-9501-010c654eb340,35 Christmas Decorating Ideas We Bet You Haven't Thought Of,20 Christmas Decorating Ideas We Bet You Haven't Thought Of via @PureWow,868000,PureWow,"Holiday Centerpieces,Xmas Decorations,Centerpiece Ideas,Table Centerpieces,Valentine Decorations,Wedding Centerpieces,Outdoor Decorations,Christmas Centerpieces With Candles,Christmas Dining Table Decorations",image,https://i.pinimg.com/originals/e9/b9/f0/e9b9f01cc3b2cf41948b45854335396c.jpg,/data/christmas,christmas


#### Data Cleaning for df_geo

In [0]:
# Take the JSON string and convert into a data frame with the corresponding schema
# Define the schema based on the JSON structure
geo_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("country", StringType(), True)
    ])    

# Extract fields from the JSON string and create separate columns
df_geo = df_geo.select(from_json(col("data"), geo_schema).alias("data")).select("data.*")

#Create a new column that contains an array based on the latitude and longitude columns
df_geo = df_geo.withColumn('coordinates', array('latitude', 'longitude'))
df_geo = df_geo.drop('latitude', 'longitude')
#Convert the timestamp column from a string to a timestamp data type
df_geo = df_geo.withColumn('timestamp', df_geo['timestamp'].cast('timestamp'))
#Reorder the DataFrame columns to have the following column order:
df_geo = df_geo.select('ind', 'country', 'coordinates', 'timestamp')
display(df_geo.limit(5))

ind,country,coordinates,timestamp
5730,Colombia,"List(-77.015, -101.437)",2021-04-19T17:37:03.000+0000
8304,French Guiana,"List(-28.8852, -164.87)",2019-09-13T04:50:29.000+0000
7554,Sudan,"List(-51.2172, -77.9768)",2019-03-20T03:15:07.000+0000
3156,Armenia,"List(-84.738, -160.795)",2018-01-13T19:33:49.000+0000
2074,Central African Republic,"List(-52.3213, -50.11)",2019-11-03T05:41:59.000+0000


#### Data Cleaning for df_user

In [0]:
# Take the JSON string and convert into a data frame with the corresponding schema
# Define the schema based on the JSON structure
user_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("date_joined", StringType(), True)
    ])

# Extract fields from the JSON string and create separate columns
df_user = df_user.select(from_json(col("data"), user_schema).alias("data")).select("data.*")

#Clean the df_user dataframe
#Create a new column user_name that concatenates the first_name and last_name columns
df_user = df_user.withColumn('user_name', concat(df_user['first_name'], lit(' '), df_user['last_name']))
df_user = df_user.drop('first_name', 'last_name')
#Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn('date_joined', df_user['date_joined'].cast('timestamp'))
#Reorder the DataFrame columns to have the following column order:
df_user = df_user.select('ind', 'user_name', 'age', 'date_joined')
display(df_user.limit(5))

ind,user_name,age,date_joined
5730,Rachel Davis,36,2015-12-08T20:02:43.000+0000
8304,Charles Berry,25,2015-12-28T04:21:39.000+0000
7554,Cheryl Huerta,20,2017-04-11T16:35:33.000+0000
3156,Andrew Baker,22,2015-12-21T08:06:54.000+0000
2074,Annette Forbes,21,2016-01-03T15:42:12.000+0000


#### Save Cleaned Data as Delta Tables

The cleaned data is saved as Delta tables. Each of the DataFrames, namely df_pin, df_geo, and df_user, are saved as Delta tables with their respective table names, ensuring that the cleaned data is appropriately stored for future analysis and processing. 
Previously saved tables are deleted to ensure the data is updated. 
The option("checkpointLocation", "/tmp/kinesis/_checkpoints/") argument specifies the checkpoint location to ensure fault tolerance and data consistency in the event of failures.


In [0]:
# Delete previous tables
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)
 
#Save df_pin as Delta table
df_pin.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/tmp/kinesis/_checkpoints/").table("0a48d8473ced_pin_table")

# Save df_geo as Delta table
df_geo.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/tmp/kinesis/_checkpoints/").table("0a48d8473ced_geo_table")

# Save df_user as Delta table
df_user.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/tmp/kinesis/_checkpoints/").table("0a48d8473ced_user_table")
