In [0]:

# import pyspark functions
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, FloatType
# import URL processing
import urllib

In [0]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']

# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
# AWS S3 bucket name
AWS_S3_BUCKET = "user-0affec486183-bucket"
# Mount name for the bucket
MOUNT_PATH = "/mnt/mount_path"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive
try:
  dbutils.fs.unmount(MOUNT_PATH)
except:
  print("Exception: Directory is not mounted!")
finally:
  print("Finally: Mount Directory anyway!")
  dbutils.fs.mount(SOURCE_URL, MOUNT_PATH)

In [0]:
def get_kinesis_stream(stream_name: str):
    '''Uses spark.readStream to retrieve Kinesis stream and returns stream as dataframe'''
    dataframe = spark.readStream \
    .format('kinesis') \
    .option('streamName', stream_name) \
    .option('initialPosition','earliest') \ # earliest vs latest
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
    return dataframe

def deserialize_kinesis_stream(stream, schema):
    '''Takes stream dataframe and schema, deserializes data from stream and returns data as dataframe'''
    dataframe = stream \
    .selectExpr("CAST(data as STRING)") \
    .withColumn("data", from_json(col("data"), schema)) \
    .select(col("data.*"))
    return dataframe

def write_Kinesis_stream_df_to_table(dataframe, name: str):
    '''Takes dataframe and name string and writes dataframe to delta table using name in options and table name'''
    dataframe.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", f"/tmp/kinesis/1215be80977f_{name}_table_checkpoints/") \
    .table(f"1215be80977f_{name}_table")

In [0]:
# define schemas for each of the dataframes
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

In [0]:
kstream_pin = get_kinesis_stream('streaming-0affec486183-pin')
# kstream_geo = get_kinesis_stream('streaming-0affec486183-geo')
# kstream_user = get_kinesis_stream('streaming-0affec486183-user')
display(kstream_pin)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
partition-1,eyJpbmRleCI6NzUyOCwidW5pcXVlX2lkIjoiZmJlNTNjNjYtMzQ0Mi00NzczLWIxOWUtZDNlYzZmNTRkZGRmIiwidGl0bGUiOiJObyBUaXRsZSBEYXRhIEF2YWlsYWJsZSIsImRlc2NyaXB0aW9uIjoiTm8gZGVzY3JpcHRpb24= (truncated),streaming-0affec486183-pin,shardId-000000000000,49654776698500171106604173156775567464098671253650407426,2024-08-12T12:21:11.473+0000
partition-1,eyJpbmRleCI6Mjg2MywidW5pcXVlX2lkIjoiOWJmMzk0MzctNDJhNi00ZjAyLTk5YTAtOWEwMzgzZDhjZDcwIiwidGl0bGUiOiIyNSBTdXBlciBGdW4gU3VtbWVyIENyYWZ0cyBmb3IgS2lkcyAtIE9mIExpZmUgYW5kIExpc2E= (truncated),streaming-0affec486183-pin,shardId-000000000000,49654776698500171106604173161714029437224431569764089858,2024-08-12T12:21:13.127+0000
partition-1,eyJpbmRleCI6NTczMCwidW5pcXVlX2lkIjoiMWUxZjBjOGItOWZjZi00NjBiLTkxNTQtYzc3NTgyNzIwNmViIiwidGl0bGUiOiJJc2xhbmQgT2FzaXMgQ291cG9uIE9yZ2FuaXplciIsImRlc2NyaXB0aW9uIjoiRGVzY3JpcHQ= (truncated),streaming-0affec486183-pin,shardId-000000000000,49654776698500171106604173170971983363833261927102939138,2024-08-12T12:21:16.233+0000
partition-1,eyJpbmRleCI6ODMwNCwidW5pcXVlX2lkIjoiNWI2ZDA5MTMtMjVlNC00M2FiLTgzOWQtODVkNTUxNmY3OGE0IiwidGl0bGUiOiJUaGUgIzEgUmVhc29uIFlvdeKAmXJlIE5vdCBIaXMgUHJpb3JpdHkgQW55bW9yZSAtIE1hdHQ= (truncated),streaming-0affec486183-pin,shardId-000000000000,49654776698500171106604173179545685276540212240277569538,2024-08-12T12:21:18.745+0000
partition-1,eyJpbmRleCI6ODczMSwidW5pcXVlX2lkIjoiZWE3NjBmNzEtZmViZi00MDIzLWI1OTItZDE3Mzk2NjU5MDM5IiwidGl0bGUiOiIyMCBLb2kgRmlzaCBUYXR0b29zIEZvciBMdWNreSBNZW4iLCJkZXNjcmlwdGlvbiI6IktvaSA= (truncated),streaming-0affec486183-pin,shardId-000000000000,49654776698500171106604173183333249869392845513351495682,2024-08-12T12:21:19.926+0000
partition-1,eyJpbmRleCI6MTMxMywidW5pcXVlX2lkIjoiNDQ2NjIwNDUtZTg5MS00ODIxLThhMTktZWJlN2VlZGQzNzFhIiwidGl0bGUiOiJMaXF1aWQgTGFzaCBFeHRlbnNpb25zIE1hc2NhcmEiLCJkZXNjcmlwdGlvbiI6Ikluc3RhbnQ= (truncated),streaming-0affec486183-pin,shardId-000000000000,49654776698500171106604173185143011821355945456606117890,2024-08-12T12:21:20.543+0000
partition-1,eyJpbmRleCI6NDMxNSwidW5pcXVlX2lkIjoiMjFiNTliYTktODI5ZC00YzMzLThjMjctNGNkNGM1NmQyNmI4IiwidGl0bGUiOiJQb2RjYXN0cyBmb3IgVGVhY2hlcnMgb3IgUGFyZW50cyBvZiBUZWVuYWdlcnMiLCJkZXNjcmk= (truncated),streaming-0affec486183-pin,shardId-000000000000,49654776698500171106604173192035097918978946587764457474,2024-08-12T12:21:23.197+0000
partition-1,eyJpbmRleCI6MTA3OTQsInVuaXF1ZV9pZCI6ImM0YmQyNTc3LWE3YmItNDQwOS1iYjdhLTE3ZDVlZDdlMWNmMSIsInRpdGxlIjoiVGlyZUJ1eWVyIiwiZGVzY3JpcHRpb24iOiJOaXNzYW4gR1QtUi4gU2ljay4iLCJwb3N0ZXI= (truncated),streaming-0affec486183-pin,shardId-000000000000,49654776698500171106604173194183359100434142699936808962,2024-08-12T12:21:23.874+0000
partition-1,eyJpbmRleCI6NTQ5NCwidW5pcXVlX2lkIjoiOGZiMmFmNjgtNTQzYi00NjM5LTgxMTktZGUzM2QyODcwNmVkIiwidGl0bGUiOiJEYXZlIFJhbXNleSdzIDcgQmFieSBTdGVwczogV2hhdCBBcmUgVGhleSBBbmQgV2lsbCBUaGU= (truncated),streaming-0affec486183-pin,shardId-000000000000,49654776698500171106604173195962897906906876845104300034,2024-08-12T12:21:24.503+0000
partition-1,eyJpbmRleCI6NTA2OSwidW5pcXVlX2lkIjoiYjc1YjZmODctZGViMy00NDRmLWIyOWUtY2U5MTYxYjJkZjQ5IiwidGl0bGUiOiJUaGUgVmF1bHQ6IEN1cmF0ZWQgJiBSZWZpbmVkIFdlZGRpbmcgSW5zcGlyYXRpb24iLCJkZXM= (truncated),streaming-0affec486183-pin,shardId-000000000000,49654776698500171106604173199009390972335742434083340290,2024-08-12T12:21:25.730+0000


In [0]:
# need to be sending live data for this to work:
kstream_geo = get_kinesis_stream('streaming-0affec486183-geo')
display(kstream_geo)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp


In [0]:
# need to be sending live data for this to work:
kstream_user = get_kinesis_stream('streaming-0affec486183-user')
display(kstream_user)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp


In [0]:
df_pin = deserialize_kinesis_stream(kstream_pin, pin_schema)
df_geo = deserialize_kinesis_stream(kstream_geo, geo_schema)
df_user = deserialize_kinesis_stream(kstream_user, user_schema)

In [0]:
# replace empty entries and entries with no relevant data in each column with Nones
# column names and values to change to null
replace_dict = {
    "description": "No description available%",
    "follower_count": "User Info Error",
    "image_src": "Image src error.",
    "poster_name": "User Info Error",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
    "title": "No Title Data Available"
}
# replace wrong values with None
for key, value in replace_dict.items():
    df_pin = df_pin.withColumn(key, when(col(key).like(value), None).otherwise(col(key)))

# format follower_count as number
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))

# cast follower_count to integer type
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast('int'))

# convert save_location column to include only the save location path
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

# rename the index column to ind
df_pin = df_pin.withColumnRenamed("index", "ind")

# reorder the df_pin
new_order_pin = [
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
]
df_pin = df_pin.select(new_order_pin)
     

In [0]:
from pyspark.sql.types import ArrayType, DoubleType

# New coordinates column, array with lat, long
df_geo = df_geo.withColumn("coordinates", array("latitude", "longitude"))

# drop the lat and long cols
cols_to_drop = ("latitude", "longitude")
df_geo = df_geo.drop(*cols_to_drop)

# convert timestamp column to type timestamp
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))

# reorder the df_geo
new_order_geo = [
    "ind",
    "country",
    "coordinates",
    "timestamp",
]
df_geo = df_geo.select(new_order_geo)
     

In [0]:
# create new user_name from first_name and last_name
df_user = df_user.withColumn("user_name", concat("first_name", "last_name"))

# drop first_name and last_name
df_user = df_user.drop('first_name', 'last_name')

# convert date_joined to timestamp
df_user = df_user.withColumn("date_joined", to_timestamp("date_joined"))

# reorder the df_user
new_order_user = [
    "ind",
    "user_name",
    "age",
    "date_joined",
]
df_user = df_user.select(new_order_user)

In [0]:
# insepct df_pin
display(df_pin)

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category


In [0]:
#df_pin.schema
#df_pin.schema.simpleString()
df_pin.printSchema()

In [0]:
display(df_geo)

In [0]:
df_geo.printSchema()

In [0]:
display(df_user)

In [0]:
df_user.printSchema()

In [0]:
print((df_pin.count(), len(df.columns)))

In [0]:
write_stream_df_to_table(df_pin, "0affec486183_pin_table")
write_stream_df_to_table(df_geo, "0affec486183_geo_table")
write_stream_df_to_table(df_user, "0affec486183_user_table")