In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

USER_ID = "0affd9571f39"

In [0]:
%sql
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0affd9571f39-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0affd9571f39-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0affd9571f39-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
from pyspark.sql.functions import col, from_json, expr
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Define schemas for each table
schema_pin = StructType([
    StructField("ind", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("follower_count", IntegerType(), True),
    StructField("poster_name", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

schema_geo = StructType([
    StructField("ind", StringType(), True),
    StructField("country", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("timestamp", TimestampType(), True)
])

schema_user = StructType([
    StructField("ind", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", TimestampType(), True)
])

# Decode and parse the JSON data for each stream

# Pin Data
df_pin_decoded = df_pin.withColumn(
    "decoded_data", expr("CAST(data AS STRING)")
).withColumn(
    "parsed_data", from_json(col("decoded_data"), schema_pin)
).select("parsed_data.*")

# Geo Data
df_geo_decoded = df_geo.withColumn(
    "decoded_data", expr("CAST(data AS STRING)")
).withColumn(
    "parsed_data", from_json(col("decoded_data"), schema_geo)
).select("parsed_data.*")

# User Data
df_user_decoded = df_user.withColumn(
    "decoded_data", expr("CAST(data AS STRING)")
).withColumn(
    "parsed_data", from_json(col("decoded_data"), schema_user)
).select("parsed_data.*")

# Show the restored columns (for debugging or testing in batch mode)
df_pin_decoded.printSchema()
df_geo_decoded.printSchema()
df_user_decoded.printSchema()


root
 |-- ind: string (nullable = true)
 |-- unique_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- follower_count: integer (nullable = true)
 |-- poster_name: string (nullable = true)
 |-- tag_list: string (nullable = true)
 |-- is_image_or_video: string (nullable = true)
 |-- image_src: string (nullable = true)
 |-- save_location: string (nullable = true)
 |-- category: string (nullable = true)

root
 |-- ind: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

root
 |-- ind: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- date_joined: timestamp (nullable = true)



In [0]:
display(df_pin)
display(df_geo)
display(df_user)

data


In [0]:
# Data Cleaning
from pyspark.sql.functions import col, when, regexp_replace, udf
from pyspark.sql.types import IntegerType

# 1. Replace empty entries and entries with no relevant data with None
df_pin_cleaned = df_pin_decoded.replace(["", " ", "null", "N/A"], None)

# 2. Transform `follower_count` to ensure every entry is a number
# Handle cases where follower_count contains strings like "1k" or "1M"
def parse_follower_count(follower_count):
    if follower_count is None:
        return None
    if isinstance(follower_count, str):
        if "k" in follower_count.lower():
            return int(float(follower_count.lower().replace("k", "")) * 1000)
        elif "m" in follower_count.lower():
            return int(float(follower_count.lower().replace("m", "")) * 1000000)
    try:
        return int(follower_count)
    except ValueError:
        return None

parse_follower_count_udf = udf(parse_follower_count, IntegerType())

df_pin_cleaned = df_pin_cleaned.withColumn(
    "follower_count", parse_follower_count_udf(col("follower_count"))
)

# 3. Ensure numeric columns have numeric data types
numeric_columns = ["follower_count"]  # Add other numeric columns if necessary
for column in numeric_columns:
    df_pin_cleaned = df_pin_cleaned.withColumn(column, col(column).cast("int"))

# 4. Clean `save_location` to include only the save location path
# Assuming `save_location` has some unwanted prefixes or suffixes to clean
df_pin_cleaned = df_pin_cleaned.withColumn(
    "save_location", regexp_replace(col("save_location"), r"unwanted_pattern", "")
)

# 5. Rename the `index` column to `ind`
df_pin_cleaned = df_pin_cleaned.withColumnRenamed("index", "ind")

# 6. Reorder columns to the desired order
desired_column_order = [
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category",
]

df_pin_cleaned = df_pin_cleaned.select(*desired_column_order)

from pyspark.sql.functions import array, to_timestamp

# 1. Create a new column `coordinates` that contains an array based on the latitude and longitude columns
df_geo_cleaned = df_geo_decoded.withColumn("coordinates", array("latitude", "longitude"))

# 2. Drop the `latitude` and `longitude` columns
df_geo_cleaned = df_geo_cleaned.drop("latitude", "longitude")

# 3. Convert the `timestamp` column from a string to a timestamp data type
df_geo_cleaned = df_geo_cleaned.withColumn("timestamp", to_timestamp("timestamp"))

# 4. Reorder the DataFrame columns to have the desired column order
desired_column_order = ["ind", "country", "coordinates", "timestamp"]
df_geo_cleaned = df_geo_cleaned.select(*desired_column_order)


from pyspark.sql.functions import concat_ws, to_timestamp

# 1. Create a new column `user_name` by concatenating `first_name` and `last_name`
df_user_cleaned = df_user_decoded.withColumn("user_name", concat_ws(" ", "first_name", "last_name"))

# 2. Drop the `first_name` and `last_name` columns
df_user_cleaned = df_user_cleaned.drop("first_name", "last_name")

# 3. Convert the `date_joined` column from a string to a timestamp data type
df_user_cleaned = df_user_cleaned.withColumn("date_joined", to_timestamp("date_joined"))

# 4. Reorder the DataFrame columns to have the desired column order
desired_column_order = ["ind", "user_name", "age", "date_joined"]
df_user_cleaned = df_user_cleaned.select(*desired_column_order)



In [0]:
%sql
DROP TABLE IF EXISTS pinterest_table;
DROP TABLE IF EXISTS geo_table;
DROP TABLE IF EXISTS user_table;

In [0]:
# Write Pin Data to Delta Table
df_pin_cleaned.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/pin/") \
    .table("pinterest_table")

# Write Geo Data to Delta Table
df_geo_cleaned.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/geo/") \
    .table("geo_table")

# Write User Data to Delta Table
df_user_cleaned.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/user/") \
    .table("user_table")

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5568318b20>