In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

In [0]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false


key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-1279c94681db-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-1279c94681db-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-1279c94681db-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
# Define the schema based on the JSON structure
schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

# Apply from_json to parse the JSON data into structured columns
df_transformed_pin = df_pin.select(from_json(col("data"), schema).alias("json_data"))

# Select the individual columns from the parsed JSON data
df_pin = df_transformed_pin.select(
    "json_data.index",
    "json_data.unique_id",
    "json_data.title",
    "json_data.description",
    "json_data.poster_name",
    "json_data.follower_count",
    "json_data.tag_list",
    "json_data.is_image_or_video",
    "json_data.image_src",
    "json_data.downloaded",
    "json_data.save_location",
    "json_data.category"
)

In [0]:
# Cleaning df_pin

cleaned_df_pin = df_pin.replace({'User Info Error': None})
cleaned_df_pin = cleaned_df_pin.replace({'No description available Story format': None}, subset=['description'])

# Clean the data and convert to integer
cleaned_df_pin = cleaned_df_pin.withColumn("follower_count_numeric",
                   when(col("follower_count").rlike("\\d+M"), regexp_extract(col("follower_count"), r"(\d+)", 1).cast("int") * 1000000)
                   .when(col("follower_count").rlike("\\d+k"), regexp_extract(col("follower_count"), r"(\d+)", 1).cast("int") * 1000)
                   .when(col("follower_count").rlike("\\d+"), col("follower_count").cast("int"))
                   .otherwise(0)  # Set non-numeric values to 0 or any default value
                   )
cleaned_df_pin = cleaned_df_pin.drop("follower_count")
cleaned_df_pin = cleaned_df_pin.withColumnRenamed("follower_count_numeric", "follower_count")

# Converting index and downloaded columns to integer data types
cleaned_df_pin = cleaned_df_pin.withColumn("index", cleaned_df_pin["index"].cast("integer"))
cleaned_df_pin = cleaned_df_pin.withColumn("downloaded", cleaned_df_pin["downloaded"].cast("integer"))

# Removing "Local save in" from save_location column to give the save path only
cleaned_df_pin = cleaned_df_pin.withColumn("save_location", regexp_replace(("save_location"), "Local save in", ""))

# Renaming index column to ind
cleaned_df_pin = cleaned_df_pin.withColumnRenamed("index", "ind")

# Reordering the table columns
cleaned_df_pin = cleaned_df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

In [0]:
df_geo = df_geo.selectExpr("CAST(data as STRING)")
display(df_geo)

data
"{""ind"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"
"{""ind"":2863,""timestamp"":""2020-04-27T13:34:16"",""latitude"":-5.34445,""longitude"":-177.924,""country"":""Armenia""}"
"{""ind"":5730,""timestamp"":""2021-04-19T17:37:03"",""latitude"":-77.015,""longitude"":-101.437,""country"":""Colombia""}"
"{""ind"":8304,""timestamp"":""2019-09-13T04:50:29"",""latitude"":-28.8852,""longitude"":-164.87,""country"":""French Guiana""}"
"{""ind"":8731,""timestamp"":""2020-07-17T04:39:09"",""latitude"":-83.104,""longitude"":-171.302,""country"":""Aruba""}"
"{""ind"":1313,""timestamp"":""2018-06-26T02:39:25"",""latitude"":77.0447,""longitude"":61.9119,""country"":""Maldives""}"
"{""ind"":4315,""timestamp"":""2019-12-15T03:51:28"",""latitude"":-45.8508,""longitude"":66.1003,""country"":""Cote d'Ivoire""}"
"{""ind"":10794,""timestamp"":""2022-01-01T02:26:50"",""latitude"":-89.5236,""longitude"":-154.567,""country"":""Cocos (Keeling) Islands""}"
"{""ind"":5494,""timestamp"":""2021-07-21T02:02:35"",""latitude"":-82.6768,""longitude"":-129.202,""country"":""Bulgaria""}"
"{""ind"":5069,""timestamp"":""2021-03-20T09:32:44"",""latitude"":-63.0063,""longitude"":-157.474,""country"":""Azerbaijan""}"


In [0]:
# Define the schema based on the JSON structure
schema_geo = StructType([
    StructField("ind", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("country", StringType(), True)
])

# Apply from_json to parse the JSON data into structured columns
df_geo = df_geo.select(from_json(col("data"), schema_geo).alias("json_data"))

# Select the individual columns from the parsed JSON data and rename the DataFrame as df_geo
df_geo = df_geo.select(
    "json_data.ind",
    "json_data.timestamp",
    "json_data.latitude",
    "json_data.longitude",
    "json_data.country"
)

In [0]:
# Adding a new column "coordinates" by array of latitude and longitude columns
cleaned_df_geo = df_geo.withColumn("coordinates", array("latitude", "longitude"))

# Removing latitude and longitude columns
cleaned_df_geo = cleaned_df_geo.drop("longitude", "latitude")

# Converts timestamp column from sting to timestamp data type
cleaned_df_geo = cleaned_df_geo.withColumn("timestamp", to_timestamp("timestamp"))

# Reorders columns
cleaned_df_geo = cleaned_df_geo.select("ind", "country", "coordinates", "timestamp")

In [0]:
df_user = df_user.selectExpr("CAST(data as STRING)")
display(df_user)

data
"{""ind"":7528,""first_name"":""Abigail"",""last_name"":""Ali"",""age"":20,""date_joined"":""2015-10-24T11:23:51""}"
"{""ind"":2863,""first_name"":""Dylan"",""last_name"":""Holmes"",""age"":32,""date_joined"":""2016-10-23T14:06:51""}"
"{""ind"":5730,""first_name"":""Rachel"",""last_name"":""Davis"",""age"":36,""date_joined"":""2015-12-08T20:02:43""}"
"{""ind"":8304,""first_name"":""Charles"",""last_name"":""Berry"",""age"":25,""date_joined"":""2015-12-28T04:21:39""}"
"{""ind"":8731,""first_name"":""Andrea"",""last_name"":""Alexander"",""age"":21,""date_joined"":""2015-11-10T09:27:42""}"
"{""ind"":1313,""first_name"":""Brittany"",""last_name"":""Jones"",""age"":32,""date_joined"":""2016-04-02T03:51:23""}"
"{""ind"":4315,""first_name"":""Michelle"",""last_name"":""Prince"",""age"":36,""date_joined"":""2015-12-20T16:38:13""}"
"{""ind"":10794,""first_name"":""Thomas"",""last_name"":""Turner"",""age"":34,""date_joined"":""2016-12-22T00:02:02""}"
"{""ind"":5494,""first_name"":""Anne"",""last_name"":""Allen"",""age"":27,""date_joined"":""2015-12-16T15:20:05""}"
"{""ind"":5069,""first_name"":""Amanda"",""last_name"":""Ball"",""age"":25,""date_joined"":""2016-01-13T17:36:30""}"


In [0]:
# Define the schema based on the JSON structure
schema_user = StructType([
    StructField("ind", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", TimestampType(), True)
])

# Apply from_json to parse the JSON data into structured columns
df_user = df_user.select(from_json(col("data"), schema_user).alias("json_data"))

# Select the individual columns from the parsed JSON data and rename the DataFrame as df_user
df_user = df_user.select(
    "json_data.ind",
    "json_data.first_name",
    "json_data.last_name",
    "json_data.age",
    "json_data.date_joined"
)

In [0]:
# Create a new column called "user_name" adding "first_name" and "last_name"
cleaned_df_user = df_user.withColumn("user_name", array("first_name", "last_name"))

# Delete columns first_name and last_name
cleaned_df_user = cleaned_df_user.drop("first_name", "last_name")

# Convert date_joined column from string to timestamp
cleaned_df_user = cleaned_df_user.withColumn("date_joined", to_timestamp("date_joined"))
cleaned_df_user = cleaned_df_user.withColumn("ind", cleaned_df_user["ind"].cast("integer"))
cleaned_df_user = cleaned_df_user.withColumn("age", cleaned_df_user["age"].cast("integer"))
cleaned_df_user = cleaned_df_user.select("ind", "user_name", "age", "date_joined")

In [0]:
cleaned_df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("1279c94681db_pin_table")

cleaned_df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("1279c94681db_geo_table")

cleaned_df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("1279c94681db_user_table")

