In [0]:
from pyspark.sql.functions import *
import urllib

#### Clean df_pin

In [0]:
data_pin = spark.read.json("/mnt/s3bucket-121ca9f7ce2b/topics/121ca9f7ce2b.pin/partition=0/*.json")

In [0]:
cleaned_df_pin = data_pin.replace({"": None})

In [0]:
# Replace entries with no relevant data with None

unrelevant_data = {
    "description": "No description available Story format",
    "description": "No description available",
    "follower_count": "User Info Error",
    "image_src": "Image src error.",
    "poster_name": "User Info Error",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
    "title": "No Title Data Available"
}

for col, data in unrelevant_data.items():
    cleaned_df_pin = cleaned_df_pin.replace({data: None}, subset=[col])

In [0]:
# Cast follower_count to int, removing of 'K' and 'M'

cleaned_df_pin = cleaned_df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
cleaned_df_pin = cleaned_df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
cleaned_df_pin = cleaned_df_pin.withColumn("follower_count", cleaned_df_pin["follower_count"].cast('int'))

In [0]:
# Only save location path in save_location

cleaned_df_pin = cleaned_df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

In [0]:
# Rename index column to ind

cleaned_df_pin = cleaned_df_pin.withColumnRenamed("index", "ind")

In [0]:
# Reorder columns

cleaned_df_pin = cleaned_df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

In [0]:
cleaned_df_pin.printSchema()

#### Clean df_geo

In [0]:
df_geo = spark.read.json("/mnt/s3bucket-121ca9f7ce2b/topics/121ca9f7ce2b.geo/partition=0/*.json")

In [0]:
# Create new column that combines latitude and longitude in an array

cleaned_df_geo = df_geo.withColumn("coordinates", array("latitude", "longitude"))

In [0]:
# Drop latitude and longitude 
cleaned_df_geo = cleaned_df_geo.drop("latitude", "longitude")

In [0]:
# Convert timestamp column from string to a timestamp data type
cleaned_df_geo = cleaned_df_geo.withColumn("timestamp", to_timestamp("timestamp"))

In [0]:
# Reorder the columns
cleaned_df_geo = cleaned_df_geo.select("ind", "country", "coordinates", "timestamp")

In [0]:
cleaned_df_geo.printSchema()

#### Clean df_user

In [0]:
df_user = spark.read.json("/mnt/s3bucket-121ca9f7ce2b/topics/121ca9f7ce2b.user/partition=0/*.json")

In [0]:
# Combine first name and last name

cleaned_df_user = df_user.withColumn("user_name", concat("first_name", lit(" "), "last_name"))
cleaned_df_user = cleaned_df_user.drop("first_name", "last_name")

In [0]:
# Convert date_joined to timestamp

cleaned_df_user = cleaned_df_user.withColumn("date_joined", to_timestamp("date_joined"))

In [0]:
# Reorder dataframe

cleaned_df_user = cleaned_df_user.select("ind", "user_name", "age", "date_joined")

In [0]:
cleaned_df_user.printSchema()

### Data Analysis


#### Find the most popular category in each country

In [0]:
# Joining df_pin and df_geo based on 'ind'

pin_geo = cleaned_df_pin.join(cleaned_df_geo, cleaned_df_geo["ind"] == cleaned_df_pin["ind"], how="inner")

In [0]:
from pyspark.sql.window import *

cat_country = pin_geo.select("country", "category")
# window_spec = Window.partitionBy("country").orderBy("category_count")
result_cat_country = cat_country.groupBy("country", "category").agg(count("category").alias("category_count"))
window_spec = Window.partitionBy("country").orderBy(desc("category_count"))

result_cat_country = result_cat_country.withColumn("rank", row_number().over(window_spec))
result_cat_country = result_cat_country.filter(result_cat_country["rank"] == 1).drop("rank")
display(result_cat_country)

country,category,category_count
Afghanistan,education,55
Albania,art,64
Algeria,quotes,113
American Samoa,tattoos,40
Andorra,tattoos,30
Angola,diy-and-crafts,18
Anguilla,diy-and-crafts,17
Antarctica (the territory South of 60 deg S),christmas,18
Antigua and Barbuda,christmas,24
Argentina,tattoos,46


#### Find the most popular category each year between 2018 and 2022



In [0]:
# Create window spec

# filter out year between 2018 and 2022
cat_year = pin_geo.filter((year("timestamp") >= 2018) & (year("timestamp") <= 2022))\
    .withColumn("post_year", year("timestamp"))\
    .select("post_year", "category")

In [0]:
display(cat_year)

post_year,category
2020,event-planning
2022,home-decor
2022,home-decor
2022,home-decor
2022,home-decor
2021,event-planning
2021,event-planning
2018,home-decor
2020,christmas
2018,diy-and-crafts


In [0]:
# Create window spec partition by 'post_year' and order by 'category_count'
window_spec = Window.partitionBy("post_year").orderBy(desc("category_count"))

# Get category count for each year and category
result_cat_year = cat_year.groupBy("post_year", "category").agg(count("category").alias("category_count"))\
    .withColumn("rank", row_number().over(window_spec))\
    .filter("rank = 1")\
    .drop("rank")

In [0]:
result_cat_year.show()

#### User With Most Followers Per Country

In [0]:
# Find the max follower count for each country 

follower_country = pin_geo.groupBy("country").agg(
    max("follower_count").alias("follower_count"), 
    first("poster_name").alias("poster_name"))
    

follower_country.printSchema()

In [0]:
# Find the country with the user with most followers

result_user_most_fol = follower_country.orderBy(desc("follower_count"))\
    .select(first("country").alias("country"), first("follower_count").alias("follower_count")).show()

In [0]:
pin_geo.printSchema()

#### The most popular category for different age groups

- 18-24
- 25-35
- 36-50
- +50

In [0]:
# Inner join df_pin and df_user on 'ind'

pin_user = cleaned_df_pin.join(cleaned_df_user, cleaned_df_user["ind"] == cleaned_df_pin["ind"], how="inner")


In [0]:
# Create new column based on age group
cat_age_group = pin_user.withColumn(
    "age_group",
    when((col("age") >= 18) & (col("age") < 25), "18-24")
    .when((col("age") >= 25) & (col("age") < 36), "25-35")
    .when((col("age") >= 36) & (col("age") < 51), "36-50")
    .when(col("age") > 50, "+50")
).select("age_group", "category")

In [0]:
# Create window spec
age_cat_window = Window.partitionBy("age_group").orderBy(desc("category_count"))

# group by age_group and category
result_cat_age = cat_age_group.groupBy("age_group", "category").agg(count("category").alias("category_count"))\
    .withColumn("rank", rank().over(age_cat_window))\
    .filter("rank = 1")\
    .drop("rank")

In [0]:
result_cat_age.show()

#### Find the Median Follower Count for Different Age Groups

In [0]:
# Create new column based on age group and select 'follower_count'
follower_age_group = pin_user.withColumn(
    "age_group",
    when((col("age") >= 18) & (col("age") < 25), "18-24")
    .when((col("age") >= 25) & (col("age") < 36), "25-35")
    .when((col("age") >= 36) & (col("age") < 51), "36-50")
    .when(col("age") > 50, "+50")
).select("age_group", "follower_count")

In [0]:
follower_age_group.printSchema()

In [0]:
# Create window spec partition over age group
window_follower_age = Window.partitionBy("age_group")

# group by age group and find median follower count
median_follower_age_group = follower_age_group.withColumn(
    "median_follower_count",
    percentile_approx("follower_count", 0.5)
    .over(window_follower_age)
).select("age_group", "median_follower_count").distinct()

In [0]:
median_follower_age_group.show()

In [0]:
# Another approach by using agg percentile
median_follower_age_group2 = follower_age_group.groupBy("age_group").agg(
    expr("percentile(follower_count, 0.5)").alias("median_follower_count")
).select("age_group", "median_follower_count")

In [0]:
median_follower_age_group2.show()

#### Find how many users have joined each year between 2015 and 2020

In [0]:
# create new column of year from date_joined 

user_year = cleaned_df_user.filter((year("date_joined") >= 2015) & (year("date_joined") <= 2020))\
    .withColumn("post_year", year("date_joined"))\
    .select("post_year", "user_name")

In [0]:
user_year.printSchema()

In [0]:
num_user_year = user_year.groupBy("post_year").agg(count("user_name").alias("numbers_users_joined"))

In [0]:
num_user_year.show()

In [0]:
# Check if there is no user joined after 2017

cleaned_df_user.createOrReplaceTempView("df_user_temp")

year_result = spark.sql("""
SELECT
    YEAR(date_joined) AS post_year,
    user_name
FROM
    df_user_temp
WHERE
    YEAR(date_joined) > 2017
""")

year_result.show()

#### Find the median follower count of users based on their joining year

In [0]:
# Create year column and select relevant columns

user_year_follower = pin_user.filter((year("date_joined") >= 2015) & (year("date_joined") <= 2020))\
    .withColumn("post_year", year("date_joined"))\
    .select("post_year", "user_name", "follower_count")

In [0]:
median_follower_year = user_year_follower.groupBy("post_year").agg(
    expr("percentile(follower_count, 0.5)").alias("median_follower_count")
).select("post_year", "median_follower_count")

In [0]:
median_follower_year.show()

#### Find median follower count of users based on their joining year and age group


In [0]:
# Create age, year columns and select relevant columns

age_year_follower = pin_user.filter((year("date_joined") >= 2015) & (year("date_joined") <= 2020))\
    .withColumn("post_year", year("date_joined"))\
    .withColumn(
    "age_group",
    when((col("age") >= 18) & (col("age") < 25), "18-24")
    .when((col("age") >= 25) & (col("age") < 36), "25-35")
    .when((col("age") >= 36) & (col("age") < 51), "36-50")
    .when(col("age") > 50, "+50")
    )\
    .select("post_year", "age_group", "follower_count")

In [0]:
median_follower_year_age = age_year_follower.groupBy("post_year", "age_group").agg(
    expr("percentile(follower_count, 0.5)").alias("median_follower_count")
).select("post_year","age_group","median_follower_count")\
    .orderBy("post_year", "age_group")

In [0]:
median_follower_year_age.show()