#Note: this is all synthetic data 

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import urllib

Code to clear kinesis folder if running writestream again 

In [0]:
#Before running the writeStream function, delete the checkpoint folder:

dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

True

Reading in credentials 

In [0]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


Getting the pin stream from Kinesis, cleaning it, then writing it to a Delta Table

In [0]:
df_pin_raw = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0ebb0073c95b-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

(display(df_pin_raw))

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp


In [0]:
# cast binary form to string 
df_pin_string = df_pin_raw.selectExpr("CAST(data as STRING)")
display(df_pin_string)

data


In [0]:
# Define a custom schema
custom_schema = StructType([
    StructField("index", LongType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", LongType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True),
    StructField("poster_name", StringType(), True),
])

# Parse the JSON data in the `data` column using the custom schema
df_pin_parsed = df_pin_string.withColumn("data_parsed", F.from_json(F.col("data"), custom_schema))

# Expand the parsed JSON fields into individual columns
df_pin = df_pin_parsed.select("data_parsed.*")  # Expands all JSON fields as columns

# Display the DataFrame
display(df_pin)





index,unique_id,title,description,follower_count,tag_list,is_image_or_video,image_src,downloaded,save_location,category,poster_name


In [0]:
# Cleaning the pin data

# Replacing empty and null values
df_pin = df_pin.fillna("None")    
df_pin = df_pin.replace("", "None") 

# Removing the text from follower count and turning it into an int
# Use regexp_replace to remove letters
df_pin = df_pin.withColumn("follower_count", F.regexp_replace("follower_count", "[^0-9]", ""))

# Cast the column to integer type
df_pin = df_pin.withColumn("follower_count", F.col("follower_count").cast("int"))

# Clean the data in the save_location column to include only the save location path
df_pin = df_pin.withColumn("save_location", F.regexp_replace("save_location", "Local save in ", ""))   

# rename index column to ind
df_pin = df_pin.withColumnRenamed("index", "ind")

# reorder columns 
df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

display(df_pin)

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category


In [0]:
display(df_pin.dtypes)

_1,_2
ind,bigint
unique_id,string
title,string
description,string
follower_count,int
poster_name,string
tag_list,string
is_image_or_video,string
image_src,string
save_location,string


In [0]:
# Writing the data to Delta Table

df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0ebb0073c95b_pin_table")

<pyspark.sql.streaming.query.StreamingQuery at 0x7f9a7477e2f0>

Getting the geo stream from Kinesis, cleaning it, then writing it to a Delta Table

In [0]:
df_geo_raw = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0ebb0073c95b-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

display(df_geo_raw)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp


In [0]:
# cast binary form to string 
df_geo_string = df_geo_raw.selectExpr("CAST(data as STRING)")

# Define a custom schema
custom_schema = StructType([
    StructField("country", StringType(), True),
    StructField("ind", LongType(), True),
    StructField("latitude", DoubleType(), True), 
    StructField("longitude", DoubleType(), True),
    StructField("timestamp", StringType(), True),
])

# Parse the JSON data in the `data` column using the custom schema
df_geo_parsed = df_geo_string.withColumn("data_parsed", F.from_json(F.col("data"), custom_schema))

# Expand the parsed JSON fields into individual columns
df_geo = df_geo_parsed.select("data_parsed.*")  # Expands all JSON fields as columns

# Display the resulting DataFrame
display(df_geo)



country,ind,latitude,longitude,timestamp


In [0]:
# Cleaning the geo dataframe 

# Create a new column 'coordinates' by combining the latitude and longitude columns 
df_geo = df_geo.withColumn("coordinates", F.array(F.col("latitude"), F.col("longitude")))

# Drop the latitude and longitude column as they're no longer needed 
df_geo = df_geo.drop("latitude", "longitude")

# Change 'timestamp' column to datatype timestamp
df_geo = df_geo.withColumn("timestamp", F.col("timestamp").cast("timestamp"))

# Reorder the columns
df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")

display(df_geo.limit(10))

ind,country,coordinates,timestamp


In [0]:
# Writing the data to Delta Table

df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0ebb0073c95b_geo_table")

<pyspark.sql.streaming.query.StreamingQuery at 0x7f9a74be2140>

Getting the user data stream from Kinesis, cleaning it, then writing it to a Delta Table

In [0]:
df_user_raw = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0ebb0073c95b-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

display(df_user_raw)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp


In [0]:
# cast binary form to string 
df_user_string = df_user_raw.selectExpr("CAST(data as STRING)")

# Define a custom schema
custom_schema = StructType([
    StructField("age", LongType(), True),
    StructField("date_joined", StringType(), True),
    StructField("first_name", StringType(), True), 
    StructField("ind", LongType(), True),
    StructField("last_name", StringType(), True), 
])

# Parse the JSON data in the `data` column using the custom schema
df_user_parsed = df_user_string.withColumn("data_parsed", F.from_json(F.col("data"), custom_schema))

# Expand the parsed JSON fields into individual columns
df_user = df_user_parsed.select("data_parsed.*")  # Expands all JSON fields as columns

# Display the resulting DataFrame
display(df_user)

age,date_joined,first_name,ind,last_name


In [0]:
# Cleaning the user data 

# Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user = df_user.withColumn("user_name", F.concat(F.col("first_name"), F.lit(" "), F.col("last_name"))) 

# Drop first and last name columns as we don't need them 
df_user = df_user.drop("first_name", "last_name")

# Change column 'date_joined' column to timestamp data format 
df_user = df_user.withColumn("date_joined", F.col("date_joined").cast("timestamp"))

# Rearrange the order of the columns 
df_user = df_user.select("ind", "user_name", "age", "date_joined")

display(df_user)

ind,user_name,age,date_joined


In [0]:

# Writing the data to Delta Table

df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0ebb0073c95b_user_table")


<pyspark.sql.streaming.query.StreamingQuery at 0x7f9a74be0df0>