# Step 1 - Read Data from Kenesis into Databricks

##  Retriving the authentication credentials.

In [None]:
from pyspark.sql import *
import urllib

table_path = "dbfs:/user/hive/warehouse/authentication_credentials" 
aws_keys_df = spark.read.format("delta").load(table_path) 

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

## Disable format checking

Disable format checks during the reading of Delta tables. 

In [None]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

## Define the connections to the three Kinesis streams.

In [None]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import *

# pintrest
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-124a514b9149-pin') \
.option('initialPosition','trim_horizon') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
df_pin = df_pin.selectExpr("CAST(data as STRING)")
schema_pin = StructType([StructField("index", StringType(), True), \
 StructField("unique_id", StringType(), True), \
 StructField("title", StringType(), True), \
 StructField("description", StringType(), True), \
 StructField("poster_name", StringType(), True), \
 StructField("follower_count", StringType(), True), \
 StructField("tag_list", StringType(), True), \
 StructField("is_image_or_video", StringType(), True), \
 StructField("image_src", StringType(), True), \
 StructField("save_location", StringType(), True), \
 StructField("category", StringType(), True)])
df_pin = df_pin.select(from_json("data", schema_pin).alias("jsonData"))
df_pin = df_pin.select("jsonData.*")

#geo
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-124a514b9149-geo') \
.option('initialPosition','trim_horizon') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
df_geo = df_geo.selectExpr("CAST(data as STRING)")
schema_geo = StructType([StructField("ind", StringType(), True),\
     StructField("timestamp", StringType(), True), \
     StructField("latitude", StringType(), True), \
     StructField("longitude", StringType(), True), \
     StructField("country", StringType(), True)])
df_geo = df_geo.select(from_json("data", schema_geo).alias("jsonData"))
df_geo = df_geo.select("jsonData.*")

# users
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-124a514b9149-user') \
.option('initialPosition','trim_horizon') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
df_user = df_user.selectExpr("CAST(data as STRING)")
schema_user = StructType([StructField("ind", StringType(), True), \
     StructField("first_name", StringType(), True),
     StructField("last_name", StringType(), True),
     StructField("age", StringType(), True),
     StructField("date_joined", StringType(), True)])
df_user = df_user.select(from_json("data", schema_user).alias("jsonData"))
df_user = df_user.select("jsonData.*")


#  Step 2 - cleanse the three data streams in Databricks.

## cleansing the Pinterest data frame.

* Replace empty entries and entries with no relevant data in each column with `Nones`
* Perform the necessary transformations on the `follower_count` to ensure every entry is a number. Make sure the data type of this column is an `int`.
* Ensure that each column containing numeric data has a numeric data type
* Clean the data in the `save_location` column to include only the save location path
* Rename the `index` column to `ind`.
* Reorder the DataFrame columns to have the following column order:
    * `ind`
    * `unique_id`
    * `title`
    * `description`
    * `follower_count`
    * `poster_name`
    * `tag_list`
    * `is_image_or_video`
    * `image_src`
    * `save_location`
    * `category`

In [None]:
from pyspark.sql.functions import col, when, regexp_extract

#remove rows where the "unique_id" column does not exist
df_pin = df_pin.filter(col("unique_id").isNotNull())

# Rename the `index` column to `ind`.
df_pin = df_pin.withColumnRenamed('index', 'ind')

# Ensure that each column containing numeric data has a numeric data type

## convert follower_count to int
df_pin = df_pin.withColumn(
    "follower_count",
    when(
        col("follower_count").contains('k'),
        (regexp_extract("follower_count", r'(\d+)', 1).cast("int") * 1000)
    ).when(
        col("follower_count").contains('M'),
        (regexp_extract("follower_count", r'(\d+)', 1).cast("int") * 1000000)
    ).otherwise(col("follower_count").cast("int"))
)
##convert ind to int
df_pin = df_pin.withColumn("ind", df_pin["ind"].cast("int"))

# Clean the data in the `save_location` column to include only the save location path
df_pin = df_pin.withColumn("save_location", regexp_extract("save_location", r"/.*", 0))

#re-order columns
df_pin = df_pin[[ 'ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name', 'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category']]


## Clean the `df_geo` DataFrame with the following transformations:

  
* Create a new column `coordinates` that contains an array based on the `latitude` and `longitude` columns
* Drop the `latitude` and `longitude` columns from the DataFrame
* Convert the `timestamp` column from a string to a timestamp data type
* Reorder the DataFrame columns to have the following column order:
    * `ind`
    * `country`
    * `coordinates`
    * `timestamp`

In [None]:
from pyspark.sql.functions import array, col, to_timestamp

# convert ind to int, and convert lattitude and longitude to float
df_geo = df_geo.withColumn('ind', df_geo["ind"].cast('int'))
df_geo = df_geo.withColumn('latitude', df_geo["latitude"].cast('float'))
df_geo = df_geo.withColumn('longitude', df_geo["longitude"].cast('float'))

# Create a new column `coordinates` that contains an array based on the `latitude` and `longitude` columns
if 'latitude' in df_geo.columns and 'longitude' in df_geo.columns:
    df_geo = df_geo.withColumn('coordinates', array(col('latitude'), col('longitude')))

# Drop the `latitude` and `longitude` columns from the DataFrame
df_geo = df_geo.drop('latitude', 'longitude')

# Convert the `timestamp` column from a string to a timestamp data type
df_geo = df_geo.withColumn('timestamp', to_timestamp(col('timestamp')))

#re-order columns
df_geo = df_geo[['ind', 'country', 'coordinates', 'timestamp']]


##  Clean the DataFrame that contains information about users.

* Create a new column `user_name` that concatenates the information found in the `first_name` and `last_name` columns
* Drop the `first_name` and `last_name` columns from the DataFrame
* Convert the `date_joined` column from a string to a timestamp data type
* Reorder the DataFrame columns to have the following column order:
    * `ind`
    * `user_name`
    * `age`
    * `date_joined`


In [None]:
from pyspark.sql.functions import concat, lit

# convert ind and age to int
df_user = df_user.withColumn('ind', col('ind').cast('int'))
df_user = df_user.withColumn('age', col('age').cast('int'))

# Create a new column `user_name` that concatenates the information found in the `first_name` and `last_name` columns
if 'first_name' in df_user.columns and 'last_name' in df_user.columns:
    df_user = df_user.withColumn('user_name', concat(col('first_name'),lit(' '), col('last_name')))

# Drop the `first_name` and `last_name` columns from the DataFrame
df_user = df_user.drop('first_name', 'last_name')

# Convert the `date_joined` column from a string to a timestamp data type
df_user = df_user.withColumn('date_joined', to_timestamp(col('date_joined')))

# re-order columns
df_user = df_user[['ind', 'user_name', 'age', 'date_joined']]


# Step 3 - write the cleansed data to tables in Databricks.

In [None]:
# write cleansed data to tables 

df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/pin_checkpoints/") \
  .table("124a514b9149_pin_table")

df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/geo_checkpoints/") \
  .table("124a514b9149_geo_table")

df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/user_checkpoints/") \
  .table("124a514b9149_user_table")
