In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define path to the data table
creds_path = 'path_to_credentials'
# Read the table to a df
aws_keys_df = spark.read.format("delta").load(creds_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")



In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
# Stream data from Kinesis
stream_df = spark \
.readStream \
.format('kinesis') \
.option('streamName','Kinesis-Prod-Stream') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

display(stream_df)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
pin-partition,eyJpbmRleCI6NzUyOCwiZmlyc3RfbmFtZSI6IkFiaWdhaWwiLCJsYXN0X25hbWUiOiJBbGkiLCJhZ2UiOjIwLCJkYXRlX2pvaW5lZCI6IjIwMTUtMTAtMjRUMTE6MjM6NTEifQ==,Kinesis-Prod-Stream,shardId-000000000004,49659751887085965470122021495210832194713232343895638082,2025-01-27T16:28:47.309Z
pin-partition,eyJpbmRleCI6NzUyOCwiZmlyc3RfbmFtZSI6IkFiaWdhaWwiLCJsYXN0X25hbWUiOiJBbGkiLCJhZ2UiOjIwLCJkYXRlX2pvaW5lZCI6IjIwMTUtMTAtMjRUMTE6MjM6NTEifQ==,Kinesis-Prod-Stream,shardId-000000000004,49659751887085965470122021602281765260522100369972527170,2025-01-27T16:30:13.615Z
pin-partition,eyJpbmRleCI6NzUyOCwidW5pcXVlX2lkIjoiZmJlNTNjNjYtMzQ0Mi00NzczLWIxOWUtZDNlYzZmNTRkZGRmIiwidGl0bGUiOiJObyBUaXRsZSBEYXRhIEF2YWlsYWJsZSIsImRlc2NyaXB0aW9uIjoiTm8gZGVzY3JpcHRpb24= (truncated),Kinesis-Prod-Stream,shardId-000000000004,49659751887085965470122022234656354391099290364019736642,2025-01-27T16:40:56.266Z
pin-partition,eyJpbmRleCI6NzUyOCwidW5pcXVlX2lkIjoiZmJlNTNjNjYtMzQ0Mi00NzczLWIxOWUtZDNlYzZmNTRkZGRmIiwidGl0bGUiOiJObyBUaXRsZSBEYXRhIEF2YWlsYWJsZSIsImRlc2NyaXB0aW9uIjoiTm8gZGVzY3JpcHRpb24= (truncated),Kinesis-Prod-Stream,shardId-000000000004,49659751887085965470122022296229364235711588094359633986,2025-01-27T16:42:03.907Z
pin-partition,eyJpbmRleCI6NzUyOCwidW5pcXVlX2lkIjoiZmJlNTNjNjYtMzQ0Mi00NzczLWIxOWUtZDNlYzZmNTRkZGRmIiwidGl0bGUiOiJObyBUaXRsZSBEYXRhIEF2YWlsYWJsZSIsImRlc2NyaXB0aW9uIjoiTm8gZGVzY3JpcHRpb24= (truncated),Kinesis-Prod-Stream,shardId-000000000004,49659751887085965470122022335269205728526810781925244994,2025-01-27T16:42:44.855Z
pin-partition,eyJpbmRleCI6NzUyOCwidW5pcXVlX2lkIjoiZmJlNTNjNjYtMzQ0Mi00NzczLWIxOWUtZDNlYzZmNTRkZGRmIiwidGl0bGUiOiJObyBUaXRsZSBEYXRhIEF2YWlsYWJsZSIsImRlc2NyaXB0aW9uIjoiTm8gZGVzY3JpcHRpb24= (truncated),Kinesis-Prod-Stream,shardId-000000000004,49659751887085965470122022351445842120790165143058120770,2025-01-27T16:43:03.205Z
df_geo,IiI=,Kinesis-Prod-Stream,shardId-000000000004,49659751887085965470122029603707209181577997219050553410,2025-01-27T18:29:04.059Z
df_geo,IiI=,Kinesis-Prod-Stream,shardId-000000000004,49659751887085965470122029606880639458066399077532172354,2025-01-27T18:29:06.767Z
df_geo,IiI=,Kinesis-Prod-Stream,shardId-000000000004,49659751887085965470122029610362345818556531238124912706,2025-01-27T18:29:09.457Z
df_geo,IiI=,Kinesis-Prod-Stream,shardId-000000000004,49659751887085965470122029614003630387235794449778868290,2025-01-27T18:29:12.253Z


In [0]:
# Partitioning streaming data via partition-key
# User table
user_df = stream_df.filter(stream_df.partitionKey == "user-partition")
# Pin table
pin_df = stream_df.filter(stream_df.partitionKey == "pin-partition")
# Geo table
geo_df = stream_df.filter(stream_df.partitionKey == "geo-partition")


In [0]:
# Decoding "data" column
user_df = user_df.selectExpr("CAST(data as STRING) UserjsonData")
pin_df = pin_df.selectExpr("CAST(data as STRING) PinjsonData")
geo_df = geo_df.selectExpr("CAST(data as STRING) GeojsonData")

In [0]:
display(geo_df)

GeojsonData
"{""index"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"
"{""index"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"
"{""index"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"
"{""index"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"
"{""index"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"
"{""index"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"
"{""index"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"
"{""index"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"
"{""index"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"
"{""index"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"


In [0]:
# Structuring the JSON data
# User table schema
user_struct = StructType([
    StructField("index", IntegerType(), False),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", StringType(), True)])
# Pin table schema
pin_struct = StructType([
    StructField("index", IntegerType(), False),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)])
# Geo table schema
geo_struct = StructType([
    StructField("index", IntegerType(), False),
    StructField("timestamp", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("country", StringType(), True)])


In [0]:
user_df = user_df.select(from_json("UserjsonData", user_struct).alias("data")).select("data.*")
pin_df = pin_df.select(from_json("PinjsonData", pin_struct).alias("data")).select("data.*")
geo_df = geo_df.select(from_json("GeojsonData", geo_struct).alias("data")).select("data.*")
display(pin_df)

index,unique_id,title,description,poster_name,follower_count,tag_list,is_image_or_video,image_src,downloaded,save_location,category
7528,,,,,,,,,,,
7528,,,,,,,,,,,
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,No description available Story format,User Info Error,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,0.0,Local save in /data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,No description available Story format,User Info Error,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,0.0,Local save in /data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,No description available Story format,User Info Error,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,0.0,Local save in /data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,No description available Story format,User Info Error,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,0.0,Local save in /data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,No description available Story format,User Info Error,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,0.0,Local save in /data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,No description available Story format,User Info Error,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,0.0,Local save in /data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,No description available Story format,User Info Error,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,0.0,Local save in /data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,No description available Story format,User Info Error,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,0.0,Local save in /data/mens-fashion,mens-fashion


In [0]:
display(geo_df)

ind,timestamp,latitude,longitude,country
7528.0,2020-08-28T03:52:47,,,Albania
2863.0,2020-04-27T13:34:16,,,Armenia
5730.0,2021-04-19T17:37:03,,,Colombia
8304.0,2019-09-13T04:50:29,,,French Guiana
8731.0,2020-07-17T04:39:09,,,Aruba
1313.0,2018-06-26T02:39:25,,,Maldives
4315.0,2019-12-15T03:51:28,,,Cote d'Ivoire
10794.0,2022-01-01T02:26:50,,,Cocos (Keeling) Islands
5494.0,2021-07-21T02:02:35,,,Bulgaria
5069.0,2021-03-20T09:32:44,,,Azerbaijan


In [0]:
display(user_df)

index,first_name,last_name,age,date_joined
7528,Abigail,Ali,20,2015-10-24T11:23:51
7528,Abigail,Ali,20,2015-10-24T11:23:51
7528,Abigail,Ali,20,2015-10-24T11:23:51
7528,Abigail,Ali,20,2015-10-24T11:23:51
7528,Abigail,Ali,20,2015-10-24T11:23:51
7528,Abigail,Ali,20,2015-10-24T11:23:51
7528,Abigail,Ali,20,2015-10-24T11:23:51
7528,Abigail,Ali,20,2015-10-24T11:23:51
7528,Abigail,Ali,20,2015-10-24T11:23:51
7528,Abigail,Ali,20,2015-10-24T11:23:51


In [0]:
from pyspark.sql.functions import array, to_timestamp, concat, lit, regexp_replace, col, when
# Clean User streaming data
# Creating user_name column by concatenating first_name and last_name
user_df = user_df.withColumn("user_name", concat("first_name", lit(" "), "last_name"))
# Dropping first_name and last_name columns
user_df = user_df.drop("first_name", "last_name")
# changing the date_joined column to a timestamp type
user_df = user_df.withColumn("date_joined", to_timestamp("date_joined"))
# Renaming the index column to ind
user_df = user_df.withColumnRenamed("index", "ind")
# Reordering the columns
user_df = user_df.select("ind", "user_name", "age", "date_joined")


In [0]:
# Clean Pin streaming data
pin_df = pin_df.replace({"User Info Error": None})
pin_df = pin_df.replace({"No description available": None})
pin_df = pin_df.replace({"No description available Story format": None})
pin_df = pin_df.replace({"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e": None})
pin_df = pin_df.replace({"No Title Data Available": None})
pin_df = pin_df.replace({"Image src error.": None})
# Amending "save_location" to show only the filepath
pin_df = pin_df.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))
# Replaces the k and M with the appropriate number of zeros and casts the follower_count column to int
pin_df = pin_df.withColumn(
    "follower_count",
    when(col("follower_count").endswith("k"), 
         regexp_replace(col("follower_count"), "k", "").cast("int") * 1000)
    .when(col("follower_count").endswith("M"), 
         regexp_replace(col("follower_count"), "M", "").cast("int") * 1000000)
    .otherwise(col("follower_count").cast("int"))
)
# Renamed the index column to ind
pin_df = pin_df.withColumnRenamed("index", "ind")
# Casted these two columns to int since they were originally long numerical type which wasn't neccesary for these smaller numbers (although in a real life situation i imagine it would be better to keep them as long)

pin_df = pin_df.withColumn("ind", pin_df["ind"].cast("int"))
# Reordering Columns (seems downloaded column is being dropped aswell?!!?!?!?!?!?!?!)
pin_df = pin_df.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")



In [0]:
# Clean Geo streaming data
# making a coordinates column from longitude and latitude columns using an array
geo_df = geo_df.withColumn("coordinates", array("latitude", "longitude"))
geo_df = geo_df.drop("latitude", "longitude")
# changing the timestamp column to a timestamp type
geo_df = geo_df.withColumn("timestamp", to_timestamp("timestamp"))
# Renaming the index column to ind
geo_df = geo_df.withColumnRenamed("index", "ind")
# Reordering the columns
geo_df = geo_df.select("ind", "country", "coordinates", "timestamp")

In [0]:
display(geo_df)

ind,country,coordinates,timestamp
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47Z
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47Z
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47Z
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47Z
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47Z
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47Z
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47Z
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47Z
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47Z
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47Z


In [0]:
display(pin_df)

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
7528,,,,,,,,,,
7528,,,,,,,,,,
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,,,,,,multi-video(story page format),,/data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,,,,,,multi-video(story page format),,/data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,,,,,,multi-video(story page format),,/data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,,,,,,multi-video(story page format),,/data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,,,,,,multi-video(story page format),,/data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,,,,,,multi-video(story page format),,/data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,,,,,,multi-video(story page format),,/data/mens-fashion,mens-fashion
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,,,,,,multi-video(story page format),,/data/mens-fashion,mens-fashion


In [0]:
display(user_df)

ind,user_name,age,date_joined
7528,Abigail Ali,20,2015-10-24T11:23:51Z
7528,Abigail Ali,20,2015-10-24T11:23:51Z
7528,Abigail Ali,20,2015-10-24T11:23:51Z
7528,Abigail Ali,20,2015-10-24T11:23:51Z
7528,Abigail Ali,20,2015-10-24T11:23:51Z
7528,Abigail Ali,20,2015-10-24T11:23:51Z
7528,Abigail Ali,20,2015-10-24T11:23:51Z
7528,Abigail Ali,20,2015-10-24T11:23:51Z
7528,Abigail Ali,20,2015-10-24T11:23:51Z
7528,Abigail Ali,20,2015-10-24T11:23:51Z


In [0]:
# Save the stream data to Delta tables
user_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("c1b2415b9314_user_table")

pin_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("c1b2415b9314_pin_table")

geo_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("c1b2415b9314_geo_table")

<pyspark.sql.streaming.query.StreamingQuery at 0x7f7867887370>

In [0]:
# Delete checkpoint folder in order to use writeStream again
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

True