In [0]:
# pyspark functions
from pyspark.sql.functions import *
from pyspark.sql.types import *
# URL processing
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")


In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

In [0]:
# Read Kinesis streaming data into Databricks Notebook

df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a1667ad2f7f-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a1667ad2f7f-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a1667ad2f7f-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
# Deserialize streaming data
df_pin = df_pin.selectExpr("CAST(data as STRING)")
# Parse JSON Data into format suitable for PySpark
# from pyspark.sql.functions import from_json
# from pyspark.sql.types import StructType, StructField, StringType, LongType

# Define schema of JSON data
schema = StructType([
    StructField("index", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),   
    StructField("downloaded", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])
# Parse JSON data
df_pin = df_pin.withColumn("jsonData", from_json("data", schema)).select("jsonData.*")
df_pin = df_pin.dropna(how="all")
display(df_pin)

In [0]:
# Clean the df_pin DataFrame

# Replace empty entries and entries that do not contain relevant data in each column with None
cleaned_df_pin = (df_pin.replace({'No description available Story format': None}, subset=['description'])
                    .replace({'No description available': None}, subset=['description'])
                    #.replace({'Untitled': None}, subset=['description']) # Unsure of this one 🤔
                    .replace({'User Info Error': None}, subset=['follower_count'])
                    .replace({'Image src Error.': None}, subset=['image_src'])
                    .replace({'Image src error.': None}, subset=['image_src'])
                    .replace({'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': None}, subset=['tag_list'])
                    .replace({'No Title Data Available': None}, subset=['title'])
                    .replace({'User Info Error': None}, subset=['poster_name']))

# Perform necessary transformations on the follower_count to ensure every entry is a number.
from pyspark.sql.functions import regexp_replace
cleaned_df_pin = cleaned_df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
cleaned_df_pin = cleaned_df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
# Make sure the data type of this column is an integer
cleaned_df_pin = cleaned_df_pin.withColumn("follower_count", cleaned_df_pin["follower_count"].cast("integer"))

# Ensure each column containing numeric data has a numeric data type
cleaned_df_pin = cleaned_df_pin.withColumn("downloaded", cleaned_df_pin["downloaded"].cast("integer"))
cleaned_df_pin = cleaned_df_pin.withColumn("index", cleaned_df_pin["index"].cast("integer"))

# Clean the data in the save_location column to include only the save location path
cleaned_df_pin = cleaned_df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

# Rename the index column to ind
cleaned_df_pin = cleaned_df_pin.withColumnRenamed("index", "ind")

# Reorder the DataFrame columns
cleaned_df_pin = cleaned_df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")
# This removes the 'download' column

In [0]:
display(cleaned_df_pin)

In [0]:
cleaned_df_pin.printSchema()

In [0]:
# Deserialize streaming data
df_geo = df_geo.selectExpr("CAST(data as STRING)")
# Parse JSON Data into format suitable for PySpark
# from pyspark.sql.functions import from_json
# from pyspark.sql.types import StructType, StructField, StringType, LongType

# Define schema of JSON data
schema = StructType([
    StructField("country", StringType(), True),
    StructField("ind", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("timestamp", StringType(), True)
])
# Parse JSON data
df_geo = df_geo.withColumn("jsonData", from_json("data", schema)).select("jsonData.*")
df_geo = df_geo.dropna(how="all")
display(df_geo)

In [0]:
# Clean df_geo DataFrame

# Create new column 'coordinates' containing an array based on the 'latitude' and 'longitude' columns
from pyspark.sql.functions import array
cleaned_df_geo = df_geo.withColumn("coordinates", array("latitude", "longitude"))

# Drop the latitude and longitude columns from DataFrame
cleaned_df_geo = (cleaned_df_geo.drop("latitude")
                                .drop("longitude"))

# Convert the timestamp column from a string to a timestamp data type
from pyspark.sql.functions import to_timestamp
cleaned_df_geo = cleaned_df_geo.withColumn("timestamp", to_timestamp("timestamp"))

# Reorder the DataFrame columns
cleaned_df_geo = cleaned_df_geo.select("ind", "country", "coordinates", "timestamp")

In [0]:
display(cleaned_df_geo)

In [0]:
cleaned_df_geo.printSchema()

In [0]:
# Deserialize streaming data
df_user = df_user.selectExpr("CAST(data as STRING)")
# Parse JSON Data into format suitable for PySpark
# from pyspark.sql.functions import from_json
# from pyspark.sql.types import StructType, StructField, StringType, LongType

# Define schema of JSON data
schema = StructType([
    StructField("age", StringType(), True),
    StructField("date_joined", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("ind", StringType(), True),
    StructField("last_name", StringType(), True)
])
# Parse JSON data
df_user = df_user.withColumn("jsonData", from_json("data", schema)).select("jsonData.*")
df_user = df_user.dropna(how="all")
display(df_user)

In [0]:
# Clean df_user DataFrame

# Create new column user_name that concatenates the first_name and last_name columns
from pyspark.sql.functions import concat
cleaned_df_user = df_user.withColumn("user_name", concat("first_name", "last_name"))

# Drop the first_name and last_name columns from the DataFrame
cleaned_df_user = (cleaned_df_user.drop("first_name")
                                    .drop("last_name"))

# Convert the date_joined column from a string to a timestamp data type
from pyspark.sql.functions import to_timestamp
cleaned_df_user = cleaned_df_user.withColumn("date_joined", to_timestamp("date_joined"))

# Reorder the DataFrame columns
cleaned_df_user = cleaned_df_user.select("ind", "user_name", "age", "date_joined")

In [0]:
cleaned_df_user.printSchema()
display(cleaned_df_user)

In [0]:
cleaned_df_user.printSchema()

In [0]:
cleaned_df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a1667ad2f7f_pin_table")

In [0]:
cleaned_df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a1667ad2f7f_geo_table")

In [0]:
cleaned_df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a1667ad2f7f_user_table")