#Imports

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

#AWS Credentials

In [0]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


# Read & Write Streams Functions

In [0]:
def read_stream(kinesis_stream_name):
    """
    Reads data from a Kinesis stream.

    Args:
        kinesis_stream_name (str): Name of the Kinesis stream.

    Returns:
        DataFrame: Spark DataFrame containing the streaming data from Kinesis.
    """
    # Configure the Kinesis stream reader
    dataframe = spark \
        .readStream \
        .format('kinesis') \
        .option('streamName', kinesis_stream_name) \
        .option('initialPosition', 'earliest') \
        .option('region', 'us-east-1') \
        .option('awsAccessKey', ACCESS_KEY) \
        .option('awsSecretKey', SECRET_KEY) \
        .load()

    return dataframe

def write_stream_df_to_table(dataframe, kinesis_stream_name):
    '''
    Writes a DataFrame to a Delta table using the specified Kinesis stream name.

    Args:
        dataframe (DataFrame): Spark DataFrame to be written.
        kinesis_stream_name (str): Name of the Kinesis stream.

    Returns:
        None
    '''
    # Configure the Delta table writer for the streaming DataFrame
    dataframe.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", "/tmp/kinesis/_checkpoints/" + kinesis_stream_name) \
        .table(kinesis_stream_name)
        

# Clean Data Functions

In [0]:
def clean_dataframe_pin(df_pin):
    """
    Cleans the input DataFrame df_pin by replacing null or irrelevant data with default values.

    Parameters:
    - df_pin: Input DataFrame to be cleaned.

    Returns:
    - Cleaned DataFrame.
    """

    # Dictionary containing default values to replace null or irrelevant data
    null_dicts = {
        "description": "No description available",
        "follower_count": "User Info Error",
        "image_src": "Image src error.",
        "poster_name": "User Info Error",
        "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
        "title": "No Title Data Available"
    }

    # Drop any duplicate rows in the dataframe (if needed)
    # df_pin = df_pin.dropDuplicates(subset=["unique_id"])

    # Replace empty and non-relevant data in each column with specified values
    for key, value in null_dicts.items():
        if key == "follower_count":
            # Replace specified value with "0" in the "follower_count" column
            df_pin = df_pin.withColumn(key, regexp_replace(key, value, "0"))
        elif key == "description":
            # Replace the description column with "None" if it starts with the specified value, otherwise keep the original value
            df_pin = df_pin.withColumn(key, when(col(key).startswith(value), "None").otherwise(col(key)))
        else:
            # Replace specified value with "None" in other columns
            df_pin = df_pin.withColumn(key, regexp_replace(key, value, "None"))

    # Convert follower_count values with "k", "M", "B" suffixes to numeric values
    df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
    df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "0000"))
    df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "B", "00000"))
    df_pin = df_pin.withColumn("follower_count", df_pin["follower_count"].cast("int"))

    # Remove "Local save in" from save_location column
    df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in", ""))

    # Rename the "index" column to "ind"
    df_pin = df_pin.withColumnRenamed("index", "ind")

    # Define the desired order of columns
    column_order_pin = [
        "ind",
        "unique_id",
        "title",
        "description",
        "follower_count",
        "poster_name",
        "tag_list",
        "is_image_or_video",
        "image_src",
        "save_location",
        "category"
    ]

    # Select and reorder the columns in the specified order
    df_pin = df_pin.select(column_order_pin)


    return df_pin

In [0]:
def clean_dataframe_geo(df_geo):
    """
    Cleans the DataFrame containing geographical data.

    Args:
        df_geo (DataFrame): Input DataFrame containing geographical data.

    Returns:
        DataFrame: Cleaned DataFrame with specified columns and transformations.
    """
    # Select relevant columns from the input DataFrame
    df_geo = df_array[1].select(*df_array[1].columns)
    
    # Uncomment the line below if duplicate rows need to be dropped
    # df_geo = df_geo.dropDuplicates()

    # Create a new column "coordinates" by combining "latitude" and "longitude"
    df_geo = df_geo.withColumn("coordinates", array("latitude", "longitude"))

    # Drop the original "latitude" and "longitude" columns
    df_geo = df_geo.drop("latitude", "longitude")

    # Convert the "timestamp" column to a timestamp type
    df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))

    # Define the desired order of columns
    column_order_geo = [
        "ind",
        "country",
        "coordinates",
        "timestamp"
    ]
    
    # Select and reorder the columns in the specified order
    df_geo = df_geo.select(column_order_geo)
    
    return df_geo

In [0]:
def clean_dataframe_user(df_user):
    """
    Cleans the DataFrame containing user data.

    Args:
        df_user (DataFrame): Input DataFrame containing user data.

    Returns:
        DataFrame: Cleaned DataFrame with specified columns and transformations.
    """
    # Concatenate "first_name" and "last_name" columns to create a new "user_name" column
    df_user = df_user.withColumn("user_name", concat("first_name", lit(" "), "last_name"))

    # Drop the original "first_name" and "last_name" columns
    df_user = df_user.drop("first_name", "last_name")

    # Convert the "date_joined" column to a timestamp type
    df_user = df_user.withColumn("date_joined", to_timestamp("date_joined"))

    # Select the desired columns
    df_user = df_user.select("ind", "user_name", "age", "date_joined")

    return df_user

In [0]:
pin_stream = read_stream('streaming-0ae9e110c9db-pin')
geo_stream = read_stream('streaming-0ae9e110c9db-geo')
user_stream = read_stream('streaming-0ae9e110c9db-user')

In [0]:
# Define the schema for the "pin" DataFrame
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

# Define the schema for the "geo" DataFrame
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])

# Define the schema for the "user" DataFrame
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

In [0]:
def deserialise_datastream(df, schema):
    """
    Deserializes a DataFrame containing serialized data using a specified schema.

    Args:
        df (DataFrame): Input DataFrame containing serialized data.
        schema (StructType): Spark StructType representing the schema of the deserialized data.

    Returns:
        DataFrame: Deserialized DataFrame with columns based on the specified schema.
    """
    # Cast the "data" column to STRING type
    df = df.selectExpr("CAST(data as STRING)")

    # Apply the from_json function to deserialize the "data" column based on the provided schema
    new_df = df.select(from_json("data", schema)).select("from_json(data).*")

    return new_df

#Deserialize streams

In [0]:
df_pin = deserialise_datastream(pin_stream, pin_schema)
df_geo = deserialise_datastream(geo_stream, geo_schema)
df_user = deserialise_datastream(user_stream, user_schema)

In [0]:
display(df_pin)

index,unique_id,title,description,poster_name,follower_count,tag_list,is_image_or_video,image_src,downloaded,save_location,category
2863,9bf39437-42a6-4f02-99a0-9a0383d8cd70,25 Super Fun Summer Crafts for Kids - Of Life and Lisa,Keep the kids busy this summer with these easy diy crafts and projects. Creative and…,Of Life & Lisa | Lifestyle Blog,124k,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",image,https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg,1,Local save in /data/diy-and-crafts,diy-and-crafts
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,No description available Story format,User Info Error,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,0,Local save in /data/mens-fashion,mens-fashion
8304,5b6d0913-25e4-43ab-839d-85d5516f78a4,The #1 Reason You’re Not His Priority Anymore - Matthew Coast,#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself,Commitment Connection,51k,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",image,https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png,1,Local save in /data/quotes,quotes
1313,44662045-e891-4821-8a19-ebe7eedd371a,Liquid Lash Extensions Mascara,"Instantly create the look of lash extensions with this award-winning, best-selling mascara that won't clump, flake or smudge. Available in 3 shades!",Thrive Causemetics,43k,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",video,https://i.pinimg.com/videos/thumbnails/originals/69/84/e2/6984e20f3e262098fa9c0614c3453254.0000001.jpg,1,Local save in /data/beauty,beauty
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",Consuelo Aguirre,0,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,1,Local save in /data/finance,finance
8731,ea760f71-febf-4023-b592-d17396659039,20 Koi Fish Tattoos For Lucky Men,"Koi fish tattoos are a popular choice for men who want to make a statement, thanks to their rich symbolism and bold design.",TheTrendSpotter,211k,"Dr Tattoo,Wörter Tattoos,Pisces Tattoos,Tatoo Art,Dream Tattoos,Dope Tattoos,Mini Tattoos,Finger Tattoos,Body Art Tattoos",image,https://i.pinimg.com/originals/8a/0c/0a/8a0c0a7b6236565c519acd41ad1a52c0.jpg,1,Local save in /data/tattoos,tattoos


In [0]:
display(df_geo)

ind,timestamp,latitude,longitude,country
5730,2021-04-19T17:37:03.000+0000,-77.015,-101.437,Colombia
8304,2019-09-13T04:50:29.000+0000,-28.8852,-164.87,French Guiana
4315,2019-12-15T03:51:28.000+0000,-45.8508,66.1003,Cote d'Ivoire
5069,2021-03-20T09:32:44.000+0000,-63.0063,-157.474,Azerbaijan
2418,2022-05-27T11:30:59.000+0000,-88.4642,-171.061,Antarctica (the territory South of 60 deg S)
3156,2018-01-13T19:33:49.000+0000,-84.738,-160.795,Armenia
7343,2018-08-06T12:17:07.000+0000,-65.4428,-161.684,Australia
2074,2019-11-03T05:41:59.000+0000,-52.3213,-50.11,Central African Republic
2411,2020-11-15T17:10:07.000+0000,-71.6856,-179.126,Albania
2698,2021-11-24T08:33:51.000+0000,-72.7174,24.169,Egypt


In [0]:
display(df_user)

ind,first_name,last_name,age,date_joined
5494,Anne,Allen,27,2015-12-16T15:20:05.000+0000
2923,Brian,Nelson,26,2015-11-11T03:20:57.000+0000
9875,Brendan,Joseph,26,2015-12-20T10:28:00.000+0000
2418,Amanda,Adams,20,2015-10-21T08:27:36.000+0000
3156,Andrew,Baker,22,2015-12-21T08:06:54.000+0000
9672,Jennifer,Hudson,22,2016-02-11T20:46:04.000+0000
7922,Denise,Adams,21,2015-11-12T06:21:36.000+0000
4137,Michael,Decker,59,2017-06-29T22:35:17.000+0000
9546,Alex,Barnes,20,2016-02-27T21:13:44.000+0000
2698,Kayla,Burton,44,2017-06-21T19:53:27.000+0000


In [0]:
cleaned_df_pin = clean_dataframe_pin(df_pin)
cleaned_df_geo = clean_dataframe_geo(df_geo)
cleaned_df_user = clean_dataframe_user(df_user)

#Write data to Delta tables

In [0]:
# Write the cleaned DataFrames to a Delta table
write_stream_df_to_table(cleaned_df_pin, "0ae9e110c9db_pin_table")
write_stream_df_to_table(cleaned_df_geo, "0ae9e110c9db_geo_table")
write_stream_df_to_table(cleaned_df_user, "0ae9e110c9db_user_table")

# Comments:
# - Each function call writes the corresponding cleaned DataFrame to a Delta table.
# - The first argument is the cleaned DataFrame to be written.
# - The second argument is the name of the Delta table where the DataFrame will be stored.