In [0]:
# check the contents in FileStore
dbutils.fs.ls("/FileStore/tables")

#Retrieving Credentials

In [0]:
# pyspark functions
from pyspark.sql.functions import *
from pyspark.sql.types import *
# URL processing
import urllib

In [0]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [0]:
# Notes: the secret access key will be encoded using urllib.parse.quote for security purposes. 
# safe="" means that every character will be encoded.

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

#Read Streaming Data from Kinesis 

In [0]:
# a function that retrieves Kienesis stream and returns a df
def read_stream(stream_name: str):
    df_stream_raw = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName','stream_name') \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
    return df_stream_raw
    
# a function that gets stream dataframe and schema,  
# deserializes data from stream and return a df
def deserializer(stream, schema):
    df_stream_raw = stream \
    .selectExpr("CAST(data as STRING)") \
    .withColumn("data", from_json(col("data"), schema)) \
    .select(col("data.*"))
    return df_stream_raw

# a funcion that writes the cleaned df to delta table
# the parameters are the df names and the topic name for insertion (i.e., pin, geo, user)
def create_delta_table(df, topic_name: str):
    df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
    .table(f"0a5e6ec37a2f_{topic_name}_table")


In [0]:
# schema for pin data
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

# schema for geo data
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])

# schema for user data
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

In [0]:
df_pin_stream_raw = read_stream('streaming-0a5e6ec37a2f-pin')
df_geo_stream_raw = read_stream('streaming-0a5e6ec37a2f-geo')
df_user_stream_raw = read_stream('streaming-0a5e6ec37a2f-user')

display(df_pin_stream_raw)

In [0]:
df_pin_stream_de = deserializer(df_pin_stream_raw, pin_schema)
df_geo_stream_de = deserializer(df_geo_stream_raw, geo_schema)
df_user_stream_de = deserializer(df_user_stream_raw, user_schema)


# Data Cleaning

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Drop duplicates
df_pin= df_pin_stream_de.dropDuplicates()

# Handle empty entires and entries with error
# Replace them with "None"
df_pin_ = df_pin.replace(['', ' ', 'NULL', 'null'], [None] * 4)
df_pin = df_pin.withColumn("description", when(col("description") == "No description available Story format", None).otherwise(col("description")))
df_pin = df_pin.withColumn('follower_count', when(col('follower_count') == 'User Info Error', None).otherwise(col("follower_count")))
df_pin = df_pin.withColumn("image_src", when(col("image_src") == "Image src error.", None).otherwise(col("image_src")))
df_pin = df_pin.withColumn("poster_name", when(col("poster_name") == "User Info Error", None).otherwise(col("poster_name")))
df_pin = df_pin.withColumn("tag_list", when(col("tag_list") == "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", None).otherwise(col("tag_list")))
df_pin = df_pin_.withColumn("title", when(col("title") == "No Title Data Available", None).otherwise(col("title")))

# Transform follower_count to ensure every entry is a number
# remove non-numeric entries, such that data type is "int"
df_pin = df_pin.withColumn("follower_count", regexp_replace(col("follower_count"), "[^0-9]", ""))

# Clean the data in the save_location column to include only the save location path
df_pin = df_pin.withColumn("save_location", regexp_replace(col("save_location"), "Local save in ", ""))

# Rename index to ind (to match the other dfs)
df_pin = df_pin.withColumnRenamed("index", "ind")

# Reorder df columns
df_pin = df_pin.select(["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"])

display(df_pin)


In [0]:
# Drop duplicates
df_geo = df_geo_stream_de.dropDuplicates()

# Create a new column coordinates that contains an array based on the latitude and longitude columns
df_geo = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

# Drop the latitude and longitude columns from the DataFrame
df_geo = df_geo.drop("latitude", "longitude")

# Convert the timestamp column from a string to a timestamp data type
df_geo = df_geo.withColumn("timestamp", col("timestamp").cast(TimestampType()))

# Reorder the DataFrame columns
df_geo = df_geo.select(["ind", "country", "coordinates", "timestamp"])

display(df_geo)


ind,country,coordinates,timestamp
9812,Benin,"List(-44.6251, 72.8949)",2021-11-20T00:08:55.000+0000
7068,Aruba,"List(-74.1238, -64.0839)",2017-10-27T06:07:40.000+0000
5561,Bangladesh,"List(-80.6398, -57.7465)",2022-06-21T20:46:51.000+0000
6521,Australia,"List(-82.0198, -155.211)",2020-05-27T11:27:08.000+0000
8887,Botswana,"List(-28.0137, -160.708)",2021-09-19T05:27:43.000+0000
771,Montserrat,"List(-29.1712, -107.111)",2018-06-21T08:42:57.000+0000
7586,Andorra,"List(-84.7363, -179.087)",2021-02-07T22:17:31.000+0000
4530,Angola,"List(3.30986, 6.98283)",2021-09-10T22:13:57.000+0000
5763,Brunei Darussalam,"List(-5.58912, -18.8417)",2021-10-10T09:15:43.000+0000
10179,Algeria,"List(-74.7055, -177.87)",2019-11-06T05:51:25.000+0000


In [0]:
# Drop duplicates
df_user = df_user_stream_de.dropDuplicates()

# Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user = df_user.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))

# Drop the first_name and last_name columns from the DataFrame
df_user = df_user.drop("first_name", "last_name")

# Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn("date_joined", col("date_joined").cast(TimestampType()))

# Reorder the DataFrame columns
df_user = df_user.select(["ind", "user_name", "age", "date_joined"])

display(df_user)

ind,user_name,age,date_joined
2668,April Brown,20,2016-04-10T02:39:57.000+0000
6797,Janet Shaw,27,2016-11-02T03:54:55.000+0000
8204,David Davis,38,2016-11-20T06:55:49.000+0000
5293,David Taylor,36,2016-06-03T23:53:50.000+0000
4561,Madeline Brown,43,2016-11-11T15:53:15.000+0000
6957,Brittany Brown,37,2016-06-25T14:36:22.000+0000
7446,Melissa Weiss,32,2017-09-19T15:10:12.000+0000
926,Thomas Stone,21,2016-07-18T18:19:31.000+0000
552,Regina Morales,20,2017-01-17T01:49:27.000+0000
6615,Barry West,44,2016-08-13T21:13:14.000+0000


# Create Delta Table

In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)
create_delta_table(df_pin, "pin")


In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)
create_delta_table(df_geo, "geo")


In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)
create_delta_table(df_user, "user")
