<a href="https://colab.research.google.com/github/yashk121/pyspark-interview/blob/main/PysparkQuestions_II.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Scenario 1


Scenario 1: Data Skew and Optimization

You have two large dataframes:

transactions_df: Contains millions of transaction records—each with a user_id, amount, and timestamp.

users_df: Contains user attributes (user_id, country, email).

You need to aggregate total amount per country, but when you join transactions_df with users_df using user_id, the job is very slow and fails due to data skew.

Task:
Write PySpark code to efficiently perform this aggregation while handling data skew. Explain your strategy—such as using broadcast joins, repartitioning, or adding salt keys.

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("DailyRevenue").getOrCreate()

Trying to answer by Data skewness handling approach

In [12]:
data=[(1, "2023-01-01 10:00:00", "100"),(1, "2023-01-02 10:05:00", "200"),(2, "2023-01-03 11:00:00", "300"),\
        (2, "2023-01-04 11:10:00", "400"),(1, "2023-02-10 10:15:00", "500"),(3, "2023-01-11 12:00:00", "600"),\
        (3, "2023-09-01 12:05:00", "700"),(3, "2023-01-12 12:10:00", "800") ]
columns = ["user_id", "timestamp", "amount"]
transactions_df = spark.createDataFrame(data , columns)
transactions_df.show()

+-------+-------------------+------+
|user_id|          timestamp|amount|
+-------+-------------------+------+
|      1|2023-01-01 10:00:00|   100|
|      1|2023-01-02 10:05:00|   200|
|      2|2023-01-03 11:00:00|   300|
|      2|2023-01-04 11:10:00|   400|
|      1|2023-02-10 10:15:00|   500|
|      3|2023-01-11 12:00:00|   600|
|      3|2023-09-01 12:05:00|   700|
|      3|2023-01-12 12:10:00|   800|
+-------+-------------------+------+



In [13]:
users_data = [
    (1, "USA", "user1@example.com"),
    (2, "Canada", "user2@example.com"),
    (3, "USA", "user3@example.com"),
    (4, "UK", "user4@example.com"),
    (5, "Canada", "user5@example.com")
]
users_columns = ["user_id", "country", "email"]
users_df = spark.createDataFrame(users_data, users_columns)
users_df.show()

+-------+-------+-----------------+
|user_id|country|            email|
+-------+-------+-----------------+
|      1|    USA|user1@example.com|
|      2| Canada|user2@example.com|
|      3|    USA|user3@example.com|
|      4|     UK|user4@example.com|
|      5| Canada|user5@example.com|
+-------+-------+-----------------+



In [24]:
transactions_df_salted = transactions_df.withColumn("user_id_salted" , concat(col("user_id") ,lit('_'),lit(10 * rand()).cast("int")))
transactions_df_salted = transactions_df_salted.repartition(col("user_id_salted"))
users_df = users_df.withColumn("salt" , F.explode(F.array([F.lit(i) for i in range(10)])))
users_df_salted = users_df.withColumn("user_id_salted" , concat(col("user_id") ,lit('_'),col("salt")))

In [21]:
merged_df = transactions_df_salted.join(users_df_salted, on=users_df_salted.user_id_salted == transactions_df_salted.user_id_salted, how="inner")
merged_df.show()

+-------+-------------------+------+--------------+-------+-------+-----------------+
|user_id|          timestamp|amount|user_id_salted|user_id|country|            email|
+-------+-------------------+------+--------------+-------+-------+-----------------+
|      1|2023-01-01 10:00:00|   100|           1_1|      1|    USA|user1@example.com|
|      1|2023-01-02 10:05:00|   200|           1_2|      1|    USA|user1@example.com|
|      1|2023-02-10 10:15:00|   500|           1_0|      1|    USA|user1@example.com|
|      2|2023-01-03 11:00:00|   300|           2_0|      2| Canada|user2@example.com|
|      2|2023-01-04 11:10:00|   400|           2_7|      2| Canada|user2@example.com|
|      3|2023-01-11 12:00:00|   600|           3_5|      3|    USA|user3@example.com|
|      3|2023-09-01 12:05:00|   700|           3_7|      3|    USA|user3@example.com|
|      3|2023-01-12 12:10:00|   800|           3_1|      3|    USA|user3@example.com|
+-------+-------------------+------+--------------+---

Using broadcast join

In [None]:
merged_df = transactions_df.join(broadcast(users_df), on=transactions_df.user_id == users_df.user_id, how="inner")
merged_df.show()

# Scenario 2


Scenario 2: Rolling Aggregation with Window Functions

You have a DataFrame called web_events_df with columns: user_id, event_time, and page_category. Each user can have multiple web events across different time points.

Task:
Write PySpark code to calculate the rolling 7-day count of events for each user—i.e., for each event, show how many events the user performed in the previous 7 days (including the current event).
Explain your approach using PySpark’s window functions.

In [15]:
# create a DataFrame called web_events_df with columns: user_id, event_time, and page_category.
# Each user can have multiple web events across different time points.
data=[(1, "2023-01-01 10:00:00", "Home"),(1, "2023-01-02 10:05:00", "Products"),(2, "2023-01-03 11:00:00", "Home"),\
        (2, "2023-01-04 11:10:00", "Contact"),(1, "2023-02-10 10:15:00", "Cart"),(3, "2023-01-11 12:00:00", "Home"),\
        (3, "2023-09-01 12:05:00", "Products"),(3, "2023-01-12 12:10:00", "Checkout")]
columns = ["user_id", "event_time", "page_category"]
web_events_df=spark.createDataFrame(data, columns)
web_events_df=web_events_df.withColumn("event_time", to_timestamp(col("event_time"), "yyyy-MM-dd HH:mm:ss"))
web_events_df.orderBy("user_id","event_time").show()
#web_events_df.printSchema()

+-------+-------------------+-------------+
|user_id|         event_time|page_category|
+-------+-------------------+-------------+
|      1|2023-01-01 10:00:00|         Home|
|      1|2023-01-02 10:05:00|     Products|
|      1|2023-02-10 10:15:00|         Cart|
|      2|2023-01-03 11:00:00|         Home|
|      2|2023-01-04 11:10:00|      Contact|
|      3|2023-01-11 12:00:00|         Home|
|      3|2023-01-12 12:10:00|     Checkout|
|      3|2023-09-01 12:05:00|     Products|
+-------+-------------------+-------------+



In [16]:
web_events_df.createOrReplaceTempView("vw_web_events_df")

In [17]:
agg_df=spark.sql("select user_id,event_time,page_category, count(page_category) \
over(partition by user_id order by event_time range between interval '7' day preceding and current row)as rolling_7_sum from vw_web_events_df")
agg_df.orderBy("user_id").show()
### Explanation: windows

+-------+-------------------+-------------+-------------+
|user_id|         event_time|page_category|rolling_7_sum|
+-------+-------------------+-------------+-------------+
|      1|2023-01-01 10:00:00|         Home|            1|
|      1|2023-01-02 10:05:00|     Products|            2|
|      1|2023-02-10 10:15:00|         Cart|            1|
|      2|2023-01-03 11:00:00|         Home|            1|
|      2|2023-01-04 11:10:00|      Contact|            2|
|      3|2023-01-11 12:00:00|         Home|            1|
|      3|2023-01-12 12:10:00|     Checkout|            2|
|      3|2023-09-01 12:05:00|     Products|            1|
+-------+-------------------+-------------+-------------+



Trying to use Pyspark code for the same logic


In [18]:

web_events_df = web_events_df.withColumn("event_ts", F.unix_timestamp("event_time"))
window = Window.partitionBy(col("user_id")).orderBy("event_ts").rangeBetween(-7*86400, 0)
ans_df = web_events_df.select(col("user_id"), col("event_time") , col("page_category") , count("*").over(window).alias("rolling_7_sum"))

In [19]:
  ans_df.show()

+-------+-------------------+-------------+-------------+
|user_id|         event_time|page_category|rolling_7_sum|
+-------+-------------------+-------------+-------------+
|      1|2023-01-01 10:00:00|         Home|            1|
|      1|2023-01-02 10:05:00|     Products|            2|
|      1|2023-02-10 10:15:00|         Cart|            1|
|      2|2023-01-03 11:00:00|         Home|            1|
|      2|2023-01-04 11:10:00|      Contact|            2|
|      3|2023-01-11 12:00:00|         Home|            1|
|      3|2023-01-12 12:10:00|     Checkout|            2|
|      3|2023-09-01 12:05:00|     Products|            1|
+-------+-------------------+-------------+-------------+

