In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
import random
spark = SparkSession.builder.appName("HardPySparkQuestions").getOrCreate()

## Hard Level PySpark Coding Questions with Solutions

`Question 1: Complex Window Functions with Multiple Partitions
Problem: Given a sales dataset, find the running total of sales for each product category
sale within its category, and the percentage contribution of each sale to the total category`

In [35]:
# Sample sales 
datasales_data = [ 
    ("2023-01-01", "Electronics", "Laptop", 1200),
    ("2023-01-02", "Electronics", "Phone", 800),
    ("2023-01-03", "Clothing", "Shirt", 50),
    ("2023-01-04", "Electronics", "Tablet", 600),
    ("2023-01-05", "Clothing", "Pants", 80),
    ("2023-01-06", "Electronics", "Headphones", 200),
    ("2023-01-07", "Clothing", "Jacket", 120),
    ("2023-01-08", "Electronics", "Mouse", 25)
]
schema = StructType([
    StructField('date', StringType(), True),
    StructField('category', StringType(), True),
    StructField('product', StringType(), True),
    StructField('amount', IntegerType(), True)
])

df = spark.createDataFrame(datasales_data, schema)
df.show()

+----------+-----------+----------+------+
|      date|   category|   product|amount|
+----------+-----------+----------+------+
|2023-01-01|Electronics|    Laptop|  1200|
|2023-01-02|Electronics|     Phone|   800|
|2023-01-03|   Clothing|     Shirt|    50|
|2023-01-04|Electronics|    Tablet|   600|
|2023-01-05|   Clothing|     Pants|    80|
|2023-01-06|Electronics|Headphones|   200|
|2023-01-07|   Clothing|    Jacket|   120|
|2023-01-08|Electronics|     Mouse|    25|
+----------+-----------+----------+------+



In [36]:
# Define winddow specifications
window_space_category_orderd = Window.partitionBy("category").orderBy("amount")
window_category_unorderd = Window.partitionBy("category")

# make the results
result = df.withColumn('running_total', sum('amount').over(window_space_category_orderd)) \
    .withColumn('rank_in_category', rank().over(window_space_category_orderd))\
    .withColumn('category_total', sum('amount').over(window_category_unorderd))\
    .withColumn('percentage_contribution', round((col('amount') / col("category_total"))*100,2)).orderBy('category', 'amount')

result.show()


+----------+-----------+----------+------+-------------+----------------+--------------+-----------------------+
|      date|   category|   product|amount|running_total|rank_in_category|category_total|percentage_contribution|
+----------+-----------+----------+------+-------------+----------------+--------------+-----------------------+
|2023-01-03|   Clothing|     Shirt|    50|           50|               1|           250|                   20.0|
|2023-01-05|   Clothing|     Pants|    80|          130|               2|           250|                   32.0|
|2023-01-07|   Clothing|    Jacket|   120|          250|               3|           250|                   48.0|
|2023-01-08|Electronics|     Mouse|    25|           25|               1|          2825|                   0.88|
|2023-01-06|Electronics|Headphones|   200|          225|               2|          2825|                   7.08|
|2023-01-04|Electronics|    Tablet|   600|          825|               3|          2825|        

`Question 2: Complex JOIN with Data Deduplication and Aggregation
Problem: You have customer and order datasets with duplicate records. Clean the data,
find customers who have made orders in consecutive months with their total spending`

In [37]:
# Customer data with duplicates
customers_data = [
    (1, "John Doe", "john@email.com", "2023-01-01"),
    (1, "John Doe", "john@email.com", "2023-01-01"), # duplicate
    (2, "Jane Smith", "jane@email.com", "2023-02-15"),
    (3, "Bob Johnson", "bob@email.com", "2023-01-10"),
    (3, "Bob Johnson", "bob_new@email.com", "2023-01-10"), # different emailHarshavardhana
]

customers_schema = StructType([
    StructField('customer_id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('email', StringType(), True),
    StructField('registration_date', StringType(), True)
])

cst_df = spark.createDataFrame(customers_data, customers_schema)
cst_df.show()

+-----------+-----------+-----------------+-----------------+
|customer_id|       name|            email|registration_date|
+-----------+-----------+-----------------+-----------------+
|          1|   John Doe|   john@email.com|       2023-01-01|
|          1|   John Doe|   john@email.com|       2023-01-01|
|          2| Jane Smith|   jane@email.com|       2023-02-15|
|          3|Bob Johnson|    bob@email.com|       2023-01-10|
|          3|Bob Johnson|bob_new@email.com|       2023-01-10|
+-----------+-----------+-----------------+-----------------+



In [38]:
# Orders 
dataorders_data = [ 
    (101, 1, "2023-01-15",250.00), 
    (102, 1, "2023-02-10", 180.00), 
    (103, 2, "2023-02-20", 320.00),
    (104, 2, "2023-03-05",150.00),
    (105, 3, "2023-01-25",400.00),
    (106, 1, "2023-03-12",275.00),
    (107, 2, "2023-04-08",190.00)]
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("amount", DoubleType(), True)])

order_df = spark.createDataFrame(dataorders_data, orders_schema)
order_df.show()


+--------+-----------+----------+------+
|order_id|customer_id|order_date|amount|
+--------+-----------+----------+------+
|     101|          1|2023-01-15| 250.0|
|     102|          1|2023-02-10| 180.0|
|     103|          2|2023-02-20| 320.0|
|     104|          2|2023-03-05| 150.0|
|     105|          3|2023-01-25| 400.0|
|     106|          1|2023-03-12| 275.0|
|     107|          2|2023-04-08| 190.0|
+--------+-----------+----------+------+



### Step-1 : Clean Customer data  - remove exact duplicates and handle email conflicts

In [39]:
cst_window = Window.partitionBy("customer_id","name","registration_date").orderBy('email')

cleaned_customers_df = cst_df.withColumn("row_number", row_number().over(cst_window))
cleaned_customers_df.show()
cleaned_customers_df =cleaned_customers_df.filter(col('row_number')==1).drop('row_number') 
cleaned_customers_df.show()







+-----------+-----------+-----------------+-----------------+----------+
|customer_id|       name|            email|registration_date|row_number|
+-----------+-----------+-----------------+-----------------+----------+
|          1|   John Doe|   john@email.com|       2023-01-01|         1|
|          1|   John Doe|   john@email.com|       2023-01-01|         2|
|          2| Jane Smith|   jane@email.com|       2023-02-15|         1|
|          3|Bob Johnson|    bob@email.com|       2023-01-10|         1|
|          3|Bob Johnson|bob_new@email.com|       2023-01-10|         2|
+-----------+-----------+-----------------+-----------------+----------+

+-----------+-----------+--------------+-----------------+
|customer_id|       name|         email|registration_date|
+-----------+-----------+--------------+-----------------+
|          1|   John Doe|john@email.com|       2023-01-01|
|          2| Jane Smith|jane@email.com|       2023-02-15|
|          3|Bob Johnson| bob@email.com|       

### Step 2: Add month-year to orders

In [40]:
order_with_month = order_df.withColumn('order_month', date_format(to_date(col("order_date")),"yyyy-MM"))
order_with_month.show()

+--------+-----------+----------+------+-----------+
|order_id|customer_id|order_date|amount|order_month|
+--------+-----------+----------+------+-----------+
|     101|          1|2023-01-15| 250.0|    2023-01|
|     102|          1|2023-02-10| 180.0|    2023-02|
|     103|          2|2023-02-20| 320.0|    2023-02|
|     104|          2|2023-03-05| 150.0|    2023-03|
|     105|          3|2023-01-25| 400.0|    2023-01|
|     106|          1|2023-03-12| 275.0|    2023-03|
|     107|          2|2023-04-08| 190.0|    2023-04|
+--------+-----------+----------+------+-----------+



### Step 3: Join and aggregate by customer and month

In [41]:
customer_monthly_order= cleaned_customers_df.join(order_with_month,"customer_id",'inner').groupBy('customer_id', 'name', 'order_month')\
    .agg(sum("amount").alias('monthly_total'), count('order_id').alias('order_count'))

customer_monthly_order.show()

+-----------+-----------+-----------+-------------+-----------+
|customer_id|       name|order_month|monthly_total|order_count|
+-----------+-----------+-----------+-------------+-----------+
|          3|Bob Johnson|    2023-01|        400.0|          1|
|          1|   John Doe|    2023-01|        250.0|          1|
|          2| Jane Smith|    2023-02|        320.0|          1|
|          1|   John Doe|    2023-02|        180.0|          1|
|          2| Jane Smith|    2023-04|        190.0|          1|
|          1|   John Doe|    2023-03|        275.0|          1|
|          2| Jane Smith|    2023-03|        150.0|          1|
+-----------+-----------+-----------+-------------+-----------+



In [42]:
customer_order_window = Window.partitionBy("customer_id").orderBy("order_month")
consecutive_month = customer_monthly_order.withColumn('prev_month', lag('order_month').over(customer_order_window)).withColumn("month_diff", months_between(to_date(col("order_month")), to_date(col("prev_month"))))\
    .filter(col('month_diff')==1)

consecutive_month.show()


+-----------+----------+-----------+-------------+-----------+----------+----------+
|customer_id|      name|order_month|monthly_total|order_count|prev_month|month_diff|
+-----------+----------+-----------+-------------+-----------+----------+----------+
|          1|  John Doe|    2023-02|        180.0|          1|   2023-01|       1.0|
|          1|  John Doe|    2023-03|        275.0|          1|   2023-02|       1.0|
|          2|Jane Smith|    2023-03|        150.0|          1|   2023-02|       1.0|
|          2|Jane Smith|    2023-04|        190.0|          1|   2023-03|       1.0|
+-----------+----------+-----------+-------------+-----------+----------+----------+



In [43]:
# Step 5: Final result with spending pattern
result_df = consecutive_month.select("customer_id","name","prev_month","order_month","monthly_total","order_count",)\
    .withColumnRenamed('prev_month', 'first_month')\
    .withColumnRenamed('order_month', 'second_month')\
    .withColumnRenamed('monthly_total', 'secont_month_total')\
    .orderBy('customer_id', 'second_month')

result_df.show()

+-----------+----------+-----------+------------+------------------+-----------+
|customer_id|      name|first_month|second_month|secont_month_total|order_count|
+-----------+----------+-----------+------------+------------------+-----------+
|          1|  John Doe|    2023-01|     2023-02|             180.0|          1|
|          1|  John Doe|    2023-02|     2023-03|             275.0|          1|
|          2|Jane Smith|    2023-02|     2023-03|             150.0|          1|
|          2|Jane Smith|    2023-03|     2023-04|             190.0|          1|
+-----------+----------+-----------+------------+------------------+-----------+



`Question 3: Advanced Data Skewness Handling with Custom Partition
Problem: Handle a highly skewed dataset where one partition key has 80% of the data.
salting technique and compare performance.`

In [44]:
# Create skewed data - most record have the same key
skewd_data = []

#80% of data with key 'A'
for i in range(8000):
    skewd_data.append(('A', f'value_{i}', random.randint(1,100)))

# 20% orf data with other keys
for key in ['B','C','D','E'] :
    for i in range(1000):
        skewd_data.append((key, f'value_{i}', random.randint(1,100)))

skewd_schema = StructType([
    StructField('key', StringType(), True),
    StructField('value', StringType(), True),
    StructField('amount', IntegerType(), True)
])

skewd_df = spark.createDataFrame(skewd_data, skewd_schema)
skewd_df.show()

+---+--------+------+
|key|   value|amount|
+---+--------+------+
|  A| value_0|    39|
|  A| value_1|    41|
|  A| value_2|     9|
|  A| value_3|    60|
|  A| value_4|    16|
|  A| value_5|    78|
|  A| value_6|    58|
|  A| value_7|    95|
|  A| value_8|     4|
|  A| value_9|    52|
|  A|value_10|    75|
|  A|value_11|    40|
|  A|value_12|    56|
|  A|value_13|    74|
|  A|value_14|    29|
|  A|value_15|    77|
|  A|value_16|    75|
|  A|value_17|    43|
|  A|value_18|     7|
|  A|value_19|    88|
+---+--------+------+
only showing top 20 rows


In [45]:
# Method 1: Without salting (original skewed approach)
def process_without_salting():
    result = skewd_df.groupBy('key').agg(
        sum('amount').alias('total_amount'),
        avg('amount').alias('avarage_amount'),
        count('*').alias("record_count"),
    )
    return result

#Method 2: With salting to handle skew
def process_with_salting():
    slat_factor = 10 #number of slats
    # Step 1: Ass salt to the skewed key
    salted_df = skewd_df.withColumn('salt', when(col('key')=='A', (rand()*slat_factor).cast('int')).otherwise(lit('0')))\
        .withColumn('salted_key', concat(col('key'), lit('_'), col('salt')))
    
    # Step 2 : Group by salted key first

    intermediate_result = salted_df.groupBy("key", "salted_key").agg(
        sum("amount").alias("partial_sum"),
        sum(lit(1)).alias('partial_count')
    )

    # Step 3 : Final ggregation to get original key result

    final_result = intermediate_result.groupBy('key').agg(
        sum('partial_sum').alias('total_amount'),
        (sum('partial_sum')/sum('partial_count')).alias('avg_amount'),
        sum('partial_count').alias('record_count')
    )

    return final_result


# Execute both method
print('================-------->Without Salting<--------==================')
result_no_salt = process_without_salting()
result_no_salt.show()


print('================-------->With Salting<--------==================')
result_salt = process_with_salting()
result_salt.show()


## # Performance comparison function
def compare_partitions():
    print("\n==================Partition Analisys=============")
    #Check partition distribution
    partition_count = skewd_df.rdd.mapPartitions(lambda x: [sum(1 for _ in x)]).collect()
    print(f"Original partition didtribution : {partition_count}")

    #With salting
    salted_temp = skewd_df.rdd.withColumn("salt", when(col("key") == "A", (rand() * 10).cast("int")).otherwise(lit(0)))\
        .withColumn("salted_key", concat(col("key"), lit("_"), col('salt')))
    
    repartitioned_df = salted_temp.repartition(col('salted_key'))
    partition_count_salted = repartitioned_df.mapPartitions(lambda x: [sum(1 for _ in x)]).collect()
    print(f"Salted partition distribution : {partition_count_salted}")

compare_partitions()






+---+------------+--------------+------------+
|key|total_amount|avarage_amount|record_count|
+---+------------+--------------+------------+
|  E|       49910|         49.91|        1000|
|  B|       50219|        50.219|        1000|
|  D|       51708|        51.708|        1000|
|  C|       51237|        51.237|        1000|
|  A|      397773|     49.721625|        8000|
+---+------------+--------------+------------+

+---+------------+----------+------------+
|key|total_amount|avg_amount|record_count|
+---+------------+----------+------------+
|  E|       49910|     49.91|        1000|
|  B|       50219|    50.219|        1000|
|  D|       51708|    51.708|        1000|
|  C|       51237|    51.237|        1000|
|  A|      397773| 49.721625|        8000|
+---+------------+----------+------------+




PySparkNotImplementedError: [NOT_IMPLEMENTED] rdd is not implemented.