In [0]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import regexp_replace

#GEO DATAFRAME
#Databricks code
#Reading the geo dataframe
file_type = "json"
infer_schema = "true"

file_location_geo = "/mnt/0a65154c50dd-mount/topics/0a65154c50dd.geo/partition=0//*.json" 
df_geo = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location_geo)

df_geo.createOrReplaceTempView("geo")


#Data Cleaning 

df_geo = spark.sql("SELECT * FROM geo")
df_geo = df_geo.withColumn('coordinates',F.array(df_geo.latitude,df_geo.longitude))
df_geo = df_geo.drop("latitude","longitude")
df_geo = df_geo.withColumn("timestamp",  
                                  df_geo["timestamp"] 
                                  .cast('timestamp')) 
df_geo = df_geo.select("ind","country","coordinates","timestamp")
df_geo.show()

In [0]:
#USER DATAFRAME 

file_location_user = "/mnt/0a65154c50dd-mount/topics/0a65154c50dd.user/partition=0//*.json" 
df_user = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location_user)

df_user.createOrReplaceTempView("user")

#Data Cleaning

df_user = spark.sql("SELECT * FROM user")

col_list = ['first_name','last_name']
df_user = df_user.withColumn('user_name',concat_ws(' ',*col_list))
df_user = df_user.drop('first_name','last_name')
df_user = df_user.withColumn("date_joined",  
                                  df_user["date_joined"] 
                                  .cast('timestamp')) 
df_user = df_user.withColumn("age",  
                                  df_user["age"] 
                                  .cast('int')) 
df_user = df_user.select("ind","user_name","age","date_joined")

In [0]:
#PIN DATAFRAME

file_location_pin = "/mnt/0a65154c50dd-mount/topics/0a65154c50dd.pin/partition=0//*.json" 
df_pin = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location_pin)

df_pin.createOrReplaceTempView("pin")

#Data Cleaning

df_pin = spark.sql("SELECT * FROM pin")
df_pin = df_pin.withColumnRenamed('index', 'ind')
df_pin = df_pin.withColumn('follower_count', regexp_replace('follower_count', 'k', '000'))
df_pin = df_pin.withColumn('follower_count', regexp_replace('follower_count', 'M', '000000'))
df_pin = df_pin.withColumn('save_location', regexp_replace('save_location', 'Local save in /', ''))
df_pin = df_pin.withColumn("follower_count",  
                                  df_pin["follower_count"] 
                                  .cast('int')) 
df_pin = df_pin.withColumn("downloaded",  
                                  df_pin["downloaded"] 
                                  .cast('int')) 
df_pin = df_pin.select("ind","unique_id","title","description","follower_count","poster_name","tag_list","is_image_or_video","image_src","save_location","category")
df_pin = df_pin.na.fill("Nones")

In [0]:
#SQL QUERIES
df_pin.createOrReplaceTempView("pin")
df_user.createOrReplaceTempView("user")
df_geo.createOrReplaceTempView("geo")

#Most popular Pinterest category people post to based on their country

df_popular_categories_by_country = spark.sql("SELECT country, category, COUNT(category) AS category_count FROM geo INNER JOIN pin ON pin.ind = geo.ind GROUP BY country,category ORDER BY category_count DESC")
df_popular_categories_by_country.show()

In [0]:
#Most popular category per year between 2018 and 2022

df_popular_categories_per_year_between_2018_and_2022 = spark.sql("SELECT YEAR(timestamp) AS year, category, COUNT(category) AS category_count FROM geo INNER JOIN pin ON pin.ind = geo.ind WHERE YEAR(timestamp) < 2023 and YEAR(timestamp) > 2017 GROUP BY YEAR(timestamp),category ORDER BY category_count DESC LIMIT 12")
df_popular_categories_per_year_between_2018_and_2022.show()

In [0]:
#User with most followers per country

df_user_with_most_followers_per_country = spark.sql("SELECT country, user_name, follower_count FROM user INNER JOIN pin ON pin.ind = user.ind INNER JOIN geo ON geo.ind = pin.ind ORDER BY follower_count DESC")
df_user_with_most_followers_per_country.show()

In [0]:
#Country with the user with the most followers

df_most_followers_per_country = spark.sql("WITH user_followers AS (SELECT country, user_name, follower_count FROM user INNER JOIN pin ON pin.ind = user.ind INNER JOIN geo ON geo.ind = pin.ind ORDER BY follower_count DESC) SELECT country, MAX(follower_count) AS follower_count FROM user_followers GROUP BY country ORDER BY follower_count DESC LIMIT 1")
df_most_followers_per_country.show()

In [0]:
#Most popular category for different age groups

df_most_popular_category_per_age_group = spark.sql("SELECT category, COUNT(category) AS category_count, CASE WHEN age BETWEEN 18 AND 24 THEN '18-24' WHEN age BETWEEN 25 AND 35 THEN '25-35' WHEN age BETWEEN 36 AND 51 THEN '36-50' ELSE '50+' END AS age_group FROM pin INNER JOIN user ON pin.ind = user.ind GROUP BY age_group,category ORDER BY category_count DESC")
df_most_popular_category_per_age_group.show()

In [0]:
#Median follower_count for each age group

df_median_follower_count_per_age_group = spark.sql("SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY follower_count) AS median_follower_count, CASE WHEN age BETWEEN 18 AND 24 THEN '18-24' WHEN age BETWEEN 25 AND 35 THEN '25-35' WHEN age BETWEEN 36 AND 51 THEN '36-50' ELSE '50+' END AS age_group FROM pin INNER JOIN user ON pin.ind = user.ind GROUP BY age_group ORDER BY median_follower_count DESC")
df_median_follower_count_per_age_group.show()

In [0]:
#Amount of users joined per year

df_amount_of_users_joined_per_year = spark.sql("SELECT YEAR(timestamp) AS post_year, COUNT(YEAR(date_joined)) AS number_users_joined FROM user INNER JOIN geo ON geo.ind = user.ind WHERE YEAR(timestamp) BETWEEN 2015 AND 2022 GROUP BY post_year")
df_amount_of_users_joined_per_year.show()

In [0]:
#Median follower count based on joining year

df_median_follower_count_per_joining_year = spark.sql("SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY follower_count) AS median_follower_count, YEAR(timestamp) AS post_year FROM pin INNER JOIN geo ON pin.ind = geo.ind WHERE YEAR(timestamp) BETWEEN 2015 AND 2020 GROUP BY post_year ORDER BY median_follower_count DESC")
df_median_follower_count_per_joining_year.show()

In [0]:
#Median follower count based on year and age group

df_median_follower_count_per_age_group_and_year = spark.sql("SELECT YEAR(timestamp) AS post_year, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY follower_count) AS median_follower_count, CASE WHEN age BETWEEN 18 AND 24 THEN '18-24' WHEN age BETWEEN 25 AND 35 THEN '25-35' WHEN age BETWEEN 36 AND 51 THEN '36-50' ELSE '50+' END AS age_group FROM pin INNER JOIN geo ON pin.ind = geo.ind INNER JOIN user ON geo.ind = user.ind WHERE YEAR(timestamp) BETWEEN 2015 AND 2020 GROUP BY age_group,post_year ORDER BY median_follower_count DESC")
df_median_follower_count_per_age_group_and_year.show()