#### AWS access key loaded in the same way as for batch processing

In [0]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


#### Streaming data from Kinesis read into three dataframes (one for each topic)

In [0]:
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0ad8a60ac12f-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0ad8a60ac12f-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0ad8a60ac12f-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
display(df_pin)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp


In [0]:
display(df_geo)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp


In [0]:
display(df_user)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
partition-user,eyJpbmQiOjc1MjgsImZpcnN0X25hbWUiOiJBYmlnYWlsIiwibGFzdF9uYW1lIjoiQWxpIiwiYWdlIjoyMCwiZGF0ZV9qb2luZWQiOiIyMDE1LTEwLTI0IDExOjIzOjUxIn0=,streaming-0ad8a60ac12f-user,shardId-000000000000,49648256421248313573246050223309610985396448393091350530,2024-01-14T15:59:44.638+0000
partition-user,eyJpbmQiOjI4NjMsImZpcnN0X25hbWUiOiJEeWxhbiIsImxhc3RfbmFtZSI6IkhvbG1lcyIsImFnZSI6MzIsImRhdGVfam9pbmVkIjoiMjAxNi0xMC0yMyAxNDowNjo1MSJ9,streaming-0ad8a60ac12f-user,shardId-000000000000,49648256421248313573246050225081896236951494900649558018,2024-01-14T15:59:47.500+0000
partition-user,eyJpbmQiOjU3MzAsImZpcnN0X25hbWUiOiJSYWNoZWwiLCJsYXN0X25hbWUiOiJEYXZpcyIsImFnZSI6MzYsImRhdGVfam9pbmVkIjoiMjAxNS0xMi0wOCAyMDowMjo0MyJ9,streaming-0ad8a60ac12f-user,shardId-000000000000,49648256421248313573246050226828794046294634264258412546,2024-01-14T15:59:50.415+0000
partition-user,eyJpbmQiOjgzMDQsImZpcnN0X25hbWUiOiJDaGFybGVzIiwibGFzdF9uYW1lIjoiQmVycnkiLCJhZ2UiOjI1LCJkYXRlX2pvaW5lZCI6IjIwMTUtMTItMjggMDQ6MjE6MzkifQ==,streaming-0ad8a60ac12f-user,shardId-000000000000,49648256421248313573246050228711091547434612095434358786,2024-01-14T15:59:53.296+0000
partition-user,eyJpbmQiOjg3MzEsImZpcnN0X25hbWUiOiJBbmRyZWEiLCJsYXN0X25hbWUiOiJBbGV4YW5kZXIiLCJhZ2UiOjIxLCJkYXRlX2pvaW5lZCI6IjIwMTUtMTEtMTAgMDk6Mjc6NDIifQ==,streaming-0ad8a60ac12f-user,shardId-000000000000,49648256421248313573246050229799124785087778421389393922,2024-01-14T15:59:55.151+0000
partition-user,eyJpbmQiOjEzMTMsImZpcnN0X25hbWUiOiJCcml0dGFueSIsImxhc3RfbmFtZSI6IkpvbmVzIiwiYWdlIjozMiwiZGF0ZV9qb2luZWQiOiIyMDE2LTA0LTAyIDAzOjUxOjIzIn0=,streaming-0ad8a60ac12f-user,shardId-000000000000,49648256421248313573246050230962111423557051893615165442,2024-01-14T15:59:56.951+0000
partition-user,eyJpbmQiOjQzMTUsImZpcnN0X25hbWUiOiJNaWNoZWxsZSIsImxhc3RfbmFtZSI6IlByaW5jZSIsImFnZSI6MzYsImRhdGVfam9pbmVkIjoiMjAxNS0xMi0yMCAxNjozODoxMyJ9,streaming-0ad8a60ac12f-user,shardId-000000000000,49648256421248313573246050232641309387001771954720997378,2024-01-14T15:59:59.837+0000
partition-user,eyJpbmQiOjEwNzk0LCJmaXJzdF9uYW1lIjoiVGhvbWFzIiwibGFzdF9uYW1lIjoiVHVybmVyIiwiYWdlIjozNCwiZGF0ZV9qb2luZWQiOiIyMDE2LTEyLTIyIDAwOjAyOjAyIn0=,streaming-0ad8a60ac12f-user,shardId-000000000000,49648256421248313573246050234877822153288836202805329922,2024-01-14T16:00:03.681+0000
partition-user,eyJpbmQiOjU0OTQsImZpcnN0X25hbWUiOiJBbm5lIiwibGFzdF9uYW1lIjoiQWxsZW4iLCJhZ2UiOjI3LCJkYXRlX2pvaW5lZCI6IjIwMTUtMTItMTYgMTU6MjA6MDUifQ==,streaming-0ad8a60ac12f-user,shardId-000000000000,49648256421248313573246050236147194263884196973685768194,2024-01-14T16:00:05.554+0000
partition-user,eyJpbmQiOjUwNjksImZpcnN0X25hbWUiOiJBbWFuZGEiLCJsYXN0X25hbWUiOiJCYWxsIiwiYWdlIjoyNSwiZGF0ZV9qb2luZWQiOiIyMDE2LTAxLTEzIDE3OjM2OjMwIn0=,streaming-0ad8a60ac12f-user,shardId-000000000000,49648256421248313573246050237383925377349962756849139714,2024-01-14T16:00:07.311+0000


#### Deserialising the data column of each stream

In [0]:
df_pin = df_pin.selectExpr("CAST(data as STRING)")
df_geo = df_geo.selectExpr("CAST(data as STRING)")
df_user = df_user.selectExpr("CAST(data as STRING)")
#display(df_pin)

#### Json data in data column is exploded to make regular dataframes for each data stream (like the ones used for batch processing)

In [0]:
from pyspark.sql.functions import from_json, col

# Convert the 'data' column from binary to string
df_pin = df_pin.withColumn("data_str", col("data").cast("string"))

# Define json schema
json_schema = StructType([
    StructField("index", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

# Apply the from_json function to parse the 'data_str' column
df_pin = df_pin.withColumn("parsed_data", from_json("data_str", json_schema))

# Selecting parsed_data
df_pin = df_pin.select("parsed_data.*")


In [0]:
# Convert the 'data' column from binary to string
df_geo = df_geo.withColumn("data_str", col("data").cast("string"))

# Define json schema
json_schema = StructType([
    StructField("index", StringType(), True),
    StructField("country", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("timestamp", StringType(), True),
])

# Apply the from_json function to parse the 'data_str' column
df_geo = df_geo.withColumn("parsed_data", from_json("data_str", json_schema))

# Selecting parsed_data
df_geo = df_geo.select("parsed_data.*")


In [0]:
# Convert the 'data' column from binary to string
df_user = df_user.withColumn("data_str", col("data").cast("string"))

# Define json schema
json_schema = StructType([
    StructField("index", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("date_joined", StringType(), True),
])

# Apply the from_json function to parse the 'data_str' column
df_user = df_user.withColumn("parsed_data", from_json("data_str", json_schema))

# Selecting parsed_data
df_user = df_user.select("parsed_data.*")

#### Each of the dataframes transformed in the same way as for batch processing

In [0]:
from pyspark.sql.functions import when
df_pin = df_pin.withColumn('description', when(col('description').contains('No description available'), None).otherwise(col('description')))
 # ^^ done in this format rather than standard replace because some records contain more info after no description
df_pin = df_pin.replace({'Image src error.': None}, subset=['image_src'])
df_pin = df_pin.replace({'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': None}, subset=['tag_list'])
df_pin = df_pin.replace({'No Title Data Available': None}, subset=['title'])
df_pin = df_pin.replace({'User Info Error': None}, subset=['follower_count', 'poster_name'])

In [0]:
from pyspark.sql.functions import regexp_replace, col
df_pin = df_pin.withColumn('save_location', regexp_replace(col('save_location'), 'Local save in ', ''))
df_pin = df_pin.withColumn('follower_count', regexp_replace(col('follower_count'), 'k', '000'))
df_pin = df_pin.withColumn('follower_count', regexp_replace(col('follower_count'), 'M', '000000'))

In [0]:
df_pin = df_pin.withColumn("index", df_pin["index"].cast("integer"))
df_pin = df_pin.withColumn("follower_count", df_pin["follower_count"].cast("integer"))
df_pin = df_pin.withColumnRenamed("index", "ind")
df_pin = df_pin.select("ind",
"unique_id",
"title",
"description",
"follower_count",
"poster_name",
"tag_list",
"is_image_or_video",
"image_src",
"save_location",
"category")

In [0]:
from pyspark.sql.functions import array
df_geo = df_geo.withColumn("coordinates", array("latitude", "longitude"))
df_geo = df_geo.drop("latitude", "longitude")
df_geo = df_geo.withColumn("timestamp", df_geo["timestamp"].cast("timestamp"))
df_geo = df_geo.withColumnRenamed("index", "ind")
df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")

In [0]:
from pyspark.sql.functions import concat, lit
df_user = df_user.withColumn("user_name", concat("first_name", lit(" "), "last_name"))
df_user = df_user.drop("first_name", "last_name")
df_user = df_user.withColumn("timestamp", df_user["date_joined"].cast("timestamp"))
df_user = df_user.withColumnRenamed("index", "ind")
df_user = df_user.select("ind", "user_name", "age", "date_joined")

#### Streaming data written to delta tables 

In [0]:
df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0ad8a60ac12f_pin_table")

In [0]:
df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0ad8a60ac12f_geo_table")

In [0]:
df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0ad8a60ac12f_user_table")