In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when,  regexp_replace, udf, expr, struct, to_timestamp, concat_ws

# Initialize Spark session (use Databricks-compatible settings)
spark = SparkSession.builder.appName("UseSpark").getOrCreate()

# S3 bucket details
bucket_name = "user-b194464884bf-bucket"

# Define the partitions and corresponding DataFrame names
partitions = {
    "b194464884bf.geo": "df_geo",
    "b194464884bf.pin": "df_pin",
    "b194464884bf.user": "df_user"
}

# Read each partition directly into PySpark DataFrames
for partition, df_name in partitions.items():
    folder_path = f"s3a://{bucket_name}/topics/{partition}/partition=2/"
    
    # Read JSON files from S3 into a PySpark DataFrame
    df = spark.read.json(folder_path)

    # Create variables dynamically using exec() for display() compatibility
    exec(f"{df_name} = df")

In [0]:
#Task 1
# Define patterns to replace with None
replace_patterns = [
    (".*User Info Error.*", None),
    (".*N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e.*", None),
    (".*No description available Story format.*", None),
    (".*No Title Data Available.*", None),
    (".*Image src error.*", None),
    (".*No description available.*",None)
]

# Iterate over all string columns and apply batch replace
for column in df_pin.columns:
    # Only process string columns
    if dict(df_pin.dtypes)[column] in ["string"]:
        df_pin = df_pin.withColumn(column, when(col(column) == "", None).otherwise(col(column)))  # Replace empty strings with None

        for pattern, replacement in replace_patterns:
            df_pin = df_pin.withColumn(column, when(col(column).rlike(pattern), None).otherwise(col(column)))


#Dropping rows with any null values
df_pin = df_pin.dropna(how="any")

# Remove "Local save in " from the 'save_location' column
df_pin = df_pin.withColumn("save_location", regexp_replace(col("save_location"), "Local save in ", ""))

#Add 
df_pin = df_pin.withColumn(
    "follower_count",
    when(col("follower_count").endswith("k"), regexp_replace(col("follower_count"), "k", "000"))
    .when(col("follower_count").endswith("M"), regexp_replace(col("follower_count"), "M", "000000"))
    .otherwise(col("follower_count"))
)

# Convert the column to integer type
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast("int"))

# Rename 'index' column to 'ind'
df_pin = df_pin.withColumnRenamed("index", "ind")

# Reorder columns in the specified order
df_pin = df_pin.select("ind", 
                 "unique_id", 
                 "title", 
                 "description", 
                 "follower_count", 
                 "poster_name", 
                 "tag_list", 
                 "is_image_or_video", 
                 "image_src", 
                 "save_location", 
                 "category")



#Cast df_pin as string
df_pin = df_pin.withColumn("ind", col("ind").cast("bigint"))

# Show the result
display(df_pin)




In [0]:
#Task 2
# Check if 'latitude' and 'longitude' exist before creating 'coordinates'
if "latitude" in df_geo.columns and "longitude" in df_geo.columns:
    df_geo = df_geo.withColumn("coordinates", struct(col("latitude"), col("longitude")))

    #Cast coordinates as string
    df_geo = df_geo.withColumn("coordinates", col("coordinates").cast("string"))

    # Drop 'latitude' and 'longitude' columns
    df_geo = df_geo.drop("latitude", "longitude")

# Convert 'timestamp' column to datetime format
df_geo = df_geo.withColumn("timestamp", to_timestamp(col("timestamp")))

# Reorder the columns
df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")

# Show the result
df_geo.show(truncate=False)


In [0]:
#Task 3
# Print columns to debug issue
print("Available columns:", df_user.columns)

# Check if 'first_name' and 'last_name' exist before creating 'user_name'
if "first_name" in df_user.columns and "last_name" in df_user.columns:
    df_user = df_user.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))
    
    # Drop 'first_name' and 'last_name' only if they exist
    df_user=df_user.drop("first_name", "last_name")

# Convert 'date_joined' column to datetime format
df_user=df_user.withColumn("timestamp",to_timestamp(col("date_joined")))

# Reorder the columns
df_user = df_user.select('ind', 'user_name', 'age', 'date_joined')

# Show the result
df_user.show(truncate=False)


In [0]:
# Ensure all DataFrames have at least 1000 rows before concatenation
# Initialize Spark session (if not already initialized)
spark = SparkSession.builder.appName("CombineDataFrames").getOrCreate()

# Limit each DataFrame to 1000 rows (equivalent to `.head(1000)`)
df_user_limited = df_user.limit(1000)
df_geo_limited = df_geo.limit(1000)
df_pin_limited = df_pin.limit(1000)

# Add a row number column to each DataFrame to ensure proper alignment
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("some_unique_column")  # Replace with a meaningful column for ordering

df_user_limited = df_user_limited.withColumn("row_num", row_number().over(window_spec))
df_geo_limited = df_geo_limited.withColumn("row_num", row_number().over(window_spec))
df_pin_limited = df_pin_limited.withColumn("row_num", row_number().over(window_spec))

# Perform a column-wise join using the row number as the key
df_combined = df_pin_limited.join(df_user_limited, "row_num", "inner").join(df_geo_limited, "row_num", "inner")

# Drop the helper row number column
df_combined = df_combined.drop("row_num")

display(df_combined)


In [0]:
#Task 4
# Initialize Spark session
spark = SparkSession.builder.appName("PopularCategory").getOrCreate()

# Assuming df_combined is your existing PySpark DataFrame
# Group by country and category, then count occurrences
df_category_count = df_combined.groupBy("country", "category").agg(count("*").alias("category_count"))

# Define a window specification to get the most popular category in each country
window_spec = Window.partitionBy("country").orderBy(col("category_count").desc())

# Add row number based on category count within each country
df_most_popular_category_by_country = df_category_count.withColumn("rank", row_number().over(window_spec))

# Filter the top-ranked category for each country
df_most_popular_category_by_country = df_most_popular_category_by_country.filter(col("rank") == 1).drop("rank")

# Show the result
display(df_most_popular_category_by_country)


In [0]:
#Task 5
# Convert timestamp column to datetime format
df_combined = df_combined.withColumn("post_year", year(col("timestamp")))

# Group by year and category, then count occurrences
df_category_count = df_combined.groupBy("post_year", "category").agg(count("*").alias("category_count"))

# Define a window specification to find the most popular category per year
window_spec = Window.partitionBy("post_year").orderBy(col("category_count").desc())

# Assign a ranking to each category within each year
df_ranked = df_category_count.withColumn("rank", row_number().over(window_spec))

# Filter only the most popular category (rank = 1) per year
df_most_popular_category_by_year = df_ranked.filter(col("rank") == 1).drop("rank")

display(df_most_popular_category_by_year)


In [0]:
#Task 6
#Step 1
#Find the user with most followers in each country

# Group by country and poster_name, then sum up follower counts
df_followers = df_combined.groupBy("country", "poster_name").agg(sum("follower_count").alias("total_followers"))

# Define a window specification to rank users by follower count within each country
window_spec = Window.partitionBy("country").orderBy(col("total_followers").desc())

# Assign ranking to users within each country
df_ranked = df_followers.withColumn("rank", row_number().over(window_spec))

# Filter only the top-ranked user per country
df_top_users_per_country = df_ranked.filter(col("rank") == 1).drop("rank")

display(df_top_users_per_country)


In [0]:
#Task 6
#Step 2
# Define a window specification to find the country with the highest follower count
window_spec = Window.orderBy(col("total_followers").desc())

# Assign a ranking based on the highest follower count across all countries
df_ranked = df_top_users_per_country.withColumn("rank", row_number().over(window_spec))

# Filter only the country with the highest follower count
df_top_country = df_ranked.filter(col("rank") == 1).select("country", "total_followers").drop("rank")

display(df_top_country)


In [0]:
#Task 7

# Define age bins using PySpark
df_combined = df_combined.withColumn(
    "age_group",
    when(col("age") <= 24, "18-24")
    .when((col("age") > 24) & (col("age") <= 35), "25-35")
    .when((col("age") > 35) & (col("age") <= 50), "36-50")
    .otherwise("50+")
)

# Group by age_group and category, then count occurrences
df_category_count = df_combined.groupBy("age_group", "category").agg(count("*").alias("category_count"))

# Define a window specification to rank categories within each age group
window_spec = Window.partitionBy("age_group").orderBy(col("category_count").desc())

# Assign ranking to categories within each age group
df_ranked = df_category_count.withColumn("rank", row_number().over(window_spec))

# Filter only the most popular category per age group
df_most_popular_category_by_age_group = df_ranked.filter(col("rank") == 1).drop("rank")

# Show the result
df_most_popular_category_by_age_group.show()

In [0]:
#Task 8
# Create a new DataFrame with age groups (without modifying df_combined)
df_age_grouped = df_combined.withColumn(
    "age_group",
    when(col("age") <= 24, "18-24")
    .when((col("age") > 24) & (col("age") <= 35), "25-35")
    .when((col("age") > 35) & (col("age") <= 50), "36-50")
    .otherwise("50+")
)

# Compute the median follower count for each age group using approxQuantile
df_median_followers = df_age_grouped.groupBy("age_group").agg(
    expr("percentile_approx(follower_count, 0.5)").alias("median_follower_count")
)

# Define a custom sorting column based on the age_group order
df_median_followers = df_median_followers.withColumn(
    "sort_order",
    when(col("age_group") == "18-24", 1)
    .when(col("age_group") == "25-35", 2)
    .when(col("age_group") == "36-50", 3)
    .when(col("age_group") == "50+", 4)
)

# Sort by the custom order and drop the helper column
df_median_followers = df_median_followers.orderBy("sort_order").drop("sort_order")

# Show the sorted result
df_median_followers.show()


In [0]:
#Task 9
# Convert 'date_joined' column to year (extracting the year)
df_with_year = df_combined.withColumn("post_year", year(col("date_joined")))

# Filter data for years between 2015 and 2020
df_filtered = df_with_year.filter((col("post_year") >= 2015) & (col("post_year") <= 2020))

# Count number of users joined per year
df_users_per_year = df_filtered.groupBy("post_year").agg(count("*").alias("number_users_joined"))

# Sort by year (optional, to ensure ascending order)
df_users_per_year = df_users_per_year.orderBy("post_year")

# Show the result
df_users_per_year.show()




In [0]:
#Task 10
# Convert 'date_joined' column to extract year
df_with_year = df_combined.withColumn("post_year", year(col("date_joined")))

# Filter data for users who joined between 2015 and 2020
df_filtered = df_with_year.filter((col("post_year") >= 2015) & (col("post_year") <= 2020))

# Calculate median follower count per year using approxQuantile
df_median_followers = df_filtered.groupBy("post_year").agg(
    expr("percentile_approx(follower_count, 0.5)").alias("median_follower_count")
)

# Sort by year (optional, to ensure ascending order)
df_median_followers = df_median_followers.orderBy("post_year")

# Show the result
df_median_followers.show()


In [0]:
#Task 11
# Convert date_joined column to datetime format and extract the year
df_combined['post_year'] = pd.to_datetime(df_combined['date_joined']).dt.year

# Filter data for users who joined between 2015 and 2020
df_filtered = df_combined[(df_combined['post_year'] >= 2015) & (df_combined['post_year'] <= 2020)]

# Group by post_year and age_group, then calculate the median follower count
df_median_followers_by_age_group = df_filtered.groupby(['post_year', 'age_group'])['follower_count'].median().reset_index(name='median_follower_count')

# Display the resulting DataFrame
df_median_followers_by_age_group

