## Connect to AWS and mount S3 drive to Databricks

In [None]:
# Databricks notebook source
# pyspark functions 
from pyspark.sql.functions import *
# pyspark window 
from pyspark.sql.window import Window
# URL processing
import urllib

# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# AWS S3 bucket name
AWS_S3_BUCKET = "user-124df56aef51-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/pinterest-bucket"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive in databricks
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

## Load batch-processed data from S3 bucket partitions

In [None]:
# File location and type
# Asterisk(*) indicates reading all the content of the specified file that have .json extension
file_location_pin = "/mnt/pinterest-bucket/topics/124df56aef51.pin/partition=0/*.json" 
file_location_geo = "/mnt/pinterest-bucket/topics/124df56aef51.geo/partition=0/*.json" 
file_location_user = "/mnt/pinterest-bucket/topics/124df56aef51.user/partition=0/*.json" 
file_type = "json"

# Ask Spark to infer the schema
infer_schema = "true"

# Read in JSONs from mounted S3 bucket
df_pin = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location_pin)

df_geo = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location_geo)

df_user = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location_user)

# Display Spark dataframe to check its content
display(df_pin)
display(df_geo)
display(df_user)

## Clean dataframe for Pin data

In [None]:
# Gets multiplier for follower count (k or M)
def get_follower_count_multiplier(df):
    return df.withColumn("multiplier",\
    "1" * \
    regexp_replace(\
        regexp_replace(\
            regexp_replace(col("follower_count"), "[0123456789]", ""),\
            "[k]", "1000"),\
        "[M]", "1000000")\
    )\
    .na.fill(value=1,subset=["multiplier"])

# Converts follower count in k or M to raw number
def follower_count_to_int(df):
    return df.withColumn("follower_count", regexp_replace(col("follower_count"), "[A-Za-z]", "").cast("int") * col("multiplier"))

# Cleans pin dataframe
df_pin = df_pin.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in df_pin.columns])\
    .withColumn("description", when(col("description")=="No description available Story format" ,None).otherwise(col("description")))\
    .withColumn("follower_count", when(col("follower_count")=="User Info Error" ,None).otherwise(col("follower_count")))\
    .transform(get_follower_count_multiplier)\
    .transform(follower_count_to_int)\
    .drop("multiplier")\
    .withColumn("downloaded", col("downloaded").cast("int"))\
    .withColumn("index", col("index").cast("int"))\
    .withColumnRenamed("index", "ind")\
    .withColumn("save_location", regexp_replace(col("save_location"), "Local save in ", ""))\
    .withColumn("image_src", when(col("image_src")=="Image src error." ,None).otherwise(col("image_src")))\
    .withColumn("tag_list", when(col("tag_list")=="N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e" ,None).otherwise(col("tag_list")))\
    .withColumn("title", when(col("title")=="No Title Data Available" ,None).otherwise(col("title")))

# Reorder pin dataframe columns
df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

# Display updated Spark dataframe
display(df_pin)

## Clean dataframe for Geo data

In [None]:
# Cleans geographical dataframe
df_geo = df_geo.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in df_geo.columns])\
    .withColumn("coordinates",array(col("latitude"), col("longitude")))\
    .drop("latitude")\
    .drop("longitude")\
    .withColumn("timestamp", to_timestamp("timestamp", 'MM/dd/yyyy, HH:mm:ss'))\
    .withColumn("ind", col("ind").cast("int"))

# Reorder geographical dataframe columns
df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")

# Display updated Spark dataframe
display(df_geo)

## Clean dataframe for User data

In [None]:
# Cleans user info dataframe
df_user = df_user.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in df_user.columns])\
    .withColumn("user_name",concat_ws(" ", col("first_name"), col("last_name")))\
    .drop("first_name")\
    .drop("last_name")\
    .withColumn("date_joined", to_timestamp("date_joined", 'MM/dd/yyyy, HH:mm:ss'))\
    .withColumn("ind", col("ind").cast("int"))\
    .withColumn("age", col("age").cast("int"))

# Reorder user dataframe columns
df_user = df_user.select("ind", "user_name", "age", "date_joined")

# Display updated Spark dataframe
display(df_user)

## Utility functions - run this before querying data below

In [None]:
# Split user age into age categories
def get_age_category(age):
    if age < 25:
        return "18-24"
    elif age < 36:
        return "25-35"
    elif age < 51:
        return "36-50"
    elif age >= 51:
        return "50+"
    else:
        return None

# Create user defined function from get_age_category
# https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.udf.html    
get_age_category_udf = udf(get_age_category)   

## Data Querying

1. The most popular category in each country

In [None]:
df_most_popular_category = df_geo.select(col("ind"), col("country"))\
    .join(df_pin.select(col("ind"), col("category")), df_geo["ind"] == df_pin["ind"])\
    .drop("ind")\
    .groupBy("country", "category")\
    .agg(count('category').alias("category_count"))

# Display table ordered by country in alphabetical order
df_most_popular_category.select("country", "category", "category_count")\
    .groupBy("country")\
    .agg(max("category").alias("category"),\
        max("category_count").alias("category_count"))\
    .orderBy(col("country").asc()).show()

2. The most popular category each year

In [None]:
df_most_popular_category_by_year = df_geo.select(col("ind"), year("timestamp").alias("post_year"))\
    .join(df_pin.select(col("ind"), col("category")), df_geo["ind"] == df_pin["ind"])\
    .drop("ind")\
    .groupBy("post_year", "category")\
    .agg(count('category').alias("category_count")).orderBy(col("category").asc()).show()

3. The user with the most followers in each country

In [None]:
df_most_followed_user_by_country = df_geo.select(col("ind"), col("country"))\
    .join(df_pin.select(col("ind"), col("poster_name"), col("follower_count")), df_geo["ind"] == df_pin["ind"])\
    .drop("ind")\
    .groupBy("country", "poster_name")\
    .agg(max("follower_count").alias("follower_count"))\
    .orderBy(col("country").asc())

# Display table with most followed user in each country
display(df_most_followed_user_by_country)

# Display data for only the #1 most followed user globally
df_most_followers_country = df_most_followed_user_by_country.orderBy(col("follower_count").desc()).limit(1).show()


4. The most popular category with each age group

In [None]:

# Get order of popularity of categories by age
df_most_popular_category_by_age = df_user.select(col("ind"), col("age"))\
    .join(df_pin.select(col("ind"), col("category")), df_user["ind"] == df_pin["ind"])\
    .drop("ind")\
    .withColumn("age_group", get_age_category_udf(col("age")))\
    .drop("age")\
    .groupBy("age_group", "category")\
    .agg(count('category').alias("category_count"))\
    .orderBy(col("category").asc())

display(df_most_popular_category_by_age)

# Get only the top categories by age group
df_most_popular_category_by_age.select("age_group", "category", "category_count")\
    .groupBy("age_group")\
    .agg(max("category_count").alias("category_count"))\
    .join(df_most_popular_category_by_age, on=['age_group','category_count'], how='inner')\
    .orderBy(col("age_group").asc()).show()

5. The median follower count with each age group

In [None]:
df_median_follower_count_by_age = df_user.select(col("ind"), col("age"))\
    .join(df_pin.select(col("ind"), col("follower_count")), df_user["ind"] == df_pin["ind"])\
    .drop("ind")\
    .withColumn("age_group", get_age_category_udf(col("age")))\
    .drop("age")\
    .groupBy("age_group")\
    .agg(percentile_approx("follower_count", 0.5).alias("median_follower_count"))\
    .orderBy(col("age_group").asc())

display(df_median_follower_count_by_age)

6. The number of users joining each year

In [None]:
df_users_joined_by_year = df_user.select(col("ind"), year("date_joined").alias("year_joined"))\
    .groupBy("year_joined")\
    .agg(count("ind").alias("number_users_joined"))\
    .orderBy(col("year_joined").asc())

display(df_users_joined_by_year)

7. The median follower count of users based on their joining year

In [None]:
df_median_follower_count_by_year = df_user.select(col("ind"), year("date_joined").alias("year_joined"))\
    .join(df_pin.select(col("ind"), col("follower_count")), df_user["ind"] == df_pin["ind"])\
    .drop("ind")\
    .groupBy("year_joined")\
    .agg(percentile_approx("follower_count", 0.5).alias("median_follower_count"))\
    .orderBy(col("year_joined").asc())

display(df_median_follower_count_by_year)

8. The median follower count of users based on their joining year and age group

In [None]:
df_median_follower_count_by_age_and_year = df_user.select(col("ind"), col("age"), year("date_joined").alias("year_joined"))\
    .join(df_pin.select(col("ind"), col("follower_count")), df_user["ind"] == df_pin["ind"])\
    .drop("ind")\
    .withColumn("age_group", get_age_category_udf(col("age")))\
    .drop("age")\
    .groupBy("age_group", "year_joined")\
    .agg(percentile_approx("follower_count", 0.5).alias("median_follower_count"))\
    .orderBy(col("age_group").asc(), col("year_joined").asc())

display(df_median_follower_count_by_age_and_year)