In [0]:
# pyspark functions
from pyspark.sql.functions import col, split, count, rank, percentile_approx, when, regexp_extract, regexp_replace, array, length, substring, to_timestamp,length,concat,rank,max,lit, row_number, desc,year, explode
from pyspark.sql.window import Window

# URL processing
import urllib
# URL processing
import urllib

In [0]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a54b96ac143-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_pin_deserial = df_pin.selectExpr("CAST(data as STRING)") 

df_pin = df_pin_deserial.selectExpr("get_json_object(data, '$.index') as index",
                            "get_json_object(data, '$.title') as title",
                            "get_json_object(data, '$.description') as description",
                            "get_json_object(data, '$.poster_name') as poster_name",
                            "get_json_object(data, '$.follower_count') as follower_count",
                            "get_json_object(data, '$.tag_list') as tag_list",
                            "get_json_object(data, '$.is_image_or_video') as is_image_or_video",
                            "get_json_object(data, '$.image_src') as image_src",
                            "get_json_object(data, '$.downloaded') as downloaded",
                            "get_json_object(data, '$save_location') as save_location",
                            "get_json_object(data, '$.category') as category")
                            



In [0]:
df_pin_deserial.display()

data
"{""index"":8304,""unique_id"":""5b6d0913-25e4-43ab-839d-85d5516f78a4"",""title"":""The #1 Reason You’re Not His Priority Anymore - Matthew Coast"",""description"":""#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself"",""poster_name"":""Commitment Connection"",""follower_count"":""51k"",""tag_list"":""Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png"",""downloaded"":1,""save_location"":""Local save in /data/quotes"",""category"":""quotes""}"
"{""index"":4315,""unique_id"":""21b59ba9-829d-4c33-8c27-4cd4c56d26b8"",""title"":""Podcasts for Teachers or Parents of Teenagers"",""description"":""Podcasts for Teachers or Parents of Teenagers: Teaching teens middle school and high school can feel joyful and rewarding most days, but can also frustrate you with one challeng… "",""poster_name"":""Math Giraffe"",""follower_count"":""25k"",""tag_list"":""Middle School Classroom,High School Students,High School Teachers,Middle School Tips,High School Counseling,Ela Classroom,High School Science,Future Classroom,Google Classroom"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/50/19/31/501931a27ee4d076658980851b995b2c.jpg"",""downloaded"":1,""save_location"":""Local save in /data/education"",""category"":""education""}"
"{""index"":5494,""unique_id"":""8fb2af68-543b-4639-8119-de33d28706ed"",""title"":""Dave Ramsey's 7 Baby Steps: What Are They And Will They Work For You"",""description"":""If you love budgeting, make sure to give Dave Ramsey's 7 Baby Steps a try. Follow these steps to begin your debt snowball, build an emergency fund, invest and reach riches. I ca… "",""poster_name"":""Living Low Key | Save Money, Make Money, & Frugal Living"",""follower_count"":""26k"",""tag_list"":""Financial Peace,Financial Tips,Saving Money Quotes,Total Money Makeover,Budgeting Finances,Money Management,Wealth Management,Personal Finance,Making Ideas"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/1e/9d/90/1e9d906e4e150e3b95187f3b76ea7c71.png"",""downloaded"":1,""save_location"":""Local save in /data/finance"",""category"":""finance""}"
"{""index"":5069,""unique_id"":""b75b6f87-deb3-444f-b29e-ce9161b2df49"",""title"":""The Vault: Curated & Refined Wedding Inspiration"",""description"":""Sacramento California Wedding 2 Chic Events & Design Jodi Yorston Photography Wilson Vineyards Barn Miosa Couture Yellow Barn Vineyard Outdoor Candles DIY"",""poster_name"":""Style Me Pretty"",""follower_count"":""6M"",""tag_list"":""60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg"",""downloaded"":1,""save_location"":""Local save in /data/event-planning"",""category"":""event-planning""}"
"{""index"":2923,""unique_id"":""52fa3af5-24a4-4ccb-8f17-9c3eb12327ee"",""title"":""UFO Paper Plate Craft"",""description"":""A fun space activity for kids. Preshoolers and kindergartners will love making their own alien spacecraft!"",""poster_name"":""The Crafting Chicks"",""follower_count"":""192k"",""tag_list"":""Paper Plate Crafts For Kids,Fun Crafts For Kids,Summer Crafts,Toddler Crafts,Art For Kids,Outer Space Crafts For Kids,Kid Crafts,Space Kids,Back To School Crafts For Kids"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/6f/e8/aa/6fe8aa405513c6d2f77b5f47d17cdce8.jpg"",""downloaded"":1,""save_location"":""Local save in /data/diy-and-crafts"",""category"":""diy-and-crafts""}"
"{""index"":3089,""unique_id"":""88f9227e-88d0-4b1c-b0be-bcfc3028b8e2"",""title"":""No Title Data Available"",""description"":""No description available Story format"",""poster_name"":""User Info Error"",""follower_count"":""User Info Error"",""tag_list"":""N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e"",""is_image_or_video"":""multi-video(story page format)"",""image_src"":""Image src error."",""downloaded"":0,""save_location"":""Local save in /data/diy-and-crafts"",""category"":""diy-and-crafts""}"
"{""index"":6063,""unique_id"":""60693727-4927-4bd6-a8c5-096a392d63e6"",""title"":""41 Gorgeous Fall Decor Ideas For Your Home - Chaylor & Mads"",""description"":""Beautiful and easy ways to update every room in your home with fall decor. Plus, my favorite finds in fall decor for 2020!"",""poster_name"":""Kristen | Lifestyle, Mom Tips & Teacher Stuff Blog"",""follower_count"":""92k"",""tag_list"":""Fall Home Decor,Autumn Home,Fall Decor Outdoor,Front Porch Fall Decor,Home Decor Ideas,Porch Ideas For Fall,Fall Outdoor Decorating,Decorating Ideas For Fall,Fall Front Doors"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/e5/ae/dc/e5aedc14ce557e3a69f672e0f8c88f6e.png"",""downloaded"":1,""save_location"":""Local save in /data/home-decor"",""category"":""home-decor""}"
"{""index"":3454,""unique_id"":""46bd3f86-b09d-4e29-9033-7ff2df595e51"",""title"":""What can you use to color resin?"",""description"":""HELPFUL RESOURCES – Check out my resin colorants resources page here with links to all the products mentioned in this article (and more). Let me know if you have any that you lo… "",""poster_name"":""Mixed Media Crafts"",""follower_count"":""6k"",""tag_list"":""Epoxy Resin Art,Diy Resin Art,Diy Resin Crafts,Resin Molds,Ice Resin,Resin Pour,Diy Epoxy,Diy Resin Painting,Diy Resin Dice"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/d4/12/78/d4127833023ca32600571ddca16f1556.jpg"",""downloaded"":1,""save_location"":""Local save in /data/diy-and-crafts"",""category"":""diy-and-crafts""}"
"{""index"":7554,""unique_id"":""c6fa12f4-0d4a-4b07-a335-5bf9f37f8281"",""title"":""Craig Style"",""description"":""imgentleboss: “ - More about men’s fashion at @Gentleboss - GB’s Facebook - ”"",""poster_name"":""iElylike ..✿◕‿◕✿ஐ✿◕‿◕✿"",""follower_count"":""940"",""tag_list"":""Mens Fashion Blog,Look Fashion,Autumn Fashion,Fashion News,Fashion Sale,80s Fashion,Paris Fashion,Runway Fashion,Fashion Trends"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/e7/6e/8e/e76e8ed6cc838b84a934c6948a5caff7.jpg"",""downloaded"":1,""save_location"":""Local save in /data/mens-fashion"",""category"":""mens-fashion""}"
"{""index"":6145,""unique_id"":""82e13a07-db99-43a3-b1c0-89a4b75821da"",""title"":""HOLIDAY MANTLE DECOR - @AMAZON & @TARGET FINDS"",""description"":""Holiday mantle decor, Christmas decor, metallic mercury glass style Christmas trees, eucalyptus vine, evergreen pine branches, white neutral holiday decor, cozy mantle for the h… "",""poster_name"":""Stylin by Aylin"",""follower_count"":""83k"",""tag_list"":""Winter Home Decor,Christmas Living Room Decor,Living Room Decor Cozy,Christmas Decor,Cozy Fireplace,Rustic Fireplace Decor,Fireplace Decorations,Rustic Room,House Decorations"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/9d/82/1a/9d821a80acd8f90c16454e978bd9b115.jpg"",""downloaded"":1,""save_location"":""Local save in /data/home-decor"",""category"":""home-decor""}"


In [0]:
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a54b96ac143-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_geo_deserial = df_geo.selectExpr("CAST(data as STRING)")

df_geo = df_geo_deserial.selectExpr("get_json_object(data, '$.index') as index",
                            "get_json_object(data, '$.timestamp') as timestamp",
                            "get_json_object(data, '$.latitude') as latitude",
                            "get_json_object(data, '$.longitude') as longitude",
                            "get_json_object(data, '$.country') as country")


In [0]:
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a54b96ac143-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_user_deserial = df_user.selectExpr("CAST(data as STRING)")

df_user = df_user_deserial.selectExpr("get_json_object(data, '$.index') as index",
                            "get_json_object(data, '$.first_name') as first_name",
                            "get_json_object(data, '$.last_name') as last_name",
                            "get_json_object(data, '$.age') as age",
                            "get_json_object(data, '$.date_joined') as date_joined")

In [0]:
class CleanData:
    def __init__(self, df):
        self.df = df


    def check_for_missing_data(self):
        # Assuming df is your DataFrame
        for column_to_check in self.df.columns:
            condition = (col(column_to_check).isNull()
                         )

            # Count the number of null values in the specified column
            num_nulls = self.df.filter(condition).count()

            # Print the count of null values for each column
            print(f"Number of null values in {column_to_check}: {num_nulls}")

        return self.df

    def treat_missing_data(self):
        for column_to_treat in self.df.columns:
            self.df = self.df.withColumn(column_to_treat, when(col(column_to_treat).isNull(), 'None').otherwise(col(column_to_treat)))
        return self.df
    
    def convert_follower_count(self):
        # Extract numeric part and check for 'k' presence
        numeric_part = regexp_extract(self.df["follower_count"], r"(\d+)k?", 1)
        has_k = col("follower_count").contains("k")

        # Multiply by 1000 only when 'k' is present, otherwise leave as-is
        follower_count = when(has_k, numeric_part.cast("int") * 1000).otherwise(self.df["follower_count"])

        # Update the DataFrame with the new follower_count column
        self.df = self.df.withColumn("follower_count", follower_count.cast("int"))
        return self.df
    

    def print_column_data_types(self):
        # Assuming 'df' is your DataFrame
        for column, dtype in self.df.dtypes:
            print(f"Column: {column}, Data Type: {dtype}") 
              
    def cast_to_numeric(self, columns_to_convert):
        for column_name in columns_to_convert:
            self.df = self.df.withColumn(column_name, self.df[column_name].cast('double'))
        return self.df
    
    def cast_to_float(self,columns_to_convert):
        for column_name in columns_to_convert:
            self.df = self.df.withColumn(column_name, self.df[column_name].cast('float'))
        return self.df
    
    def tidy_file_path(self):
        self.df = self.df.withColumn('save_location',regexp_replace('save_location', 'Local save in ', ''))
        return self.df
    
    def rename_index(self): 
        self.df = self.df.withColumnRenamed('index', 'ind')
        return self.df
    
    def reorder_columns(self,column_order):
        self.df = self.df.select(column_order)
        return self.df
    
    def add_array_column(self, column1:float, column2:float, new_column_name:str):
        self.df = self.df.withColumn(new_column_name, array(col(column1), col(column2)))
        return self.df
    
    def drop_columns(self, column_drop):
        self.df = self.df.drop(*column_drop)
        return self.df
    
    def cast_to_timestamp(self, column):
        length_condition = length(self.df[column]) > 10

        self.df = self.df.withColumn(column, 
                                     to_timestamp(
                                         when(length_condition,substring(self.df[column], 1, 10)).otherwise(self.df[column]),'yyyy-MM-dd'))
        return self.df
    
    def combine_names(self,column1: str, column2:str, new_column:str):
        self.df = self.df.withColumn(new_column, concat(col(column1),lit(' '),col(column2)))
        return self.df


In [0]:
cleaned_df = CleanData(df_pin)

cleaned_df.treat_missing_data()

cleaned_df.convert_follower_count()

columns_convert = ['downloaded','follower_count','index']
cleaned_df.cast_to_numeric(columns_convert)

cleaned_df.tidy_file_path()

cleaned_df.rename_index()

order = ['ind','title','description','follower_count','poster_name','tag_list','is_image_or_video','image_src','save_location','category']
pin = cleaned_df.reorder_columns(order)

cleaned_df.print_column_data_types()


#check_null = cleaned_df.check_for_missing_data()
#test = cleaned_df.treat_data_types('follower_count', 'int')
#df_type = df_wo_null.treat_data_types(col('follower_count'), 'int')
#pin.display()

In [0]:
pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a54b96ac143_pin")

In [0]:
#Tidying geographic data
tidy_geo = CleanData(df_geo)

tidy_geo.print_column_data_types()

columns = ['latitude','longitude']
tidy_geo.cast_to_float(columns)
tidy_geo.add_array_column('latitude','longitude','coordinates')
tidy_geo.drop_columns(columns)

tidy_geo.cast_to_timestamp('timestamp')

tidy_geo.rename_index()

reorder_geo = ['ind','country','coordinates','timestamp']
geo = tidy_geo.reorder_columns(reorder_geo)

#geo.display()

In [0]:
geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a54b96ac143_geo")

In [0]:
#Tidying user data

tidy_user = CleanData(df_user)

tidy_user.combine_names('first_name','last_name','user_name')

name_columns = ['first_name','last_name']
tidy_user.drop_columns(name_columns)

tidy_user.cast_to_timestamp('date_joined')

tidy_user.rename_index()

col_order = ['ind','user_name','age','date_joined']
user = tidy_user.reorder_columns(col_order)

#user.display()

In [0]:
user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a54b96ac143_user")