In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

In [0]:
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"
aws_keys_df = spark.read.format("delta").load(delta_table_path)

ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']

# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
# Define the schema for the streams
pin_schema = StructType([
    StructField("category", StringType(), True),
    StructField("description", StringType(), True),
    StructField("downloaded", LongType(), True),
    StructField("follower_count", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("index", LongType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("title", StringType(), True),
    StructField("unique_id", StringType(), True)
])

geo_schema = StructType([
    StructField("country", StringType(), True),
    StructField("ind", LongType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])

user_schema = StructType([
    StructField("age", LongType(), True),
    StructField("date_joined", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("ind", LongType(), True),
    StructField("last_name", StringType(), True)
])


In [0]:
# Define function to read streams 
def read_stream(stream_name, schema):
    df = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', stream_name) \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()

    df = df.selectExpr("CAST(data as STRING)").select(from_json("data", schema).alias("data")).select("data.*")
    return df

In [0]:
# Read data from streams 
pin_stream = read_stream('streaming-0affc011d3cf-pin', pin_schema)
geo_stream = read_stream('streaming-0affc011d3cf-geo', geo_schema)
user_stream = read_stream('streaming-0affc011d3cf-user', user_schema)

In [0]:
# Clean data from pinterest table 
column_and_string = [
    ("description", "No description available$"),
    ("follower_count", "User Info Error"),
    ("image_src", "Image src error."),
    ("poster_name", "User Info Error"),
    ("tag_list", "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e"),
    ("title", "No Title Data Available")
]
def clean_non_relevant(df, column_and_string):
    for column, string in column_and_string:
        df = df.withColumn(column, regexp_replace(col(column), string, "None"))
    return df

def clean_empty_string(df):
    df = df.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in df.columns])
    return df.na.fill('None')

def clean_follower_count(df):
    df = df.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
    df = df.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
    return df.withColumn("follower_count", df["follower_count"].cast("integer"))

def clean_save_location(df):
    return df.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

def rename_index(df):
    return df.withColumnRenamed("index", "ind")

def order_columns(df):
    return df.select('ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name', 'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category')
  
pin_df = clean_non_relevant(pin_stream, column_and_string)
pin_df = clean_empty_string(pin_df)
pin_df = clean_follower_count(pin_df)
pin_df = clean_empty_string(pin_df)
pin_df = clean_save_location(pin_df)
pin_df = rename_index(pin_df)
pin_df = order_columns(pin_df)

# Display 10 rows of the streaming DataFrame
pin_df.limit(10).display()

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,,No description available Story format,,,,multi-video(story page format),,/data/mens-fashion,mens-fashion
2863,9bf39437-42a6-4f02-99a0-9a0383d8cd70,25 Super Fun Summer Crafts for Kids - Of Life and Lisa,Keep the kids busy this summer with these easy diy crafts and projects. Creative and…,124000.0,Of Life & Lisa | Lifestyle Blog,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",image,https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg,/data/diy-and-crafts,diy-and-crafts
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",0.0,Consuelo Aguirre,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,/data/finance,finance
8304,5b6d0913-25e4-43ab-839d-85d5516f78a4,The #1 Reason You’re Not His Priority Anymore - Matthew Coast,#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself,51000.0,Commitment Connection,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",image,https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png,/data/quotes,quotes
8731,ea760f71-febf-4023-b592-d17396659039,20 Koi Fish Tattoos For Lucky Men,"Koi fish tattoos are a popular choice for men who want to make a statement, thanks to their rich symbolism and bold design.",211000.0,TheTrendSpotter,"Dr Tattoo,Wörter Tattoos,Pisces Tattoos,Tatoo Art,Dream Tattoos,Dope Tattoos,Mini Tattoos,Finger Tattoos,Body Art Tattoos",image,https://i.pinimg.com/originals/8a/0c/0a/8a0c0a7b6236565c519acd41ad1a52c0.jpg,/data/tattoos,tattoos
1313,44662045-e891-4821-8a19-ebe7eedd371a,Liquid Lash Extensions Mascara,"Instantly create the look of lash extensions with this award-winning, best-selling mascara that won't clump, flake or smudge. Available in 3 shades!",43000.0,Thrive Causemetics,,video,https://i.pinimg.com/videos/thumbnails/originals/69/84/e2/6984e20f3e262098fa9c0614c3453254.0000001.jpg,/data/beauty,beauty
4315,21b59ba9-829d-4c33-8c27-4cd4c56d26b8,Podcasts for Teachers or Parents of Teenagers,"Podcasts for Teachers or Parents of Teenagers: Teaching teens middle school and high school can feel joyful and rewarding most days, but can also frustrate you with one challeng…",25000.0,Math Giraffe,"Middle School Classroom,High School Students,High School Teachers,Middle School Tips,High School Counseling,Ela Classroom,High School Science,Future Classroom,Google Classroom",image,https://i.pinimg.com/originals/50/19/31/501931a27ee4d076658980851b995b2c.jpg,/data/education,education
10794,c4bd2577-a7bb-4409-bb7a-17d5ed7e1cf1,TireBuyer,Nissan GT-R. Sick.,437.0,Ray Uyemura,"Lowrider,Old Vintage Cars,Antique Cars,Austin Martin,Nissan Gtr Black,Jaguar,1959 Cadillac,Cadillac Ct6,Old School Cars",image,https://i.pinimg.com/originals/0d/29/9f/0d299f3df020395aa7ce8387f40fbeed.jpg,/data/vehicles,vehicles
5494,8fb2af68-543b-4639-8119-de33d28706ed,Dave Ramsey's 7 Baby Steps: What Are They And Will They Work For You,"If you love budgeting, make sure to give Dave Ramsey's 7 Baby Steps a try. Follow these steps to begin your debt snowball, build an emergency fund, invest and reach riches. I ca…",26000.0,"Living Low Key | Save Money, Make Money, & Frugal Living","Financial Peace,Financial Tips,Saving Money Quotes,Total Money Makeover,Budgeting Finances,Money Management,Wealth Management,Personal Finance,Making Ideas",image,https://i.pinimg.com/originals/1e/9d/90/1e9d906e4e150e3b95187f3b76ea7c71.png,/data/finance,finance
5069,b75b6f87-deb3-444f-b29e-ce9161b2df49,The Vault: Curated & Refined Wedding Inspiration,Sacramento California Wedding 2 Chic Events & Design Jodi Yorston Photography Wilson Vineyards Barn Miosa Couture Yellow Barn Vineyard Outdoor Candles DIY,6000000.0,Style Me Pretty,"60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design",image,https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg,/data/event-planning,event-planning


In [0]:
# Clean data from geolocation table 
def create_coordinates_col(df):
    return df.withColumn("coordinates", array("latitude", "longitude"))

def drop_long_lat(df):
    return df.drop("latitude", "longitude")

def convert_to_timestamp(df):
    return df.withColumn("timestamp", to_timestamp("timestamp"))

def order_columns(df):
    return df.select('ind', 'country', 'coordinates', 'timestamp')

geo_df = create_coordinates_col(geo_stream)
geo_df = drop_long_lat(geo_df)
geo_df = convert_to_timestamp(geo_df)
geo_df = order_columns(geo_df)

# Display 10 rows of the streaming DataFrame
geo_df.limit(10).display()

ind,country,coordinates,timestamp
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47.000+0000
2863,Armenia,"List(-5.34445, -177.924)",2020-04-27T13:34:16.000+0000
5730,Colombia,"List(-77.015, -101.437)",2021-04-19T17:37:03.000+0000
8304,French Guiana,"List(-28.8852, -164.87)",2019-09-13T04:50:29.000+0000
8731,Aruba,"List(-83.104, -171.302)",2020-07-17T04:39:09.000+0000
1313,Maldives,"List(77.0447, 61.9119)",2018-06-26T02:39:25.000+0000
4315,Cote d'Ivoire,"List(-45.8508, 66.1003)",2019-12-15T03:51:28.000+0000
10794,Cocos (Keeling) Islands,"List(-89.5236, -154.567)",2022-01-01T02:26:50.000+0000
5494,Bulgaria,"List(-82.6768, -129.202)",2021-07-21T02:02:35.000+0000
5069,Azerbaijan,"List(-63.0063, -157.474)",2021-03-20T09:32:44.000+0000


In [0]:
# Clean data from users tables 
def create_user_name(df):
    return df.withColumn("user_name", concat("first_name", lit(" "), "last_name"))

def drop_first_last_name(df):
    return df.drop("first_name", "last_name")

def convert_to_timestamp(df):
    return df.withColumn("date_joined", to_timestamp("date_joined"))

def order_columns(df):
    return df.select('ind', 'user_name', 'age', 'date_joined')

user_df = create_user_name(user_stream)
user_df = drop_first_last_name(user_df)
user_df = convert_to_timestamp(user_df)
user_df = order_columns(user_df)

# Display 10 rows of the streaming DataFrame
user_df.limit(10).display()



ind,user_name,age,date_joined
7528,Abigail Ali,20,2015-10-24T11:23:51.000+0000
2863,Dylan Holmes,32,2016-10-23T14:06:51.000+0000
5730,Rachel Davis,36,2015-12-08T20:02:43.000+0000
8304,Charles Berry,25,2015-12-28T04:21:39.000+0000
8731,Andrea Alexander,21,2015-11-10T09:27:42.000+0000
1313,Brittany Jones,32,2016-04-02T03:51:23.000+0000
4315,Michelle Prince,36,2015-12-20T16:38:13.000+0000
10794,Thomas Turner,34,2016-12-22T00:02:02.000+0000
5494,Anne Allen,27,2015-12-16T15:20:05.000+0000
5069,Amanda Ball,25,2016-01-13T17:36:30.000+0000


In [0]:
# Define function to write stream data to Delta Lake table
def write_stream_to_delta(df, name):
    df.writeStream \
      .format("delta") \
      .outputMode("append") \
      .option("checkpointLocation", f"/tmp/kinesis/_checkpoints/0affc011d3cf_{name}") \
      .table(f"0affc011d3cf_{name}_stream")

#Write stream data to Delta Lake table
write_stream_to_delta(pin_df, "pin")
write_stream_to_delta(geo_df, "geo")
write_stream_to_delta(user_df, "user")