#### Mounting S3 bucket to Databricks

- Mount creates a link between a workspace and cloud object storage, which enables you to interact <br /> 
with cloud object storage using familiar file paths relative to the Databricks file system.
- To open a new notebook: `New > Notebook`
- Ref: [Databricks notebook](https://dbc-b54c5c54-233d.cloud.databricks.com/?o=1865928197306450#notebook/3155158148749029)

In [None]:
# You should see the CSV files you uploaded earlier is now inside the
# FileStore tables folder.
dbutils.fs.ls('/FileStore/tables')

In [None]:
# PySpark functions.
from pyspark.sql.functions import *
# URL processing.
import urllib

In [None]:
# Read the CSV file to spark dataframe.
aws_keys_df = spark.read.format('csv') \
    .option('header', 'true') \
    .option('sep', ',') \
    .load('/FileStore/tables/authentication_credentials.csv')

aws_keys_df

In [None]:
# Get the AWS access key and secret key from the spark dataframe.
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user') \
                        .select('Access key ID') \
                        .collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user') \
                        .select('Secret access key') \
                        .collect()[0]['Secret access key']

#### Read streaming data from Kinesis
- [Medium: stream-data-from-kinesis-to-databricks-with-pyspark](https://medium.com/road-to-data-engineering/stream-data-from-kinesis-to-databricks-with-pyspark-813c516b4233)
- [Learn how to process Steaming Data with DataBricks and Amazon Kinesis [ hands on Demo ]](https://www.youtube.com/watch?v=2s08mk6vfDk)

In [None]:
PIN_STREAM_NAME = "streaming-0a966c04ad33-pin"

kinesis_df_pin = spark \
                    .readStream \
                    .format('kinesis') \
                    .option('streamName', PIN_STREAM_NAME) \
                    .option('initialPosition', 'earliest') \
                    .option('format', 'json') \
                    .option('awsAccessKey', ACCESS_KEY) \
                    .option('awsSecretKey', SECRET_KEY) \
                    .option('inferSchema', 'true') \
                    .load()

display(kinesis_df_pin)

In [None]:
GEO_STREAM_NAME = "streaming-0a966c04ad33-geo"

kinesis_df_geo = spark \
                    .readStream \
                    .format('kinesis') \
                    .option('streamName', GEO_STREAM_NAME) \
                    .option('initialPosition', 'earliest') \
                    .option('format', 'json') \
                    .option('awsAccessKey', ACCESS_KEY) \
                    .option('awsSecretKey', SECRET_KEY) \
                    .option('inferSchema', 'true') \
                    .load()

display(kinesis_df_geo)

In [None]:
USER_STREAM_NAME = "streaming-0a966c04ad33-user"

kinesis_df_user = spark \
                    .readStream \
                    .format('kinesis') \
                    .option('streamName', USER_STREAM_NAME) \
                    .option('initialPosition', 'earliest') \
                    .option('format', 'json') \
                    .option('awsAccessKey', ACCESS_KEY) \
                    .option('awsSecretKey', SECRET_KEY) \
                    .option('inferSchema', 'true') \
                    .load()

display(kinesis_df_user)

#### Define the schema for the binary data (payload)

In [None]:
from pyspark.sql.types import *


pin_schema = StructType() \
                .add('category', StringType()) \
                .add('description', StringType()) \
                .add('downloaded', LongType()) \
                .add('follower_count', StringType()) \
                .add('image_src', StringType()) \
                .add('index', LongType()) \
                .add('is_image_or_video', StringType()) \
                .add('poster_name', StringType()) \
                .add('save_location', StringType()) \
                .add('tag_list', StringType()) \
                .add('title', StringType()) \
                .add('unique_id', StringType())

In [None]:
geo_timestamp_schema = StructType() \
                        .add('$date', StringType())

geo_schema = StructType() \
                .add('country', StringType()) \
                .add('ind', LongType()) \
                .add('latitude', DoubleType()) \
                .add('longitude', DoubleType()) \
                .add('timestamp', geo_timestamp_schema)

In [None]:
user_date_joined_schema = StructType() \
                            .add('$date', StringType())

user_schema = StructType() \
                .add('age', LongType()) \
                .add('date_joined', user_date_joined_schema) \
                .add('first_name', StringType()) \
                .add('ind', LongType()) \
                .add('last_name', StringType())

#### Extract data from the payload and use transformation to do your analytics

In [None]:
df_pin = kinesis_df_pin \
          .selectExpr('cast (data as STRING) jsonData') \
          .select(from_json('jsonData', pin_schema).alias('pin')) \
          .select('pin.*')

In [None]:
df_geo = kinesis_df_geo \
          .selectExpr('cast (data as STRING) jsonData') \
          .select(from_json('jsonData', geo_schema).alias('geo')) \
          .select('geo.*')

In [None]:
df_user = kinesis_df_user \
          .selectExpr('cast (data as STRING) jsonData') \
          .select(from_json('jsonData', user_schema).alias('user')) \
          .select('user.*')

#### Task 1: Clean the Pinterest post DataFrame

In [None]:
type(df_pin)
df_pin.printSchema()
df_pin.dtypes
transformed_df_pin = df_pin

In [None]:
# Replace empty entries and entries with no relevant data in each column
# with Nones.
# https://www.databricks.com/blog/2017/08/09/apache-sparks-structured-streaming-with-amazon-kinesis-on-databricks.html

from pyspark.sql.functions import col, when


# Define conditions for updating each column.
update_conditions = {
    'description':
        (col('description') == 'No description available Story format', None),
    'follower_count': (col('follower_count') == 'User Info Error', None),
    'image_src': (col('image_src') == 'Image src error.', None),
    'is_image_or_video':
        (~col('is_image_or_video')
         .isin(['image', 'video', 'multi-video(story page format)']), None),
    'poster_name': (col('poster_name') == 'User Info Error', None),
    'tag_list': (col('tag_list') == 'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e', None),
    'title': (col('title') == 'No Title Data Available', None),
}

# Apply conditional transformations to update multiple columns.
for column, condition in update_conditions.items():
    print(condition)
    transformed_df_pin = transformed_df_pin \
                            .withColumn(column, when(condition[0], condition[1])
                            .otherwise(col(column)))

display(transformed_df_pin)

In [None]:
# Drop duplicates.
transformed_df_pin = transformed_df_pin.dropDuplicates()
display(transformed_df_pin)

In [None]:
# Perform the necessary transformations on the follower_count to ensure 
# every entry is a number. Make sure the data type of this column is an
# int.

from pyspark.sql.functions import expr


# Pre-transformation of follower_count.
display(df_pin.select('follower_count').distinct())

transformed_df_pin = transformed_df_pin.withColumn(
    'follower_count',
    expr('CASE WHEN substring(follower_count, -1) = "k" \
               THEN concat(substring(follower_count, 1, \
                    length(follower_count) - 1), "000") \
               WHEN substring(follower_count, -1) = "M" \
               THEN concat(substring(follower_count, 1, \
                    length(follower_count) - 1), "000000") \
               ELSE follower_count \
               END')
)

# Post-transformation of follower_count.
display(transformed_df_pin.select('follower_count').distinct())

In [None]:
# Convert follower_count to int type.
transformed_df_pin = transformed_df_pin \
    .withColumn("follower_count",col('follower_count').cast('int'))
transformed_df_pin.printSchema()

In [None]:
# Ensure columns with numbers are of the numeric type:
# downloaded, follower_count, index.
transformed_df_pin.dtypes

In [None]:
# Clean the data in the save_location column to include only the save 
# location path.
transformed_df_pin = transformed_df_pin.withColumn( \
    'new_save_location', transformed_df_pin.save_location.substr(14, 30))
transformed_df_pin = transformed_df_pin.drop('save_location')
transformed_df_pin = transformed_df_pin \
    .withColumnRenamed('new_save_location', 'save_location')
display(transformed_df_pin)

In [None]:
# Rename column.
transformed_df_pin = transformed_df_pin.withColumnRenamed('index', 'ind')
transformed_df_pin.columns

In [None]:
# Rearrange columns.
transformed_df_pin = transformed_df_pin.select(
    'ind', 'unique_id', 'title', 'description', 'follower_count',
    'poster_name', 'tag_list', 'is_image_or_video', 'image_src', 
    'save_location', 'category')
transformed_df_pin.columns

#### Task 2: Clean the geolocation DataFrame

In [None]:
df_geo.printSchema()
transformed_df_geo = df_geo

In [None]:
# Drop duplicates.
transformed_df_geo = transformed_df_geo.dropDuplicates()
display(transformed_df_geo)

In [None]:
# Create a new column coordinates that contains an array based on the
# latitude and longitude columns.
transformed_df_geo = transformed_df_geo.withColumn(
                        'coordinates', array('latitude', 'longitude'))
display(transformed_df_geo)

In [None]:
# Drop the latitude and longitude columns from the DataFrame.
# OPTIONAL because later on, the columns will be rearranged to exclude 
# the columns in question.
# https://stackoverflow.com/questions/29600673/how-to-delete-columns-in-pyspark-dataframe
# The * is to unpack / destructure the array.
columns_to_drop = ['latitude', 'longitude']
transformed_df_geo = transformed_df_geo.drop(*columns_to_drop)
display(transformed_df_geo)

In [None]:
# Convert the timestamp column from a string to a timestamp data type.
transformed_df_geo = transformed_df_geo.withColumn(
                        'timestamp', to_timestamp('timestamp.$date')) 
display(transformed_df_geo)

In [None]:
# Rearrange columns.
transformed_df_geo = transformed_df_geo.select(
                        'ind', 'country', 'coordinates', 'timestamp')
transformed_df_geo.columns

#### Task 3: Clean the user DataFrame

In [None]:
df_user.printSchema()
transformed_df_user = df_user

In [None]:
# Drop duplicates.
transformed_df_user = transformed_df_user.dropDuplicates()
display(transformed_df_user)

In [None]:
# Create a new column user_name that concatenates the information found 
# in the first_name and last_name columns.
transformed_df_user = transformed_df_user.withColumn(
                        'user_name', concat('first_name', 'last_name'))
display(transformed_df_user)

In [None]:
# Drop the first_name and last_name columns from the DataFrame.
# OPTIONAL because later on, the columns will be rearranged to exclude 
# the columns in question.
columns_to_drop = ['first_name', 'last_name']
transformed_df_user = transformed_df_user.drop(*columns_to_drop)
display(transformed_df_user)

In [None]:
# Convert the date_joined column from a string to a timestamp data type.
transformed_df_user = transformed_df_user.withColumn(
                        'date_joined', to_timestamp('date_joined.$date')) 
display(transformed_df_user)

In [None]:
# Rearrange columns.
transformed_df_user = transformed_df_user.select(
                        'ind', 'user_name', 'age', 'date_joined')
transformed_df_user.columns

#### Task 4: Write the streaming data to Delta Tables

In [None]:
# Mount creates a link between a workspace and cloud object storage, 
# which enables you to interact with cloud object storage using familiar 
# file paths relative to the Databricks file system.

# Encode the secrete key.
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")
# AWS S3 bucket name.
AWS_S3_BUCKET = "user-0a966c04ad33-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/0a966c04ad33-mount"
# Source url.
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive.
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME) # CAN ONLY DO THIS ONCE
# To unmount, run: dbutils.fs.unmount(MOUNT_NAME)

In [None]:
# Write to delta tables.
# https://sparkbyexamples.com/pyspark/pyspark-partitionby-example/#google_vignette
# https://www.youtube.com/watch?v=-OQGEc09xbY

transformed_df_pin \
  .writeStream \
  .partitionBy('category') \
  .format('delta') \
  .outputMode("append") \
  .option('checkpointLocation',
          f'{MOUNT_NAME}/delta/0a966c04ad33_pin_table/_checkpoint') \
  .start(f'{MOUNT_NAME}/delta/0a966c04ad33_pin_table')

In [None]:
transformed_df_geo \
  .writeStream \
  .partitionBy('country') \
  .format('delta') \
  .option('checkpointLocation', 
          f'{MOUNT_NAME}/delta/0a966c04ad33_geo_table/_checkpoint') \
  .start(f'{MOUNT_NAME}/delta/0a966c04ad33_geo_table')

In [None]:
transformed_df_user \
  .writeStream \
  .partitionBy('age') \
  .format('delta') \
  .option('checkpointLocation',
          f'{MOUNT_NAME}/delta/0a966c04ad33_user_table/_checkpoint') \
  .start(f'{MOUNT_NAME}/delta/0a966c04ad33_user_table')