In [0]:
df_pin = spark.read.csv("/FileStore/tables/pin_data.csv", 
                        header=True, 
                        inferSchema=True, 
                        sep=",")

df_geo = spark.read.csv("/FileStore/tables/geo_data.csv", 
                        header=True, 
                        inferSchema=True, 
                        sep=",")

df_user = spark.read.csv("/FileStore/tables/user_data.csv", 
                        header=True, 
                        inferSchema=True, 
                        sep=",")

In [0]:
from pyspark.sql.functions import regexp_replace, when, col
#df_pin cleaning

##1 Replace empty entries and entries with no relevant data in each column with 


# List of placeholders to treat as null
null_like_values = ["", "N/A", "n/a", "na", "NaN", "None", "null"]

# Replace in all columns
for c in df_pin.columns:
    df_pin_cleaned = df_pin.withColumn(c, when(col(c).isin(null_like_values), None).otherwise(col(c)))

##2 Perform the necessary transformations on the follower_count to ensure every entry is a number. Make sure the data type of this column is an int
# Step 1: Clean commas and ensure lowercase string type
df_pin_cleaned = df_pin_cleaned.withColumn("follower_count", regexp_replace("follower_count", ",", ""))
df_pin_cleaned = df_pin_cleaned.withColumn("follower_count", col("follower_count").cast("string"))

# Step 2: Convert 'k' and 'm' to numeric values
df_pin_cleaned = df_pin_cleaned.withColumn(
    "follower_count",
    when(col("follower_count").rlike("^[0-9.]+k$"), 
         (regexp_replace("follower_count", "k", "").cast("float") * 1000).cast("int"))
    .when(col("follower_count").rlike("^[0-9.]+m$"), 
          (regexp_replace("follower_count", "m", "").cast("float") * 1000000).cast("int"))
    .otherwise(col("follower_count").cast("int"))
)

##3 Ensure that each column containing numeric data has a numeric data type

df_pin_cleaned = df_pin_cleaned.withColumn("idx", col("idx").cast("integer")) \
         .withColumn("downloaded", col("downloaded").cast("int"))

##4 Clean the data in the save_location column to include only the save location path
df_pin_cleaned = df_pin_cleaned.withColumn("save_location", 
    regexp_replace(col("save_location"), "^(Local save in )", ""))

##5 Rename the index column to ind
df_pin_cleaned = df_pin_cleaned.withColumnRenamed("idx", "ind")

##6 Reorder the DataFrame columns
new_column_order_pin = [
    "ind", 
    "unique_id", 
    "title", 
    "description", 
    "follower_count", 
    "poster_name", 
    "tag_list", 
    "is_image_or_video", 
    "image_src", 
    "save_location", 
    "category"
]

df_pin_cleaned = df_pin_cleaned.select(new_column_order_pin)
# Show result
df_pin_cleaned.limit(10).toPandas()





Unnamed: 0,ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
0,4097,bf1bf0af-01ee-469c-9709-acc562a64fbf,How to Easily Homeschool Consistently,How can you homeschool consistently when it's ...,13000.0,Classically Homeschooling | Education at the K...,"How To Start Homeschooling,Homeschooling Resou...",image,https://i.pinimg.com/originals/20/33/85/203385...,/data/education,education
1,8204,8d2c3ddb-144d-4e2c-8824-d73484834973,You Will Never Understand The Damage You Did T...,You will never understand the damage you did t...,6000.0,Andrea Groneman,"Letting Go Quotes,Go For It Quotes,Hurt Quotes...",image,https://i.pinimg.com/originals/9e/7c/b2/9e7cb2...,/data/quotes,quotes
2,10253,cab8a53e-40e1-4324-9e25-9b339ade9fd2,Romantic Travel Destinations For Adventurous C...,Romantic travel destinations shouldn't be limi...,,SOCIETY19,"Places To Travel,Travel Destinations,Places To...",image,https://i.pinimg.com/originals/37/82/eb/3782eb...,/data/travel,travel
3,6156,02b1529c-2fd3-47a8-af1e-e48e936df09c,1867 Italianate Villa In Manchester New Hampsh...,CLICK HERE 1867 Italianate Villa In Manchester...,40000.0,Captivating Houses | Old Houses & Home decor,"Home Decor Kitchen,Rustic Kitchen,Interior Des...",image,https://i.pinimg.com/originals/80/f7/eb/80f7eb...,/data/home-decor,home-decor
4,10256,13a1a2ad-a1e4-4bca-8e70-41cb4ca4530c,Portugal Travel Guide: An Epic 10 Day Itinerar...,Use this 10-day Portugal itinerary to help you...,617.0,Travel + Food Blogger ➳ TeriakiTalks,"Visit Portugal,Spain And Portugal,Lisbon Portu...",image,https://i.pinimg.com/originals/85/e6/03/85e603...,/data/travel,travel
5,4116,efcd3e17-5d87-4610-9470-c7c4e2707ce3,Teach Genetics With Edible DNA - The Happy Hou...,My kids would be thrilled to learn about genet...,68000.0,Toni Herrbach (Happy Housewife),"Kid Science,Food Science Experiments,Science P...",image,https://i.pinimg.com/originals/fb/a1/28/fba128...,/data/education,education
6,2069,4b494dc9-7c21-4f43-ac9e-8bb64488ea83,10 of the Best Red Christmas Tree Ideas - Back...,The color of passion is also the color of Chri...,76000.0,Backyard Boss,"Red And Gold Christmas Tree,Elegant Christmas ...",image,https://i.pinimg.com/originals/e1/d2/93/e1d293...,/data/christmas,christmas
7,30,1e39f505-1561-440e-a76e-1b84ea3df078,How to Draw a Realistic Face Step By Step | Po...,Most people believe that drawing a realistic f...,65000.0,Pouted Lifestyle Magazine,"Realistic Pencil Drawings,Pencil Drawing Tutor...",image,https://i.pinimg.com/originals/89/4a/a7/894aa7...,/data/art,art
8,8222,f4902a59-1708-4fa1-8916-141f19b0225d,A Religious Person Will Do What He Is Told,Religious | God | Jesus | Quotes | Inspiration...,,The Minds Journal,"Quotable Quotes,Wisdom Quotes,Book Quotes,Word...",image,https://i.pinimg.com/originals/e3/cd/3b/e3cd3b...,/data/quotes,quotes
9,8226,443624b9-8320-4a4b-ac4d-0761307790d9,100 Inspirational Quotes to Make You Feel Bett...,Words of wisdom for all of life's crazy moments.,133000.0,REDBOOK Magazine,"Quotes Thoughts,Life Quotes Love,Inspiring Quo...",image,https://i.pinimg.com/originals/8b/40/66/8b4066...,/data/quotes,quotes


In [0]:

from pyspark.sql.functions import array, col
#df_geo cleaning

##Create a new column coordinates that contains an array based on the latitude and longitude columns
df_geo_cleaned = df_geo.withColumn("coordinates", array("latitude", "longitude"))

##Drop the latitude and longitude columns from the DataFrame
df_geo_cleaned = df_geo_cleaned.drop("latitude", "longitude")

##Convert the timestamp column from a string to a timestamp data type
df_geo_cleaned = df_geo_cleaned.withColumn("timestamp", col("timestamp").cast("timestamp"))

##Reorder the DataFrame columns 
df_geo_cleaned = df_geo_cleaned.withColumnRenamed("idx", "ind")

new_column_order_geo = ["ind",
                    "country",
                    "coordinates",
                    "timestamp"
]

df_geo_cleaned = df_geo_cleaned.select(new_column_order_geo)
df_geo_cleaned.limit(10).toPandas()

Unnamed: 0,ind,country,coordinates,timestamp
0,4097,Denmark,"[1.28495, 74.2608]",2020-01-09 11:55:49
1,8204,Maldives,"[-18.1973, -126.543]",2019-05-28 05:38:33
2,10253,Azerbaijan,"[-89.3669, -170.886]",2018-03-07 12:37:25
3,6156,Norway,"[-9.45376, -77.97]",2018-07-23 00:14:40
4,10256,Azerbaijan,"[17.7952, -44.441]",2020-09-21 22:31:19
5,4116,Saint Pierre and Miquelon,"[-7.80883, 19.4768]",2017-12-15 07:59:58
6,2069,Chile,"[-59.0335, -161.59]",2018-09-21 01:14:59
7,30,Saint Barthelemy,"[61.9983, 65.5766]",2022-07-07 18:43:40
8,8222,Albania,"[-87.2, -177.109]",2022-05-07 18:07:02
9,8226,Bahrain,"[-83.9199, -162.946]",2019-11-03 16:00:21


In [0]:
from pyspark.sql.functions import concat, lit, col
#df_user cleaning

##Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user_cleaned = df_user.withColumn("user_name", concat(col("first_name"), lit(" "), col("last_name")))

##Drop the first_name and last_name columns from the DataFrame
df_user_cleaned = df_user_cleaned.drop("first_name", "last_name")
df_user.limit(10).toPandas()

##Convert the date_joined column from a string to a timestamp data type
df_user_cleaned = df_user_cleaned.withColumn("date_joined", col("date_joined").cast("timestamp"))

##Reorder the DataFrame columns
df_user_cleaned = df_user_cleaned.withColumnRenamed("idx", "ind")

new_column_order_user = ["ind",
                         "user_name",
                         "age",
                         "date_joined"
]  

df_user_cleaned = df_user_cleaned.select(new_column_order_user)
df_user_cleaned.limit(10).toPandas()

Unnamed: 0,ind,user_name,age,date_joined
0,4097,Kristen Lyons,21,2016-03-24 02:42:31
1,8204,David Davis,38,2016-11-20 06:55:49
2,10253,Aaron Anderson,21,2015-10-23 04:43:54
3,6156,Christopher Alvarez,28,2016-10-26 20:31:58
4,10256,Amber Morris,24,2016-03-02 09:47:07
5,4116,Jason Luna,23,2017-06-11 02:08:47
6,2069,Alexandra Lawrence,34,2016-04-12 09:18:00
7,30,Jeffery Richardson,43,2016-07-22 03:44:24
8,8222,Aaron Anderson,21,2015-10-24 06:35:27
9,8226,Austin Alvarez,38,2015-11-27 20:15:05


In [0]:
##Find the most popular category in each country
from pyspark.sql.functions import count
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Step 1: Join on 'ind'
df_joined = df_pin_cleaned.join(df_geo_cleaned, on="ind", how="inner")

# Step 2: Group by country and category, count posts
df_grouped = df_joined.groupBy("country", "category").agg(count("*").alias("category_count"))

# Step 3: Use window function to get the top category per country
from pyspark.sql.functions import row_number
window_spec = Window.partitionBy("country").orderBy(df_grouped["category_count"].desc())

df_result = df_grouped.withColumn("row_num", row_number().over(window_spec)) \
                      .filter("row_num = 1") \
                      .select("country", "category", "category_count")

# Show result
df_result.orderBy(
    ["category_count", "country"], 
    ascending=[False, True]  # False=DESC, True=ASC
).show(truncate=False)


+--------------------------------------------+--------------+--------------+
|country                                     |category      |category_count|
+--------------------------------------------+--------------+--------------+
|Algeria                                     |quotes        |11            |
|Albania                                     |art           |8             |
|Afghanistan                                 |finance       |7             |
|Aruba                                       |art           |6             |
|Andorra                                     |tattoos       |4             |
|Azerbaijan                                  |finance       |4             |
|Antarctica (the territory South of 60 deg S)|tattoos       |3             |
|Belgium                                     |christmas     |3             |
|Brazil                                      |home-decor    |3             |
|American Samoa                              |beauty        |2             |

In [0]:
##Find how many posts each category had between 2018 and 2022.

from pyspark.sql.functions import year, col, count

# Step 1: Join pin data with geo data to get timestamp
df_joined = df_pin_cleaned.join(df_geo_cleaned, on="ind", how="inner")

# Step 2: Extract year from timestamp
df_with_year = df_joined.withColumn("post_year", year(col("timestamp")))

# Step 3: Filter for years 2018 to 2022
df_filtered = df_with_year.filter((col("post_year") >= 2018) & (col("post_year") <= 2022))

# Step 4: Group by year and category, and count
df_result = df_filtered.groupBy("post_year", "category") \
                       .agg(count("*").alias("category_count"))

# Show the result
df_result.orderBy("post_year", "category").show(truncate=False)


+---------+----------------+--------------+
|post_year|category        |category_count|
+---------+----------------+--------------+
|2018     | Value Twin Pack|1             |
|2018     |art             |8             |
|2018     |beauty          |7             |
|2018     |christmas       |12            |
|2018     |diy-and-crafts  |8             |
|2018     |education       |6             |
|2018     |event-planning  |10            |
|2018     |finance         |8             |
|2018     |home-decor      |8             |
|2018     |mens-fashion    |8             |
|2018     |quotes          |5             |
|2018     |tattoos         |9             |
|2018     |travel          |12            |
|2018     |vehicles        |8             |
|2019     |1               |2             |
|2019     |2k              |1             |
|2019     |art             |10            |
|2019     |beauty          |4             |
|2019     |christmas       |14            |
|2019     |diy-and-crafts  |8   

In [0]:
#Step 1: For each country find the user with the most followers.
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Join on 'ind' to access country + followers + poster_name
df_joined = df_pin_cleaned.join(df_geo_cleaned, on="ind", how="inner")

# Define window partitioned by country, ordered by follower_count desc
window_spec = Window.partitionBy("country").orderBy(col("follower_count").desc())

# Add row number to pick top poster per country
df_top_per_country = df_joined.select("country", "poster_name", "follower_count") \
                              .withColumn("row_num", row_number().over(window_spec)) \
                              .filter("row_num = 1") \
                              .select("country", "poster_name", "follower_count")

# Show Results
df_top_per_country.orderBy("follower_count", ascending=False).show()

# Step 2: Based on the above query, find the country with the user with most followers.

from pyspark.sql.functions import max

# Find the max follower count globally
max_followers = df_top_per_country.agg(max("follower_count").alias("max_follower_count"))

# Join back to filter the country that has this max
df_country_with_max_user = df_top_per_country.join(
    max_followers,
    df_top_per_country.follower_count == max_followers.max_follower_count,
    how="inner"
).select("country", "follower_count")

df_country_with_max_user.show()


+--------------------+--------------------+--------------+
|             country|         poster_name|follower_count|
+--------------------+--------------------+--------------+
|             Algeria|           YourTango|        942000|
|            Mongolia|               ZAFUL|        893000|
|             Armenia|Michelle {CraftyM...|        892000|
|              Brazil|              Lively|        892000|
|               Aruba|         GQ Magazine|        874000|
|Central African R...|             PureWow|        868000|
|           Argentina|         Next Luxury|        800000|
|             Andorra|           Glaminati|        799000|
|            Anguilla|           dresslily|        760000|
|         Afghanistan|        TheUnstitchd|        723000|
|           Swaziland|          Casa Vogue|        685000|
|          Bangladesh|  Smart School House|        673000|
|             Burundi|        Studio McGee|        662000|
|               Gabon|               SHAPE|        63700

In [0]:
# What is the most popular category people post to based on the following age groups:

# 18-24
# 25-35
# 36-50
# +50

from pyspark.sql.functions import when, col, count
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Step 1: Join pins with users to get access to age
df_joined = df_pin_cleaned.join(df_user_cleaned, on="ind", how="inner")

# Step 2: Create age_group column
df_with_age_group = df_joined.withColumn(
    "age_group",
    when((col("age") >= 18) & (col("age") <= 24), "18-24")
    .when((col("age") >= 25) & (col("age") <= 35), "25-35")
    .when((col("age") >= 36) & (col("age") <= 50), "36-50")
    .when(col("age") > 50, "50+")
)

# Step 3: Group by age_group and category, count posts
df_grouped = df_with_age_group.groupBy("age_group", "category") \
                              .agg(count("*").alias("category_count"))

# Step 4: Get most popular category per age group using window
window_spec = Window.partitionBy("age_group").orderBy(col("category_count").desc())

df_result = df_grouped.withColumn("row_num", row_number().over(window_spec)) \
                      .filter("row_num = 1") \
                      .select("age_group", "category", "category_count")

# Show the final result
df_result.show(truncate=False)

# row_count = df_user_cleaned.count()
# print(f"\nNumber of rows: {row_count}")


+---------+------------+--------------+
|age_group|category    |category_count|
+---------+------------+--------------+
|18-24    |quotes      |27            |
|25-35    |christmas   |20            |
|36-50    |travel      |14            |
|50+      |mens-fashion|7             |
+---------+------------+--------------+



In [0]:
#What is the median follower count for users in the following age groups:

# 18-24
# 25-35
# 36-50
# 50+

from pyspark.sql.functions import when, col, expr

# Step 1: Join pin and user data
df_joined = df_pin_cleaned.join(df_user_cleaned, on="ind", how="inner")

# Step 2: Create age_group column
df_with_age_group = df_joined.withColumn(
    "age_group",
    when((col("age") >= 18) & (col("age") <= 24), "18-24")
    .when((col("age") >= 25) & (col("age") <= 35), "25-35")
    .when((col("age") >= 36) & (col("age") <= 50), "36-50")
    .when(col("age") > 50, "50+")
)

# Step 3: Use approx_percentile to get median follower count per age group
df_median = df_with_age_group.groupBy("age_group") \
    .agg(expr("approx_percentile(follower_count, 0.5)").alias("median_follower_count"))

# Show the result
df_median.orderBy('age_group').show(truncate=False)


+---------+---------------------+
|age_group|median_follower_count|
+---------+---------------------+
|18-24    |50000                |
|25-35    |19000                |
|36-50    |7000                 |
|50+      |5000                 |
+---------+---------------------+



In [0]:
# Find how many users have joined between 2015 and 2020.

from pyspark.sql.functions import year, col, count

# Step 1: Extract year from date_joined
df_with_year = df_user_cleaned.withColumn("post_year", year(col("date_joined")))

# Step 2: Filter for years 2015 to 2020
df_filtered = df_with_year.filter((col("post_year") >= 2015) & (col("post_year") <= 2020))

# Step 3: Group by year and count users
df_result = df_filtered.groupBy("post_year") \
                       .agg(count("*").alias("number_users_joined"))

# Show the result
df_result.orderBy("post_year").show(truncate=False)


+---------+-------------------+
|post_year|number_users_joined|
+---------+-------------------+
|2015     |183                |
|2016     |238                |
|2017     |79                 |
+---------+-------------------+



In [0]:
# Find the median follower count of users have joined between 2015 and 2020.

from pyspark.sql.functions import year, col, expr

# Step 1: Join pins with users to get follower count and date joined
df_joined = df_pin_cleaned.join(df_user_cleaned, on="ind", how="inner")

# Step 2: Extract year from date_joined
df_with_year = df_joined.withColumn("post_year", year(col("date_joined")))

# Step 3: Filter for years 2015 to 2020
df_filtered = df_with_year.filter((col("post_year") >= 2015) & (col("post_year") <= 2020))

# Step 4: Group by post_year and get median follower count
df_median = df_filtered.groupBy("post_year") \
    .agg(expr("approx_percentile(follower_count, 0.5)").alias("median_follower_count"))

# Show the result
df_median.orderBy("post_year").show(truncate=False)


+---------+---------------------+
|post_year|median_follower_count|
+---------+---------------------+
|2015     |79000                |
|2016     |15000                |
|2017     |4000                 |
+---------+---------------------+

