In [0]:
'''
AiCore Pinterest Data Pipeline Project
Stream Pinterest data from Kinesis; clean and analyse the data with PySpark.
This code is intended to run in a Databricks notebook.
Author: Kristina Gorkovskaya
Date: 2023-11-10
'''

from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark
import urllib

# Replace with your AWS IAM user id
user_id ='0ec858bf1407'

# Read the CSV file (credentials) to spark dataframe
file_type = "csv"
first_row_is_header = "true"
delimiter = ","
aws_keys_df = spark.read.format(file_type).option("header", first_row_is_header).option("sep", delimiter).load("/FileStore/tables/authentication_credentials.csv")

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']

# Encode the secret key; safe="" means every char will be encoded
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

def read_stream(stream_name_suffix) -> pyspark.sql.dataframe.DataFrame:
  '''Read Kinesis stream.'''

  stream_name = f'streaming-{user_id}-{stream_name_suffix}'
  print(f'Reading stream {stream_name}...')
  df = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', stream_name) \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
  return df


def show_non_numeric_patterns(df: pyspark.sql.dataframe.DataFrame, col: str) -> pyspark.sql.dataframe.DataFrame:
  '''Look for non-numeric patterns in a field that is expected to be numeric.'''
  new_col = col + '_pattern'
  df = df.withColumn(new_col, F.regexp_replace(col, '[0-9]+', '9'))
  df.groupBy(new_col).count().show()
  return df


In [0]:
# Load data
df_geo = read_stream('geo')
df_pin = read_stream('pin')
df_user = read_stream('user')
df_geo = df_geo.selectExpr("CAST(data as STRING)")
df_pin = df_pin.selectExpr("CAST(data as STRING)")
df_user = df_user.selectExpr("CAST(data as STRING)")

In [0]:
#############################################################################################################
# TASK 1: Clean the df_pin DataFrame
#############################################################################################################

# (1) Replace nans and empty strings with None
df_pin = df_pin.replace(float('nan'), None).replace("", None)

# (2) Perform the necessary transformations on the follower_count to ensure every entry is a number.
# Create a multiplier column
# Assumptions: a suffix of "k" in follower_count means a multiplier of 1,000; a suffix of "M" means 10^6
df_pin = df_pin.withColumn('follower_count_multiplier', 
                           F.when(df_pin.follower_count.like('%k'), 1000).
                           when(df_pin.follower_count.like('%M'), 1000000).
                           otherwise(1))

# Parse numeric data from follower_count and apply multiplier to the parsed values
df_pin = df_pin.withColumn('follower_count_numeric',
                           F.when(df_pin.follower_count == 'User Info Error', None).
                           otherwise(F.regexp_replace('follower_count', '[^0-9]+', '')).
                           cast(IntegerType()) * df_pin.follower_count_multiplier)

# Remove intermediate columns
df_pin = df_pin.withColumn('follower_count', df_pin.follower_count_numeric)
df_pin = df_pin.drop('follower_count_numeric', 'follower_count_multiplier')

# (3) Ensure that each column containing numeric data has a numeric data type
numeric_cols = ['downloaded', 'index']
for col in numeric_cols:
    df_pin = df_pin.withColumn(col, df_pin[col].cast(IntegerType()))

# (4) Clean the data in the save_location column to include only the save location path
df_pin = df_pin.withColumn('save_location', F.regexp_replace('save_location', 'Local save in ', ''))

# (5) Rename the index column to ind, and reorder the columns.
cols = [    
    'ind',
    'unique_id',
    'title',
    'description',
    'follower_count',
    'poster_name',
    'tag_list',
    'is_image_or_video',
    'image_src',
    'save_location',
    'category'
    ]
df_pin = df_pin.withColumnRenamed('index', 'ind').select(cols)

In [0]:
#############################################################################################################
# TASK 2: Clean the df_geo (geolocation) DataFrame
#############################################################################################################

# (1) Create a new column coordinates that contains an array based on the latitude and longitude columns
# Start by defining a UDF that takes multiple columns and returns an array.
def make_list(*args):
    return list(args)

udf_make_list = F.udf(make_list, ArrayType(DoubleType()))

# Then apply the UDF to latitude and longitude
df_geo = df_geo.withColumn('coordinates', udf_make_list('latitude', 'longitude'))
df_geo.select('latitude', 'longitude', 'coordinates').show(10, truncate=False)

# (2) Drop the latitude and longitude columns from the DataFrame
df_geo = df_geo.drop('latitude', 'longitude')

# (3) Convert the timestamp column from a string to a timestamp data type
df_geo = df_geo.withColumn('timestamp', F.to_timestamp('timestamp'))

# (4) Reorder columns
df_geo = df_geo.select('ind', 'country', 'coordinates', 'timestamp')

In [0]:
#############################################################################################################
# TASK 3: Clean the df_user DataFrame
#############################################################################################################
# (1) Create a new column user_name that concatenates the information found in the first_name and last_name columns.
# Trim and normalize whitespace.
df_user = df_user.withColumn('user_name', F.regexp_replace(F.trim(F.concat_ws(' ', 'first_name', 'last_name')), '\s+', ' '))

# (2) Drop the first_name and last_name columns from the DataFrame
df_user = df_user.drop('first_name', 'last_name')

# (3) Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn('date_joined', F.to_timestamp('date_joined'))

# (4) Reorder columns.
df_user = df_user.select('ind', 'user_name', 'age', 'date_joined')

In [0]:
#############################################################################################################
# FINAL TASK: Write transformed data to Databricks delta tables. 
#############################################################################################################

df_geo.writeStream \
  .format('delta') \
  .outputMode('append') \
  .table(f'{user_id}_geo_table')

df_pin.writeStream \
  .format('delta') \
  .outputMode('append') \
  .table(f'{user_id}_pin_table')

df_user.writeStream \
  .format('delta') \
  .outputMode('append') \
  .table(f'{user_id}_user_table')
