In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import traceback
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the delta table to a Spark DataFrame
aws_keys_df = spark.read.format('delta').load(delta_table_path)

ACCESS_KEY = aws_keys_df.select('Access Key ID').collect()[0]['Access Key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']

ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# Print the content of the DataFrame to debug any issues
# aws_keys_df.show()

In [None]:
pin_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

geo_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("country", StringType(), True)
])
user_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("date_joined", TimestampType(), True)
])

def get_stream(stream_name: str):
    '''Uses spark.readStream to retrieve Kinesis stream and returns stream as dataframe'''
    dataframe = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', stream_name) \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .option("format", "json") \
    .load()
    return dataframe

def deserialize_stream(stream, schema):
    '''Takes stream dataframe and schema, deserializes data from stream and returns data as dataframe'''
    dataframe = stream \
    .selectExpr("CAST(data as STRING)") \
    .withColumn("data", from_json(col("data"), schema)) \
    .select(col("data.*"))
    return dataframe


In [None]:
# Clean the data in the save_location column to include only the save location path
def check_save_location(value):
    if not 'Local save in' in value:
        value = None
    return value

def pin_data_clean(datas):
     # Specify columns to cast to numeric data type
    numeric_columns = ["downloaded", "index", "follower_count"]
    
    # Replacing Missing Values  
    cleaned_df = datas.replace({'User Info Error': None})
    
    # Iterate through the numeric columns and cast them to IntegerType
    for col_name in numeric_columns:
        cleaned_df = cleaned_df.withColumn(col_name, cleaned_df[col_name].cast(IntegerType()))
        
    # Register the UDF
    replace_values_udf = udf(check_save_location, StringType())

    # Rename column 'index' to 'ind'
    renamed_df = cleaned_df.withColumnRenamed('index', 'ind')

    # Reorder the DataFrame columns
    new_order = ["ind", "unique_id", "title", "downloaded", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"]
    cleaned_df = renamed_df.select(*new_order)

    cleaned_df.printSchema()
    return cleaned_df
            

In [None]:
def geo_data_clean(datas):
    # Create a new column "Coordinates" based on Latitude and Longitude 
    df_with_coordinates = datas.withColumn("coordinates", array(col("latitude"), col("longitude")))

    # Drop the latitude and longitude columns from the DataFrame
    columns_to_drop = ["latitude", "longitude"]
    df_without_coordinates = df_with_coordinates.drop(*columns_to_drop)
    
    # Change timestamp datatype to timestamp
    df_with_timestamp = df_without_coordinates.withColumn("timestamp", col("timestamp").cast("timestamp"))
    
    # Reorder the DataFrame columns
    df_with_timestamp = df_with_timestamp.withColumnRenamed('index', 'ind')

    # Drop duplicate rows
    df_with_timestamp = df_with_timestamp.dropDuplicates()

    new_order = ["ind", "country", "coordinates", "timestamp"]
    df_reordered = df_with_timestamp.select(*new_order)

    df_reordered.printSchema()
    return df_reordered

def user_data_clean(datas): 
    # Create a new column user_name that concatenates the information found in the first_name and last_name columns
    df_with_fullname = datas.withColumn("user_name", concat(col("first_name"),lit(" "), col("last_name")))

    # Drop the first_name and last_name columns from the DataFrame
    columns_to_drop = ["first_name", "last_name"]
    df_with_fullname= df_with_fullname.drop(*columns_to_drop)

    # Convert the date_joined column from a string to a timestamp data type
    cleaned_df = df_with_fullname.withColumn("date_joined", col("date_joined").cast("timestamp"))

    # Reorder the DataFrame columns
    cleaned_df = cleaned_df.withColumnRenamed('index', 'ind')
        
    new_order = ["ind", "user_name", "age", "date_joined"]
    df_reordered = cleaned_df.select(*new_order)

    # Drop duplicate rows
    df_reordered = df_reordered.dropDuplicates()

    df_reordered.printSchema()
    return df_reordered 

In [None]:
pin_steam = get_stream("streaming-0a2f66c3e41f-pin")
pin_deserialize = deserialize_stream(pin_steam, pin_schema)
df_pin = pin_data_clean(pin_deserialize)
display(df_pin)

ind,unique_id,title,downloaded,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,0,,,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,Local save in /data/mens-fashion,mens-fashion
2863,9bf39437-42a6-4f02-99a0-9a0383d8cd70,25 Super Fun Summer Crafts for Kids - Of Life and Lisa,1,,Of Life & Lisa | Lifestyle Blog,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",image,https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg,Local save in /data/diy-and-crafts,diy-and-crafts
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,1,0.0,Consuelo Aguirre,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,Local save in /data/finance,finance
8304,5b6d0913-25e4-43ab-839d-85d5516f78a4,The #1 Reason You’re Not His Priority Anymore - Matthew Coast,1,,Commitment Connection,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",image,https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png,Local save in /data/quotes,quotes
8731,ea760f71-febf-4023-b592-d17396659039,20 Koi Fish Tattoos For Lucky Men,1,,TheTrendSpotter,"Dr Tattoo,Wörter Tattoos,Pisces Tattoos,Tatoo Art,Dream Tattoos,Dope Tattoos,Mini Tattoos,Finger Tattoos,Body Art Tattoos",image,https://i.pinimg.com/originals/8a/0c/0a/8a0c0a7b6236565c519acd41ad1a52c0.jpg,Local save in /data/tattoos,tattoos
1313,44662045-e891-4821-8a19-ebe7eedd371a,Liquid Lash Extensions Mascara,1,,Thrive Causemetics,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",video,https://i.pinimg.com/videos/thumbnails/originals/69/84/e2/6984e20f3e262098fa9c0614c3453254.0000001.jpg,Local save in /data/beauty,beauty
4315,21b59ba9-829d-4c33-8c27-4cd4c56d26b8,Podcasts for Teachers or Parents of Teenagers,1,,Math Giraffe,"Middle School Classroom,High School Students,High School Teachers,Middle School Tips,High School Counseling,Ela Classroom,High School Science,Future Classroom,Google Classroom",image,https://i.pinimg.com/originals/50/19/31/501931a27ee4d076658980851b995b2c.jpg,Local save in /data/education,education
10794,c4bd2577-a7bb-4409-bb7a-17d5ed7e1cf1,TireBuyer,1,437.0,Ray Uyemura,"Lowrider,Old Vintage Cars,Antique Cars,Austin Martin,Nissan Gtr Black,Jaguar,1959 Cadillac,Cadillac Ct6,Old School Cars",image,https://i.pinimg.com/originals/0d/29/9f/0d299f3df020395aa7ce8387f40fbeed.jpg,Local save in /data/vehicles,vehicles
5494,8fb2af68-543b-4639-8119-de33d28706ed,Dave Ramsey's 7 Baby Steps: What Are They And Will They Work For You,1,,"Living Low Key | Save Money, Make Money, & Frugal Living","Financial Peace,Financial Tips,Saving Money Quotes,Total Money Makeover,Budgeting Finances,Money Management,Wealth Management,Personal Finance,Making Ideas",image,https://i.pinimg.com/originals/1e/9d/90/1e9d906e4e150e3b95187f3b76ea7c71.png,Local save in /data/finance,finance
5069,b75b6f87-deb3-444f-b29e-ce9161b2df49,The Vault: Curated & Refined Wedding Inspiration,1,,Style Me Pretty,"60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design",image,https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg,Local save in /data/event-planning,event-planning


In [None]:
geo_stream = get_stream("streaming-0a2f66c3e41f-geo")
geo_deserialize = deserialize_stream(geo_stream,geo_schema)
df_geo = geo_data_clean(geo_deserialize)
display(df_geo)

ind,country,coordinates,timestamp
10625,Jamaica,"List(-84.4944, -81.0613)",2018-07-13T11:51:15.000+0000
428,Bangladesh,"List(-83.4105, -150.788)",2020-09-27T18:46:41.000+0000
9979,Dominican Republic,"List(14.9967, -120.682)",2018-07-18T19:01:46.000+0000
7922,Antigua and Barbuda,"List(-88.0974, -172.052)",2021-01-27T09:14:19.000+0000
7790,Papua New Guinea,"List(-43.692, 64.9839)",2018-07-31T08:19:15.000+0000
9875,Barbados,"List(-74.3382, -110.484)",2020-03-20T13:03:18.000+0000
1313,Maldives,"List(77.0447, 61.9119)",2018-06-26T02:39:25.000+0000
8653,Seychelles,"List(48.4569, -139.658)",2022-04-11T18:30:19.000+0000
3156,Armenia,"List(-84.738, -160.795)",2018-01-13T19:33:49.000+0000
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47.000+0000


In [None]:
user_stream = get_stream("streaming-0a2f66c3e41f-user")
user_deserialize = deserialize_stream(user_stream,user_schema)
df_user = user_data_clean(user_deserialize)
display(df_user)

ind,user_name,age,date_joined
7528,Abigail Ali,20,2015-10-24T11:23:51.000+0000
428,Claudia Adams,20,2015-11-28T02:20:29.000+0000
3454,Robert Murphy,48,2017-09-26T16:31:56.000+0000
205,Brett Bryant,20,2015-10-23T12:40:19.000+0000
7166,Alvin Adams,20,2016-01-01T13:50:40.000+0000
9979,Kaylee Miller,31,2016-11-09T19:50:51.000+0000
5069,Amanda Ball,25,2016-01-13T17:36:30.000+0000
8887,Austin Rodriguez,24,2016-03-31T20:56:39.000+0000
10663,Julie Cox,23,2016-06-23T14:38:00.000+0000
5468,Lisa Gamble,20,2016-07-23T20:51:06.000+0000


In [None]:
df_pin.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
    .table("0a2f66c3e41f_pin_table")

df_geo.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
    .table("0a2f66c3e41f_geo_table")

df_user.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
    .table("0a2f66c3e41f_user_table")

In [None]:
# Read data from Delta table
df = spark.read.format("delta").table("0a2f66c3e41f_user_table")

# Display the contents of the DataFrame
df.show(truncate=False)