##Get Access Keys

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col, from_json, lit


In [None]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials" 

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)


### Extraction of the secret key and access

In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

#### reading the streaming data

In [None]:
# Disable format checks during the reading of Delta tables
spark.conf.set("spark.databricks.delta.formatCheck.enabled", "false")

In [None]:
df_pin_stream = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-121d2a86d1ab-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

display(df_pin_stream)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
desired-name,eyJpbmRleCI6Mjg2MywidW5pcXVlX2lkIjoiOWJmMzk0MzctNDJhNi00ZjAyLTk5YTAtOWEwMzgzZDhjZDcwIiwidGl0bGUiOiIyNSBTdXBlciBGdW4gU3VtbWVyIENyYWZ0cyBmb3IgS2lkcyAtIE9mIExpZmUgYW5kIExpc2E= (truncated),streaming-121d2a86d1ab-pin,shardId-000000000002,49650497534146423513851985720289197149397396134377291810,2024-04-05T16:28:20.397+0000
desired-name,eyJpbmRleCI6ODMwNCwidW5pcXVlX2lkIjoiNWI2ZDA5MTMtMjVlNC00M2FiLTgzOWQtODVkNTUxNmY3OGE0IiwidGl0bGUiOiJUaGUgIzEgUmVhc29uIFlvdeKAmXJlIE5vdCBIaXMgUHJpb3JpdHkgQW55bW9yZSAtIE1hdHQ= (truncated),streaming-121d2a86d1ab-pin,shardId-000000000002,49650497534146423513851985726391854686812044483171975202,2024-04-05T16:28:24.590+0000
desired-name,eyJpbmRleCI6MTMxMywidW5pcXVlX2lkIjoiNDQ2NjIwNDUtZTg5MS00ODIxLThhMTktZWJlN2VlZGQzNzFhIiwidGl0bGUiOiJMaXF1aWQgTGFzaCBFeHRlbnNpb25zIE1hc2NhcmEiLCJkZXNjcmlwdGlvbiI6Ikluc3RhbnQ= (truncated),streaming-121d2a86d1ab-pin,shardId-000000000002,49650497534146423513851985731310973846823970869929312290,2024-04-05T16:28:28.160+0000
desired-name,eyJpbmRleCI6MTA3OTQsInVuaXF1ZV9pZCI6ImM0YmQyNTc3LWE3YmItNDQwOS1iYjdhLTE3ZDVlZDdlMWNmMSIsInRpdGxlIjoiVGlyZUJ1eWVyIiwiZGVzY3JpcHRpb24iOiJOaXNzYW4gR1QtUi4gU2ljay4iLCJwb3N0ZXI= (truncated),streaming-121d2a86d1ab-pin,shardId-000000000002,49650497534146423513851985737790816239958383452512845858,2024-04-05T16:28:32.501+0000
desired-name,eyJpbmRleCI6NTA2OSwidW5pcXVlX2lkIjoiYjc1YjZmODctZGViMy00NDRmLWIyOWUtY2U5MTYxYjJkZjQ5IiwidGl0bGUiOiJUaGUgVmF1bHQ6IEN1cmF0ZWQgJiBSZWZpbmVkIFdlZGRpbmcgSW5zcGlyYXRpb24iLCJkZXM= (truncated),streaming-121d2a86d1ab-pin,shardId-000000000002,49650497534146423513851985742591460669648076180148977698,2024-04-05T16:28:35.572+0000
desired-name,eyJpbmRleCI6MzA4OSwidW5pcXVlX2lkIjoiODhmOTIyN2UtODhkMC00YjFjLWIwYmUtYmNmYzMwMjhiOGUyIiwidGl0bGUiOiJObyBUaXRsZSBEYXRhIEF2YWlsYWJsZSIsImRlc2NyaXB0aW9uIjoiTm8gZGVzY3JpcHRpb24= (truncated),streaming-121d2a86d1ab-pin,shardId-000000000002,49650497534146423513851985748500690075924383860990672930,2024-04-05T16:28:39.223+0000
desired-name,eyJpbmRleCI6NjcxMiwidW5pcXVlX2lkIjoiNWNkOGFlYmQtNjA3ZS00OTVhLWE3YTYtY2VhMGFmMmMwYTJkIiwidGl0bGUiOiJDaHJpc3RtYXMgS2l0Y2hlbiBEZWNvciBJZGVhcyB3aXRoIEpPQU5OIHwgQmxlc3MgVGhpcyA= (truncated),streaming-121d2a86d1ab-pin,shardId-000000000002,49650497534146423513851985754909205845701533390986018850,2024-04-05T16:28:43.497+0000
desired-name,eyJpbmRleCI6OTA5OSwidW5pcXVlX2lkIjoiMzdiOTcwODctMDRmMi00OTM4LTkzOWMtNDlkMGY3NjJjMWI0IiwidGl0bGUiOiJzY2h3YXJ6IHdlacOfZXMgZm90byB0YXR0b28gdm9ybGFnZW4gZnJhdWVuIHZvbiB6d2VpIGc= (truncated),streaming-121d2a86d1ab-pin,shardId-000000000002,49650497534146423513851985759521257847531343898648510498,2024-04-05T16:28:46.802+0000
desired-name,eyJpbmRleCI6MTA2MjUsInVuaXF1ZV9pZCI6ImQzMTg4NWI3LTc0MmEtNGUyYS1iYmI3LWFjNWY5ZDMzNDM0MCIsInRpdGxlIjoiSmFndWFyIFR5cGUgRSIsImRlc2NyaXB0aW9uIjoiMTkzNyBKYWd1YXIgU1MxMDAgLSAyIDE= (truncated),streaming-121d2a86d1ab-pin,shardId-000000000002,49650497534146423513851985765662601011173660381033791522,2024-04-05T16:28:51.017+0000
desired-name,eyJpbmRleCI6OTg3NSwidW5pcXVlX2lkIjoiNzgyZGNiYWQtZmY5MS00MGE2LWJhNjAtMjE2ZWZlMjlhZGI3IiwidGl0bGUiOiJFdXJvcGVhbiBCdWNrZXQgTGlzdDogMzUgVGhpbmdzIE5PVCBUbyBNaXNzIFdoZW4gVHJhdmU= (truncated),streaming-121d2a86d1ab-pin,shardId-000000000002,49650497534146423513851985769960332299903667303272677410,2024-04-05T16:28:54.044+0000


In [None]:
df_geo_stream = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-121d2a86d1ab-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

display(df_geo_stream)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
desired-name,eyJpbmQiOjc1MjgsInRpbWVzdGFtcCI6IjIwMjAtMDgtMjhUMDM6NTI6NDciLCJsYXRpdHVkZSI6LTg5Ljk3ODcsImxvbmdpdHVkZSI6LTE3My4yOTMsImNvdW50cnkiOiJBbGJhbmlhIn0=,streaming-121d2a86d1ab-geo,shardId-000000000002,49650516444464728021467656190808941531509527080580677666,2024-04-05T16:28:20.877+0000
desired-name,eyJpbmQiOjU3MzAsInRpbWVzdGFtcCI6IjIwMjEtMDQtMTlUMTc6Mzc6MDMiLCJsYXRpdHVkZSI6LTc3LjAxNSwibG9uZ2l0dWRlIjotMTAxLjQzNywiY291bnRyeSI6IkNvbG9tYmlhIn0=,streaming-121d2a86d1ab-geo,shardId-000000000002,49650516444464728021467656198067332152475760920394465314,2024-04-05T16:28:25.068+0000
desired-name,eyJpbmQiOjg3MzEsInRpbWVzdGFtcCI6IjIwMjAtMDctMTdUMDQ6Mzk6MDkiLCJsYXRpdHVkZSI6LTgzLjEwNCwibG9uZ2l0dWRlIjotMTcxLjMwMiwiY291bnRyeSI6IkFydWJhIn0=,streaming-121d2a86d1ab-geo,shardId-000000000002,49650516444464728021467656204646306462818573232742858786,2024-04-05T16:28:28.627+0000
desired-name,eyJpbmQiOjQzMTUsInRpbWVzdGFtcCI6IjIwMTktMTItMTVUMDM6NTE6MjgiLCJsYXRpdHVkZSI6LTQ1Ljg1MDgsImxvbmdpdHVkZSI6NjYuMTAwMywiY291bnRyeSI6IkNvdGUgZCdJdm9pcmUifQ==,streaming-121d2a86d1ab-geo,shardId-000000000002,49650516444464728021467656211762043837070280829941317666,2024-04-05T16:28:32.978+0000
desired-name,eyJpbmQiOjU0OTQsInRpbWVzdGFtcCI6IjIwMjEtMDctMjFUMDI6MDI6MzUiLCJsYXRpdHVkZSI6LTgyLjY3NjgsImxvbmdpdHVkZSI6LTEyOS4yMDIsImNvdW50cnkiOiJCdWxnYXJpYSJ9,streaming-121d2a86d1ab-geo,shardId-000000000002,49650516444464728021467656217296506239266053329185144866,2024-04-05T16:28:36.052+0000
desired-name,eyJpbmQiOjI5MjMsInRpbWVzdGFtcCI6IjIwMTktMDktMDhUMjI6NTM6MDkiLCJsYXRpdHVkZSI6LTg0LjYzMDIsImxvbmdpdHVkZSI6LTE2NC41MDcsImNvdW50cnkiOiJDb3RlIGQnSXZvaXJlIn0=,streaming-121d2a86d1ab-geo,shardId-000000000002,49650516444464728021467656224562150415149974944047169570,2024-04-05T16:28:39.680+0000
desired-name,eyJpbmQiOjYwNjMsInRpbWVzdGFtcCI6IjIwMjEtMDctMjBUMDk6MDI6NDciLCJsYXRpdHVkZSI6LTg5LjE3OTcsImxvbmdpdHVkZSI6LTE3NC4wMTUsImNvdW50cnkiOiJBbmd1aWxsYSJ9,streaming-121d2a86d1ab-geo,shardId-000000000002,49650516444464728021467656232352468396746645689451151394,2024-04-05T16:28:43.988+0000
desired-name,eyJpbmQiOjY2MDMsInRpbWVzdGFtcCI6IjIwMjAtMDgtMDlUMjM6MTQ6MjAiLCJsYXRpdHVkZSI6LTc4LjcyMjcsImxvbmdpdHVkZSI6LTc0LjQxNTQsImNvdW50cnkiOiJNYWRhZ2FzY2FyIn0=,streaming-121d2a86d1ab-geo,shardId-000000000002,49650516444464728021467656238157730182536095123829162018,2024-04-05T16:28:47.281+0000
desired-name,eyJpbmQiOjYxNDUsInRpbWVzdGFtcCI6IjIwMTktMTItMDVUMDI6MDk6NDQiLCJsYXRpdHVkZSI6LTY1LjkwNzksImxvbmdpdHVkZSI6LTE0My44NDUsImNvdW50cnkiOiJNb3phbWJpcXVlIn0=,streaming-121d2a86d1ab-geo,shardId-000000000002,49650516444464728021467656245810230620696698143316639778,2024-04-05T16:28:51.503+0000
desired-name,eyJpbmQiOjI5NTksInRpbWVzdGFtcCI6IjIwMTktMDgtMTlUMDg6MjI6MDIiLCJsYXRpdHVkZSI6LTY4LjAwOTUsImxvbmdpdHVkZSI6LTE1Ny4yMjcsImNvdW50cnkiOiJBbmdvbGEifQ==,streaming-121d2a86d1ab-geo,shardId-000000000002,49650516444464728021467656251447451717559714191129968674,2024-04-05T16:28:54.578+0000


In [None]:
df_user_stream = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-121d2a86d1ab-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

display(df_user_stream)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
desired-name,eyJpbmQiOjc0NTMsImZpcnN0X25hbWUiOiJBbHZpbiIsImxhc3RfbmFtZSI6IkFkYW1zIiwiYWdlIjoyMCwiZGF0ZV9qb2luZWQiOiIyMDE2LTAxLTAxVDEzOjUwOjQwIn0=,streaming-121d2a86d1ab-user,shardId-000000000002,49650497554127891211735767848417911818917586435557031970,2024-04-05T16:28:21.389+0000
desired-name,eyJpbmQiOjcxMDIsImZpcnN0X25hbWUiOiJUcmFjeSIsImxhc3RfbmFtZSI6Ildlc3QiLCJhZ2UiOjI0LCJkYXRlX2pvaW5lZCI6IjIwMTYtMTAtMjlUMTU6NDk6NDEifQ==,streaming-121d2a86d1ab-user,shardId-000000000002,49650497554127891211735767856170753100106203607825645602,2024-04-05T16:28:25.543+0000
desired-name,eyJpbmQiOjE5ODcsImZpcnN0X25hbWUiOiJBbXkiLCJsYXN0X25hbWUiOiJBZGFtcyIsImFnZSI6MjAsImRhdGVfam9pbmVkIjoiMjAxNS0xMC0yNFQwNTowNToyOCJ9,streaming-121d2a86d1ab-user,shardId-000000000002,49650497554127891211735767862563552834228362958549811234,2024-04-05T16:28:29.131+0000
desired-name,eyJpbmQiOjc4NCwiZmlyc3RfbmFtZSI6IkFuZ2VsYSIsImxhc3RfbmFtZSI6IkJhdGVzIiwiYWdlIjoyMywiZGF0ZV9qb2luZWQiOiIyMDE1LTEwLTMwVDE1OjA4OjU3In0=,streaming-121d2a86d1ab-user,shardId-000000000002,49650497554127891211735767870744353855560558858664411170,2024-04-05T16:28:33.448+0000
desired-name,eyJpbmQiOjM3NjMsImZpcnN0X25hbWUiOiJUaWZmYW55IiwibGFzdF9uYW1lIjoiS25veCIsImFnZSI6NDUsImRhdGVfam9pbmVkIjoiMjAxNS0xMS0yNVQyMToxNjo0OSJ9,streaming-121d2a86d1ab-user,shardId-000000000002,49650497554127891211735767876433558762667003960990105634,2024-04-05T16:28:36.529+0000
desired-name,eyJpbmQiOjIzMDcsImZpcnN0X25hbWUiOiJLYXJlbiIsImxhc3RfbmFtZSI6IkhhbWlsdG9uIiwiYWdlIjozNCwiZGF0ZV9qb2luZWQiOiIyMDE1LTEyLTEyVDE2OjQ4OjE1In0=,streaming-121d2a86d1ab-user,shardId-000000000002,49650497554127891211735767883178155910297020401553768482,2024-04-05T16:28:40.152+0000
desired-name,eyJpbmQiOjEwMjYxLCJmaXJzdF9uYW1lIjoiTWFyaW8iLCJsYXN0X25hbWUiOiJCcmlnaHQiLCJhZ2UiOjYwLCJkYXRlX2pvaW5lZCI6IjIwMTYtMTAtMjlUMTc6NDQ6MTYifQ==,streaming-121d2a86d1ab-user,shardId-000000000002,49650497554127891211735767891299719566468099472107765794,2024-04-05T16:28:44.555+0000
desired-name,eyJpbmQiOjc1NTQsImZpcnN0X25hbWUiOiJDaGVyeWwiLCJsYXN0X25hbWUiOiJIdWVydGEiLCJhZ2UiOjIwLCJkYXRlX2pvaW5lZCI6IjIwMTctMDQtMTFUMTY6MzU6MzMifQ==,streaming-121d2a86d1ab-user,shardId-000000000002,49650497554127891211735767896830555191205028083827474466,2024-04-05T16:28:47.758+0000
desired-name,eyJpbmQiOjI2MjgsImZpcnN0X25hbWUiOiJKZWZmcmV5IiwibGFzdF9uYW1lIjoiQmFrZXIiLCJhZ2UiOjI0LCJkYXRlX2pvaW5lZCI6IjIwMTctMDMtMjhUMTM6MzI6MjQifQ==,streaming-121d2a86d1ab-user,shardId-000000000002,49650497554127891211735767904551964401083664966273204258,2024-04-05T16:28:51.966+0000
desired-name,eyJpbmQiOjkyLCJmaXJzdF9uYW1lIjoiQW1hbmRhIiwibGFzdF9uYW1lIjoiQWx2YXJleiIsImFnZSI6MjEsImRhdGVfam9pbmVkIjoiMjAxNS0xMC0zMFQwMTozMTowOSJ9,streaming-121d2a86d1ab-user,shardId-000000000002,49650497554127891211735767910310078079908143931557150754,2024-04-05T16:28:55.194+0000


## cleaning the streaming data

####define the streaming data for geo,pin and user

In [None]:
# Define a streaming schema using StructType for geo
geo_streaming_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("country", StringType(), True),
    # Add more fields as needed
])

In [None]:
# Define a streaming schema using StructType for pin
pin_streaming_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True),
    ])
       # Add more fields as needed

In [None]:
# Define a streaming schema using StructType for user
user_streaming_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", StringType(), True),
    ])
    # Add more fields as needed

#### reading the streams

In [None]:
df_geo = df_geo_stream.withColumn("jsonData", from_json(col("data").cast("string"), geo_streaming_schema)).select("jsonData.*")
display(df_geo)

ind,timestamp,latitude,longitude,country
7528,2020-08-28T03:52:47.000+0000,-89.9787,-173.293,Albania
5730,2021-04-19T17:37:03.000+0000,-77.015,-101.437,Colombia
8731,2020-07-17T04:39:09.000+0000,-83.104,-171.302,Aruba
4315,2019-12-15T03:51:28.000+0000,-45.8508,66.1003,Cote d'Ivoire
5494,2021-07-21T02:02:35.000+0000,-82.6768,-129.202,Bulgaria
2923,2019-09-08T22:53:09.000+0000,-84.6302,-164.507,Cote d'Ivoire
6063,2021-07-20T09:02:47.000+0000,-89.1797,-174.015,Anguilla
6603,2020-08-09T23:14:20.000+0000,-78.7227,-74.4154,Madagascar
6145,2019-12-05T02:09:44.000+0000,-65.9079,-143.845,Mozambique
2959,2019-08-19T08:22:02.000+0000,-68.0095,-157.227,Angola


In [None]:
df_user = df_user_stream.withColumn("jsonData", from_json(col("data").cast("string"), user_streaming_schema)).select("jsonData.*")
display(df_user)

ind,first_name,last_name,age,date_joined
7453,Alvin,Adams,20,2016-01-01T13:50:40
7102,Tracy,West,24,2016-10-29T15:49:41
1987,Amy,Adams,20,2015-10-24T05:05:28
784,Angela,Bates,23,2015-10-30T15:08:57
3763,Tiffany,Knox,45,2015-11-25T21:16:49
2307,Karen,Hamilton,34,2015-12-12T16:48:15
10261,Mario,Bright,60,2016-10-29T17:44:16
7554,Cheryl,Huerta,20,2017-04-11T16:35:33
2628,Jeffrey,Baker,24,2017-03-28T13:32:24
92,Amanda,Alvarez,21,2015-10-30T01:31:09


In [None]:
df_pin = df_pin_stream.withColumn("jsonData", from_json(col("data").cast("string"), pin_streaming_schema)).select("jsonData.*")
display(df_pin)

index,unique_id,title,description,poster_name,follower_count,tag_list,is_image_or_video,image_src,downloaded,save_location,category
2863,9bf39437-42a6-4f02-99a0-9a0383d8cd70,25 Super Fun Summer Crafts for Kids - Of Life and Lisa,Keep the kids busy this summer with these easy diy crafts and projects. Creative and…,Of Life & Lisa | Lifestyle Blog,124k,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",image,https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg,1,Local save in /data/diy-and-crafts,diy-and-crafts
8304,5b6d0913-25e4-43ab-839d-85d5516f78a4,The #1 Reason You’re Not His Priority Anymore - Matthew Coast,#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself,Commitment Connection,51k,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",image,https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png,1,Local save in /data/quotes,quotes
1313,44662045-e891-4821-8a19-ebe7eedd371a,Liquid Lash Extensions Mascara,"Instantly create the look of lash extensions with this award-winning, best-selling mascara that won't clump, flake or smudge. Available in 3 shades!",Thrive Causemetics,43k,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",video,https://i.pinimg.com/videos/thumbnails/originals/69/84/e2/6984e20f3e262098fa9c0614c3453254.0000001.jpg,1,Local save in /data/beauty,beauty
10794,c4bd2577-a7bb-4409-bb7a-17d5ed7e1cf1,TireBuyer,Nissan GT-R. Sick.,Ray Uyemura,437,"Lowrider,Old Vintage Cars,Antique Cars,Austin Martin,Nissan Gtr Black,Jaguar,1959 Cadillac,Cadillac Ct6,Old School Cars",image,https://i.pinimg.com/originals/0d/29/9f/0d299f3df020395aa7ce8387f40fbeed.jpg,1,Local save in /data/vehicles,vehicles
5069,b75b6f87-deb3-444f-b29e-ce9161b2df49,The Vault: Curated & Refined Wedding Inspiration,Sacramento California Wedding 2 Chic Events & Design Jodi Yorston Photography Wilson Vineyards Barn Miosa Couture Yellow Barn Vineyard Outdoor Candles DIY,Style Me Pretty,6M,"60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design",image,https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg,1,Local save in /data/event-planning,event-planning
3089,88f9227e-88d0-4b1c-b0be-bcfc3028b8e2,No Title Data Available,No description available Story format,User Info Error,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,0,Local save in /data/diy-and-crafts,diy-and-crafts
6712,5cd8aebd-607e-495a-a7a6-cea0af2c0a2d,Christmas Kitchen Decor Ideas with JOANN | Bless This Nest,I used Christmas kitchen decor from Joann to bring Christmas to life in my space. See how I used their items to create the perfect look.,Bless This Nest Blog | Farmhouse Home Decor | DIY,54k,"Modern Farmhouse Kitchens,Farmhouse Kitchen Decor,Kitchen Redo,Home Decor Kitchen,New Kitchen,Home Kitchens,Diy Home Decor,Rustic Farmhouse,Farmhouse Ideas",image,https://i.pinimg.com/originals/f1/d7/1a/f1d71a2a155cf808cc6e3bc8bcd663d3.jpg,1,Local save in /data/home-decor,home-decor
9099,37b97087-04f2-4938-939c-49d0f762c1b4,schwarz weißes foto tattoo vorlagen frauen von zwei gesichter simple ideen tattoos auf dem fuß inspi,schwarz weißes foto tattoo vorlagen frauen von zwei gesichter simple ideen tattoos auf dem fuß inspiration,Archzine.net,353k,"Twin Tattoos,Dainty Tattoos,Music Tattoos,Family Tattoos,Couple Tattoos,Body Art Tattoos,Girl Tattoos,Pretty Tattoos,Quote Tattoos",image,https://i.pinimg.com/originals/9d/2a/65/9d2a657081c268af852b72613ca96b58.jpg,1,Local save in /data/tattoos,tattoos
10625,d31885b7-742a-4e2a-bbb7-ac5f9d334340,Jaguar Type E,"1937 Jaguar SS100 - 2 1/2 Litre Roadster, one of the most sought after pre-war sports cars",hobbyDB,8k,"Jaguar Type E,Jaguar Xk,Jaguar Cars,Jaguar Roadster,Jaguar Sport,Retro Cars,Vintage Cars,Antique Cars,British Sports Cars",image,https://i.pinimg.com/originals/26/81/a7/2681a71bd0c8f7fd0ab79c455338a49a.jpg,1,Local save in /data/vehicles,vehicles
9875,782dcbad-ff91-40a6-ba60-216efe29adb7,European Bucket List: 35 Things NOT To Miss When Traveling Europe,"35 European bucket list destinations for any traveler heading to Europe. From Cinque Terre, Italy to Iceland - there are so many amazing sites to see in Europe.",Nicki,28k,"Backpacking Europe,Europe Travel Guide,Travel Guides,Travel Packing,Traveling Europe,Travelling,Travel Backpack,Budget Travel,2 Week Europe Itinerary",image,https://i.pinimg.com/originals/71/04/1a/71041ad83ede43d9665741e719c58a86.jpg,1,Local save in /data/travel,travel


##Cleaning operations 

##### cleaning the geo dataframe

In [None]:
#Create a new column coordinates that contains an array based on the latitude and longitude columns
df_geo = df_geo.withColumn("coordinates", array("latitude", "longitude"))

# dropping multiple columns
df_geo_dropped = df_geo.drop("latitude", "longitude")

#Convert the timestamp column from a string to a timestamp data type
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))

#Reorder the DataFrame columns to have the following column order:
df_geo = df_geo.select('ind', 'country', 'coordinates', 'timestamp')
df_geo.display()

ind,country,coordinates,timestamp
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47.000+0000
5730,Colombia,"List(-77.015, -101.437)",2021-04-19T17:37:03.000+0000
8731,Aruba,"List(-83.104, -171.302)",2020-07-17T04:39:09.000+0000
4315,Cote d'Ivoire,"List(-45.8508, 66.1003)",2019-12-15T03:51:28.000+0000
5494,Bulgaria,"List(-82.6768, -129.202)",2021-07-21T02:02:35.000+0000
2923,Cote d'Ivoire,"List(-84.6302, -164.507)",2019-09-08T22:53:09.000+0000
6063,Anguilla,"List(-89.1797, -174.015)",2021-07-20T09:02:47.000+0000
6603,Madagascar,"List(-78.7227, -74.4154)",2020-08-09T23:14:20.000+0000
6145,Mozambique,"List(-65.9079, -143.845)",2019-12-05T02:09:44.000+0000
2959,Angola,"List(-68.0095, -157.227)",2019-08-19T08:22:02.000+0000


##### Cleaning the user dataframe

In [None]:
#Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user = df_user.withColumn("user_name", concat(col("first_name"), col("last_name")))

#Drop the first_name and last_name columns from the DataFrame
df_user_dropped = df_user.drop("first_name", "last_name")

#Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn("date_joined_timestamp", to_timestamp(col("date_joined"), "yyyy-MM-dd'T'HH:mm:ss"))
 
# #Reorder the DataFrame columns to have the following column order:
df_user_reordering = df_user.select('ind', 'user_name', 'age', 'date_joined')
df_user_reordering.display()

ind,user_name,age,date_joined
7453,AlvinAdams,20,2016-01-01T13:50:40
7102,TracyWest,24,2016-10-29T15:49:41
1987,AmyAdams,20,2015-10-24T05:05:28
784,AngelaBates,23,2015-10-30T15:08:57
3763,TiffanyKnox,45,2015-11-25T21:16:49
2307,KarenHamilton,34,2015-12-12T16:48:15
10261,MarioBright,60,2016-10-29T17:44:16
7554,CherylHuerta,20,2017-04-11T16:35:33
2628,JeffreyBaker,24,2017-03-28T13:32:24
92,AmandaAlvarez,21,2015-10-30T01:31:09


##### cleaning the pin dataframe

In [None]:
#replacing empty entries 
df_pin.withColumn("poster_name", when(col("poster_name")=="User Info Error" , None).otherwise(col("poster_name")))

df_pin.withColumn("follower_count", when(col("follower_count")=="User Info Error" , None).otherwise(col("follower_count")))

#Perform the necessary transformations on the follower_count to ensure every entry is a number. Make sure the data type of this column is an int.
#change the data types, use if statements#
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", ""))

df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", ""))

df_pin = df_pin.withColumn("follower_count", col("follower_count").cast("int") * 1000000)

#cleaning the follower_count 
cleaned_df = df_pin.filter(df_pin["follower_count"] >= 0)

#renaming old column 
df_pin = df_pin.withColumnRenamed ("index", "ind") 

#Ensure that each column containing numeric data has a numeric data type#
#use cast , the if statements also#
df_pin = df_pin.na.replace('', None)

df_pin = df_pin.withColumn("index", col("ind").cast("int"))

#Clean the data in the save_location column to include only the save location path
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "local save", ""))


#reorder coloumns 
df_pin = df_pin.select('ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name', 'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category')
df_pin.display()


ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
2863,9bf39437-42a6-4f02-99a0-9a0383d8cd70,25 Super Fun Summer Crafts for Kids - Of Life and Lisa,Keep the kids busy this summer with these easy diy crafts and projects. Creative and…,124000000.0,Of Life & Lisa | Lifestyle Blog,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",image,https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg,Local save in /data/diy-and-crafts,diy-and-crafts
8304,5b6d0913-25e4-43ab-839d-85d5516f78a4,The #1 Reason You’re Not His Priority Anymore - Matthew Coast,#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself,51000000.0,Commitment Connection,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",image,https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png,Local save in /data/quotes,quotes
1313,44662045-e891-4821-8a19-ebe7eedd371a,Liquid Lash Extensions Mascara,"Instantly create the look of lash extensions with this award-winning, best-selling mascara that won't clump, flake or smudge. Available in 3 shades!",43000000.0,Thrive Causemetics,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",video,https://i.pinimg.com/videos/thumbnails/originals/69/84/e2/6984e20f3e262098fa9c0614c3453254.0000001.jpg,Local save in /data/beauty,beauty
10794,c4bd2577-a7bb-4409-bb7a-17d5ed7e1cf1,TireBuyer,Nissan GT-R. Sick.,437000000.0,Ray Uyemura,"Lowrider,Old Vintage Cars,Antique Cars,Austin Martin,Nissan Gtr Black,Jaguar,1959 Cadillac,Cadillac Ct6,Old School Cars",image,https://i.pinimg.com/originals/0d/29/9f/0d299f3df020395aa7ce8387f40fbeed.jpg,Local save in /data/vehicles,vehicles
5069,b75b6f87-deb3-444f-b29e-ce9161b2df49,The Vault: Curated & Refined Wedding Inspiration,Sacramento California Wedding 2 Chic Events & Design Jodi Yorston Photography Wilson Vineyards Barn Miosa Couture Yellow Barn Vineyard Outdoor Candles DIY,6000000.0,Style Me Pretty,"60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design",image,https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg,Local save in /data/event-planning,event-planning
3089,88f9227e-88d0-4b1c-b0be-bcfc3028b8e2,No Title Data Available,No description available Story format,,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,Local save in /data/diy-and-crafts,diy-and-crafts
6712,5cd8aebd-607e-495a-a7a6-cea0af2c0a2d,Christmas Kitchen Decor Ideas with JOANN | Bless This Nest,I used Christmas kitchen decor from Joann to bring Christmas to life in my space. See how I used their items to create the perfect look.,54000000.0,Bless This Nest Blog | Farmhouse Home Decor | DIY,"Modern Farmhouse Kitchens,Farmhouse Kitchen Decor,Kitchen Redo,Home Decor Kitchen,New Kitchen,Home Kitchens,Diy Home Decor,Rustic Farmhouse,Farmhouse Ideas",image,https://i.pinimg.com/originals/f1/d7/1a/f1d71a2a155cf808cc6e3bc8bcd663d3.jpg,Local save in /data/home-decor,home-decor
9099,37b97087-04f2-4938-939c-49d0f762c1b4,schwarz weißes foto tattoo vorlagen frauen von zwei gesichter simple ideen tattoos auf dem fuß inspi,schwarz weißes foto tattoo vorlagen frauen von zwei gesichter simple ideen tattoos auf dem fuß inspiration,353000000.0,Archzine.net,"Twin Tattoos,Dainty Tattoos,Music Tattoos,Family Tattoos,Couple Tattoos,Body Art Tattoos,Girl Tattoos,Pretty Tattoos,Quote Tattoos",image,https://i.pinimg.com/originals/9d/2a/65/9d2a657081c268af852b72613ca96b58.jpg,Local save in /data/tattoos,tattoos
10625,d31885b7-742a-4e2a-bbb7-ac5f9d334340,Jaguar Type E,"1937 Jaguar SS100 - 2 1/2 Litre Roadster, one of the most sought after pre-war sports cars",8000000.0,hobbyDB,"Jaguar Type E,Jaguar Xk,Jaguar Cars,Jaguar Roadster,Jaguar Sport,Retro Cars,Vintage Cars,Antique Cars,British Sports Cars",image,https://i.pinimg.com/originals/26/81/a7/2681a71bd0c8f7fd0ab79c455338a49a.jpg,Local save in /data/vehicles,vehicles
9875,782dcbad-ff91-40a6-ba60-216efe29adb7,European Bucket List: 35 Things NOT To Miss When Traveling Europe,"35 European bucket list destinations for any traveler heading to Europe. From Cinque Terre, Italy to Iceland - there are so many amazing sites to see in Europe.",28000000.0,Nicki,"Backpacking Europe,Europe Travel Guide,Travel Guides,Travel Packing,Traveling Europe,Travelling,Travel Backpack,Budget Travel,2 Week Europe Itinerary",image,https://i.pinimg.com/originals/71/04/1a/71041ad83ede43d9665741e719c58a86.jpg,Local save in /data/travel,travel


## Writing the streams to the delta table 

In [None]:
def write_df_to_deltatable(dataframe, name: str):
    dataframe.writeStream \
        .format("delta") \
        .queryName(f"df_121d2a86d1ab_geo_streaming_query") \
        .outputMode("append") \
        .option("checkpointLocation", f"/tmp/checkpoints/121d2a86d1ab_geo_table/") \
        .table(f"121d2a86d1ab_geo_table")

write_df_to_deltatable(df_geo, "121d2a86d1ab_geo_table")

In [None]:
def write_df_to_deltatable(dataframe, name: str):
    dataframe.writeStream \
        .format("delta") \
        .queryName(f"df_121d2a86d1ab_user_streaming_query") \
        .outputMode("append") \
        .option("checkpointLocation", f"/tmp/checkpoints/121d2a86d1ab_user_table/") \
        .table(f"121d2a86d1ab_user_table")

write_df_to_deltatable(df_user, "121d2a86d1ab_user_table")


In [None]:
def write_df_to_deltatable(dataframe, name: str):
    dataframe.writeStream \
        .format("delta") \
        .queryName(f"df_121d2a86d1ab_pin_streaming_query") \
        .outputMode("append") \
        .option("checkpointLocation", f"/tmp/checkpoints/121d2a86d1ab_pin_table/") \
        .table(f"121d2a86d1ab_pin_table")
write_df_to_deltatable(df_pin, "121d2a86d1ab_pin_table")


###### for debugging purposes

In [None]:

# Use this to remove the file pathway first.

# %fs rm -r dbfs:/tmp/checkpoints/121d2a86d1ab_pin_table

# Replacing each table name accordingly.

#Then run a DROP TABLE SQL statement to drop the tables from the database.

