In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false


In [None]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

In [None]:
def read_stream_data(data_name):
    df = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName',f'streaming-0e0816526d11-{data_name}') \
        .option('initialPosition','earliest') \
        .option('region','us-east-1') \
        .option('awsAccessKey', ACCESS_KEY) \
        .option('awsSecretKey', SECRET_KEY) \
        .load()
    df = df.selectExpr("CAST(data as STRING)") # extracting the data in the stream
    return df

def write_stream_data(df, data_name):
    df.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
        .table(f"0e0816526d11_{data_name}_table")
    return df


In [None]:
pin_streaming_schema = StructType([
    StructField("index", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

df_pin = read_stream_data("pin")

# extracting the fields from a JSON string and converting them into columns of a DataFrame
df_pin = df_pin.withColumn("jsonData",from_json(col("data"),pin_streaming_schema)) \
                   .select("jsonData.*")

# cleaning Pinterest dataframe
df_pin = df_pin.replace("No description available Story format", None)
df_pin = df_pin.replace("null", None)
df_pin = df_pin.replace("User Info Error", None)
df_pin = df_pin.replace("Image src error", None)
df_pin = df_pin.replace("N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", None)
df_pin = df_pin.replace("No Title Data Available", None)

# transforming the 'follower_count' col from string to integer and checking if the value matches a pattern that contains either k or M 
df_pin = df_pin.withColumn("follower_count", when(
    col("follower_count").rlike("\d+k"),(regexp_extract(col("follower_count"),"(\d+)",1).cast("integer") * 1000)).when(col("follower_count").rlike("\d+M"),(regexp_extract(col("follower_count"), "(\d+)", 1).cast("integer") * 1000000))
# otherwise, if it doesn't match it leaves the full integer value
.otherwise(col("follower_count").cast("integer")))

# cleaning the 'save_location' column by removing 'Local save in ' text and just leaving the path for the 'save_location' column
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

# Renaming the column
df_pin = df_pin.withColumnRenamed("index", "ind")

# removes the row that contains Null value
df_pin = df_pin.na.drop()

# rearranging the Pinterest columns
reorder_col = ["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"]
df_pin = df_pin.select(reorder_col)

display(df_pin)
write_stream_data(df_pin, "pin")


In [None]:
geo_streaming_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("country", StringType(), True)
])

df_geo = read_stream_data("geo")

# extracting the fields from a JSON string and converting them into columns of a DataFrame
df_geo = df_geo.withColumn("jsonData",from_json(col("data"),geo_streaming_schema)) \
                   .select("jsonData.*")

# created a new column containing latitude and longitude
df_geo = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

# dropping columns
df_geo = df_geo.drop("latitude", "longitude")
df_geo = df_geo.withColumn("timestamp", col("timestamp").cast("timestamp"))

geo_reorder_col = ["ind", "country", "coordinates", "timestamp"]
df_geo = df_geo.select(geo_reorder_col)

display(df_geo)
write_stream_data(df_geo, "geo")

In [None]:
user_streaming_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("date_joined", StringType(), True)
])

df_user = read_stream_data("user")

# extracting the fields from a JSON string and converting them into columns of a DataFrame
df_user = df_user.withColumn("jsonData",from_json(col("data"),user_streaming_schema)) \
                   .select("jsonData.*")

df_user = df_user.withColumn("user_name", concat(col("first_name"),lit(" "),col("last_name")))
df_user = df_user.drop("first_name","last_name","index")
df_user = df_user.withColumn("date_joined", col("date_joined").cast("timestamp"))
df_user = df_user.dropDuplicates(["user_name", "age", "date_joined"])

user_reorder_col = ["ind","user_name","age","date_joined"]
df_user = df_user.select(user_reorder_col)

display(df_user)
write_stream_data(df_user, "user")