### Read topics of S3 bucket into dataframes

In [None]:
from pyspark.sql.functions import *

# list of topic suffixes
topics = [".pin", ".geo", ".user"]

def read_topics_into_dataframe(topic):
    # create path to topic files
    file_path = f"/mnt/pinterest_0ecac53030fd/topics/0ecac53030fd{topic}/partition=0/*.json"
    # specify file type
    file_type = "json"
    # Ask Spark to infer the schema
    infer_schema = "true"
    # load JSONs from mounted S3 bucket to Spark dataframe
    df = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .load(file_path)
    return df

dataframes = {}
for topic in topics:
    # Remove the leading dot for the DataFrame name
    df_name = topic[1:]
    # Directly call the function and store the DataFrame in a dictionary
    dataframes[df_name] = read_topics_into_dataframe(topic)
    # Display the DataFrame using Databricks' display function
    display(dataframes[df_name])

# Define DataFrames to access
df_pin = dataframes['pin']
df_geo = dataframes['geo']
df_user = dataframes['user']

### Clean df_pin

In [None]:
def add_nulls_to_dataframe_column(dataframe, column, value_to_replace):
    '''Converts matched values in column of dataframe to null based on expression'''
    dataframe = dataframe.withColumn(column, when(col(column).like(value_to_replace), None).otherwise(col(column)))
    return dataframe

# replace empty entries and entries with no relevant data in each column with Nones
# column names and values to change to null
columns_and_values_for_null = {
    "description": "No description available%",
    "follower_count": "User Info Error",
    "image_src": "Image src error.",
    "poster_name": "User Info Error",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
    "title": "No Title Data Available"
}

# loop through dictionary, calling function with dictionary values as arguments
for key, value in columns_and_values_for_null.items():
    df_pin = add_nulls_to_dataframe_column(df_pin, key, value)
# Perform the necessary transformations on the follower_count to ensure every entry is a number
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
# cast follower_count column to integer type
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast('int'))
# convert save_location column to include only the save location path
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))
# rename the index column to ind
df_pin = df_pin.withColumnRenamed("index", "ind")
# reorder columns
new_pin_column_order = [
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
]
df_pin = df_pin.select(new_pin_column_order)

# Display the final schema and data
df_pin.printSchema()
display(df_pin)


### Clean df_geo

In [None]:
# import types
from pyspark.sql.functions import array, to_timestamp

# Creates a new column "coordinates" that contains an array based on the latitude and longitude columns
df_geo = df_geo.withColumn("coordinates", array("latitude", "longitude")).drop("latitude", "longitude")

# convert timestamp column from type string to type timestamp
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))
# change column order
new_geo_column_order = [
    "ind",
    "country",
    "coordinates",
    "timestamp",
]
df_geo = df_geo.select(new_geo_column_order)  

# display changes
df_geo.printSchema()
display(df_geo)

### Clean df_user

In [None]:
from pyspark.sql.functions import concat, lit, to_timestamp

# Create a new column user_name that concatenates the information found in the first_name and last_name columns and Drop the first_name and last_name columns from the DataFrame
df_user = df_user.withColumn("user_name", concat("first_name",lit( " "), "last_name")).drop("first_name", "last_name")

# Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn("date_joined", to_timestamp("date_joined"))
# Reorder the DataFrame columns to have the following column order
new_user_column_order = [
    "ind",
    "user_name",
    "age",
    "date_joined",
]
df_user = df_user.select(new_user_column_order) 
# display changes
df_user.printSchema()
display(df_user)


In [None]:
# Preparing for Queries ( joins, age groups )
# import window functions and col 
from pyspark.sql.functions import col
from pyspark.sql.window import Window

# join df_pin and df_geo dataframes on index
pin_geo = df_pin.alias("pin").join(df_geo.alias("geo"), col("pin.ind") == col("geo.ind"), "inner")

# join df_pin and df_user and create temp view for SQL query
df_pin.alias("pin").join(df_user.alias("user"), col("pin.ind") == col("user.ind")).createOrReplaceTempView("category_age")
# SQL query to create age group column
pin_user_age_group = spark.sql(
    "SELECT CASE \
        WHEN age between 18 and 24 then '18-24' \
        WHEN age between 25 and 35 then '25-35' \
        WHEN age between 36 and 50 then '36-50' \
        WHEN age > 50 then '50+' \
        END as age_group, * FROM category_age")

In [None]:
from pyspark.sql.functions import col, count
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Task4 : Find the most popular category in each country.
# creates partition by country and order by category_count descending
windowCountryByCategoryCount = Window.partitionBy("country").orderBy(col("category_count").desc())

# Calculates the most popular category for each country
pin_geo \
.groupBy("country", "category") \
.agg(count("category").alias("category_count")) \
.withColumn("rank", row_number().over(windowCountryByCategoryCount)) \
.filter(col("rank") == 1) \
.drop("rank") \
.show()


In [None]:
# Task 5: Find which was the most popular category each year
from pyspark.sql.functions import col, year, count, row_number
from pyspark.sql.window import Window

# creates partition by year and order by category_count descending 
windowYearByCategoryCount = Window.partitionBy("post_year").orderBy(col("category_count").desc())
# finds which was the most popular category each year between 2018 and 2022
pin_geo.withColumn("post_year", year("timestamp")) \
.filter(col("post_year") >= 2018) \
.filter(col("post_year") <= 2022) \
.groupBy("post_year", "category") \
.agg(count("category").alias("category_count")) \
.withColumn("rank", row_number().over(windowYearByCategoryCount)) \
.filter(col("rank") == 1) \
.drop("rank") \
.show()


In [None]:
# Task 6: Find the user with most followers in each country 
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# creates partition by country and order by follower_count descending
windowCountryByFollowers = Window.partitionBy("country").orderBy(col("follower_count").desc())

# finds the user with the most followers in each country
max_followers_by_country = \
 pin_geo \
 .withColumn("rank", row_number().over(windowCountryByFollowers)) \
 .filter(col("rank") == 1) \
 .select("country", "poster_name", "follower_count") \
 
# gets highest number of followers from all countries
max_followers_all_countries = max_followers_by_country.select(max("follower_count")).collect()[0][0]

# finds the country with the user with most followers
country_with_max_followers = \
    max_followers_by_country \
    .select("*") \
    .where(col("follower_count") == max_followers_all_countries)

max_followers_by_country.show()
country_with_max_followers.show() 


In [None]:
# Task 7: Find the most popular category for different age groups
# create partition by age_group and order by category_count descending
windowAgeGroupByCategory = Window.partitionBy("age_group").orderBy(col("category_count").desc())
# find the most popular category for different age groups
pin_user_age_group \
.groupBy("age_group", "category") \
.agg(count("category").alias("category_count")) \
.withColumn("rank", row_number().over(windowAgeGroupByCategory)) \
.filter(col("rank") == 1) \
.drop("rank") \
.show()

In [None]:
# Task 8: Find  the median follower count for different age groups
# create partition by age_group and order by category_count descending
from pyspark.sql.functions import percentile_approx

# Assuming pin_user_age_group is a DataFrame that already has 'age_group' and 'follower_count' defined
# Find the median follower count for different age groups
pin_user_age_group \
    .select("age_group", "follower_count") \
    .distinct() \
    .groupBy("age_group") \
    .agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
    .orderBy("age_group") \
    .show()


In [None]:
# Task 9: How many users have joined each year
# Find how many users have joined between 2015 and 2020.
df_user \
.withColumn("post_year", year("date_joined")) \
.distinct() \
.filter((year('date_joined') >= 2015) & (year('date_joined') <= 2020)) \
.groupBy("post_year") \
.agg(count("user_name").alias("number_users_joined")) \
.orderBy("post_year") \
.show()


In [None]:
from pyspark.sql.functions import col, year, percentile_approx
# Task 10 : Find the median follower count of users have joined between 2015 and 2020.

# Perform the join and create a new column for the post year based on the date joined
pin_user = df_pin.alias("pin").join(df_user.alias("user"), col("pin.ind") == col("user.ind"),"inner") \

pin_user \
.withColumn("post_year", year("date_joined")) \
.filter((col("post_year") >= 2015) & (col("post_year") <= 2020)) \
.groupBy("post_year") \
.agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
.orderBy("post_year") \
.show()


In [None]:
# Task 11: Find the median follower count of users that have joined between 2015 and 2020, based on which age group they are part of.
from pyspark.sql.functions import col, year, percentile_approx
pin_user_age_group \
.withColumn("post_year", year("date_joined")) \
.filter((col("post_year") >= 2015) & (col("post_year") <= 2020)) \
.groupBy("age_group", "post_year") \
.agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
.orderBy("age_group","post_year") \
.show()
