In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
import json 

df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a8597384a69-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a8597384a69-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a8597384a69-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_user = df_user.selectExpr("CAST(data as STRING)")
json_schema = "index INT, first_name STRING, last_name STRING, age INT, date_joined STRING"
df_user = df_user.withColumn("data", from_json(df_user["data"], json_schema))
df_user = df_user.selectExpr("data.*")

df_geo = df_geo.selectExpr("CAST(data as STRING)")
json_schema = "index INT, country STRING, timestamp STRING, latitude STRING, longitude STRING"
df_geo = df_geo.withColumn("data", from_json(df_geo["data"], json_schema))
df_geo = df_geo.selectExpr("data.*")

df_pin = df_pin.selectExpr("CAST(data as STRING)")
json_schema = "index INT, unique_id STRING, title STRING, description STRING, poster_name STRING, follower_count STRING, tag_list STRING, is_image_or_video STRING, image_src STRING, downloaded STRING, save_location STRING, category STRING"
df_pin = df_pin.withColumn("data", from_json(df_pin["data"], json_schema))
df_pin = df_pin.selectExpr("data.*")

display(df_user)


index,first_name,last_name,age,date_joined
8304,Charles,Berry,25,2015-12-28T04:21:39
8731,Andrea,Alexander,21,2015-11-10T09:27:42
1313,Brittany,Jones,32,2016-04-02T03:51:23
4315,Michelle,Prince,36,2015-12-20T16:38:13
5494,Anne,Allen,27,2015-12-16T15:20:05
7528,Abigail,Ali,20,2015-10-24T11:23:51
2863,Dylan,Holmes,32,2016-10-23T14:06:51
5730,Rachel,Davis,36,2015-12-08T20:02:43
8304,Charles,Berry,25,2015-12-28T04:21:39
8731,Andrea,Alexander,21,2015-11-10T09:27:42


In [None]:
from pyspark.sql.functions import col,when
from pyspark.sql.functions import regexp_extract

df_pin = df_pin.withColumn("description", when(col("description").contains("No description available"), "None").otherwise(col("description")))
df_pin = df_pin.withColumn("image_src", when(col("image_src").contains("Image src error"), "None").otherwise(col("image_src")))
df_pin = df_pin.withColumn("follower_count", when(col("follower_count").contains("User Info Error"), "None").otherwise(col("follower_count")))
df_pin = df_pin.withColumn("follower_count", regexp_replace(df_pin["follower_count"], "M", "000000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace(df_pin["follower_count"], "k", "000"))
df_pin = df_pin.withColumn("downloaded",col("downloaded").cast("int"))
df_pin = df_pin.withColumn("follower_count",col("follower_count").cast("int"))
df_pin = df_pin.withColumn("index",col("index").cast("int"))
df_pin = df_pin.withColumnRenamed("index","ind")
df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")
display(df_pin)


ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",0.0,Consuelo Aguirre,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,Local save in /data/finance,finance
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,,,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),,Local save in /data/mens-fashion,mens-fashion
2863,9bf39437-42a6-4f02-99a0-9a0383d8cd70,25 Super Fun Summer Crafts for Kids - Of Life and Lisa,Keep the kids busy this summer with these easy diy crafts and projects. Creative and…,124000.0,Of Life & Lisa | Lifestyle Blog,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",image,https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg,Local save in /data/diy-and-crafts,diy-and-crafts
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",0.0,Consuelo Aguirre,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,Local save in /data/finance,finance
8304,5b6d0913-25e4-43ab-839d-85d5516f78a4,The #1 Reason You’re Not His Priority Anymore - Matthew Coast,#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself,51000.0,Commitment Connection,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",image,https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png,Local save in /data/quotes,quotes
8731,ea760f71-febf-4023-b592-d17396659039,20 Koi Fish Tattoos For Lucky Men,"Koi fish tattoos are a popular choice for men who want to make a statement, thanks to their rich symbolism and bold design.",211000.0,TheTrendSpotter,"Dr Tattoo,Wörter Tattoos,Pisces Tattoos,Tatoo Art,Dream Tattoos,Dope Tattoos,Mini Tattoos,Finger Tattoos,Body Art Tattoos",image,https://i.pinimg.com/originals/8a/0c/0a/8a0c0a7b6236565c519acd41ad1a52c0.jpg,Local save in /data/tattoos,tattoos
1313,44662045-e891-4821-8a19-ebe7eedd371a,Liquid Lash Extensions Mascara,"Instantly create the look of lash extensions with this award-winning, best-selling mascara that won't clump, flake or smudge. Available in 3 shades!",43000.0,Thrive Causemetics,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",video,https://i.pinimg.com/videos/thumbnails/originals/69/84/e2/6984e20f3e262098fa9c0614c3453254.0000001.jpg,Local save in /data/beauty,beauty
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,,,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),,Local save in /data/mens-fashion,mens-fashion
2863,9bf39437-42a6-4f02-99a0-9a0383d8cd70,25 Super Fun Summer Crafts for Kids - Of Life and Lisa,Keep the kids busy this summer with these easy diy crafts and projects. Creative and…,124000.0,Of Life & Lisa | Lifestyle Blog,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",image,https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg,Local save in /data/diy-and-crafts,diy-and-crafts
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",0.0,Consuelo Aguirre,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,Local save in /data/finance,finance


In [None]:
from pyspark.sql.functions import col,when
from pyspark.sql.functions import regexp_extract

# df_geo = df_geo.select("index", "country", array(col("longitude"), col("latitude")).alias("coordinates"), "timestamp")
df_geo = df_geo.withColumn("timestamp", col("timestamp").cast("timestamp"))
df_geo = df_geo.withColumnRenamed("index","ind")

display(df_geo)

ind,country,timestamp,latitude,longitude
1313,Maldives,2018-06-26T02:39:25.000+0000,77.0447,61.9119
4315,Cote d'Ivoire,2019-12-15T03:51:28.000+0000,-45.8508,66.1003
7528,Albania,2020-08-28T03:52:47.000+0000,-89.9787,-173.293
2863,Armenia,2020-04-27T13:34:16.000+0000,-5.34445,-177.924
5730,Colombia,2021-04-19T17:37:03.000+0000,-77.015,-101.437
8304,French Guiana,2019-09-13T04:50:29.000+0000,-28.8852,-164.87
8731,Aruba,2020-07-17T04:39:09.000+0000,-83.104,-171.302
1313,Maldives,2018-06-26T02:39:25.000+0000,77.0447,61.9119
4315,Cote d'Ivoire,2019-12-15T03:51:28.000+0000,-45.8508,66.1003
10794,Cocos (Keeling) Islands,2022-01-01T02:26:50.000+0000,-89.5236,-154.567


In [None]:
df_user = df_user.withColumn("user_name", concat(col("first_name"), lit(" "), col("last_name")))
df_user = df_user.withColumn("date_joined", col("date_joined").cast("timestamp"))
df_user = df_user.withColumnRenamed("index","ind")
df_user = df_user.select("ind", "user_name", "age", "date_joined")
display(df_user)

ind,user_name,age,date_joined
8304,Charles Berry,25,2015-12-28T04:21:39.000+0000
8731,Andrea Alexander,21,2015-11-10T09:27:42.000+0000
1313,Brittany Jones,32,2016-04-02T03:51:23.000+0000
4315,Michelle Prince,36,2015-12-20T16:38:13.000+0000
5494,Anne Allen,27,2015-12-16T15:20:05.000+0000
7528,Abigail Ali,20,2015-10-24T11:23:51.000+0000
2863,Dylan Holmes,32,2016-10-23T14:06:51.000+0000
5730,Rachel Davis,36,2015-12-08T20:02:43.000+0000
8304,Charles Berry,25,2015-12-28T04:21:39.000+0000
8731,Andrea Alexander,21,2015-11-10T09:27:42.000+0000


In [None]:
df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a8597384a69_user_table")

df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a8597384a69_geo_table")

df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a8597384a69_pin_table")