#Important Notes:
- **Ensure that our streams are correctly configured and that data is being sent to these streams before running these commands.**
> This means our `user_posting_emulation_streaming.py` file should be running!
- **The display function will continuously update the Databricks notebook as new data arrives in the stream.**

# Step 1: Load AWS Credentials
Ensure that we load our AWS credentials:

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table containing AWS credentials
delta_table_path = "<your_delta_table_path>"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Extract AWS Access Key and Secret Key
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']

# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

#Step 2: Read Data from Each Stream
We will repeat the reading process for each of the three streams (PIN_STREAM, GEO_STREAM, and USER_STREAM from our `user_posting_emulation_streaming.py`).

In [None]:
%sql
/*Disable format checks during the reading of Delta tables*/
SET spark.databricks.delta.formatCheck.enabled=false

In [None]:
# Read data from the Pinterest stream
pin_df = spark \
  .readStream \
  .format('kinesis') \
  .option('streamName', 'streaming-<your_user_ID>-pin') \
  .option('initialPosition', 'earliest') \
  .option('region', 'us-east-1') \
  .option('awsAccessKey', ACCESS_KEY) \
  .option('awsSecretKey', SECRET_KEY) \
  .load()

# Read data from the Geolocation stream
geo_df = spark \
  .readStream \
  .format('kinesis') \
  .option('streamName', 'streaming-<your_user_ID>-geo') \
  .option('initialPosition', 'earliest') \
  .option('region', 'us-east-1') \
  .option('awsAccessKey', ACCESS_KEY) \
  .option('awsSecretKey', SECRET_KEY) \
  .load()

# Read data from the User stream
user_df = spark \
  .readStream \
  .format('kinesis') \
  .option('streamName', 'streaming-<your_user_ID>-user') \
  .option('initialPosition', 'earliest') \
  .option('region', 'us-east-1') \
  .option('awsAccessKey', ACCESS_KEY) \
  .option('awsSecretKey', SECRET_KEY) \
  .load()


#Step 3: Display the Streaming Data
We can now display the streaming data for each stream using the display function:

In [None]:
# Display the contents of pin_df
display(pin_df)

In [None]:
# Display the contents of geo_df
display(geo_df)

In [None]:
# Display the contents of user_df
display(user_df)

# Step 4: Deserialize the Data Columns
If we want to view the actual data contained in the stream, we can cast the data column to a string for each DataFrame, then view the deserialized data to better understand what needs to be done to clean it:

In [None]:
cast_pin_df = pin_df.selectExpr("CAST(data AS STRING) as jsonData")
cast_geo_df = geo_df.selectExpr("CAST(data AS STRING) as jsonData")
cast_user_df = user_df.selectExpr("CAST(data AS STRING) as jsonData")

In [None]:
display(cast_pin_df)

In [None]:
display(cast_geo_df) 

In [None]:
display(cast_user_df) 

# Step 5: Parse the JSON Strings
The data is currently stored as JSON strings within a single column. We need to parse these JSON strings to convert them into separate columns for easier manipulation.

Here's how to do that:

In [None]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType

# Define the schema for each stream based on the expected structure of the JSON data
geo_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("country", StringType(), True)
])

pin_schema = StructType([
    StructField("index", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

user_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("date_joined", StringType(), True)
])


In [None]:
# Parse the JSON data based on the stream you are checking
parsed_geo_df = cast_geo_df.select(from_json("jsonData", geo_schema).alias("geo_data")).select("geo_data.*")
parsed_pin_df = cast_pin_df.select(from_json("jsonData", pin_schema).alias("pin_data")).select("pin_data.*")
parsed_user_df = cast_user_df.select(from_json("jsonData", user_schema).alias("user_data")).select("user_data.*")

#### Display the parsed dataframes to better understand what transformations need to be made:

In [None]:
display(parsed_pin_df)

In [None]:
display(parsed_geo_df)

In [None]:
display(parsed_user_df)

# Step 6: Transform the Data
Once we've parsed the JSON strings, we need to clean the data. We can copy most of our existing cleaning classes used for the batch processing steps in an prior milestone.

Check to see if any adjustments to cleaning classes are neccessary (I had to remove some methods that were focused on the `_corrupt_record` column that no longer exists as a result of changing my way of sending data to kinesis and reading into databricks).

In [None]:
class PinterestDataCleaner:
    def __init__(self, df: DataFrame):
        self.df = df

    def replace_empty_with_none(self):
        for column in self.df.columns:
            self.df = self.df.withColumn(column, when(col(column) == "", None).otherwise(col(column)))
        return self

    def clean_follower_count(self):
        self.df = self.df.withColumn(
            "follower_count",
            when(col("follower_count").rlike(r'^[\d]+k$'), (regexp_replace(col("follower_count"), "k", "").cast("int") * 1000))
            .when(col("follower_count").rlike(r'^[\d]+M$'), (regexp_replace(col("follower_count"), "M", "").cast("int") * 1000000))
            .otherwise(col("follower_count").cast("int"))
        )
        return self

    def convert_columns_to_int(self, columns):
        for column in columns:
            self.df = self.df.withColumn(column, col(column).cast("int"))
        return self

#    def convert_corrupt_record_to_boolean(self):
#        self.df = self.df.withColumn(
#            "_corrupt_record",
#            when(col("_corrupt_record").isNull(), lit(False)).otherwise(lit(True)).cast("boolean")
#        )
#        return self

    def clean_save_location(self):
        self.df = self.df.withColumn(
            "save_location",
            regexp_replace(col("save_location"), r"https?://[^/]+/", "").cast("string")
        )
        return self

    def rename_index_column(self, old_name: str, new_name: str):
        self.df = self.df.withColumnRenamed(old_name, new_name)
        return self
    
    #def drop_rows_where_corrupt(self):
    #    self.df = self.df.filter(self.df._corrupt_record == False)
    #    return self

    def reorder_columns(self, column_order: list):
        self.df = self.df.select(column_order)
        return self

    def clean_pin_df(self):
        columns_to_convert = ["index", "follower_count", "downloaded"]

        self.replace_empty_with_none()\
            .clean_follower_count()\
            .clean_save_location()\
            .convert_columns_to_int(columns_to_convert)\
            .rename_index_column(old_name="index", new_name="ind")\
            .reorder_columns([
                "ind", "unique_id", "title", "description", "follower_count",
                "poster_name", "tag_list", "is_image_or_video", "image_src",
                "save_location", "category", "downloaded"#, "_corrupt_record"
            ])
        return self.df
    
            # removed these from the clean_pin_df method as they were no longer present in the dataframe when loaded into databricks
            #.drop_rows_where_corrupt()\
            #.convert_corrupt_record_to_boolean()\


In [None]:
class GeoDataCleaner:
    def __init__(self, df: DataFrame):
        self.df = df

    def create_coordinates(self):
        self.df = self.df.withColumn("coordinates", struct(
            col("longitude").cast("double").alias("longitude"),
            col("latitude").cast("double").alias("latitude")
        ))
        return self

    def drop_columns(self, columns):
        self.df = self.df.drop(*columns)
        return self

    def convert_timestamp(self, column_name: str):
        self.df = self.df.withColumn(column_name, to_timestamp(col(column_name)))
        return self

    def reorder_columns(self, column_order: list):
        self.df = self.df.select(column_order)
        return self

    def clean_geo_df(self):
        self.create_coordinates()\
            .drop_columns(["latitude", "longitude"])\
            .convert_timestamp("timestamp")\
            .reorder_columns(["ind", "country", "coordinates", "timestamp"])
        return self.df


In [None]:
class UserDataCleaner:
    def __init__(self, df: DataFrame):
        self.df = df

    def create_user_name(self):
        self.df = self.df.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")).cast("string"))
        return self

    def drop_columns(self, columns):
        self.df = self.df.drop(*columns)
        return self

    def convert_timestamp(self, column_name: str):
        self.df = self.df.withColumn(column_name, to_timestamp(col(column_name)))
        return self

    def convert_columns_to_int(self, columns):
        for column in columns:
            self.df = self.df.withColumn(column, col(column).cast("int"))
        return self

    def reorder_columns(self, column_order: list):
        self.df = self.df.select(column_order)
        return self

    def clean_user_df(self):
        int_columns = ["ind", "age"]

        self.create_user_name()\
            .drop_columns(["first_name", "last_name"])\
            .convert_timestamp("date_joined")\
            .convert_columns_to_int(int_columns)\
            .reorder_columns(["ind", "user_name", "age", "date_joined"])
        return self.df


In [None]:
# Initialize the cleaners with the parsed DataFrames
pin_cleaner = PinterestDataCleaner(parsed_pin_df)
geo_cleaner = GeoDataCleaner(parsed_geo_df)
user_cleaner = UserDataCleaner(parsed_user_df)

# Clean the DataFrames
df_pin_cleaned = pin_cleaner.clean_pin_df()
df_geo_cleaned = geo_cleaner.clean_geo_df()
df_user_cleaned = user_cleaner.clean_user_df()


#### Check the cleaned dataframes to see if you're happy with the transformations that have been made:

In [None]:
display(df_pin_cleaned)

In [None]:
display(df_geo_cleaned)

In [None]:
display(df_user_cleaned)

# Step 7: Write the Streaming Data to Delta Tables



In [None]:
# For df_pin_cleaned writing to <your_user_ID>_pin_table
df_pin_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/mnt/delta/checkpoints/<your_user_ID>_pin_table/") \
  .toTable("<your_user_ID>_pin_table")

# For df_geo_cleaned writing to <your_user_ID>_geo_table
df_geo_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/mnt/delta/checkpoints/<your_user_ID>_geo_table/") \
  .toTable("<your_user_ID>_geo_table")

# For df_user_cleaned writing to <your_user_ID>_user_table
df_user_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/mnt/delta/checkpoints/<your_user_ID>_user_table/") \
  .toTable("<your_user_ID>_user_table")

# Step 8: Testing Delta Table 'writes'
The below steps are completed to confirm the success of writing the streaming data to new Delta Tables & gather various forms of information to gather an idea of the current state of the cleaned streaming data.

In [None]:
# List the Contents of the Checkpoint Directory:
%fs ls /mnt/delta/checkpoints/

In [None]:
%sql
-- Check the contents of the pin table
SELECT * FROM <your_user_ID>_pin_table LIMIT 10;


In [None]:
%sql
-- Check the contents of the geo table
SELECT * FROM <your_user_ID>_geo_table LIMIT 10;

In [None]:
%sql
-- Check the contents of the user table
SELECT * FROM <your_user_ID>_user_table LIMIT 10;

In [None]:
%sql
-- Check the metadata of the pin table
DESCRIBE DETAIL <your_user_ID>_pin_table;

In [None]:
%sql
-- Check the metadata of the geo table
DESCRIBE DETAIL <your_user_ID>_geo_table;

In [None]:
%sql
-- Check the metadata of the user table
DESCRIBE DETAIL <your_user_ID>_user_table;


In [None]:
%sql
-- View the history of operations on the pin table
DESCRIBE HISTORY <your_user_ID>_pin_table;

In [None]:
%sql
-- View the history of operations on the geo table
DESCRIBE HISTORY <your_user_ID>_geo_table;

In [None]:
%sql
-- View the history of operations on the user table
DESCRIBE HISTORY <your_user_ID>_user_table;

In [None]:
%sql
-- Count rows in the pin table
SELECT COUNT(*) FROM <your_user_ID>_pin_table;


In [None]:
%sql
-- Count rows in the geo table
SELECT COUNT(*) FROM <your_user_ID>_geo_table;

In [None]:
%sql
-- Count rows in the user table
SELECT COUNT(*) FROM <your_user_ID>_user_table;