In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)


In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

In [None]:
df = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-1209b9ad90a5-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_pin = df.selectExpr("CAST(data as STRING)")
display(df_pin)


In [None]:
df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("1209b9ad90a5_pin_table")

df_pin_cleaned = df_pin.na.fill(value="")

# Perform necessary transformations on follower_count column
df_pin_cleaned = df_pin_cleaned.withColumn("follower_count", df_pin_cleaned["follower_count"].cast("int"))

# Ensure that each column containing numeric data has a numeric data type
numeric_columns = ["follower_count"]
for col in numeric_columns:
    df_pin_cleaned = df_pin_cleaned.withColumn(col, df_pin_cleaned[col].cast("int"))

# Clean the data in the save_location column
df_pin_cleaned = df_pin_cleaned.withColumn("save_location", F.split(df_pin_cleaned["save_location"], "/").getItem(-1))

# Rename the index column to ind
df_pin_cleaned = df_pin_cleaned.withColumnRenamed("index", "ind")

# Reorder the DataFrame columns
desired_order = ["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"]
df_pin_cleaned = df_pin_cleaned.select(desired_order)

pin_table_path = "/delta/1209b9ad90a5_pin_table"

# Write the DataFrame to Delta table
df_pin_cleaned.write.format("delta").mode("append").save(pin_table_path)

In [None]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

In [None]:
df = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-1209b9ad90a5-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_geo = df.selectExpr("CAST(data as STRING)")
display(df_geo)



In [None]:
df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("1209b9ad90a5_table_geo")

df_geo_cleaned = df_geo.withColumn("coordinates", F.array(df_geo["latitude"], df_geo["longitude"]))

# Drop the latitude and longitude columns
df_geo_cleaned = df_geo_cleaned.drop("latitude", "longitude")

# Convert the timestamp column from a string to a timestamp data type
df_geo_cleaned = df_geo_cleaned.withColumn("timestamp", df_geo_cleaned["timestamp"].cast("timestamp"))

# Reorder the DataFrame columns
desired_order = ["ind", "country", "coordinates", "timestamp"]
df_geo_cleaned = df_geo_cleaned.select(desired_order)

geo_table_path = "/delta/1209b9ad90a5_geo_table"

# Write the DataFrame to Delta table
df_geo_cleaned.write.format("delta").mode("append").save(geo_table_path)

In [None]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

In [None]:
df = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-1209b9ad90a5-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_user = df.selectExpr("CAST(data as STRING)")
display(df_user)




In [None]:
df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("1209b9ad90a5_table_user")

# Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user_cleaned = df_user.withColumn("user_name", F.concat_ws(" ", df_user["first_name"], df_user["last_name"]))

# Drop the first_name and last_name columns
df_user_cleaned = df_user_cleaned.drop("first_name", "last_name")

# Convert the date_joined column from a string to a timestamp data type
df_user_cleaned = df_user_cleaned.withColumn("date_joined", df_user_cleaned["date_joined"].cast("timestamp"))

# Reorder the DataFrame columns
desired_order = ["ind", "user_name", "age", "date_joined"]
df_user_cleaned = df_user_cleaned.select(desired_order)

user_table_path = "/delta/1209b9ad90a5_user_table"

# Write the DataFrame to Delta table
df_user_cleaned.write.format("delta").mode("append").save(user_table_path)
