In [0]:
# Import the credentials setup function
%run ./aws_credentials

set_aws_credentials(spark)

# Then proceed to load your data
df_geo = spark.read.json("s3a://kafka-bucket-osaze/topics/testbuck1.geo/partition=0/")
df_geo.show(5)

df_user = spark.read.json("s3a://kafka-bucket-osaze/topics/testbuck1.user/partition=0/")
df_user.show(5)

df_pin = spark.read.json("s3a://kafka-bucket-osaze/topics/testbuck1.pin/partition=0/")
df_pin.show(5)


+--------------------+----+--------+---------+-------------------+
|             country| ind|latitude|longitude|          timestamp|
+--------------------+----+--------+---------+-------------------+
|British Indian Oc...|3337|-65.2363|  21.9622|2020-06-29T05:02:22|
|Antarctica (the t...|2418|-88.4642| -171.061|2022-05-27T11:30:59|
|Antarctica (the t...|1434|-39.6186| -128.291|2020-03-06T21:00:23|
|Antarctica (the t...|5162|-71.6607| -149.206|2019-09-27T19:06:43|
|Antarctica (the t...|1335|-77.9931| -175.682|2022-03-19T17:29:42|
+--------------------+----+--------+---------+-------------------+
only showing top 5 rows

+---+-------------------+-----------+-----+---------+
|age|        date_joined| first_name|  ind|last_name|
+---+-------------------+-----------+-----+---------+
| 27|2016-03-08T13:38:37|Christopher| 2015| Bradshaw|
| 59|2017-05-12T21:22:17|  Alexander|10673|Cervantes|
| 39|2016-06-29T20:43:59|  Christina| 6398|Davenport|
| 20|2015-10-23T04:13:23| Alexandria| 3599| Alva

In [0]:
from pyspark.sql.functions import col, regexp_replace, when
from pyspark.sql.types import IntegerType

# Replace empty strings and null-like values with None
df_pin_cleaned = df_pin.replace(['', ' ', 'N/A', 'null', 'None'], None)

# Fix follower_count to be numeric
df_pin_cleaned = df_pin_cleaned.withColumn(
    "follower_count",
    when(col("follower_count").contains("K"), regexp_replace("follower_count", "K", "e3"))
    .when(col("follower_count").contains("M"), regexp_replace("follower_count", "M", "e6"))
    .when(col("follower_count").contains("B"), regexp_replace("follower_count", "B", "e9"))
    .otherwise(col("follower_count"))
)
df_pin_cleaned = df_pin_cleaned.withColumn("follower_count", col("follower_count").cast("double").cast("int"))

# Clean save_location
df_pin_cleaned = df_pin_cleaned.withColumn("save_location", regexp_replace(col("save_location"), r".*/", ""))

# Rename and reorder
df_pin_cleaned = df_pin_cleaned.withColumnRenamed("index", "ind")

ordered_cols = [
    "ind", "unique_id", "title", "description", "follower_count", "poster_name",
    "tag_list", "is_image_or_video", "image_src", "save_location", "category"
]
df_pin_cleaned = df_pin_cleaned.select(*ordered_cols)

# Preview
df_pin_cleaned.show(5)


+----+--------------------+---------------------+--------------------+--------------+--------------------+--------------------+-----------------+--------------------+--------------+--------------+
| ind|           unique_id|                title|         description|follower_count|         poster_name|            tag_list|is_image_or_video|           image_src| save_location|      category|
+----+--------------------+---------------------+--------------------+--------------+--------------------+--------------------+-----------------+--------------------+--------------+--------------+
|8586|c338b1c8-7c6a-4a1...| Pretty ♥ discover...|Image uploaded by...|      15000000|         We Heart It|Mens Body Tattoos...|            image|https://i.pinimg....|       tattoos|       tattoos|
|6447|d3039535-5767-426...|〚 Warm natural to...|И хоть у шведов л...|          null|PUFIK Interiors &...|Cheap Home Decor,...|            image|https://i.pinimg....|    home-decor|    home-decor|
|1706|b5c8a1b5-9

In [0]:
from pyspark.sql.functions import array, col, to_timestamp

# Create coordinates column as an array of latitude and longitude
df_geo_cleaned = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

# Drop latitude and longitude columns
df_geo_cleaned = df_geo_cleaned.drop("latitude", "longitude")

# Convert timestamp to proper timestamp format
df_geo_cleaned = df_geo_cleaned.withColumn("timestamp", to_timestamp("timestamp"))

# Reorder columns
geo_cols_order = ["ind", "country", "coordinates", "timestamp"]
df_geo_cleaned = df_geo_cleaned.select(*geo_cols_order)

# Show cleaned DataFrame
df_geo_cleaned.show(5)


+----+--------------------+--------------------+-------------------+
| ind|             country|         coordinates|          timestamp|
+----+--------------------+--------------------+-------------------+
|3337|British Indian Oc...| [-65.2363, 21.9622]|2020-06-29 05:02:22|
|2418|Antarctica (the t...|[-88.4642, -171.061]|2022-05-27 11:30:59|
|1434|Antarctica (the t...|[-39.6186, -128.291]|2020-03-06 21:00:23|
|5162|Antarctica (the t...|[-71.6607, -149.206]|2019-09-27 19:06:43|
|1335|Antarctica (the t...|[-77.9931, -175.682]|2022-03-19 17:29:42|
+----+--------------------+--------------------+-------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import concat_ws

# Create user_name column by combining first_name and last_name
df_user_cleaned = df_user.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))

# Drop first_name and last_name columns
df_user_cleaned = df_user_cleaned.drop("first_name", "last_name")

# Convert date_joined column to timestamp
df_user_cleaned = df_user_cleaned.withColumn("date_joined", to_timestamp("date_joined"))

# Reorder columns
user_cols_order = ["ind", "user_name", "age", "date_joined"]
df_user_cleaned = df_user_cleaned.select(*user_cols_order)

# Show cleaned DataFrame
df_user_cleaned.show(5)


+-----+--------------------+---+-------------------+
|  ind|           user_name|age|        date_joined|
+-----+--------------------+---+-------------------+
| 2015|Christopher Bradshaw| 27|2016-03-08 13:38:37|
|10673| Alexander Cervantes| 59|2017-05-12 21:22:17|
| 6398| Christina Davenport| 39|2016-06-29 20:43:59|
| 3599| Alexandria Alvarado| 20|2015-10-23 04:13:23|
| 5623| Alexander Blanchard| 32|2015-12-18 05:07:36|
+-----+--------------------+---+-------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import count, row_number
from pyspark.sql.window import Window

# Step 1: Join the geo and pin cleaned DataFrames on 'ind'
df_joined = df_pin_cleaned.join(df_geo_cleaned, on="ind")

# Step 2: Group by country and category, and count how many posts per category per country
df_grouped = df_joined.groupBy("country", "category").agg(count("*").alias("category_count"))

# Step 3: Use window function to rank categories by popularity within each country
window_spec = Window.partitionBy("country").orderBy(col("category_count").desc())

df_ranked = df_grouped.withColumn("rank", row_number().over(window_spec))

# Step 4: Filter only the top-ranked (most popular) category per country
df_result = df_ranked.filter(col("rank") == 1).drop("rank")

# Show result
df_result.show(truncate=False)


+--------------------------------------------+--------------+--------------+
|country                                     |category      |category_count|
+--------------------------------------------+--------------+--------------+
|Afghanistan                                 |education     |5             |
|Albania                                     |art           |11            |
|Algeria                                     |quotes        |7             |
|American Samoa                              |travel        |4             |
|Andorra                                     |tattoos       |4             |
|Angola                                      |diy-and-crafts|1             |
|Anguilla                                    |home-decor    |3             |
|Antarctica (the territory South of 60 deg S)|beauty        |2             |
|Antigua and Barbuda                         |art           |3             |
|Argentina                                   |tattoos       |4             |

In [0]:
from pyspark.sql.functions import year, col, count

# Step 1: Join the cleaned pin and geo DataFrames
df_joined = df_pin_cleaned.join(df_geo_cleaned, on="ind")

# Step 2: Extract the year from timestamp
df_with_year = df_joined.withColumn("post_year", year(col("timestamp")))

# Step 3: Filter for years between 2018 and 2022
df_filtered = df_with_year.filter((col("post_year") >= 2018) & (col("post_year") <= 2022))

# Step 4: Group by year and category, and count the number of posts
df_category_year = df_filtered.groupBy("post_year", "category").agg(count("*").alias("category_count"))

# Step 5: Order by post_year and category_count (optional)
df_category_year.orderBy("post_year", "category_count", ascending=[True, False]).show(truncate=False)


+---------+--------------+--------------+
|post_year|category      |category_count|
+---------+--------------+--------------+
|2018     |travel        |12            |
|2018     |quotes        |12            |
|2018     |education     |11            |
|2018     |diy-and-crafts|10            |
|2018     |beauty        |10            |
|2018     |art           |8             |
|2018     |mens-fashion  |8             |
|2018     |christmas     |7             |
|2018     |home-decor    |5             |
|2018     |tattoos       |5             |
|2018     |vehicles      |5             |
|2018     |event-planning|2             |
|2018     |finance       |1             |
|2019     |christmas     |23            |
|2019     |diy-and-crafts|22            |
|2019     |education     |13            |
|2019     |mens-fashion  |12            |
|2019     |art           |11            |
|2019     |finance       |11            |
|2019     |travel        |10            |
+---------+--------------+--------

In [0]:
from pyspark.sql.functions import col, max
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Step 1: Join pin and geo data
df_joined = df_pin_cleaned.join(df_geo_cleaned, on="ind")

# Step 2: Create a window partitioned by country and ordered by follower_count descending
window_spec = Window.partitionBy("country").orderBy(F.desc("follower_count"))

# Step 3: Rank users by follower count in each country
df_ranked = df_joined.withColumn("rank", F.row_number().over(window_spec))

# Step 4: Filter to get top user per country
df_top_per_country = df_ranked.filter(col("rank") == 1).select("country", "poster_name", "follower_count")

df_top_per_country.show(truncate=False)


+--------------------------------------------+----------------------------------------------------------------+--------------+
|country                                     |poster_name                                                     |follower_count|
+--------------------------------------------+----------------------------------------------------------------+--------------+
|Afghanistan                                 |9GAG                                                            |3000000       |
|Albania                                     |The Minds Journal                                               |5000000       |
|Algeria                                     |Apartment Therapy                                               |5000000       |
|American Samoa                              |Mamas Uncut                                                     |8000000       |
|Andorra                                     |Teachers Pay Teachers                                           |

In [0]:
# Use the result from previous query
df_most_followed_user = df_top_per_country.orderBy(F.desc("follower_count")).limit(1)

df_most_followed_user.select("country", "follower_count").show()


+--------+--------------+
| country|follower_count|
+--------+--------------+
|Anguilla|      15000000|
+--------+--------------+



In [0]:
from pyspark.sql.functions import col, when, count, row_number
from pyspark.sql.window import Window

# Step 1: Join pin and user data on 'ind'
df_joined_age = df_pin_cleaned.join(df_user_cleaned, on="ind")

# Step 2: Create age_group column
df_age_grouped = df_joined_age.withColumn(
    "age_group",
    when((col("age") >= 18) & (col("age") <= 24), "18-24")
    .when((col("age") >= 25) & (col("age") <= 35), "25-35")
    .when((col("age") >= 36) & (col("age") <= 50), "36-50")
    .otherwise("+50")
)

# Step 3: Group and count categories per age group
df_age_category_popular = (
    df_age_grouped.groupBy("age_group", "category")
    .agg(count("*").alias("category_count"))
)

# Step 4: Rank categories within each age_group
windowSpec = Window.partitionBy("age_group").orderBy(col("category_count").desc())

# Step 5: Filter to get the most popular category per age group
top_category_per_age_group = df_age_category_popular.withColumn(
    "rank", row_number().over(windowSpec)
).filter(col("rank") == 1).drop("rank")

# Show result
top_category_per_age_group.show(truncate=False)


+---------+----------+--------------+
|age_group|category  |category_count|
+---------+----------+--------------+
|+50      |travel    |7             |
|18-24    |art       |33            |
|25-35    |christmas |21            |
|36-50    |home-decor|12            |
+---------+----------+--------------+



In [0]:
from pyspark.sql.functions import col, when

# Step 1: Join user and pin data
df_joined_age = df_user_cleaned.join(df_pin_cleaned, on="ind")

# Step 2: Create age_group column
df_age_grouped = df_joined_age.withColumn(
    "age_group",
    when((col("age") >= 18) & (col("age") <= 24), "18-24")
    .when((col("age") >= 25) & (col("age") <= 35), "25-35")
    .when((col("age") >= 36) & (col("age") <= 50), "36-50")
    .otherwise("+50")
).select("age_group", "follower_count")

# Step 3: Collect median follower count per age group using approxQuantile
age_groups = df_age_grouped.select("age_group").distinct().rdd.flatMap(lambda x: x).collect()

# Step 4: Loop through each group and get median
results = []
for group in age_groups:
    median = df_age_grouped.filter(col("age_group") == group)\
                           .approxQuantile("follower_count", [0.5], 0.01)[0]
    results.append((group, median))

# Step 5: Create DataFrame from results
median_schema = ["age_group", "median_follower_count"]
df_median_followers = spark.createDataFrame(results, median_schema)

# Display result
df_median_followers.show(truncate=False)


+---------+---------------------+
|age_group|median_follower_count|
+---------+---------------------+
|36-50    |213.0                |
|+50      |196.0                |
|18-24    |1000000.0            |
|25-35    |412.0                |
+---------+---------------------+



In [0]:
from pyspark.sql.functions import year, col, count

# Step 1: Extract the year from the date_joined column
df_joined_year = df_user_cleaned.withColumn("post_year", year(col("date_joined")))

# Step 2: Filter years between 2015 and 2020 and group by year
df_users_joined_filtered = df_joined_year.filter(
    (col("post_year") >= 2015) & (col("post_year") <= 2020)
).groupBy("post_year").agg(count("*").alias("number_users_joined"))

# Step 3: Display the result
df_users_joined_filtered.orderBy("post_year").show()


+---------+-------------------+
|post_year|number_users_joined|
+---------+-------------------+
|     2015|                214|
|     2016|                224|
|     2017|                 87|
+---------+-------------------+



In [0]:
from pyspark.sql.functions import year, expr, col

# Step 1: Join pin and user data on `ind`
df_user_pin = df_user_cleaned.join(df_pin_cleaned, on="ind")

# Step 2: Extract post year
df_with_year = df_user_pin.withColumn("post_year", year("date_joined"))

# Step 3: Filter users who joined between 2015 and 2020
df_filtered = df_with_year.filter((col("post_year") >= 2015) & (col("post_year") <= 2020))

# Step 4: Compute median follower count
df_median = df_filtered.groupBy("post_year").agg(
    expr("percentile_approx(follower_count, 0.5)").alias("median_follower_count")
)

# Step 5: Show result
df_median.orderBy("post_year").show()


+---------+---------------------+
|post_year|median_follower_count|
+---------+---------------------+
|     2015|              2000000|
|     2016|                  313|
|     2017|                  213|
+---------+---------------------+



In [0]:
from pyspark.sql.functions import col, year, expr, when

# Step 1: Join user and pin DataFrames
df_joined = df_user_cleaned.join(df_pin_cleaned, on="ind")

# Step 2: Extract year from date_joined
df_with_year = df_joined.withColumn("post_year", year("date_joined"))

# Step 3: Create age_group column
df_with_age_group = df_with_year.withColumn(
    "age_group",
    when((col("age") >= 18) & (col("age") <= 24), "18-24")
    .when((col("age") >= 25) & (col("age") <= 35), "25-35")
    .when((col("age") >= 36) & (col("age") <= 50), "36-50")
    .otherwise("+50")
)

# Step 4: Filter years between 2015 and 2020
df_filtered = df_with_age_group.filter((col("post_year") >= 2015) & (col("post_year") <= 2020))

# Step 5: Compute median follower count grouped by age_group and post_year
df_median_grouped = df_filtered.groupBy("age_group", "post_year").agg(
    expr("percentile_approx(follower_count, 0.5)").alias("median_follower_count")
)

# Step 6: Show results
df_median_grouped.orderBy("age_group", "post_year").show(truncate=False)


+---------+---------+---------------------+
|age_group|post_year|median_follower_count|
+---------+---------+---------------------+
|+50      |2015     |196                  |
|+50      |2016     |189                  |
|+50      |2017     |409                  |
|18-24    |2015     |2000000              |
|18-24    |2016     |49                   |
|18-24    |2017     |612                  |
|25-35    |2015     |null                 |
|25-35    |2016     |437                  |
|25-35    |2017     |26                   |
|36-50    |2015     |0                    |
|36-50    |2016     |399                  |
|36-50    |2017     |213                  |
+---------+---------+---------------------+

