In [None]:
%sql
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import urllib

def read_aws_keys(table_path):
    '''
    This function loads the credentials file from the delta table.
     Parameters:
     table_path: str, the path of the delta table.
     ---------
     Return:
     aws_df_keys: a dataframe containing the credentials
    '''
    delta_table_path = table_path
    aws_df_keys = spark.read.format("delta").load(delta_table_path)
    return aws_df_keys

def extract_keys(tab_path):
    '''
    This function returns the access and secret keys stores in the credentials delta table.
    Return:
    ACCESS_KEY: str, the access key of the credentials
    SECRET_KEYS: str, the secret key of the credentials

    '''
    aws_keys = read_aws_keys(tab_path)
    ACCESS_KEY = aws_keys.select('Access key ID').collect()[0]['Access key ID']
    SECRET_KEYS = aws_keys.select('Secret access key').collect()[0]['Secret access key']
    #encode secret key
    ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEYS, safe="")
    #print(ENCODED_SECRET_KEY)
    return ACCESS_KEY,SECRET_KEYS

def read_streaming_data(
    table_path:str,
    kinesis_stream_name:list,
    aws_region,
    stream_position:str='earliest'
    ):
    '''
    This function reads the data from the kinesis stream.
    Return:
    streaming_data_df: a dataframe containing the data from the kinesis stream
    '''
    acces_keys, secret_keys = extract_keys(table_path)
    streaming_data_df = spark.readStream.format('kinesis')\
        .option('streamName', kinesis_stream_name) \
        .option('initialPosition', stream_position) \
        .option('region',aws_region) \
        .option('awsAccessKey', acces_keys) \
        .option('awsSecretKey', secret_keys) \
        .load()  
    return streaming_data_df



In [None]:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType,DateType


user_delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"
app_region ='us-east-1'
data_streams_pin = "streaming-0e7ae8feb921-pin"
data_streams_geo = "streaming-0e7ae8feb921-geo"
data_streams_user = "streaming-0e7ae8feb921-user"
data_format = 'JSON'


struct_stream_pin = StructType([StructField("index", StringType(), True), 
                    StructField("unique_id", StringType(), True),
                    StructField("title", StringType(), True),
                    StructField("description", StringType(), True),
                    StructField("poster_name", StringType(), True),
                    StructField("save_location", StringType(), True),
                    StructField("tag_list", StringType(), True),
                    StructField("is_image_or_video", StringType(), True),
                    StructField("image_src", StringType(), True),
                    StructField("downloaded", StringType(), True),
                    StructField("follower_count", StringType(), True),
                    StructField("category", StringType(), True)
                    ])


struct_stream_user = StructType([StructField("ind", StringType(), True),
                                  StructField("first_name", StringType(), True),
                                  StructField("last_name", StringType(), True),
                                  StructField("age", StringType(), True),
                                  StructField("date_joined", DateType(), True)
                                  ])


struct_stream_geo = StructType([StructField("country", StringType(), True),
                                StructField("ind", StringType(), True),
                                StructField("latitude", DoubleType(), True),
                                StructField("longitude", DoubleType(), True),
                                StructField("timestamp", TimestampType(), True)
                                ])

streaming_data_pin_df= read_streaming_data(user_delta_table_path,data_streams_pin,app_region,'earliest')
streaming_data_geo_df = read_streaming_data(user_delta_table_path,data_streams_geo,app_region,'earliest')
streaming_data_user_df = read_streaming_data(user_delta_table_path,data_streams_user,app_region,'earliest')

#applying schema




In [None]:
# TASK 1 Replacing empty with none


import pyspark.sql.functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import when,col, substring



streaming_data_pin_df_schema = streaming_data_pin_df.selectExpr("CAST(data as STRING)as message") \
    .select(from_json(col("message"), struct_stream_pin).alias("json")) \
        .select("json.*")

streaming_data_pin_df_schema = streaming_data_pin_df_schema.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in streaming_data_pin_df_schema.columns])
streaming_data_pin_df_schema = streaming_data_pin_df_schema.replace({'Bored Panda': None}, subset=['poster_name'])
streaming_data_pin_df_schema = streaming_data_pin_df_schema.replace({'User Info Error': None}, subset=['poster_name'])
streaming_data_pin_df_schema = streaming_data_pin_df_schema.withColumn('poster_name', regexp_replace('poster_name', '[^a-zA-Z0-9]', " "))

# CLEANING FOLLOWERS COUNT COLUMN
streaming_data_pin_df_schema = streaming_data_pin_df_schema.withColumn("follower_count", F.when(F.col('follower_count').rlike("(k$)"), F.regexp_replace(F.col('follower_count'),r'(k$)','000')).otherwise(F.col('follower_count')))

#CASTING FOLLOWER_COUNT TO INT

streaming_data_pin_df_schema = streaming_data_pin_df_schema.withColumn("follower_count", streaming_data_pin_df_schema["follower_count"].cast("int"))

streaming_data_pin_df_schema = streaming_data_pin_df_schema.withColumn("index", streaming_data_pin_df_schema["index"].cast("int"))
streaming_data_pin_df_schema = streaming_data_pin_df_schema.withColumnRenamed("index", "ind")


streaming_data_pin_df_schema = streaming_data_pin_df_schema.withColumn("downloaded", streaming_data_pin_df_schema["downloaded"].cast("int"))

streaming_data_pin_df_schema.select('follower_count').printSchema()

#CLEANING SAVED_LOCATION COLUMN
streaming_data_pin_df_schema = streaming_data_pin_df_schema.withColumn("save_location",expr('substring(save_location, 15,length(save_location))'))

#RENAMING INDEX COLUMN AND REORDERING

df_sequence =[
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
]

streaming_data_pin_df_schema = streaming_data_pin_df_schema.select(df_sequence + [col for col in streaming_data_pin_df_schema.columns if col not in df_sequence])

query = (
  streaming_data_pin_df_schema
    .writeStream
    .format("delta")          
    .queryName("cleaned_pin_table") 
    .outputMode("append")   
    .option("checkpointLocation", "tmp/checkpoints")  
    .table("0e7ae8feb921_pin_table")  
)

display(streaming_data_pin_df_schema)


ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category,downloaded
3238,30e52c79-841e-4513-aebd-a1883a4b1d4a,Felt and Candy Cane Christmas Tree Favors,Use our printable template to make an easy Christmas tree favor using felt and candy canes! These Christmas tree favors are such a fun Christmas craft and a great craft for kids…,472000.0,One Little Project,"Christmas Candy Crafts,Candy Cane Christmas Tree,Christmas Crafts For Kids To Make,Xmas Crafts,Christmas Tree Decorations,Christmas Christmas,Christmas Gift Craft Ideas,Paper Christmas Trees,Christmas Crafts For Kindergarteners",video,https://i.pinimg.com/videos/thumbnails/originals/e0/85/02/e08502192f18e56a75bee823b3d3bccb.0000001.jpg,/data/diy-and-crafts,diy-and-crafts,1
8946,a8329861-86ca-4514-9bb0-c05874cddb3c,Nothing Says I Am the Night Like These TK Cute Bat Tattoos,"When you're the darkness, but also very smol. These cute bat tattoos are perfect to show off your Halloween spirit and goth aesthetic year round, but they're also too cute to be…",19000.0,Let s Eat Cake,"Body Art Tattoos,Small Tattoos,Tattoo Drawings,Bat Tattoos,Tattoos Pics,Phoenix Tattoos,Cute Halloween Tattoos,Spooky Tattoos,Spooky Halloween",image,https://i.pinimg.com/originals/21/59/14/2159144a8f9931f271dda263c33a8f95.jpg,/data/tattoos,tattoos,1
8014,515ac5f7-073c-4e7b-8cb2-736e81936c79,40 Powerful Quotes To Give You Strength When You Need It Most,"You are stronger than you think, and when you're faced with the toughest moments of your life, these powerful quotes about strength will remind you that you possess the strength…",942000.0,YourTango,"Now Quotes,Go For It Quotes,Life Quotes Love,Words Quotes,Quotes To Live By,Funny Quotes,Simple Quotes,Quotes About Loving Yourself,Quotes Images",image,https://i.pinimg.com/originals/8d/57/c9/8d57c9fe70a55e759deb9bfe48661745.jpg,/data/quotes,quotes,1
9739,141022ed-78b7-4d21-ae95-1735c361b626,Europe Bucket List - 50 awesome destinations,This is the ultimate Europe Bucket List! Add these stunning 50 destinations to your itinerary now! #europe #europetravel #bucketlist | Europe travel | European bucket list | Eur…,6000.0,Daily Travel Pill Travel Blogger Digital Nomad,"European Travel Tips,European Vacation,European Destination,Europe Travel Guide,Europe Tourism,Europe Europe,Travel Guides,Best Places In Europe,Best Places To Travel",image,https://i.pinimg.com/originals/f9/69/ff/f969ffd2b0839a29a294548f434400d7.jpg,/data/travel,travel,1
5942,a71ae47b-ee25-4e03-b15d-6f10e215a314,A Dedicated Work Place - Small Office Inspiration,"About three months into working from home, I finally realized that quarantine was going to be more permanent than I had originally anticipated. At the beginning of the stay-at h…",508.0,Hackett House Studio Interior Design and Decor,"Cosy Home,Sweet Home,Living Spaces,Living Room,My New Room,Home Fashion,Home Decor Inspiration,My Dream Home,Home And Living",image,https://i.pinimg.com/originals/fa/51/9c/fa519cd012e53eaef49176a7d1299002.jpg,/data/home-decor,home-decor,1
7919,309d1004-7ccb-4273-a4ba-900694c2c392,5 Empowering Quotes for International Women's Day,No description available Story format,4000.0,Creative Agency L Rae Design,"Now Quotes,Self Love Quotes,Wise Quotes,Faith Quotes,Words Quotes,Quotes To Live By,Best For Me Quotes,Me Time Quotes,Freedom Quotes",multi-video(story page format),Image src error.,/data/quotes,quotes,0
6740,441c8d4c-88f9-4557-a3b6-789259e097e9,Superb tailoring,Superb tailoring by Manolo Costa,237.0,Josie Moya,"White Dress Pants,White Trousers,Men Dress,Dress Shoes,Sharp Dressed Man,Well Dressed Men,Stylish Men,Men Casual,Brown Blazer",image,https://i.pinimg.com/originals/4f/1b/6e/4f1b6eb633942083c2e3f30a9c24f09e.png,/data/mens-fashion,mens-fashion,1
10230,b7f215fc-444b-493f-80b3-e220b34ad629,"Jessie Kahnweiler Talks Her Web Series ""The Skinny"": ""Here’s My F---ing Shame""","We talk to Jessie Kahnweiler about her new series, The Skinny.",,Refinery29,"I Want To Travel,Best Places To Travel,Places To Visit,Travel Advice,Travel Guides,Travel Tags,Destination Voyage,Belle Photo,New Mexico",image,https://i.pinimg.com/originals/34/4d/e9/344de9d18920dce209512192cfb3b678.jpg,/data/travel,travel,1
8583,bd450e37-a6ff-446d-b055-b9a04ac16796,Top 134 Best Funny Tattoos [2021 Inspiration Guide],"Discover the top 134 best funny tattoo ideas including alien, animal and quote tattoos for men and women. Check out these hilarious tattoos.",800000.0,Next Luxury,"Funny Tattoos,Cute Tattoos,Leg Tattoos,Body Art Tattoos,Tattoos For Guys,Funny Small Tattoos,Tatoos,Tattoo Art,Pretty Tattoos",image,https://i.pinimg.com/originals/6f/c4/a5/6fc4a544959cbeb872f4dcab9df70813.jpg,/data/tattoos,tattoos,1
8492,98746165-5503-4218-88c5-fbda751f7f99,16 Powerful Tarot Card Tattoo Ideas & Their Meanings,"If you're looking for tattoos with meaning, these tarot card tattoo ideas are just what you. Find out what the powerful imagery on these cards can mean for you.",942000.0,YourTango,"Jj Tattoos,Rebellen Tattoo,Tattoo Fairy,Grunge Tattoo,Tattoo Trend,Tattoos For Lovers,Dainty Tattoos,Pretty Tattoos,Piercing Tattoo",image,https://i.pinimg.com/originals/60/75/e7/6075e7b0d087ef9fc388523d4922c4a2.jpg,/data/tattoos,tattoos,1


In [None]:


streaming_data_geo_df_schema = streaming_data_geo_df.selectExpr("CAST(data as STRING)as message") \
    .select(from_json(col("message"), struct_stream_geo).alias("json")) \
        .select("json.*")


# TASK 2
#creating a new array column
streaming_data_geo_df_schema = streaming_data_geo_df_schema.withColumn("coordinates", array("latitude", "longitude"))

# Dropping columns
streaming_data_geo_df_schema = streaming_data_geo_df_schema.drop("latitude", "longitude")

# converting to a timestamp
streaming_data_geo_df_schema = streaming_data_geo_df_schema.withColumn("timestamp",to_timestamp("timestamp"))

#Reordering the dataframe
geo_struct =[
    "ind",
    "country",
    "coordinates",
    "timestamp"
]

streaming_data_geo_df_schema = streaming_data_geo_df_schema.select(geo_struct + [col for col in streaming_data_geo_df_schema.columns if col not in geo_struct])

query = (
  streaming_data_pin_df_schema
    .writeStream
    .format("delta")          
    .queryName("cleaned_geo_table") 
    .outputMode("append")   
    .option("checkpointLocation", "tmp/checkpoints")  
    .table("0e7ae8feb921_geo_table")  
)

display(streaming_data_geo_df_schema)



In [None]:


streaming_data_user_df_schema = streaming_data_user_df.selectExpr("CAST(data as STRING)as message") \
    .select(from_json(col("message"), struct_stream_user).alias("json")) \
        .select("json.*")

# TASK 3
# CREATING USERNAME COLUMN
streaming_data_user_df_schema = streaming_data_user_df_schema.withColumn("user_name", concat("first_name", lit(" "), "last_name"))

# dropping colums
streaming_data_user_df_schema = streaming_data_user_df_schema.drop("first_name", "last_name")

#convert column
streaming_data_user_df_schema = streaming_data_user_df_schema.withColumn("date_joined",to_timestamp("date_joined"))

#Reordering dataframe

user_struct =[
    "ind",
    "user_name",
    "age",
    "date_joined"
]
streaming_data_user_df_schema = streaming_data_user_df_schema.select(user_struct + [col for col in streaming_data_user_df_schema.columns if col not in user_struct])

query = (
  streaming_data_pin_df_schema
    .writeStream
    .format("delta")          
    .queryName("cleaned_user_table") 
    .outputMode("append")   
    .option("checkpointLocation", "tmp/checkpoints")  
    .table("0e7ae8feb921_user_table")  
)

display(streaming_data_user_df_schema)






ind,user_name,age,date_joined
3844,Blake Bennett,20,2016-07-11T00:00:00.000+0000
7077,Matthew Gonzalez,21,2015-11-14T00:00:00.000+0000
3816,Alison Cole,21,2015-11-05T00:00:00.000+0000
9572,Tina King,36,2015-11-30T00:00:00.000+0000
9007,Adam Acosta,20,2015-10-21T00:00:00.000+0000
5406,Anthony Durham,21,2015-11-06T00:00:00.000+0000
4187,Emily Arroyo,29,2015-11-11T00:00:00.000+0000
743,Tina Mckee,48,2016-04-19T00:00:00.000+0000
3737,Alex Banks,27,2015-12-13T00:00:00.000+0000
8828,David Acosta,20,2015-11-09T00:00:00.000+0000
