# Notebook for all stream data processing

Run all the code together in this notebook to get actual results from the data

## Imports Required

In [0]:
import urllib
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

## Retrieving AWS Credentials

In [0]:
dbutils.fs.ls("/FileStore/tables")

In [0]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

## Reading in stream data

In [0]:
pin_kinesis_df = (
    spark
    .readStream
    .format('kinesis')
    .option('streamName','streaming-0a40ea42f8d1-pin')
    .option('initialPosition','earliest')
    .option('region','us-east-1')
    .option('awsAccessKey', ACCESS_KEY)
    .option('awsSecretKey', SECRET_KEY)
    .load()
)

geo_kinesis_df = (
    spark
    .readStream
    .format('kinesis')
    .option('streamName','streaming-0a40ea42f8d1-geo')
    .option('initialPosition','earliest')
    .option('region','us-east-1')
    .option('awsAccessKey', ACCESS_KEY)
    .option('awsSecretKey', SECRET_KEY)
    .load()
)

user_kinesis_df = (
    spark
    .readStream
    .format('kinesis')
    .option('streamName','streaming-0a40ea42f8d1-user')
    .option('initialPosition','earliest')
    .option('region','us-east-1')
    .option('awsAccessKey', ACCESS_KEY)
    .option('awsSecretKey', SECRET_KEY)
    .load()
)

## Defining schema for each stream table

In [0]:
pin_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])
geo_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("country", StringType(), True)
])
user_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("date_joined", StringType(), True)
])

# Fields "timestamp" and "date_joined" set to StringType initially as they will be converted to TimestampType in the cleaning process

## Deserialising stream data

In [0]:
dirty_pin_df = (
  pin_kinesis_df
  .selectExpr("CAST(data as STRING)")
  .select(from_json(col("data"), pin_schema).alias("pin_data"))
  .select("pin_data.*")
  )

dirty_geo_df = (
  geo_kinesis_df
  .selectExpr("CAST(data as STRING)")
  .select(from_json(col("data"), geo_schema).alias("geo_data"))
  .select("geo_data.*")
)

dirty_user_df = (
  user_kinesis_df
  .selectExpr("CAST(data as STRING)")
  .select(from_json(col("data"), user_schema).alias("user_data"))
  .select("user_data.*")
)

## Cleaning streaming data

### Cleaning pin dataframe

In [0]:
pin_df = dirty_pin_df

# Cleaning all the invalid data

# Column names that contain invalid data
columns_for_null = ['description', 'follower_count', 'image_src', 'poster_name', 'tag_list', 'title']

# Dictionary of invalid data entries to change to null
values_for_null = {"No description available%": None,
                   "User Info Error": None,
                   "Image src error.": None,
                   "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e": None,
                   "No Title Data Available": None}

# Loops through the relative columns and changes the data to None where the invalid values in the dictionary are present
for column in columns_for_null:
  pin_df = pin_df.replace(values_for_null, subset=[column])
  

In [0]:
# Converts follower_count to ensure every entry is a number
pin_df = pin_df.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
pin_df = pin_df.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
# Cast follower_count column to integer type
pin_df = pin_df.withColumn("follower_count", col("follower_count").cast('int'))

# Converts save_location column to include only the save location path
pin_df = pin_df.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

# Renames the index column to ind
pin_df = pin_df.withColumnRenamed("index", "ind")

# Reorders columns
pin_df = pin_df.select([ "ind",
                        "unique_id",
                        "title",
                        "description",
                        "follower_count",
                        "poster_name",
                        "tag_list",
                        "is_image_or_video",
                        "image_src",
                        "save_location",
                        "category"])


In [0]:
# Checks all the data types are correct via schema
pin_df.printSchema()
# Display data for visual check over
display(pin_df)

index,unique_id,title,description,poster_name,follower_count,tag_list,is_image_or_video,image_src,downloaded,save_location,category
1666,876b6e98-8f85-44a5-9724-453ce72b727d,Type A Style Christmas Decorating Series | J. Crew Tartan Plaid,"Next on the list is this classic beauty. This J. Crew-inspired tree is a repeat from last year, but still one of my faves. This 7’ tree brings all the traditional vibes. The onl…",Type A Style,1k,"Tartan Christmas,Nutcracker Christmas,Christmas Mood,Merry Little Christmas,Christmas Crafts,Xmas,Blue Christmas Tree Decorations,Christmas Tablescapes,Christmas Tree Inspiration",image,https://i.pinimg.com/originals/b9/d0/df/b9d0dfe0fc30ccdc4aa800b13630cda4.jpg,1,Local save in /data/christmas,christmas
1841,1fcf501b-42d2-4114-8cc6-9586eaf326db,Paper Plate Christmas Tree Craft,"Fun paper plate Christmas tree craft for kids, preschool Christmas crafts, Christmas fine motor activities, Christmas art projects for kids.",I Heart Crafty Things,276k,"Preschool Christmas Crafts,Christmas Art Projects,Christmas Trees For Kids,Christmas Crafts For Kids To Make,Decoration Christmas,Christmas Tree Crafts,Christmas Fun,Preschool Projects,Christmas Crafts Paper Plates",image,https://i.pinimg.com/originals/0c/fa/c8/0cfac89288d5593da751a641d6fedda4.jpg,1,Local save in /data/christmas,christmas
1129,3a34cc8a-4c4b-42eb-a6bd-571a903b5569,Drugstore Dupe Series: Concealers - Creativity Jar,Next up in my drugstore dupe series is concealers! These are 5 affordable options to some of the most popular high-end products and you need to try them!,Kelsie Ebel | Creativity Jar,10k,"Make Up Kits,Best Drugstore Concealer,Make Up Dupes Drugstore,Mac Lipstick Dupes,Best Drugstore Foundation,Foundation Dupes,Highlighter Makeup,Best Foundation,Makeup Eyes",image,https://i.pinimg.com/originals/0e/f1/b6/0ef1b68964e508a2303cd132d173657d.png,1,Local save in /data/beauty,beauty
10113,be06168d-f571-45ca-814b-c746727215dd,How cheap is a day on BALI,No description available Story format,Ton Trip | Backpack | Aventure | Lifestyle,340,"Fun Places To Go,Beautiful Places To Travel,Vacation Places,Dream Vacations,Vacation Ideas,Bali Travel,Future Travel,Travel Aesthetic,Travel Essentials",multi-video(story page format),https://i.pinimg.com/videos/thumbnails/originals/a4/91/03/a49103ff06554f01ecff7eef09637c2f.0000001.jpg,1,Local save in /data/travel,travel
9014,45c2e92a-5daf-40f3-9732-771c186c0757,75 More Small Tattoo Ideas from Playground Tattoo - Crestfox,"Hi everyone! My last small tattoo ideas post was really popular on Pinterest, so I decided to put together this post with even more tiny tattoo ideas. Just like the other post,…",Sarah Wahl | Crestfox,17k,"Little Tattoos,Mini Tattoos,Body Art Tattoos,Sleeve Tattoos,Tatoos,Flower Tattoos,White Tattoos,Arrow Tattoos,Word Tattoos",image,https://i.pinimg.com/originals/b5/72/b5/b572b5641d4efd2e9a13de2506b9e721.png,1,Local save in /data/tattoos,tattoos
819,79337521-36dd-44d2-9335-f6420a2606ff,How to draw a butterfly step by step easy and fast,How to draw a butterfly step by step tutorial with free printable guide. A quick butterfly anatomy diagram with some interesting facts.,Craft-Mart,132k,"Art Drawings Sketches Simple,Pencil Art Drawings,Doodle Drawings,Easy Drawings,Dress Sketches,Butterfly Sketch,Butterfly Art,How To Draw Butterfly,Easy Butterfly Drawing",image,https://i.pinimg.com/originals/2f/6a/da/2f6ada62e881a29f9a61c03865e0af62.jpg,1,Local save in /data/art,art
4973,ed5a6f9d-bedf-4c2d-a15c-07922eb8cd0d,"USB Plug in, 300 LED 9.8 Ft x 9.8 Ft Curtain Fairy Lights for Chrismas","Battery Powered, USB Powered, or Plug in with an Extra Charger: No access to any outlets? Never mind. It doesn’t even need connection to electricity. Use 3 AA batteries (not inc…",ifyousayido,5k,"Decoration Evenementielle,Bridal Shower Decorations,Ceremony Decorations,Diy Birthday Decorations At Home,Diy Birthday Backdrop,Wedding Reception Backdrop,Tulle Wedding,Decor Wedding,Boho Wedding",image,https://i.pinimg.com/originals/0e/d7/6d/0ed76da5fbed6f4bafdd129353d6b342.jpg,1,Local save in /data/event-planning,event-planning
1244,8eddc5e2-92c0-43b9-a971-96ca6c8d5cf5,ALLURE PEACH BEAUTY BLENDER,"Allure Peach Beauty Blender Latex Free Feature: We are proud to present this beautiful blender, that is soft but strong in it's flawless support of applying makeup.It's Shapes s…",Goo Goo Lashes,7,"Beauty Blender,Makeup Blender,Beauty Sponge,Makeup Sponge,Makeup Box,Beauty Makeup,How To Apply Makeup,Applying Makeup,Packing",image,https://i.pinimg.com/originals/71/36/93/713693c0fbd0496dbcba58ea3bc6bac8.jpg,1,Local save in /data/beauty,beauty
3176,f450701d-af66-4b02-8d16-f0aa820415d1,Stuffed Felt Alphabet Letters (Sewing Tutorial),"DIY Stuffed Felt Toy Letters of the Alphabet: Easy sewing tutorial for homemade ABCs. Great hands-on learning toy for name practice, letter sounds, letter recognition and other…",Buggy and Buddy,185k,"Sewing Projects For Beginners,Sewing Tutorials,Tutorial Sewing,Felt Tutorial,Sewing Hacks,Diy Tutorial,Diy Projects,Toddler Gifts,Toddler Toys",image,https://i.pinimg.com/originals/b8/65/10/b86510b95c87a416cf861694121ee5ad.jpg,1,Local save in /data/diy-and-crafts,diy-and-crafts
4631,f6f9dfd9-ca55-47f1-9253-d1a7bc8776c5,“All you need is love!”,No description available Story format,Little Hill Floral Designs,875,"Romantic Wedding Decor,Beach Wedding Reception,Wedding Set Up,Tent Wedding,Wedding Chairs,Elegant Wedding,Wedding Dinner,Wedding Tables,Wedding Receptions",multi-video(story page format),https://i.pinimg.com/videos/thumbnails/originals/b5/d3/64/b5d36417e1b6d38c0ae372eb3c91b602.0000001.jpg,1,Local save in /data/event-planning,event-planning


### Cleaning geo dataframe

In [0]:
geo_df = dirty_geo_df

# Creates a new column 'coordinates' containing an array of latitude and longitude
geo_df = geo_df.withColumn("coordinates", array(col("latitude"), col("longitude")))

# Drops the latitude and longitude columns
geo_df = geo_df.drop("latitude", "longitude")

# Cast timestamp column to timestamp data type 
geo_df = geo_df.withColumn("timestamp", geo_df["timestamp"].cast(TimestampType()))

# Reorders columns
geo_df = geo_df.select(["ind",
                        "country", 
                        "coordinates", 
                        "timestamp"])


In [0]:
# Checks all the data types are correct via schema
geo_df.printSchema()
# Display data for visual check over
display(geo_df)

ind,country,coordinates,timestamp
576,Argentina,"List(-36.9507, -4.59275)",2021-05-06T11:14:22.000+0000
4008,Afghanistan,"List(-88.5478, -174.971)",2022-08-10T11:12:47.000+0000
9889,El Salvador,"List(69.5941, -35.2369)",2019-06-18T21:23:55.000+0000
10419,Namibia,"List(-24.884, 108.456)",2019-09-05T08:41:01.000+0000
164,Albania,"List(-71.6856, -179.126)",2020-01-16T14:17:45.000+0000
4767,Hungary,"List(-63.6785, -29.2943)",2022-03-15T14:29:47.000+0000
1865,Anguilla,"List(-84.6446, -173.058)",2021-05-30T23:28:17.000+0000
2367,Portugal,"List(-18.0104, 14.7975)",2018-03-22T05:22:50.000+0000
10086,Korea,"List(-3.11171, -81.2853)",2018-02-19T04:07:26.000+0000
825,Falkland Islands (Malvinas),"List(5.38301, 83.0372)",2020-07-28T23:59:30.000+0000


### Cleaning user dataframe

In [0]:
user_df = dirty_user_df

# Creates a new column 'user_name' by concatenating 'first_name' and 'last_name' with a space in between
user_df = user_df.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))

# Drops the 'first_name' and 'last_name' columns
user_df = user_df.drop("first_name", "last_name")

# Cast date_joined column to timestamp data type 
user_df = user_df.withColumn("date_joined", user_df["date_joined"].cast(TimestampType()))

# Reorders columns
user_df = user_df.select(["ind",
                          "user_name",
                          "age",
                          "date_joined"])
     

In [0]:
# Checks all the data types are correct via schema
user_df.printSchema()
# Display data for visual check over
display(user_df)

ind,user_name,age,date_joined
576,Sandra Allen,60,2016-12-16T14:41:24.000+0000
4008,Alexandria Alvarado,20,2015-10-23T04:13:23.000+0000
9889,Cristina Morrow,38,2016-03-19T16:07:08.000+0000
10419,Scott Jackson,38,2016-07-07T10:13:24.000+0000
164,Aaron Bartlett,21,2015-11-24T02:15:36.000+0000
4767,Madeline Brown,43,2016-11-11T15:53:15.000+0000
1865,Carol Alexander,25,2016-03-17T05:33:57.000+0000
2367,William Saunders,20,2016-03-28T16:52:43.000+0000
10086,Haley Mccarthy,36,2017-01-08T13:56:27.000+0000
825,Brandon Walker,33,2016-06-22T02:30:49.000+0000


## Writing stream data to delta tables

In [0]:
# Removes checkpoint folder before each writeStream function
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

(
    pin_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/")
    .table("0a40ea42f8d1_pin_table")
)


In [0]:
# Removes checkpoint folder before each writeStream function
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

(
    geo_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/")
    .table("0a40ea42f8d1_geo_table")
)


In [0]:
# Removes checkpoint folder before each writeStream function
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

(
    user_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/")
    .table("0a40ea42f8d1_user_table")
)


*NOTE*: Each of the ```writeStream```s must be interrupted before the next one can run 