# Kinesis Data Pipeline
Streaming pipeline to stream data from AWS Kinesis, clean it and then store in Databricks delta tables. 

In [None]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
%sql 
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [None]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType

pin_df = spark.readStream \
.format('kinesis') \
.option('streamName','streaming-0afff2eeb7e3-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
pin_df = pin_df.selectExpr("CAST(data as STRING)")




In [None]:
geo_df = spark.readStream \
.format('kinesis') \
.option('streamName','streaming-0afff2eeb7e3-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
geo_df = geo_df.selectExpr("CAST(data as STRING)")
display(geo_df)

data
"{""ind"":4873,""timestamp"":""2019-06-03 06:29:14"",""latitude"":-58.7041,""longitude"":-161.128,""country"":""Iraq""}"
"{""ind"":910,""timestamp"":""2018-06-22 00:51:33"",""latitude"":-66.1915,""longitude"":-95.9974,""country"":""Cayman Islands""}"
"{""ind"":3221,""timestamp"":""2022-08-30 21:09:21"",""latitude"":-71.6856,""longitude"":-179.126,""country"":""Albania""}"
"{""ind"":3197,""timestamp"":""2021-11-09 19:56:14"",""latitude"":50.3185,""longitude"":-61.8984,""country"":""Azerbaijan""}"
"{""ind"":10571,""timestamp"":""2022-08-07 15:53:04"",""latitude"":-14.5736,""longitude"":21.4795,""country"":""Saint Vincent and the Grenadines""}"
"{""ind"":1264,""timestamp"":""2021-04-09 12:29:33"",""latitude"":3.94678,""longitude"":-124.477,""country"":""French Southern Territories""}"
"{""ind"":9764,""timestamp"":""2021-08-01 17:51:18"",""latitude"":-0.859891,""longitude"":56.9958,""country"":""Cote d'Ivoire""}"
"{""ind"":4785,""timestamp"":""2022-07-28 22:42:04"",""latitude"":37.3013,""longitude"":-58.7017,""country"":""British Virgin Islands""}"
"{""ind"":6046,""timestamp"":""2020-01-05 19:06:07"",""latitude"":30.527,""longitude"":-66.8356,""country"":""South Georgia and the South Sandwich Islands""}"
"{""ind"":9958,""timestamp"":""2019-09-30 17:39:58"",""latitude"":-88.3092,""longitude"":125.397,""country"":""Portugal""}"


In [None]:
user_df = spark.readStream \
.format('kinesis') \
.option('streamName','streaming-0afff2eeb7e3-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
user_df = user_df.selectExpr("CAST(data as STRING)")

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType
pin_json_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("unique_id", StringType(), True),
    StructField("description", StringType(), True),
    StructField("title", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("tag_list", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True)
])

parsed_df = pin_df.select(
    from_json(col("data"), pin_json_schema).alias("parsed_data")
)

structured_pin_df = parsed_df.select(
    col("parsed_data.index").alias("ind"),
    col("parsed_data.title").alias("title"),
    col("parsed_data.description").alias("description"),
    col("parsed_data.follower_count").alias("follower_count"),
    col("parsed_data.image_src").alias("image_src"),
    col("parsed_data.save_location").alias("save_location"),
    col("parsed_data.category").alias("category"),
    col("parsed_data.is_image_or_video").alias("is_image_or_video"),
    col("parsed_data.tag_list").alias("tag_list"),
    col("parsed_data.poster_name").alias("poster_name"),
    col("parsed_data.unique_id").alias("unique_id")

)
display(structured_pin_df)


ind,title,description,follower_count,image_src,save_location,category,is_image_or_video,tag_list,poster_name,unique_id
5628,How to Make Money Investing With Only a Few Dollars,"A comprehensive analysis of VYM - Vanguard High Dividend Yield ETF. Including VYM dividend yield, dividend history, dividend growth & stock holdings.",28k,https://i.pinimg.com/originals/de/56/8a/de568abdccea841991dd06c6ebfdbbfe.png,Local save in /data/finance,finance,image,"Investment Tips,Budget Planer,Investing Money,Stock Investing,Silver Investing,Budgeting Finances,Budgeting Tips,Financial Tips,Financial Peace",Dividends Diversify: Money Matters So Build Wealth & Be Rich,d6680f43-a3cc-4c5a-9170-165adaecbb9f
4873,20 Ideas For Throwing a Legendary Summer Pool Party,"If anyone dares to throw you in the water, #4 might soften the blow.",876k,https://i.pinimg.com/originals/90/79/88/9079885636af5ec5b43fb0e0d3ef3e70.jpg,Local save in /data/event-planning,event-planning,image,"Garden Parties,Outdoor Parties,Summer Parties,Backyard Parties,Summer Events,Pool Party Ideias,Sommer Pool Party,Pool Party Decorations,Wedding Decorations",House Beautiful,92396d76-61a5-4085-bd6b-cdc9dea0c808
910,Make Up For Ever's Nude Lipstick Was Tested on 25 Skin Tones,Make Up For Ever's Rosewood nude lipstick was tested on 25 skin tones and was designed to create a flattering look for everyone. Click here to learn more about the satin-finish…,1M,https://i.pinimg.com/originals/10/10/c1/1010c1144c345e6960c0e4b501286acb.jpg,Local save in /data/beauty,beauty,image,"Makeup Tips,Beauty Makeup,Eye Makeup,Hair Makeup,Hair Beauty,Makeup Brushes,Makeup Ideas,Bride Makeup,Full Makeup",InStyle,878db9bf-f989-4eec-a169-1191a8d2b3f8
3221,Autumn Tree Painting with Cotton Balls,Create this gorgeous autumn tree painting using cotton balls. Kids will love creating this fall craft with all of the beautiful colors of autumn! Includes a branch template to m…,20k,https://i.pinimg.com/originals/18/68/60/186860c65f6357e2fceeab2ab1f86c44.jpg,Local save in /data/diy-and-crafts,diy-and-crafts,image,"Easy Fall Crafts,Fall Crafts For Kids,Projects For Kids,Kids Crafts,Art For Kids,Autumn Art Ideas For Kids,Craft Projects,Craft Ideas,Quick Crafts",Projects with Kids,6943c657-1872-4cf3-b46c-e5b24d4b2928
3197,25 Best Christmas Nativity Crafts for Kids in Your Children’s Ministry,Here are 25 of the best nativity Christmas crafts we found for you on Pinterest and the internet.,19k,https://i.pinimg.com/originals/df/80/10/df801006b36dec981ca0fc11e273882b.jpg,Local save in /data/diy-and-crafts,diy-and-crafts,image,"Christian Christmas Crafts,Christmas Gifts For Parents,Christmas Bible,Christmas Nativity,Christmas Fun,Christmas Craft Religious,Church Christmas Craft,Bible Crafts For Kids,Christmas Crafts For Kids To Make",ChurchLeaders,a61630ee-2d65-478c-b677-952777f1b859
10571,shushpanzer_ru,shushpanzer_ru - the new blog in LiveJournal. There should be new interesting records soon.,272,https://i.pinimg.com/originals/1c/59/72/1c59725fc7946c7b146a81f7203d0536.jpg,Local save in /data/vehicles,vehicles,image,"Army Vehicles,Armored Vehicles,Monster Car,Armored Truck,By Any Means Necessary,Expedition Vehicle,Futuristic Cars,Jeep Truck,Emergency Vehicles",David O'Connor,ea84197e-5935-4121-a358-0b6886b87538
1264,Your Complete Guide to Makeup Brushes and How to Use Them,Let's break this down.,889,https://i.pinimg.com/originals/d5/d8/05/d5d8051c808cca91518df053a4b1cf0a.jpg,Local save in /data/beauty,beauty,image,"All Things Beauty,Beauty Make Up,Diy Beauty,Beauty Hacks,Fashion Beauty,Beauty Box,Luxury Beauty,Girly Things,Make Up Brush",Sammy,529f6246-d1ca-43ad-b181-3f508d3a6575
9764,What to Wear In Alaska: An All-Season Packing List and Guide for Outfits,"Are you planning to travel to Alaska and wondering what to wear? If so, then this guide is for you! From outfits to toiletries, hiking gear, electronics, miscellaneous items, an…",22k,https://i.pinimg.com/originals/d1/c4/f4/d1c4f432c62ff0826646d78700d5cbbd.jpg,Local save in /data/travel,travel,image,"Packing For Alaska,Alaska Travel,Travel Usa,Alaska Trip,Alaska Summer,Alaska Winter,Fairbanks Alaska,Anchorage Alaska,Summer Packing Lists",Jasmine Alley,804c3b50-a10f-4f0b-bf01-6aa5e4bf051c
4785,Special Event Restrooms for any Occasion,Special event restrooms by the company that changed the portable restroom industry. Experience the Difference with Royal Restrooms.,9k,https://i.pinimg.com/originals/02/fe/ee/02feeebf81bca5c32570b7af83be58cf.jpg,Local save in /data/event-planning,event-planning,image,"Event Planning Tips,Event Planning Business,Party Planning,Business Ideas,Event Guide,Catering Business,Business Events,Business Goals,Family Business",RoyalRestrooms,0e53d4b8-f1bc-47f7-a501-de3c22c527e4
6046,Well Said Friend Card Inspired by Home Decor,Today's card is a rustic friend card inspired by the Home Decor pictured below. Key to the card's WOW! factor is using the Well-Said Cling Stamp Set and Well-Written Framelits D…,67k,https://i.pinimg.com/originals/d9/24/91/d9249124e7b2650eedbae0d63f084f07.jpg,Local save in /data/home-decor,home-decor,image,"Farmhouse Frames,Country Farmhouse Decor,Rustic Decor,Farmhouse Style,Farmhouse Shelving,Farmhouse Mantel,Fresh Farmhouse,Modern Farmhouse,Farmhouse Design",Stampin' Pretty,432dd5cc-d712-4d7a-9b0d-2e6b4085ea07


In [None]:
# Remove the Local save in from the save_location column
structured_pin_df = structured_pin_df.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))
# Replace empty entries and entries with no relevant data in each column with Nones
irrelevant_data = ["null", "N/A", "n/a", "none", "None", "User Info Error",  "Image src error.", "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", "No Title Data Available", "No description available", "No description available Story format", "follower_count"]
replace_dict = {c: None for c in irrelevant_data}
structured_pin_df = structured_pin_df.replace(replace_dict, subset=["title", "description", "poster_name", "image_src","tag_list"])

# Perform the necessary transformations on the follower_count to ensure every entry is a number. Make sure the data type of this column is an int
structured_pin_df = structured_pin_df.withColumn('follower_count', regexp_replace('follower_count', 'M', '000000'))
structured_pin_df = structured_pin_df.withColumn('follower_count', regexp_replace('follower_count', 'k', '000'))
structured_pin_df = structured_pin_df.withColumn("follower_count", structured_pin_df["follower_count"].cast("int"))

# Reorder the DataFrame columns to have the following column order
required_order = "ind, unique_id, title, description, follower_count, poster_name, tag_list, is_image_or_video, image_src, save_location, category"
required_order = required_order.split(", ")

structured_pin_df = structured_pin_df.select(*required_order)

In [None]:
# write clean structured pin_df data to a delta table
structured_pin_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0afff2eeb7e3_pin_table")

In [None]:
geo_json_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("country", StringType(), True),
    StructField("latitude", IntegerType(), True),
    StructField("longitude", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
])

parsed_geo_df = geo_df.select(
    from_json(col("data"), geo_json_schema).alias("parsed_data")
)

structured_geo_df = parsed_geo_df.select(
    col("parsed_data.ind").alias("ind"),
    col("parsed_data.country").alias("country"),
    col("parsed_data.latitude").alias("latitude"),
    col("parsed_data.longitude").alias("longitude"),
    col("parsed_data.timestamp").alias("timestamp")
)
display(structured_geo_df)

ind,country,latitude,longitude,timestamp
4873,Iraq,-58.7041,-161.128,2019-06-03 06:29:14
910,Cayman Islands,-66.1915,-95.9974,2018-06-22 00:51:33
3221,Albania,-71.6856,-179.126,2022-08-30 21:09:21
3197,Azerbaijan,50.3185,-61.8984,2021-11-09 19:56:14
10571,Saint Vincent and the Grenadines,-14.5736,21.4795,2022-08-07 15:53:04
1264,French Southern Territories,3.94678,-124.477,2021-04-09 12:29:33
9764,Cote d'Ivoire,-0.859891,56.9958,2021-08-01 17:51:18
4785,British Virgin Islands,37.3013,-58.7017,2022-07-28 22:42:04
6046,South Georgia and the South Sandwich Islands,30.527,-66.8356,2020-01-05 19:06:07
9958,Portugal,-88.3092,125.397,2019-09-30 17:39:58


In [None]:
# Create a new column coordinates that contains an array based on the latitude and longitude columns
structured_geo_df = structured_geo_df.withColumn("coordinates", array("latitude", "longitude"))
structured_geo_df = structured_geo_df.drop("latitude", "longitude")

# Reorder the DataFrame columns
structured_geo_df = structured_geo_df.select("ind", "country", "coordinates", "timestamp")

In [None]:
# write to a new Delta table
structured_geo_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0afff2eeb7e3_geo_table")



In [None]:
user_json_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", TimestampType(), True),
])

parsed_user_df = user_df.select(
    from_json(col("data"), user_json_schema).alias("parsed_data")
)

structured_user_df = parsed_user_df.select(
    col("parsed_data.ind").alias("ind"),
    col("parsed_data.first_name").alias("first_name"),
    col("parsed_data.last_name").alias("last_name"),
    col("parsed_data.age").alias("age"),
    col("parsed_data.date_joined").alias("date_joined")
)
display(structured_user_df)

ind,first_name,last_name,age,date_joined
4873,Beth,Ali,20,2015-12-08 16:44:30
910,Alexandria,Davis,27,2016-02-24 12:13:37
3221,Aaron,Bartlett,21,2015-11-24 02:15:36
3197,Joel,Craig,50,2016-07-01 10:53:41
10571,Cheryl,Matthews,42,2017-06-11 03:32:20
1264,Stephanie,Duffy,51,2017-05-13 01:42:43
9764,Luis,Shea,29,2016-06-17 04:50:42
4785,Dominique,Ford,42,2016-01-04 17:04:43
6046,Jonathan,Chan,33,2016-07-10 12:35:42
9958,Julie,Weber,36,2017-07-04 23:42:54


In [None]:
# add user_name col
structured_user_df = structured_user_df.withColumn("user_name", concat("first_name", lit(" "), "last_name"))
structured_user_df = structured_user_df.drop("first_name", "last_name")

structured_user_df = structured_user_df.select("ind", "user_name", "age", "date_joined")


In [None]:
# write to a new Delta table
structured_user_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0afff2eeb7e3_user_table")