In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark Dataframe
aws_keys_df = spark.read.format("delta").load(delta_table_path)

#print(aws_keys_df)

# Get the AWS Access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select("Access Key ID").collect()[0]["Access Key ID"]
SECRET_KEY = aws_keys_df.select("Secret access key").collect()[0]["Secret access key"]

# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
spark.conf.set("spark.databricks.delta.formatCheck.enabled", "false")


#Uses spark.readStream to retrieve Kinesis stream as dataframe
df_pin = spark \
 .readStream \
 .format('kinesis') \
 .option('streamName','streaming-129bc7e0bd61-pin') \
 .option('region','us-east-1') \
 .option('intitPosition','earliest') \
 .option('awsAccessKey', ACCESS_KEY) \
 .option('awsSecretKey', SECRET_KEY) \
 .load()

df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-129bc7e0bd61-geo') \
.option('region','us-east-1') \
.option('intitPosition','earliest') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-129bc7e0bd61-user') \
.option('region','us-east-1') \
.option('intitPosition','earliest') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
.readStream \
.format('kinesis') \
.option('streamName','streaming-129bc7e0bd61-user') \
.option('region','us-east-1') \
.option('intitPosition','earliest') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

# define schemas for each of the dataframes
pin_schema = StructType([StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

# Deserialize the data
df_pin = df_pin.selectExpr("CAST(data AS STRING)") \
    .withColumn("parsed_data", from_json(col("data"), pin_schema)) \
    .select(col("parsed_data.*"))
#display(df_pin)


df_geo = df_geo.selectExpr("CAST(data AS STRING)") \
    .withColumn("parsed_data", from_json(col("data"), geo_schema)) \
    .select(col("parsed_data.*"))
#display(df_geo)

df_user = df_user.selectExpr("CAST(data AS STRING)") \
    .withColumn("parsed_data", from_json(col("data"), user_schema)) \
    .select(col("parsed_data.*"))
display(df_user)

ind,first_name,last_name,age,date_joined
4654,Nathan,Howard,37,2016-03-06T21:35:04.000+0000
6939,Erin,Thompson,35,2016-10-21T03:38:15.000+0000
5243,David,Craig,30,2016-01-30T06:09:57.000+0000
2630,Alexis,Bryant,28,2016-10-13T13:33:09.000+0000
2454,Bernard,Arnold,21,2015-12-12T01:39:29.000+0000
4974,Beth,Ali,20,2015-12-08T16:44:30.000+0000
10637,John,Mckenzie,21,2016-11-10T03:18:38.000+0000
6008,Antonio,Aguilar,42,2015-11-16T00:07:37.000+0000
7362,Aaron,Anderson,21,2015-10-23T03:43:54.000+0000
4114,Haley,Malone,39,2016-01-14T01:50:15.000+0000


In [None]:
from pyspark.sql.functions import col, when
from pyspark.sql.functions import regexp_replace, col
#display(df_pin)
# irrelevant_values = [
#     "No description available%",  
#     "No Data Available%"
# ]

# # Apply transformations for each value in irrelevant_values
# for value in irrelevant_values:
#     df_pin = df_pin.withColumn(
#         "description", 
#         when(col("description").like(value), None).otherwise(col("description"))
#     )

irrelevant_values_dict = {
    "description": "No description available%",
    "follower_count": "User Info Error",
    "image_src": "Image src error.",
    "poster_name": "User Info Error",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
    "title": "No Title Data Available"
}

# Apply transformations for each column and value
for col_name, value in irrelevant_values_dict.items():
    df_pin = df_pin.withColumn(
        col_name, 
        when(col(col_name).like(value), None).otherwise(col(col_name))
    )

df_pin = df_pin.withColumn("follower_count", regexp_replace(col("follower_count"), "[kK]", "000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace(col("follower_count"), "[mM]", "000000"))
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast('int'))
df_pin = df_pin.withColumn("save location", regexp_replace(col("save_location"), "Local save in", ""))
df_pin = df_pin.withColumnRenamed("index", "ind")
new_pin_column_order = [
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
]
df_pin = df_pin.select(new_pin_column_order)


display(df_pin)    

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
6072,7da9439a-fdc5-4f2c-85cc-afa123e27395,One Of A Kind Bathroom Decor & Design Checklist,Disciplined provided wonderful #bathroom remodeling as well as decor inspiration find out,1000000.0,Southern Living,"Shiplap Bathroom,Small Bathroom,Bathroom Ideas,Bathroom Vintage,Bathroom Fixtures,Bathroom Remodeling,Bathroom Cabinets,Relaxing Bathroom,Master Bathrooms",image,https://i.pinimg.com/originals/f3/a3/e8/f3a3e89199d1398dcd06086a89f4d9f2.jpg,Local save in /data/home-decor,home-decor
8250,09d2a5e3-8e0c-43cf-bb3b-edf241aff1c6,50 Spot-On Motivational Quotes That Will Make Your Heart Soar,You only fail when you stop trying.,166000.0,Women.com,"Great Motivational Quotes,Great Quotes,Inspirational Quotes,Quotes Positive,Great Woman Quotes,Positive Affirmations,Life Quotes Love,Quotes To Live By,Me Quotes",image,https://i.pinimg.com/originals/0f/cc/9e/0fcc9e4cb1f2b38f7a64f15f944c26b8.jpg,Local save in /data/quotes,quotes
7152,110b049c-0e83-4a3a-81b4-11ebc8b71351,Kings Of Leon - Sex On Fire - Women's T-shirt - Heather Dark Grey / XL,"Women's T-shirt. Design inspired by the song ""Sex On Fire"" by Kings Of Leon, a band based in Nashville, Tennessee. This song, which was the first single from Only by the Night,…",27.0,Mala Rock | Rock T-shirts,"Only By The Night,Dark Heather Color,Kings Of Leon,Music Charts,Rock T Shirts,Nashville Tennessee,Great Bands,Reception,September",image,https://i.pinimg.com/originals/c2/1c/6e/c21c6ebbca1dbab9c66c7fce73175c57.jpg,Local save in /data/mens-fashion,mens-fashion
6227,5708d340-7c06-4ce0-a7a2-b991123e2972,Small Entryway Ideas: 57 Foyer Decorating Ideas For Small Foyers and Apartment Entryways - Clever DIY Ideas,Beautiful entryway bench and small entryway decor ideas - would looks great in a small foyer or apartment entryway.,4000.0,Jen's Clever DIY,"Foyer Decorating,Farmhouse Style Decorating,Farmhouse Design,Interior Decorating,Decorating Tips,Decorating Ideas For The Home Living Room,Decorating Bedrooms,Farmhouse Furniture,Furniture Decor",image,https://i.pinimg.com/originals/68/8d/6b/688d6b09fe424138fe01af7c17b74d4c.jpg,Local save in /data/home-decor,home-decor
3923,8384d54e-6947-407a-bce9-f332b7ccaf91,Simple Steps for Effective Lesson Planning,Simple steps for lesson planning! Great for high school classes.,54000.0,The Daring English Teachier,"Instructional Strategies,Teaching Strategies,Teaching Tips,Teaching Art,Instructional Planning,Instructional Technology,English Lesson Plans,English Lessons,History Lesson Plans",image,https://i.pinimg.com/originals/76/47/0a/76470a883aa76f10e04ecc172a05f8a3.jpg,Local save in /data/education,education
6741,4ad74ca1-a56f-4439-b6f9-aea55271e654,Finities® Script Hoodie,The Finities® Script men's pullover hoodie is made with 80% cotton and 20% polyester premium fleece. This street-ready hoodie also features:Screen print graphicDrawcord-adjustab…,177.0,FINITIES,"Hooded Sweatshirts,Hoodies,Graphic Prints,Script,Screen Printing,Street Wear,Mens Fashion,Fashion Trends,Menswear",image,https://i.pinimg.com/originals/66/46/a7/6646a7d4f55632bc5aff82a4dc96d0fe.png,Local save in /data/mens-fashion,mens-fashion
6117,4aed8f62-76e4-4c7d-8544-40075ff7155e,Fall Decor Ideas - The Evolution of a Home Tour,Hello again friends. It's slowly starting to feel like Autumn around here and today I'm,47000.0,craftberry bush,"Fall Home Decor,Autumn Home,Diy Home Decor,Autumn Fall,Decoration Inspiration,Autumn Inspiration,Decor Ideas,Autumn Ideas,Fall Door Decorations",image,https://i.pinimg.com/originals/1b/33/8b/1b338b50a2ec909af1f0e6e1dd28b170.png,Local save in /data/home-decor,home-decor
3961,66d91344-bb57-4c32-aab6-dccd30d3e697,Kingsolver,"Easily the most eye-catching of our Education sets, Kingsolver will help your child make a fun & daring statement! Printed on thick 100# Lynx cardstock, this is a no-tear, no-bl…",43.0,List & File,,image,https://i.pinimg.com/originals/97/23/fb/9723fb85b8ec427bcf1c6e3245402d3b.jpg,Local save in /data/education,education
6783,66bd73a1-ed3f-4592-bb83-836978cd2ebc,My DeepBlue>Wood luxury bracelet with FC718,"This is my wood variation of DeepBlue luxury bracelets: the agate-blue stones are joined by wooden beads instead of onyx, jade, or lapis stones.",25.0,Vinchesi Designs,"Stainless Steel Wire,Wood Design,Wooden Beads,Bracelets For Men,Blue Stones,Sterling Silver,Luxury,Agate,Handmade",image,https://i.pinimg.com/originals/67/aa/4f/67aa4f18f661b22bd5639bedd70d7fa2.jpg,Local save in /data/mens-fashion,mens-fashion
4225,23cddfcf-bfaf-499a-8fbd-722957548916,Sticky Note Sight Word Match,"Learning sight words can be challenging, but it doesn't have to be boring. This sight word match game is perfect for early readers at school or home!",68000.0,The Kindergarten Connection,"Preschool Sight Words,Learning Sight Words,Sight Word Practice,Sight Word Activities,Phonics Activities,Listening Activities,Kindergarten Sight Word Games,Sight Word Bingo,Sight Word Flashcards",image,https://i.pinimg.com/originals/7e/25/9c/7e259cfa52efae67174e3d4c550cf2e6.jpg,Local save in /data/education,education


In [None]:
# Cleaning Geo stream data
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import array, col
#display(df_geo)
# Create a new column 'coordinates' that contains an array with latitude and longitude
df_geo = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))
# Reorder the DataFrame columns
df_geo = df_geo.select("ind","country","coordinates","timestamp")
display(df_geo)


ind,country,coordinates,timestamp
6117,Afghanistan,"List(33.2467, -132.693)",2022-09-09T07:13:46.000+0000
3961,Canada,"List(44.8253, -71.7851)",2020-01-29T21:54:03.000+0000
6783,Philippines,"List(64.2157, -147.73)",2021-10-27T01:48:34.000+0000
4225,New Zealand,"List(7.1723, 130.363)",2018-04-14T23:42:54.000+0000
180,Korea,"List(-51.051, -51.6278)",2021-12-12T03:32:23.000+0000
7583,Algeria,"List(-89.5173, -179.689)",2018-03-26T00:01:24.000+0000
8507,American Samoa,"List(-88.2286, -178.919)",2022-06-13T19:49:28.000+0000
3255,Puerto Rico,"List(-21.9964, -56.7605)",2017-10-24T14:58:02.000+0000
4505,Faroe Islands,"List(0.0485475, -16.5635)",2018-06-20T22:11:03.000+0000
9268,American Samoa,"List(-88.5255, -161.644)",2020-03-16T23:13:51.000+0000


In [None]:
# Cleaning User stream data
from pyspark.sql.functions import concat, col, lit
#Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user = df_user.withColumn("user_name", concat(col("first_name"), lit(" "), col("last_name")))

#Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn("date_joined", col("date_joined").cast("timestamp"))
#Reorder the DataFrame columns
df_user = df_user.select("ind", "user_name","age","date_joined")
df_user = df_user.drop("first_name", "last_name")
display(df_user)



ind,user_name,age,date_joined
10616,Andrew Allen,38,2017-06-09T21:55:57.000+0000
1058,James Pennington,54,2016-11-11T00:52:42.000+0000
2140,Heather Adams,21,2015-12-30T17:47:03.000+0000
6871,Alejandra Acevedo,20,2015-11-24T21:01:23.000+0000
2320,Corey Carpenter,25,2015-10-28T09:03:45.000+0000
10593,Nicholas Brown,22,2016-09-02T13:30:34.000+0000
2528,Nicholas West,31,2016-09-23T23:14:01.000+0000
3016,Alan Burns,25,2015-10-22T19:42:37.000+0000
6348,Mark Brown,23,2016-07-24T20:27:24.000+0000
5716,Joshua Collins,43,2016-01-25T18:54:31.000+0000


In [None]:
# Writing streaming data to Delta Tables.
# Defining a method to write the data to a Delta Table
def write_to_delta_table(dataframe, table_name):
    dataframe.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", "/tmp/kinesis/129bc7e0bd61_{table_name}_table_checkpoints/") \
        .table(f"129bc7e0bd61_{table_name}_table")

write_to_delta_table(df_pin, "pin")
write_to_delta_table(df_geo, "geo")
write_to_delta_table(df_user, "user")

In [None]:
%sql
--SHOW TABLES;
--DESCRIBE TABLE `129bc7e0bd61_pin_table`;
SELECT * FROM `129bc7e0bd61_pin_table` LIMIT 10;



index,unique_id,title,description,poster_name,follower_count,tag_list,is_image_or_video,image_src,downloaded,save_location,category


In [None]:
# Check what _sqldf contains
_sqldf.show()  # Displays the result from the last SQL query
df = spark.read.format("delta").table("129bc7e0bd61_pin_table")
df.show()  # Shows the first 20 rows by default
df.show(10)  # Shows the first 10 rows
display(df)








index,unique_id,title,description,poster_name,follower_count,tag_list,is_image_or_video,image_src,downloaded,save_location,category
