# Load AWS access key and secret key

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType
import urllib

In [None]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimiter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
    .option("header", first_row_is_header)\
    .option("sep", delimiter)\
    .load("/FileStore/tables/authentication_credentials.csv")

In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(F.col("User name")=="databricks-user").select('Access key ID').collect()[0]['Access key ID']

SECRET_KEY = aws_keys_df.where(F.col("User name")=="databricks-user").select("Secret access key").collect()[0]["Secret access key"]

# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# Stream transformations

In [None]:
# define the output and checkpoint paths for the cleaned user data
outputPath = '/mnt/pinterest_data/test_streaming_delta_tables/0e3bbd435bfb_pin_table'
checkpointPath = '/mnt/pinterest_data/test_streaming_delta_tables/checkpoints/pin'

# define partition key
partition_key = "test_pin"

# define aws access variables and a dataframe to read from kinesis stream
awsAccessKeyId = ACCESS_KEY
awsSecretKey = SECRET_KEY
kinesisStreamName = "streaming-0e3bbd435bfb-pin"
kinesisRegion = "us-east-1"
df = (spark.readStream
    .format("kinesis") 
    .option("streamName", kinesisStreamName)
    .option("region", kinesisRegion)
    .option("initialPosition", "LATEST")
    .option("format", "json")
    .option("awsAccessKey", awsAccessKeyId)
    .option("awsSecretKey", awsSecretKey)
    .option("inferSchema", "true")
    .option("minFetchPeriod", "200ms")
    .load())

# schema for the data
schema = StructType([
    StructField("index", LongType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", LongType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

# define a udf to convert numerical abbreviation to string numeric form
@F.udf(returnType=StringType())
def convert_numeric_abb_to_str_numeric_representation(value: str) -> str:
    """
    converts value in numberical abbreviation form to string numeric form

    Args:
        value (str): value in numerical abbreviation form

    Returns:
        str: string numeric form of value
    """
    try:
        if value == None:
            return None
        elif value[-1] == "M":
            num_part = value[:-1]
            return num_part + "000000"
        elif value[-1] == "k":
            num_part = value[:-1]
            return num_part + '000'
        else:
            return value
    except Exception as e:
        return "An exception occured!"

# transformations
transformed_df = (
    # filter data with required shardId
    df.filter(F.col("partitionKey") == partition_key)

        # decode the data column
        .withColumn(
            "decoded_data",
            F.unbase64(
                F.col("data")
            ).cast("string")
        )

        # Use from_json to parse the JSON string in decoded_data and apply the schema
        .withColumn("parsed_data", F.from_json(F.col("decoded_data"), schema))
        
        # select the individual fields from the parsed_data column
        .select(
            F.col("parsed_data.index"),
            F.col("parsed_data.unique_id"),
            F.col("parsed_data.title"),
            F.col("parsed_data.description"),
            F.col("parsed_data.poster_name"),
            F.col("parsed_data.follower_count"),
            F.col("parsed_data.tag_list"),
            F.col("parsed_data.is_image_or_video"),
            F.col("parsed_data.image_src"),
            F.col("parsed_data.downloaded"),
            F.col("parsed_data.save_location"),
            F.col("parsed_data.category"),
        )
)

# Replace empty strings with null
for column in transformed_df.columns:
    transformed_df = transformed_df.withColumn(
        column,
        F.when(
            F.col(column) == "", None
        ).otherwise(F.col(column))
    )

# define irrelevant values for the description column
irrelevant_data = ["No description available Story format", "No description available"]

# other transformations
transformed_df = (
    # change the irrelevant values in description column to null
    transformed_df.withColumn(
        "description",
        F.when(
            (F.col("description") == irrelevant_data[0]) | (F.col("description") == irrelevant_data[1]), None
        ).otherwise(F.col("description"))
    )

    # change 'User Info Error' values to null in 'follower_count' column
    .withColumn(
        "follower_count",
        F.when(F.col("follower_count") == "User Info Error", None).otherwise(F.col("follower_count"))
    )

    # change 'Image src error' values to null in 'image_src' column
    .withColumn(
        "image_src",
        F.when(F.col("image_src") == 'Image src error', None).otherwise(F.col("image_src"))
    )

    # change 'User Info Error' values to null in 'poster_name' column
    .withColumn(
        "poster_name",
        F.when(F.col("poster_name") == 'User Info Error', None).otherwise(F.col("poster_name"))
    )

    # change "N,o,,T,a,g,s,,A,v,a,i,l,a,b,l,e" value to null in 'tag_list'
    .withColumn(
        "tag_list",
        F.when(F.col("tag_list") == "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", None).otherwise(F.col("tag_list"))
    )

    # change "No Title Data Available" value to null in 'title' column
    .withColumn(
        "title",
        F.when(F.col("title") == "No Title Data Available", None).otherwise(F.col("title"))
    )

    # convert 'follower_count' column from numerical abbreviation to string numeric form
    .withColumn("follower_count", convert_numeric_abb_to_str_numeric_representation(F.col("follower_count")))

    # change datatype of 'follower_count' to int
    .withColumn("follower_count", F.col("follower_count").cast("int"))

    # modify 'save_location' column to only contain the save location path
    .withColumn("save_location",
                F.col("save_location").substr(
                    F.locate(substr="/", str="save_location", pos=1),
                    F.length(F.col("save_location"))
                ))
    
    # rename 'index' column to ind
    .withColumnRenamed("index", "ind")
)

# write the stream to a delta table
query = (
    transformed_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpointPath)
    .start(outputPath)
)

# keep the stream running
query.awaitTermination()