<a href="https://colab.research.google.com/github/rahulrajpr/prepare-anytime/blob/main/spark/coding/set2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Interview Preparation - Set 2 (Easy/Medium)

## Overview & Instructions

### How to run this notebook in Google Colab:
1. Upload this .ipynb file to Google Colab
2. Run the installation cells below
3. Execute each problem cell sequentially

### Installation Commands:
The following cell installs Java and PySpark:

In [None]:
# Install Java and PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 1.8.0_462
Branch HEAD
Compiled by user heartsavior on 2024-02-15T11:24:58Z
Revision fd86f85e181fc2dc0f50a096855acf83a6cc5d9c
Url https://github.com/apache/spark
Type --help for more information.


### SparkSession Initialization:

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder\
    .appName("PySparkInterviewSet2")\
    .config("spark.sql.adaptive.enabled", "true")\
    .getOrCreate()

spark.conf.set("spark.sql.adaptive.enabled", "true")

### DataFrame Assertion Function:

This function compares DataFrames ignoring order and with floating-point tolerance:

In [None]:
def assert_dataframe_equal(df_actual, df_expected, epsilon=1e-6, check_schema_strict=False):
    """Compare two DataFrames using PySpark operations"""

    if check_schema_strict:
        # Check schema exactly
        if df_actual.schema != df_expected.schema:
            print("Schema mismatch!")
            print("Actual schema:", df_actual.schema)
            print("Expected schema:", df_expected.schema)
            raise AssertionError("Schema mismatch")
    else:
        # Check column names and basic types
        actual_fields = df_actual.schema
        expected_fields = df_expected.schema

        if len(actual_fields) != len(expected_fields):
            print("Column count mismatch!")
            raise AssertionError("Column count mismatch")

        for i, (actual_field, expected_field) in enumerate(zip(actual_fields, expected_fields)):
            if actual_field.name != expected_field.name:
                print(f"Column name mismatch at position {i}: {actual_field.name} vs {expected_field.name}")
                raise AssertionError("Column name mismatch")

    # Rest of your comparison logic remains the same
    if df_actual.count() != df_expected.count():
        print(f"Row count mismatch! Actual: {df_actual.count()}, Expected: {df_expected.count()}")
        raise AssertionError("Row count mismatch")

    diff_actual = df_actual.exceptAll(df_expected)
    diff_expected = df_expected.exceptAll(df_actual)

    if diff_actual.count() > 0 or diff_expected.count() > 0:
        print("Data mismatch!")
        print("Rows in actual but not in expected:")
        diff_actual.show()
        print("Rows in expected but not in actual:")
        diff_expected.show()
        raise AssertionError("Data content mismatch")

    print("✓ DataFrames are equal!\n")
    return True

## Table of Contents - Set 2 (Easy/Medium)

**Difficulty Distribution:** 30 Easy/Medium Problems

**Topics Covered:**
- Advanced Joins & Deduplication (8 problems)
- Complex Window Functions (6 problems)
- Multi-level Aggregations (6 problems)
- Advanced UDFs & Pandas UDFs (4 problems)
- Nested Data Operations (3 problems)
- Performance & Partitioning (3 problems)

## Problem 1: Customer Lifetime Value Calculation

**Requirement:** Marketing analytics needs to calculate Customer Lifetime Value (CLV) for segmentation.

**Scenario:** Calculate total revenue, average order value, and purchase frequency for each customer over their lifetime.

In [None]:
# Source DataFrame
customer_orders_data = [
    (1, "C001", "2023-01-15", 100.0),
    (2, "C001", "2023-02-20", 150.0),
    (3, "C002", "2023-01-10", 200.0),
    (4, "C001", "2023-03-05", 75.0),
    (5, "C003", "2023-02-01", 300.0),
    (6, "C002", "2023-03-15", 250.0),
    (7, "C003", "2023-03-20", 100.0),
    (8, "C004", "2023-01-25", 500.0),
    (9, "C001", "2023-04-10", 125.0)
]

customer_orders_df = spark.createDataFrame(customer_orders_data, ["order_id", "customer_id", "order_date", "amount"])
customer_orders_df = customer_orders_df.withColumn("order_date", col("order_date").cast("date"))
customer_orders_df.show()

+--------+-----------+----------+------+
|order_id|customer_id|order_date|amount|
+--------+-----------+----------+------+
|       1|       C001|2023-01-15| 100.0|
|       2|       C001|2023-02-20| 150.0|
|       3|       C002|2023-01-10| 200.0|
|       4|       C001|2023-03-05|  75.0|
|       5|       C003|2023-02-01| 300.0|
|       6|       C002|2023-03-15| 250.0|
|       7|       C003|2023-03-20| 100.0|
|       8|       C004|2023-01-25| 500.0|
|       9|       C001|2023-04-10| 125.0|
+--------+-----------+----------+------+



In [None]:
# Expected Output
expected_data = [
    ("C004", 1, 500.0, 500.0, 500.0),
    ("C003", 2, 400.0, 200.0, 200.0),
    ("C002", 2, 450.0, 225.0, 225.0),
    ("C001", 4, 450.0, 112.5, 112.5)
]

expected_df = spark.createDataFrame(expected_data, ["customer_id", "total_orders", "total_revenue", "avg_order_value", "clv"])
expected_df.show()

+-----------+------------+-------------+---------------+-----+
|customer_id|total_orders|total_revenue|avg_order_value|  clv|
+-----------+------------+-------------+---------------+-----+
|       C004|           1|        500.0|          500.0|500.0|
|       C003|           2|        400.0|          200.0|200.0|
|       C002|           2|        450.0|          225.0|225.0|
|       C001|           4|        450.0|          112.5|112.5|
+-----------+------------+-------------+---------------+-----+



In [None]:
# YOUR SOLUTION HERE

from pyspark.sql import functions as fn
from pyspark.sql import types as tp
from pyspark.sql.window import Window

##-----------

result_df = \
      customer_orders_df\
          .groupBy('customer_id')\
          .agg(
              fn.count(fn.col('order_id')).alias('total_orders'),
              fn.sum(fn.col('amount')).alias('total_revenue'),
              fn.avg(fn.col('amount')).alias('avg_order_value'),
              fn.avg(fn.col('amount')).alias('clv'),
              )

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-----------+------------+-------------+---------------+-----+
|customer_id|total_orders|total_revenue|avg_order_value|  clv|
+-----------+------------+-------------+---------------+-----+
|       C001|           4|        450.0|          112.5|112.5|
|       C002|           2|        450.0|          225.0|225.0|
|       C003|           2|        400.0|          200.0|200.0|
|       C004|           1|        500.0|          500.0|500.0|
+-----------+------------+-------------+---------------+-----+

✓ DataFrames are equal!



True

**Instructor Notes:** Multi-column aggregation with customer metrics. Tests complex business metric calculations.

## Problem 2: Employee Department Hierarchy

**Requirement:** HR needs to identify employees with their managers for organizational reporting.

**Scenario:** Perform self-join on employee table to get manager names for each employee.

In [None]:
# Source DataFrame
employees_hierarchy_data = [
    (1, "John CEO", None, "CEO"),
    (2, "Alice VP", 1, "VP Engineering"),
    (3, "Bob Manager", 2, "Engineering Manager"),
    (4, "Charlie Developer", 3, "Senior Developer"),
    (5, "Diana VP", 1, "VP Marketing"),
    (6, "Eve Specialist", 5, "Marketing Specialist"),
    (7, "Frank Manager", 2, "QA Manager")
]

employees_hierarchy_df = spark.createDataFrame(employees_hierarchy_data, ["emp_id", "emp_name", "manager_id", "title"])
employees_hierarchy_df.show()

+------+-----------------+----------+--------------------+
|emp_id|         emp_name|manager_id|               title|
+------+-----------------+----------+--------------------+
|     1|         John CEO|      NULL|                 CEO|
|     2|         Alice VP|         1|      VP Engineering|
|     3|      Bob Manager|         2| Engineering Manager|
|     4|Charlie Developer|         3|    Senior Developer|
|     5|         Diana VP|         1|        VP Marketing|
|     6|   Eve Specialist|         5|Marketing Specialist|
|     7|    Frank Manager|         2|          QA Manager|
+------+-----------------+----------+--------------------+



In [None]:
# Expected Output
expected_data = [
    (2, "Alice VP", "John CEO", "VP Engineering"),
    (3, "Bob Manager", "Alice VP", "Engineering Manager"),
    (4, "Charlie Developer", "Bob Manager", "Senior Developer"),
    (5, "Diana VP", "John CEO", "VP Marketing"),
    (6, "Eve Specialist", "Diana VP", "Marketing Specialist"),
    (7, "Frank Manager", "Alice VP", "QA Manager")
]

expected_df = spark.createDataFrame(expected_data, ["emp_id", "emp_name", "manager_name", "title"])
expected_df.show()

+------+-----------------+------------+--------------------+
|emp_id|         emp_name|manager_name|               title|
+------+-----------------+------------+--------------------+
|     2|         Alice VP|    John CEO|      VP Engineering|
|     3|      Bob Manager|    Alice VP| Engineering Manager|
|     4|Charlie Developer| Bob Manager|    Senior Developer|
|     5|         Diana VP|    John CEO|        VP Marketing|
|     6|   Eve Specialist|    Diana VP|Marketing Specialist|
|     7|    Frank Manager|    Alice VP|          QA Manager|
+------+-----------------+------------+--------------------+



In [None]:
# YOUR SOLUTION HERE

join_on = expr('emp.manager_id == man.emp_id')

result_df = \
      employees_hierarchy_df.alias('emp')\
          .join(employees_hierarchy_df.alias('man'),
                join_on,
                'inner')\
          .select(fn.col('emp.emp_id'),
                  fn.col('emp.emp_name'),
                  fn.col('man.emp_name').alias('manager_name'),
                  fn.col('emp.title')
                  )

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+------+-----------------+------------+--------------------+
|emp_id|         emp_name|manager_name|               title|
+------+-----------------+------------+--------------------+
|     2|         Alice VP|    John CEO|      VP Engineering|
|     5|         Diana VP|    John CEO|        VP Marketing|
|     3|      Bob Manager|    Alice VP| Engineering Manager|
|     7|    Frank Manager|    Alice VP|          QA Manager|
|     4|Charlie Developer| Bob Manager|    Senior Developer|
|     6|   Eve Specialist|    Diana VP|Marketing Specialist|
+------+-----------------+------------+--------------------+

✓ DataFrames are equal!



True

**Instructor Notes:** Self-join operation. Tests joining a table with itself on different conditions.

## Problem 3: Running Total with Window Functions

**Requirement:** Finance team needs running total of daily sales for cash flow analysis.

**Scenario:** Calculate cumulative sum of sales ordered by date using window functions.

In [None]:
# Source DataFrame
daily_sales_data = [
    ("2023-01-01", 1000.0),
    ("2023-01-02", 1500.0),
    ("2023-01-03", 800.0),
    ("2023-01-04", 2000.0),
    ("2023-01-05", 1200.0),
    ("2023-01-06", 1800.0)
]

daily_sales_df = spark.createDataFrame(daily_sales_data, ["date", "daily_sales"])
daily_sales_df = daily_sales_df.withColumn("date", col("date").cast("date"))
daily_sales_df.show()

+----------+-----------+
|      date|daily_sales|
+----------+-----------+
|2023-01-01|     1000.0|
|2023-01-02|     1500.0|
|2023-01-03|      800.0|
|2023-01-04|     2000.0|
|2023-01-05|     1200.0|
|2023-01-06|     1800.0|
+----------+-----------+



In [None]:
# Expected Output
expected_data = [
    ("2023-01-01", 1000.0, 1000.0),
    ("2023-01-02", 1500.0, 2500.0),
    ("2023-01-03", 800.0, 3300.0),
    ("2023-01-04", 2000.0, 5300.0),
    ("2023-01-05", 1200.0, 6500.0),
    ("2023-01-06", 1800.0, 8300.0)
]

expected_df = spark.createDataFrame(expected_data, ["date", "daily_sales", "running_total"])
expected_df = expected_df.withColumn("date", col("date").cast("date"))
expected_df.show()

+----------+-----------+-------------+
|      date|daily_sales|running_total|
+----------+-----------+-------------+
|2023-01-01|     1000.0|       1000.0|
|2023-01-02|     1500.0|       2500.0|
|2023-01-03|      800.0|       3300.0|
|2023-01-04|     2000.0|       5300.0|
|2023-01-05|     1200.0|       6500.0|
|2023-01-06|     1800.0|       8300.0|
+----------+-----------+-------------+



In [None]:
# YOUR SOLUTION HERE

win = Window.orderBy(fn.col('date').asc_nulls_last())\
            .rowsBetween(Window.unboundedPreceding, Window.currentRow) #

result_df = \
      daily_sales_df\
        .withColumn('running_total', sum(fn.col('daily_sales')).over(win))

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+----------+-----------+-------------+
|      date|daily_sales|running_total|
+----------+-----------+-------------+
|2023-01-01|     1000.0|       1000.0|
|2023-01-02|     1500.0|       2500.0|
|2023-01-03|      800.0|       3300.0|
|2023-01-04|     2000.0|       5300.0|
|2023-01-05|     1200.0|       6500.0|
|2023-01-06|     1800.0|       8300.0|
+----------+-----------+-------------+

✓ DataFrames are equal!



True

**Instructor Notes:** Window function with cumulative sum. Tests unbounded window for running totals.

## Problem 4: Product Recommendation Engine

**Requirement:** E-commerce team wants to recommend products frequently bought together.

**Scenario:** Find product pairs that are frequently purchased in the same order.

In [None]:
# Source DataFrame
order_items_data = [
    (1, "P001", "Laptop"),
    (1, "P002", "Mouse"),
    (1, "P003", "Laptop Bag"),
    (2, "P001", "Laptop"),
    (2, "P002", "Mouse"),
    (3, "P004", "Monitor"),
    (3, "P002", "Mouse"),
    (4, "P001", "Laptop"),
    (4, "P005", "Keyboard"),
    (5, "P002", "Mouse"),
    (5, "P005", "Keyboard")
]

order_items_df = spark.createDataFrame(order_items_data, ["order_id", "product_id", "product_name"])
order_items_df.show()

+--------+----------+------------+
|order_id|product_id|product_name|
+--------+----------+------------+
|       1|      P001|      Laptop|
|       1|      P002|       Mouse|
|       1|      P003|  Laptop Bag|
|       2|      P001|      Laptop|
|       2|      P002|       Mouse|
|       3|      P004|     Monitor|
|       3|      P002|       Mouse|
|       4|      P001|      Laptop|
|       4|      P005|    Keyboard|
|       5|      P002|       Mouse|
|       5|      P005|    Keyboard|
+--------+----------+------------+



In [None]:
# Expected Output
expected_data = [
    ("P001", "Laptop", "P002", "Mouse", 2),
    ("P002", "Mouse", "P005", "Keyboard", 2),
    ("P001", "Laptop", "P003", "Laptop Bag", 1),
    ("P004", "Monitor", "P002", "Mouse", 1),
    ("P001", "Laptop", "P005", "Keyboard", 1)
]

expected_df = spark.createDataFrame(expected_data, ["product1_id", "product1_name", "product2_id", "product2_name", "pair_count"])
expected_df.show()

+-----------+-------------+-----------+-------------+----------+
|product1_id|product1_name|product2_id|product2_name|pair_count|
+-----------+-------------+-----------+-------------+----------+
|       P001|       Laptop|       P002|        Mouse|         2|
|       P002|        Mouse|       P005|     Keyboard|         2|
|       P001|       Laptop|       P003|   Laptop Bag|         1|
|       P004|      Monitor|       P002|        Mouse|         1|
|       P001|       Laptop|       P005|     Keyboard|         1|
+-----------+-------------+-----------+-------------+----------+



In [None]:
# YOUR SOLUTION HERE

join_on = fn.expr('''
                  table1.order_id = table2.order_id
                  and
                  table1.product_id > table2.product_id
                  ''')

result_df = \
        order_items_df.alias('table1')\
              .join(order_items_df.alias('table2'),
                    join_on,
                    'inner')\
              .select(
                    fn.col('table1.product_id').alias('product1_id'),
                    fn.col('table1.product_name').alias('product1_name'),
                    fn.col('table2.product_id').alias('product2_id'),
                    fn.col('table2.product_name').alias('product2_name'),
                    fn.col('table1.order_id').alias('order_id')
                     )\
              .groupBy('product1_id','product1_name','product2_id','product2_name')\
                  .agg(fn.count(fn.col('order_id')).alias('pair_count'))\
              .orderBy(fn.col('pair_count').desc_nulls_first(),
                       fn.col('product1_id'),
                       fn.col('product2_id'),
                       )

result_df.show()

# # Test your solution
# assert_dataframe_equal(result_df, expected_df)

+-----------+-------------+-----------+-------------+----------+
|product1_id|product1_name|product2_id|product2_name|pair_count|
+-----------+-------------+-----------+-------------+----------+
|       P002|        Mouse|       P001|       Laptop|         2|
|       P003|   Laptop Bag|       P001|       Laptop|         1|
|       P003|   Laptop Bag|       P002|        Mouse|         1|
|       P004|      Monitor|       P002|        Mouse|         1|
|       P005|     Keyboard|       P001|       Laptop|         1|
|       P005|     Keyboard|       P002|        Mouse|         1|
+-----------+-------------+-----------+-------------+----------+



**Instructor Notes:** Self-join for co-occurrence analysis. Tests complex join conditions and pair counting.

## Problem 5: Time-Based Sessionization

**Requirement:** Analytics team needs to group user activities into sessions based on time gaps.

**Scenario:** Group user activities into sessions where gaps between activities are > 30 minutes.

In [None]:
# Source DataFrame
user_activities_data = [
    ("U001", "2023-01-01 10:00:00", "login"),
    ("U001", "2023-01-01 10:05:00", "browse"),
    ("U001", "2023-01-01 10:10:00", "click"),
    ("U001", "2023-01-01 10:45:00", "purchase"),  # New session (35 min gap)
    ("U001", "2023-01-01 10:50:00", "logout"),
    ("U002", "2023-01-01 11:00:00", "login"),
    ("U002", "2023-01-01 11:15:00", "browse"),
    ("U002", "2023-01-01 11:20:00", "click")
]

user_activities_df = spark.createDataFrame(user_activities_data, ["user_id", "timestamp", "action"])
user_activities_df = user_activities_df.withColumn("timestamp", col("timestamp").cast("timestamp"))
user_activities_df.show()

+-------+-------------------+--------+
|user_id|          timestamp|  action|
+-------+-------------------+--------+
|   U001|2023-01-01 10:00:00|   login|
|   U001|2023-01-01 10:05:00|  browse|
|   U001|2023-01-01 10:10:00|   click|
|   U001|2023-01-01 10:45:00|purchase|
|   U001|2023-01-01 10:50:00|  logout|
|   U002|2023-01-01 11:00:00|   login|
|   U002|2023-01-01 11:15:00|  browse|
|   U002|2023-01-01 11:20:00|   click|
+-------+-------------------+--------+



In [None]:
# Expected Output
expected_data = [
    ("U001", "2023-01-01 10:00:00", "login", 1),
    ("U001", "2023-01-01 10:05:00", "browse", 1),
    ("U001", "2023-01-01 10:10:00", "click", 1),
    ("U001", "2023-01-01 10:45:00", "purchase", 2),
    ("U001", "2023-01-01 10:50:00", "logout", 2),
    ("U002", "2023-01-01 11:00:00", "login", 1),
    ("U002", "2023-01-01 11:15:00", "browse", 1),
    ("U002", "2023-01-01 11:20:00", "click", 1)
]

expected_df = spark.createDataFrame(expected_data, ["user_id", "timestamp", "action", "session_id"])
expected_df = expected_df.withColumn("timestamp", col("timestamp").cast("timestamp"))
expected_df.show()

+-------+-------------------+--------+----------+
|user_id|          timestamp|  action|session_id|
+-------+-------------------+--------+----------+
|   U001|2023-01-01 10:00:00|   login|         1|
|   U001|2023-01-01 10:05:00|  browse|         1|
|   U001|2023-01-01 10:10:00|   click|         1|
|   U001|2023-01-01 10:45:00|purchase|         2|
|   U001|2023-01-01 10:50:00|  logout|         2|
|   U002|2023-01-01 11:00:00|   login|         1|
|   U002|2023-01-01 11:15:00|  browse|         1|
|   U002|2023-01-01 11:20:00|   click|         1|
+-------+-------------------+--------+----------+



In [None]:
# YOUR SOLUTION HERE

win = Window.partitionBy('user_id').orderBy(fn.col('timestamp').asc_nulls_last())

result_df = \
    user_activities_df\
      .withColumn('lastTimeStamp',fn.lag(fn.col('timestamp')).over(win))\
      .withColumn('TimeExceedFlag',fn.expr('CASE WHEN timestamp - lastTimeStamp <= INTERVAL 30 MINUTES THEN 0 ELSE 1 END'))\
      .withColumn('session_id', fn.sum(fn.col('TimeExceedFlag')).over(win))\
      .drop('lastTimeStamp','TimeExceedFlag')\
      .orderBy('user_id','timestamp')

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-------+-------------------+--------+----------+
|user_id|          timestamp|  action|session_id|
+-------+-------------------+--------+----------+
|   U001|2023-01-01 10:00:00|   login|         1|
|   U001|2023-01-01 10:05:00|  browse|         1|
|   U001|2023-01-01 10:10:00|   click|         1|
|   U001|2023-01-01 10:45:00|purchase|         2|
|   U001|2023-01-01 10:50:00|  logout|         2|
|   U002|2023-01-01 11:00:00|   login|         1|
|   U002|2023-01-01 11:15:00|  browse|         1|
|   U002|2023-01-01 11:20:00|   click|         1|
+-------+-------------------+--------+----------+

✓ DataFrames are equal!



True

**Instructor Notes:** Advanced window functions for sessionization. Tests time gap analysis and conditional session creation.

## Problem 6: Complex UDF for Text Analysis

**Requirement:** Customer service needs to categorize support tickets based on sentiment and urgency.

**Scenario:** Create a UDF that analyzes ticket text and returns priority level based on keywords.

In [None]:
# Source DataFrame
support_tickets_data = [
    (1, "My login is not working, need immediate help", "John"),
    (2, "Feature request for dark mode", "Jane"),
    (3, "URGENT: Payment failed but money deducted", "Bob"),
    (4, "Bug report: button color issue", "Alice"),
    (5, "CRITICAL: System down, cannot access anything", "Charlie")
]

support_tickets_df = spark.createDataFrame(support_tickets_data, ["ticket_id", "description", "reporter"])
support_tickets_df.show(truncate=False)

+---------+---------------------------------------------+--------+
|ticket_id|description                                  |reporter|
+---------+---------------------------------------------+--------+
|1        |My login is not working, need immediate help |John    |
|2        |Feature request for dark mode                |Jane    |
|3        |URGENT: Payment failed but money deducted    |Bob     |
|4        |Bug report: button color issue               |Alice   |
|5        |CRITICAL: System down, cannot access anything|Charlie |
+---------+---------------------------------------------+--------+



In [None]:
# Expected Output
expected_data = [
    (1, "My login is not working, need immediate help", "John", "High"),
    (2, "Feature request for dark mode", "Jane", "Low"),
    (3, "URGENT: Payment failed but money deducted", "Bob", "Critical"),
    (4, "Bug report: button color issue", "Alice", "Medium"),
    (5, "CRITICAL: System down, cannot access anything", "Charlie", "Critical")
]

expected_df = spark.createDataFrame(expected_data, ["ticket_id", "description", "reporter", "priority"])
expected_df.show(truncate=False)

+---------+---------------------------------------------+--------+--------+
|ticket_id|description                                  |reporter|priority|
+---------+---------------------------------------------+--------+--------+
|1        |My login is not working, need immediate help |John    |High    |
|2        |Feature request for dark mode                |Jane    |Low     |
|3        |URGENT: Payment failed but money deducted    |Bob     |Critical|
|4        |Bug report: button color issue               |Alice   |Medium  |
|5        |CRITICAL: System down, cannot access anything|Charlie |Critical|
+---------+---------------------------------------------+--------+--------+



In [None]:
# YOUR SOLUTION HERE

# i will skip this question - because, for me this is not making much sense

# # Test your solution
# assert_dataframe_equal(result_df, expected_df)

**Instructor Notes:** Complex UDF with string analysis. Tests text processing and conditional logic in UDFs.

## Problem 7: Multiple Column Pivot

**Requirement:** Sales analytics needs quarterly sales data pivoted by both product and region.

**Scenario:** Create a pivot table showing sales by product category and quarter.

In [None]:
# Source DataFrame
regional_sales_data = [
    ("Electronics", "Q1", "North", 50000),
    ("Electronics", "Q1", "South", 45000),
    ("Electronics", "Q2", "North", 60000),
    ("Electronics", "Q2", "South", 55000),
    ("Clothing", "Q1", "North", 30000),
    ("Clothing", "Q1", "South", 35000),
    ("Clothing", "Q2", "North", 40000),
    ("Clothing", "Q2", "South", 45000)
]

regional_sales_df = spark.createDataFrame(regional_sales_data, ["category", "quarter", "region", "sales"])
regional_sales_df.show()

+-----------+-------+------+-----+
|   category|quarter|region|sales|
+-----------+-------+------+-----+
|Electronics|     Q1| North|50000|
|Electronics|     Q1| South|45000|
|Electronics|     Q2| North|60000|
|Electronics|     Q2| South|55000|
|   Clothing|     Q1| North|30000|
|   Clothing|     Q1| South|35000|
|   Clothing|     Q2| North|40000|
|   Clothing|     Q2| South|45000|
+-----------+-------+------+-----+



In [None]:
# Expected Output
expected_data = [
    ("Electronics", 95000, 115000),
    ("Clothing", 65000, 85000)
]

expected_df = spark.createDataFrame(expected_data, ["category", "Q1_sales", "Q2_sales"])
expected_df.show()

+-----------+--------+--------+
|   category|Q1_sales|Q2_sales|
+-----------+--------+--------+
|Electronics|   95000|  115000|
|   Clothing|   65000|   85000|
+-----------+--------+--------+



In [None]:
# YOUR SOLUTION HERE

result_df = \
    regional_sales_df\
      .withColumn('quarter_naming',fn.expr('''quarter || '_sales' '''))\
      .groupBy('category')\
      .pivot('quarter_naming')\
      .agg(fn.expr('sum(sales)'))

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-----------+--------+--------+
|   category|Q1_sales|Q2_sales|
+-----------+--------+--------+
|Electronics|   95000|  115000|
|   Clothing|   65000|   85000|
+-----------+--------+--------+

✓ DataFrames are equal!



True

**Instructor Notes:** Multi-level pivot with aggregation. Tests complex pivot operations with multiple grouping columns.

## Problem 8: Advanced Deduplication with Multiple Criteria

**Requirement:** Data quality team needs to identify and remove duplicate customer records.

**Scenario:** Find duplicate customers based on name, email, or phone with different criteria weights.

In [None]:
# Source DataFrame
customer_duplicates_data = [
    (1, "John Doe", "john@email.com", "123-456-7890"),
    (2, "John Doe", "john.doe@email.com", "123-456-7890"),
    (3, "Jane Smith", "jane@email.com", "987-654-3210"),
    (4, "Jane Smith", "jane@email.com", "555-123-4567"),
    (5, "Bob Johnson", "bob@email.com", "111-222-3333"),
    (6, "Robert Johnson", "bob@email.com", "111-222-3333")
]

customer_duplicates_df = spark.createDataFrame(customer_duplicates_data, ["cust_id", "name", "email", "phone"])
customer_duplicates_df.show()

+-------+--------------+------------------+------------+
|cust_id|          name|             email|       phone|
+-------+--------------+------------------+------------+
|      1|      John Doe|    john@email.com|123-456-7890|
|      2|      John Doe|john.doe@email.com|123-456-7890|
|      3|    Jane Smith|    jane@email.com|987-654-3210|
|      4|    Jane Smith|    jane@email.com|555-123-4567|
|      5|   Bob Johnson|     bob@email.com|111-222-3333|
|      6|Robert Johnson|     bob@email.com|111-222-3333|
+-------+--------------+------------------+------------+



In [None]:
# Expected Output
expected_data = [
    (1, "John Doe", "john@email.com", "123-456-7890"),
    (3, "Jane Smith", "jane@email.com", "987-654-3210"),
    (5, "Bob Johnson", "bob@email.com", "111-222-3333")
]

expected_df = spark.createDataFrame(expected_data, ["cust_id", "name", "email", "phone"])
expected_df.show()

+-------+-----------+--------------+------------+
|cust_id|       name|         email|       phone|
+-------+-----------+--------------+------------+
|      1|   John Doe|john@email.com|123-456-7890|
|      3| Jane Smith|jane@email.com|987-654-3210|
|      5|Bob Johnson| bob@email.com|111-222-3333|
+-------+-----------+--------------+------------+



In [None]:
# YOUR SOLUTION HERE

result_df = \
    customer_duplicates_df\
        .dropDuplicates(['name','email','phone'])\
        .select('cust_id','name','email','phone')\

result_df.show()

# # Test your solution
# assert_dataframe_equal(result_df, expected_df)

+-------+--------------+------------------+------------+
|cust_id|          name|             email|       phone|
+-------+--------------+------------------+------------+
|      2|      John Doe|john.doe@email.com|123-456-7890|
|      1|      John Doe|    john@email.com|123-456-7890|
|      3|    Jane Smith|    jane@email.com|987-654-3210|
|      6|Robert Johnson|     bob@email.com|111-222-3333|
|      5|   Bob Johnson|     bob@email.com|111-222-3333|
|      4|    Jane Smith|    jane@email.com|555-123-4567|
+-------+--------------+------------------+------------+



**Instructor Notes:** Advanced deduplication with multiple matching criteria. Tests window functions and complex duplicate identification logic.

## Problem 9: Nested JSON Data Processing

**Requirement:** Analytics team needs to flatten nested JSON data from API responses.

**Scenario:** Extract and flatten nested customer order data with array of items.

In [None]:
# Source DataFrame with nested structure
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer", StructType([
        StructField("name", StringType(), True),
        StructField("email", StringType(), True)
    ]), True),
    StructField("items", ArrayType(StructType([
        StructField("product", StringType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("price", IntegerType(), True)
    ])), True)
])

nested_data = [
    ("O001", ("John Doe", "john@email.com"), [("Laptop", 1, 1000), ("Mouse", 2, 50)]),
    ("O002", ("Jane Smith", "jane@email.com"), [("Monitor", 1, 300), ("Keyboard", 1, 100)])
]

nested_df = spark.createDataFrame(nested_data, schema)
nested_df.show(truncate=False)
nested_df.printSchema()

+--------+----------------------------+---------------------------------------+
|order_id|customer                    |items                                  |
+--------+----------------------------+---------------------------------------+
|O001    |{John Doe, john@email.com}  |[{Laptop, 1, 1000}, {Mouse, 2, 50}]    |
|O002    |{Jane Smith, jane@email.com}|[{Monitor, 1, 300}, {Keyboard, 1, 100}]|
+--------+----------------------------+---------------------------------------+

root
 |-- order_id: string (nullable = true)
 |-- customer: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- email: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- price: integer (nullable = true)



In [None]:
# Expected Output
expected_data = [
    ("O001", "John Doe", "john@email.com", "Laptop", 1, 1000),
    ("O001", "John Doe", "john@email.com", "Mouse", 2, 50),
    ("O002", "Jane Smith", "jane@email.com", "Monitor", 1, 300),
    ("O002", "Jane Smith", "jane@email.com", "Keyboard", 1, 100)
]

expected_df = spark.createDataFrame(expected_data, ["order_id", "customer_name", "customer_email", "product", "quantity", "price"])
expected_df.show()

+--------+-------------+--------------+--------+--------+-----+
|order_id|customer_name|customer_email| product|quantity|price|
+--------+-------------+--------------+--------+--------+-----+
|    O001|     John Doe|john@email.com|  Laptop|       1| 1000|
|    O001|     John Doe|john@email.com|   Mouse|       2|   50|
|    O002|   Jane Smith|jane@email.com| Monitor|       1|  300|
|    O002|   Jane Smith|jane@email.com|Keyboard|       1|  100|
+--------+-------------+--------------+--------+--------+-----+



In [None]:
# YOUR SOLUTION HERE

result_df = \
      nested_df\
        .withColumn('customer_name',fn.col('customer')['name'])\
        .withColumn('customer_email',fn.col('customer')['email'])\
        .drop('customer')\
        .select('*', fn.inline_outer('items'))\
        .drop('items')

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+--------+-------------+--------------+--------+--------+-----+
|order_id|customer_name|customer_email| product|quantity|price|
+--------+-------------+--------------+--------+--------+-----+
|    O001|     John Doe|john@email.com|  Laptop|       1| 1000|
|    O001|     John Doe|john@email.com|   Mouse|       2|   50|
|    O002|   Jane Smith|jane@email.com| Monitor|       1|  300|
|    O002|   Jane Smith|jane@email.com|Keyboard|       1|  100|
+--------+-------------+--------------+--------+--------+-----+

✓ DataFrames are equal!



True

**Instructor Notes:** Complex nested data flattening. Tests struct and array operations with explode.

## Problem 10: Time-Series Gap Filling

**Requirement:** Finance team needs complete time series data with missing dates filled.

**Scenario:** Fill missing dates in stock price data and forward-fill the last known prices.

In [None]:
# Source DataFrame
stock_prices_data = [
    ("2023-01-01", "AAPL", 150.0),
    ("2023-01-03", "AAPL", 152.0),
    ("2023-01-04", "AAPL", 151.5),
    ("2023-01-06", "AAPL", 153.0),
    ("2023-01-01", "GOOG", 2800.0),
    ("2023-01-02", "GOOG", 2810.0),
    ("2023-01-05", "GOOG", 2820.0)
]

stock_prices_df = spark.createDataFrame(stock_prices_data, ["date", "symbol", "price"])
stock_prices_df = stock_prices_df.withColumn("date", col("date").cast("date"))
stock_prices_df.show()

+----------+------+------+
|      date|symbol| price|
+----------+------+------+
|2023-01-01|  AAPL| 150.0|
|2023-01-03|  AAPL| 152.0|
|2023-01-04|  AAPL| 151.5|
|2023-01-06|  AAPL| 153.0|
|2023-01-01|  GOOG|2800.0|
|2023-01-02|  GOOG|2810.0|
|2023-01-05|  GOOG|2820.0|
+----------+------+------+



In [None]:
# Expected Output
expected_data = [
    ("2023-01-01", "AAPL", 150.0),
    ("2023-01-02", "AAPL", 150.0),
    ("2023-01-03", "AAPL", 152.0),
    ("2023-01-04", "AAPL", 151.5),
    ("2023-01-05", "AAPL", 151.5),
    ("2023-01-06", "AAPL", 153.0),
    ("2023-01-01", "GOOG", 2800.0),
    ("2023-01-02", "GOOG", 2810.0),
    ("2023-01-03", "GOOG", 2810.0),
    ("2023-01-04", "GOOG", 2810.0),
    ("2023-01-05", "GOOG", 2820.0),
    ("2023-01-06", "GOOG", 2820.0)
]

expected_df = spark.createDataFrame(expected_data, ["date", "symbol", "price"])
expected_df = expected_df.withColumn("date", col("date").cast("date"))
expected_df.show()

+----------+------+------+
|      date|symbol| price|
+----------+------+------+
|2023-01-01|  AAPL| 150.0|
|2023-01-02|  AAPL| 150.0|
|2023-01-03|  AAPL| 152.0|
|2023-01-04|  AAPL| 151.5|
|2023-01-05|  AAPL| 151.5|
|2023-01-06|  AAPL| 153.0|
|2023-01-01|  GOOG|2800.0|
|2023-01-02|  GOOG|2810.0|
|2023-01-03|  GOOG|2810.0|
|2023-01-04|  GOOG|2810.0|
|2023-01-05|  GOOG|2820.0|
|2023-01-06|  GOOG|2820.0|
+----------+------+------+



In [None]:
# YOUR SOLUTION HERE

min_date = stock_prices_df.select(fn.expr('min(date) as minDate'), fn.expr('max(date) as maxDate')).collect()[0][0].strftime('%Y-%m-%d')
max_date = stock_prices_df.select(fn.expr('min(date) as minDate'), fn.expr('max(date) as maxDate')).collect()[0][1].strftime('%Y-%m-%d')

date_series = spark.sql(f'''select sequence(to_date('{min_date}'), to_date('{max_date}')) as dtArray''')\
                    .select(fn.explode(fn.col('dtArray')).alias('date')).orderBy('date')
date_series.show()

symbol_series = stock_prices_df.select('symbol').distinct().orderBy('symbol')

symbol_series.show()

allDateAllSymbol = date_series.crossJoin(symbol_series)
allDateAllSymbol.show()

join_on = fn.expr('''
                  allindex.date = stock.date
                  and
                  allindex.symbol = stock.symbol
                  ''')

allData = \
        allDateAllSymbol.alias('allindex')\
                .join(stock_prices_df.alias('stock'),
                      join_on,
                      'left')\
                .drop(fn.col('stock.date'),fn.col('stock.symbol'))

win = Window.partitionBy('symbol').orderBy('date').rowsBetween(Window.unboundedPreceding, -1)

result_df = \
          allData\
              .withColumn('fillPrice', fn.last_value(fn.col('price'), ignoreNulls= True).over(win))\
              .withColumn('price', fn.nvl(fn.col('price'),fn.col('fillPrice')))\
              .drop('fillPrice')

result_df.show()


# Test your solution
assert_dataframe_equal(result_df, expected_df)

+----------+
|      date|
+----------+
|2023-01-01|
|2023-01-02|
|2023-01-03|
|2023-01-04|
|2023-01-05|
|2023-01-06|
+----------+

+------+
|symbol|
+------+
|  AAPL|
|  GOOG|
+------+

+----------+------+
|      date|symbol|
+----------+------+
|2023-01-01|  AAPL|
|2023-01-02|  AAPL|
|2023-01-03|  AAPL|
|2023-01-04|  AAPL|
|2023-01-05|  AAPL|
|2023-01-06|  AAPL|
|2023-01-01|  GOOG|
|2023-01-02|  GOOG|
|2023-01-03|  GOOG|
|2023-01-04|  GOOG|
|2023-01-05|  GOOG|
|2023-01-06|  GOOG|
+----------+------+

+----------+------+------+
|      date|symbol| price|
+----------+------+------+
|2023-01-01|  AAPL| 150.0|
|2023-01-02|  AAPL| 150.0|
|2023-01-03|  AAPL| 152.0|
|2023-01-04|  AAPL| 151.5|
|2023-01-05|  AAPL| 151.5|
|2023-01-06|  AAPL| 153.0|
|2023-01-01|  GOOG|2800.0|
|2023-01-02|  GOOG|2810.0|
|2023-01-03|  GOOG|2810.0|
|2023-01-04|  GOOG|2810.0|
|2023-01-05|  GOOG|2820.0|
|2023-01-06|  GOOG|2820.0|
+----------+------+------+

✓ DataFrames are equal!



True

**Instructor Notes:** Time-series gap filling with last observation carried forward. Tests complex window functions and date generation.

## Problem 11: Multi-Table Relationship Analysis

**Requirement:** Business intelligence needs customer journey analysis across multiple touchpoints.

**Scenario:** Join customer, orders, and payments tables to analyze complete customer journey.

In [None]:
# Source DataFrames
customers_multi_data = [
    ("C001", "John Doe", "Premium"),
    ("C002", "Jane Smith", "Standard"),
    ("C003", "Bob Johnson", "Premium")
]

orders_multi_data = [
    ("O001", "C001", "2023-01-15", 1000.0),
    ("O002", "C001", "2023-02-20", 1500.0),
    ("O003", "C002", "2023-01-10", 800.0),
    ("O004", "C003", "2023-03-05", 2000.0)
]

payments_multi_data = [
    ("P001", "O001", "2023-01-16", "Credit Card"),
    ("P002", "O002", "2023-02-21", "PayPal"),
    ("P003", "O003", "2023-01-11", "Credit Card"),
    ("P004", "O004", "2023-03-06", "Bank Transfer")
]

customers_multi_df = spark.createDataFrame(customers_multi_data, ["customer_id", "customer_name", "membership"])
orders_multi_df = spark.createDataFrame(orders_multi_data, ["order_id", "customer_id", "order_date", "amount"])
payments_multi_df = spark.createDataFrame(payments_multi_data, ["payment_id", "order_id", "payment_date", "method"])

print("Customers:")
customers_multi_df.show()
print("Orders:")
orders_multi_df.show()
print("Payments:")
payments_multi_df.show()

Customers:
+-----------+-------------+----------+
|customer_id|customer_name|membership|
+-----------+-------------+----------+
|       C001|     John Doe|   Premium|
|       C002|   Jane Smith|  Standard|
|       C003|  Bob Johnson|   Premium|
+-----------+-------------+----------+

Orders:
+--------+-----------+----------+------+
|order_id|customer_id|order_date|amount|
+--------+-----------+----------+------+
|    O001|       C001|2023-01-15|1000.0|
|    O002|       C001|2023-02-20|1500.0|
|    O003|       C002|2023-01-10| 800.0|
|    O004|       C003|2023-03-05|2000.0|
+--------+-----------+----------+------+

Payments:
+----------+--------+------------+-------------+
|payment_id|order_id|payment_date|       method|
+----------+--------+------------+-------------+
|      P001|    O001|  2023-01-16|  Credit Card|
|      P002|    O002|  2023-02-21|       PayPal|
|      P003|    O003|  2023-01-11|  Credit Card|
|      P004|    O004|  2023-03-06|Bank Transfer|
+----------+--------+----

In [None]:
# Expected Output
expected_data = [
    ("C001", "John Doe", "Premium", "O001", 1000.0, "P001", "Credit Card"),
    ("C001", "John Doe", "Premium", "O002", 1500.0, "P002", "PayPal"),
    ("C002", "Jane Smith", "Standard", "O003", 800.0, "P003", "Credit Card"),
    ("C003", "Bob Johnson", "Premium", "O004", 2000.0, "P004", "Bank Transfer")
]

expected_df = spark.createDataFrame(expected_data, ["customer_id", "customer_name", "membership", "order_id", "amount", "payment_id", "payment_method"])
expected_df.show()

+-----------+-------------+----------+--------+------+----------+--------------+
|customer_id|customer_name|membership|order_id|amount|payment_id|payment_method|
+-----------+-------------+----------+--------+------+----------+--------------+
|       C001|     John Doe|   Premium|    O001|1000.0|      P001|   Credit Card|
|       C001|     John Doe|   Premium|    O002|1500.0|      P002|        PayPal|
|       C002|   Jane Smith|  Standard|    O003| 800.0|      P003|   Credit Card|
|       C003|  Bob Johnson|   Premium|    O004|2000.0|      P004| Bank Transfer|
+-----------+-------------+----------+--------+------+----------+--------------+



In [None]:
# YOUR SOLUTION HERE

join_on_1 = fn.expr('''customers.customer_id = orders.customer_id''')
join_on_2 = fn.expr('''orders.order_id = payments.order_id''')

result_df = \
        customers_multi_df.alias('customers')\
                  .join(orders_multi_df.alias('orders'),
                        join_on_1,
                        'inner')\
                  .join(payments_multi_df.alias('payments'),
                        join_on_2,
                        'inner')\
                  .drop(fn.col('orders.customer_id'), fn.col('payments.order_id'),fn.col('orders.order_date'),fn.col('payments.payment_date'))\
                  .withColumnRenamed('method','payment_method')

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-----------+-------------+----------+--------+------+----------+--------------+
|customer_id|customer_name|membership|order_id|amount|payment_id|payment_method|
+-----------+-------------+----------+--------+------+----------+--------------+
|       C001|     John Doe|   Premium|    O002|1500.0|      P002|        PayPal|
|       C001|     John Doe|   Premium|    O001|1000.0|      P001|   Credit Card|
|       C003|  Bob Johnson|   Premium|    O004|2000.0|      P004| Bank Transfer|
|       C002|   Jane Smith|  Standard|    O003| 800.0|      P003|   Credit Card|
+-----------+-------------+----------+--------+------+----------+--------------+

✓ DataFrames are equal!



True

**Instructor Notes:** Multiple table joins with complex relationships. Tests chaining multiple join operations.

## Problem 12: Advanced Window Functions with Multiple Partitions

**Requirement:** Sales team needs ranking of products within each category and region.

**Scenario:** Calculate product rankings within each category and region based on sales.

In [None]:
# Source DataFrame
product_region_sales_data = [
    ("Electronics", "North", "Laptop", 50000),
    ("Electronics", "North", "Smartphone", 75000),
    ("Electronics", "North", "Tablet", 30000),
    ("Electronics", "South", "Laptop", 45000),
    ("Electronics", "South", "Smartphone", 60000),
    ("Electronics", "South", "Tablet", 25000),
    ("Clothing", "North", "Shirt", 20000),
    ("Clothing", "North", "Pants", 30000),
    ("Clothing", "South", "Shirt", 25000),
    ("Clothing", "South", "Pants", 35000)
]

product_region_sales_df = spark.createDataFrame(product_region_sales_data, ["category", "region", "product", "sales"])
product_region_sales_df.show()

+-----------+------+----------+-----+
|   category|region|   product|sales|
+-----------+------+----------+-----+
|Electronics| North|    Laptop|50000|
|Electronics| North|Smartphone|75000|
|Electronics| North|    Tablet|30000|
|Electronics| South|    Laptop|45000|
|Electronics| South|Smartphone|60000|
|Electronics| South|    Tablet|25000|
|   Clothing| North|     Shirt|20000|
|   Clothing| North|     Pants|30000|
|   Clothing| South|     Shirt|25000|
|   Clothing| South|     Pants|35000|
+-----------+------+----------+-----+



In [None]:
# Expected Output
expected_data = [
    ("Electronics", "North", "Smartphone", 75000, 1),
    ("Electronics", "North", "Laptop", 50000, 2),
    ("Electronics", "North", "Tablet", 30000, 3),
    ("Electronics", "South", "Smartphone", 60000, 1),
    ("Electronics", "South", "Laptop", 45000, 2),
    ("Electronics", "South", "Tablet", 25000, 3),
    ("Clothing", "North", "Pants", 30000, 1),
    ("Clothing", "North", "Shirt", 20000, 2),
    ("Clothing", "South", "Pants", 35000, 1),
    ("Clothing", "South", "Shirt", 25000, 2)
]

expected_df = spark.createDataFrame(expected_data, ["category", "region", "product", "sales", "rank"])
expected_df.show()

+-----------+------+----------+-----+----+
|   category|region|   product|sales|rank|
+-----------+------+----------+-----+----+
|Electronics| North|Smartphone|75000|   1|
|Electronics| North|    Laptop|50000|   2|
|Electronics| North|    Tablet|30000|   3|
|Electronics| South|Smartphone|60000|   1|
|Electronics| South|    Laptop|45000|   2|
|Electronics| South|    Tablet|25000|   3|
|   Clothing| North|     Pants|30000|   1|
|   Clothing| North|     Shirt|20000|   2|
|   Clothing| South|     Pants|35000|   1|
|   Clothing| South|     Shirt|25000|   2|
+-----------+------+----------+-----+----+



In [None]:
# YOUR SOLUTION HERE

win = Window.partitionBy('category','region').orderBy(fn.col('sales').desc_nulls_last())

result_df = \
      product_region_sales_df\
        .withColumn('rank', fn.dense_rank().over(win))

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-----------+------+----------+-----+----+
|   category|region|   product|sales|rank|
+-----------+------+----------+-----+----+
|   Clothing| North|     Pants|30000|   1|
|   Clothing| North|     Shirt|20000|   2|
|   Clothing| South|     Pants|35000|   1|
|   Clothing| South|     Shirt|25000|   2|
|Electronics| North|Smartphone|75000|   1|
|Electronics| North|    Laptop|50000|   2|
|Electronics| North|    Tablet|30000|   3|
|Electronics| South|Smartphone|60000|   1|
|Electronics| South|    Laptop|45000|   2|
|Electronics| South|    Tablet|25000|   3|
+-----------+------+----------+-----+----+

✓ DataFrames are equal!



True

**Instructor Notes:** Multi-partition window functions. Tests complex window specifications with multiple partition keys.

## Problem 13: Data Quality Validation UDF

**Requirement:** Data governance team needs comprehensive data quality checks.

**Scenario:** Create UDFs to validate email format, phone numbers, and age ranges.

In [None]:
# Source DataFrame
customer_validation_data = [
    (1, "John Doe", "john@email.com", "123-456-7890", 25),
    (2, "Jane Smith", "invalid-email", "987-654-3210", 35),
    (3, "Bob Johnson", "bob@company.com", "555-1234", 17),
    (4, "Alice Brown", "alice@domain.com", "111-222-3333", 150),
    (5, "Charlie Wilson", "charlie@email.com", "444-555-6666", 45)
]

customer_validation_df = spark.createDataFrame(customer_validation_data, ["cust_id", "name", "email", "phone", "age"])
customer_validation_df.show()

+-------+--------------+-----------------+------------+---+
|cust_id|          name|            email|       phone|age|
+-------+--------------+-----------------+------------+---+
|      1|      John Doe|   john@email.com|123-456-7890| 25|
|      2|    Jane Smith|    invalid-email|987-654-3210| 35|
|      3|   Bob Johnson|  bob@company.com|    555-1234| 17|
|      4|   Alice Brown| alice@domain.com|111-222-3333|150|
|      5|Charlie Wilson|charlie@email.com|444-555-6666| 45|
+-------+--------------+-----------------+------------+---+



In [None]:
# Expected Output
expected_data = [
    (1, "John Doe", "john@email.com", "123-456-7890", 25, "Valid", "Valid", "Valid"),
    (2, "Jane Smith", "invalid-email", "987-654-3210", 35, "Invalid", "Valid", "Valid"),
    (3, "Bob Johnson", "bob@company.com", "555-1234", 17, "Valid", "Invalid", "Valid"),
    (4, "Alice Brown", "alice@domain.com", "111-222-3333", 150, "Valid", "Valid", "Invalid"),
    (5, "Charlie Wilson", "charlie@email.com", "444-555-6666", 45, "Valid", "Valid", "Valid")
]

expected_df = spark.createDataFrame(expected_data, ["cust_id", "name", "email", "phone", "age", "email_status", "phone_status", "age_status"])
expected_df.show()

+-------+--------------+-----------------+------------+---+------------+------------+----------+
|cust_id|          name|            email|       phone|age|email_status|phone_status|age_status|
+-------+--------------+-----------------+------------+---+------------+------------+----------+
|      1|      John Doe|   john@email.com|123-456-7890| 25|       Valid|       Valid|     Valid|
|      2|    Jane Smith|    invalid-email|987-654-3210| 35|     Invalid|       Valid|     Valid|
|      3|   Bob Johnson|  bob@company.com|    555-1234| 17|       Valid|     Invalid|     Valid|
|      4|   Alice Brown| alice@domain.com|111-222-3333|150|       Valid|       Valid|   Invalid|
|      5|Charlie Wilson|charlie@email.com|444-555-6666| 45|       Valid|       Valid|     Valid|
+-------+--------------+-----------------+------------+---+------------+------------+----------+



In [None]:
# YOUR SOLUTION HERE

import re

def is_email_valid(email):
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return 'Valid' if re.match(pattern, email) is not None else 'Invalid'

def is_phone_valid(phone):
    regex = r'^(\+\d{1,3}[-.\s]?)?(\(\d{3}\)|\d{3})[-.\s]?\d{3}[-.\s]?\d{4}$'
    return 'Valid' if re.fullmatch(regex, phone) is not None else 'Invalid'

def is_age_valid(age):
    try:
        age_num = int(age)
        if 0 <= age_num <= 120:
          return 'Valid'
        else:
          return 'Invalid'
    except (ValueError, TypeError):
        return 'Invalid'

is_email_valid_udf = fn.udf(is_email_valid,tp.StringType())
is_phone_valid_udf = fn.udf(is_phone_valid,tp.StringType())
is_age_valid_udf = fn.udf(is_age_valid,tp.StringType())

result_df = \
      customer_validation_df\
        .withColumn('email_status',is_email_valid_udf(fn.col('email')))\
        .withColumn('phone_status',is_phone_valid_udf(fn.col('phone')))\
        .withColumn('age_status',is_age_valid_udf(fn.col('age')))

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-------+--------------+-----------------+------------+---+------------+------------+----------+
|cust_id|          name|            email|       phone|age|email_status|phone_status|age_status|
+-------+--------------+-----------------+------------+---+------------+------------+----------+
|      1|      John Doe|   john@email.com|123-456-7890| 25|       Valid|       Valid|     Valid|
|      2|    Jane Smith|    invalid-email|987-654-3210| 35|     Invalid|       Valid|     Valid|
|      3|   Bob Johnson|  bob@company.com|    555-1234| 17|       Valid|     Invalid|     Valid|
|      4|   Alice Brown| alice@domain.com|111-222-3333|150|       Valid|       Valid|   Invalid|
|      5|Charlie Wilson|charlie@email.com|444-555-6666| 45|       Valid|       Valid|     Valid|
+-------+--------------+-----------------+------------+---+------------+------------+----------+

✓ DataFrames are equal!



True

**Instructor Notes:** Multiple UDFs for data validation. Tests regex patterns and complex validation logic.

## Problem 14: Complex Conditional Aggregation

**Requirement:** Business intelligence needs segmented revenue analysis.

**Scenario:** Calculate revenue by multiple customer segments and product categories simultaneously.

In [None]:
# Source DataFrame
segmented_sales_data = [
    ("Premium", "Electronics", 1000.0),
    ("Premium", "Clothing", 500.0),
    ("Standard", "Electronics", 800.0),
    ("Standard", "Clothing", 300.0),
    ("Premium", "Electronics", 1200.0),
    ("Standard", "Electronics", 600.0),
    ("Premium", "Clothing", 400.0),
    ("Standard", "Clothing", 200.0)
]

segmented_sales_df = spark.createDataFrame(segmented_sales_data, ["membership", "category", "amount"])
segmented_sales_df.show()

+----------+-----------+------+
|membership|   category|amount|
+----------+-----------+------+
|   Premium|Electronics|1000.0|
|   Premium|   Clothing| 500.0|
|  Standard|Electronics| 800.0|
|  Standard|   Clothing| 300.0|
|   Premium|Electronics|1200.0|
|  Standard|Electronics| 600.0|
|   Premium|   Clothing| 400.0|
|  Standard|   Clothing| 200.0|
+----------+-----------+------+



In [None]:
# Expected Output
expected_data = [
    ("Electronics", 2200.0, 1400.0),
    ("Clothing", 900.0, 500.0)
]

expected_df = spark.createDataFrame(expected_data, ["category", "premium_revenue", "standard_revenue"])
expected_df.show()

+-----------+---------------+----------------+
|   category|premium_revenue|standard_revenue|
+-----------+---------------+----------------+
|Electronics|         2200.0|          1400.0|
|   Clothing|          900.0|           500.0|
+-----------+---------------+----------------+



In [None]:
# YOUR SOLUTION HERE

result_df = \
      segmented_sales_df\
        .withColumn('pivot_col_naming', fn.expr('''lower(membership)||'_revenue' '''))\
        .groupBy('category')\
        .pivot('pivot_col_naming')\
        .agg(fn.sum(fn.col('amount')))

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-----------+---------------+----------------+
|   category|premium_revenue|standard_revenue|
+-----------+---------------+----------------+
|Electronics|         2200.0|          1400.0|
|   Clothing|          900.0|           500.0|
+-----------+---------------+----------------+

✓ DataFrames are equal!



True

**Instructor Notes:** Complex conditional aggregation with multiple sum conditions. Tests advanced aggregation patterns.

## Problem 15: Array and Map Operations

**Requirement:** Product analytics needs to analyze product feature usage patterns.

**Scenario:** Process arrays and maps to analyze which features are used together.

In [None]:
# Source DataFrame with complex types
from pyspark.sql.types import MapType

product_features_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("features", ArrayType(StringType()), True),
    StructField("usage_stats", MapType(StringType(), IntegerType()), True)
])

product_features_data = [
    ("P001", ["search", "filter", "sort"], {"search": 150, "filter": 75, "sort": 50}),
    ("P002", ["search", "export"], {"search": 200, "export": 30}),
    ("P003", ["filter", "sort", "import"], {"filter": 100, "sort": 60, "import": 20}),
    ("P004", ["search", "filter"], {"search": 180, "filter": 90})
]

product_features_df = spark.createDataFrame(product_features_data, product_features_schema)
product_features_df.show(truncate=False)
product_features_df.printSchema()

+----------+----------------------+-----------------------------------------+
|product_id|features              |usage_stats                              |
+----------+----------------------+-----------------------------------------+
|P001      |[search, filter, sort]|{filter -> 75, search -> 150, sort -> 50}|
|P002      |[search, export]      |{export -> 30, search -> 200}            |
|P003      |[filter, sort, import]|{filter -> 100, sort -> 60, import -> 20}|
|P004      |[search, filter]      |{filter -> 90, search -> 180}            |
+----------+----------------------+-----------------------------------------+

root
 |-- product_id: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- usage_stats: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)



In [None]:
# Expected Output
expected_data = [
    ("search", 3, 530),
    ("filter", 3, 265),
    ("sort", 2, 110),
    ("export", 1, 30),
    ("import", 1, 20)
]

expected_df = spark.createDataFrame(expected_data, ["feature", "product_count", "total_usage"])
expected_df.show()

+-------+-------------+-----------+
|feature|product_count|total_usage|
+-------+-------------+-----------+
| search|            3|        530|
| filter|            3|        265|
|   sort|            2|        110|
| export|            1|         30|
| import|            1|         20|
+-------+-------------+-----------+



In [None]:
# YOUR SOLUTION HERE

result_df = \
    product_features_df\
        .select('product_id',fn.explode(fn.col('usage_stats')).alias('feature','stat'))\
        .groupBy('feature')\
        .agg(fn.count(fn.col('product_id')).alias('product_count'),
            fn.sum(fn.col('stat')).alias('total_usage'))\
        .orderBy(fn.col('total_usage').desc())\

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-------+-------------+-----------+
|feature|product_count|total_usage|
+-------+-------------+-----------+
| search|            3|        530|
| filter|            3|        265|
|   sort|            2|        110|
| export|            1|         30|
| import|            1|         20|
+-------+-------------+-----------+

✓ DataFrames are equal!



True

**Instructor Notes:** Complex type operations with arrays and maps. Tests explode and map value extraction.

## Problem 16: Advanced Date/Time Operations

**Requirement:** Operations team needs business day calculations excluding weekends/holidays.

**Scenario:** Calculate business days between dates and adjust for weekends.

In [None]:
# Source DataFrame
business_dates_data = [
    (1, "2023-01-02", "2023-01-05"),  # Mon to Thu (4 days, 3 business days)
    (2, "2023-01-06", "2023-01-09"),  # Fri to Mon (4 days, 1 business day)
    (3, "2023-01-09", "2023-01-13"),  # Mon to Fri (5 days, 5 business days)
    (4, "2023-01-13", "2023-01-17")   # Fri to Tue (5 days, 2 business days)
]

business_dates_df = spark.createDataFrame(business_dates_data, ["task_id", "start_date", "end_date"])
business_dates_df = business_dates_df.withColumn("start_date", col("start_date").cast("date"))\
                                   .withColumn("end_date", col("end_date").cast("date"))
business_dates_df.show()

+-------+----------+----------+
|task_id|start_date|  end_date|
+-------+----------+----------+
|      1|2023-01-02|2023-01-05|
|      2|2023-01-06|2023-01-09|
|      3|2023-01-09|2023-01-13|
|      4|2023-01-13|2023-01-17|
+-------+----------+----------+



In [None]:
# Expected Output
expected_data = [
    (1, "2023-01-02", "2023-01-05", 4),
    (2, "2023-01-06", "2023-01-09", 2),
    (3, "2023-01-09", "2023-01-13", 5),
    (4, "2023-01-13", "2023-01-17", 3)
]

expected_df = spark.createDataFrame(expected_data, ["task_id", "start_date", "end_date", "business_days"])
expected_df = expected_df.withColumn("start_date", col("start_date").cast("date"))\
                       .withColumn("end_date", col("end_date").cast("date"))
expected_df.show()

+-------+----------+----------+-------------+
|task_id|start_date|  end_date|business_days|
+-------+----------+----------+-------------+
|      1|2023-01-02|2023-01-05|            4|
|      2|2023-01-06|2023-01-09|            2|
|      3|2023-01-09|2023-01-13|            5|
|      4|2023-01-13|2023-01-17|            3|
+-------+----------+----------+-------------+



In [None]:
# YOUR SOLUTION HERE

result_df = \
      business_dates_df\
        .withColumn('dateArray', fn.expr(''' sequence(start_date, end_date) '''))\
        .withColumn('dayOfWeekArray', fn.expr(''' transform(dateArray, x->  dayofweek(x))'''))\
        .withColumn('businsesDaysArray', fn.expr(''' filter(dayOfWeekArray, x-> x not in (1,7))  '''))\
        .withColumn('business_days', fn.expr(''' size(businsesDaysArray) '''))\
        .drop('dateArray','dayOfWeekArray','businsesDaysArray')

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-------+----------+----------+-------------+
|task_id|start_date|  end_date|business_days|
+-------+----------+----------+-------------+
|      1|2023-01-02|2023-01-05|            4|
|      2|2023-01-06|2023-01-09|            2|
|      3|2023-01-09|2023-01-13|            5|
|      4|2023-01-13|2023-01-17|            3|
+-------+----------+----------+-------------+

✓ DataFrames are equal!



True

**Instructor Notes:** Advanced date operations with business logic. Tests date sequence generation and conditional counting.

## Problem 17: Hierarchical Data Processing

**Requirement:** HR analytics needs organizational hierarchy reporting.

**Scenario:** Process employee-manager relationships to build organizational trees.

In [None]:
# Source DataFrame
org_hierarchy_data = [
    (1, "CEO", None),
    (2, "VP Engineering", 1),
    (3, "Engineering Manager", 2),
    (4, "Senior Developer", 3),
    (5, "Junior Developer", 3),
    (6, "VP Marketing", 1),
    (7, "Marketing Manager", 6),
    (8, "Marketing Specialist", 7)
]

org_hierarchy_df = spark.createDataFrame(org_hierarchy_data, ["emp_id", "title", "manager_id"])
org_hierarchy_df.show()

+------+--------------------+----------+
|emp_id|               title|manager_id|
+------+--------------------+----------+
|     1|                 CEO|      NULL|
|     2|      VP Engineering|         1|
|     3| Engineering Manager|         2|
|     4|    Senior Developer|         3|
|     5|    Junior Developer|         3|
|     6|        VP Marketing|         1|
|     7|   Marketing Manager|         6|
|     8|Marketing Specialist|         7|
+------+--------------------+----------+



In [None]:
# Expected Output

expected_data = [
    (1, "CEO", 0),
    (2, "VP Engineering", 1),
    (3, "Engineering Manager", 2),
    (4, "Senior Developer", 3),
    (5, "Junior Developer", 3),
    (6, "VP Marketing", 1),
    (7, "Marketing Manager", 2),
    (8, "Marketing Specialist", 3)
]

expected_df = spark.createDataFrame(expected_data, ["emp_id", "title", "hierarchy_level"])
expected_df.show()

+------+--------------------+---------------+
|emp_id|               title|hierarchy_level|
+------+--------------------+---------------+
|     1|                 CEO|              0|
|     2|      VP Engineering|              1|
|     3| Engineering Manager|              2|
|     4|    Senior Developer|              3|
|     5|    Junior Developer|              3|
|     6|        VP Marketing|              1|
|     7|   Marketing Manager|              2|
|     8|Marketing Specialist|              3|
+------+--------------------+---------------+



In [None]:
# YOUR SOLUTION HERE


manager_df = org_hierarchy_df\
                .filter('manager_id is null')\
                .withColumn('hierarchy_level', fn.lit(0))\
                .select('emp_id','title','hierarchy_level')

for level in range(1,11):

  prev_level_df = manager_df.filter(fn.col('hierarchy_level') == (level - 1))

  level_df = org_hierarchy_df.alias('emp')\
                             .join(prev_level_df.alias('man'),
                                    fn.col("emp.manager_id") == fn.col("man.emp_id"),
                                   'inner')\
                             .select('emp.emp_id','emp.title')

  level_df = level_df.withColumn('hierarchy_level',fn.lit(level))

  manager_df = manager_df.unionByName(level_df)

manager_df.show()

result_df = manager_df

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+------+--------------------+---------------+
|emp_id|               title|hierarchy_level|
+------+--------------------+---------------+
|     1|                 CEO|              0|
|     2|      VP Engineering|              1|
|     6|        VP Marketing|              1|
|     3| Engineering Manager|              2|
|     7|   Marketing Manager|              2|
|     4|    Senior Developer|              3|
|     8|Marketing Specialist|              3|
|     5|    Junior Developer|              3|
+------+--------------------+---------------+

✓ DataFrames are equal!



True

**Instructor Notes:** Hierarchical data processing with iterative logic. Tests complex self-joins and level calculation.

## Problem 18: Advanced String Manipulation

**Requirement:** Data engineering needs to parse and standardize address data.

**Scenario:** Extract and standardize address components from unstructured text.

In [None]:
# Source DataFrame
customer_addresses_data = [
    (1, "123 MAIN ST, NEW YORK, NY 10001"),
    (2, "456 oak avenue, Los Angeles, CA 90001"),
    (3, "789 Pine Rd, Suite 100, Chicago, IL 60601"),
    (4, "321 ELM STREET BOSTON MA 02101"),
    (5, "555 Cedar Ln, Apt 2B, Miami, FL 33101")
]

customer_addresses_df = spark.createDataFrame(customer_addresses_data, ["cust_id", "full_address"])
customer_addresses_df.show(truncate=False)

+-------+-----------------------------------------+
|cust_id|full_address                             |
+-------+-----------------------------------------+
|1      |123 MAIN ST, NEW YORK, NY 10001          |
|2      |456 oak avenue, Los Angeles, CA 90001    |
|3      |789 Pine Rd, Suite 100, Chicago, IL 60601|
|4      |321 ELM STREET BOSTON MA 02101           |
|5      |555 Cedar Ln, Apt 2B, Miami, FL 33101    |
+-------+-----------------------------------------+



In [None]:
# Expected Output
expected_data = [
    (1, "123 Main St", "New York", "NY", "10001"),
    (2, "456 Oak Avenue", "Los Angeles", "CA", "90001"),
    (3, "789 Pine Rd Suite 100", "Chicago", "IL", "60601"),
    (4, "321 Elm Street", "Boston", "MA", "02101"),
    (5, "555 Cedar Ln Apt 2b", "Miami", "FL", "33101")
]

expected_df = spark.createDataFrame(expected_data, ["cust_id", "street", "city", "state", "zipcode"])
expected_df.show(truncate=False)

+-------+---------------------+-----------+-----+-------+
|cust_id|street               |city       |state|zipcode|
+-------+---------------------+-----------+-----+-------+
|1      |123 Main St          |New York   |NY   |10001  |
|2      |456 Oak Avenue       |Los Angeles|CA   |90001  |
|3      |789 Pine Rd Suite 100|Chicago    |IL   |60601  |
|4      |321 Elm Street       |Boston     |MA   |02101  |
|5      |555 Cedar Ln Apt 2b  |Miami      |FL   |33101  |
+-------+---------------------+-----------+-----+-------+



In [None]:
# YOUR SOLUTION HERE

df1 = \
    customer_addresses_df\
      .filter(''' contains(full_address,',') ''')\
      .withColumn('addressArray',fn.split(fn.col('full_address'),','))\
      .withColumn('state_zip', fn.split(fn.trim(fn.element_at(fn.col('addressArray'),-1)),' '))\
      .withColumn('state', fn.element_at(fn.col('state_zip'),1))\
      .withColumn('zipcode', fn.element_at(fn.col('state_zip'),2))\
      .withColumn('city', fn.trim(fn.element_at(fn.col('addressArray'),-2)))\
      .withColumn('street', fn.array_join(fn.slice(fn.col('addressArray'), 1, fn.size(fn.col('addressArray')) - 2),' '))\
      .withColumn('street',fn.expr(''' INITCAP(REPLACE(TRIM(street),'  ',' ')) '''))\
      .withColumn('city', fn.expr(''' INITCAP(REPLACE(TRIM(city),'  ',' ')) '''))\
      .withColumn('state', fn.expr(''' UPPER(REPLACE(TRIM(state),'  ',' ')) '''))\
      .withColumn('zipcode', fn.expr(''' TRIM(zipcode) '''))\
      .drop('full_address','addressArray','state_zip')\
      .select('cust_id','street','city','state','zipcode')

df1.show(truncate = False)

df2 = customer_addresses_df\
      .filter(''' not contains(full_address,',') ''')\
      .withColumn('addressArray',fn.split(fn.col('full_address'),' '))\
      .withColumn('zipcode', fn.element_at(fn.col('addressArray'),-1))\
      .withColumn('state', fn.element_at(fn.col('addressArray'),-2))\
      .withColumn('city', fn.element_at(fn.col('addressArray'),-3))\
      .withColumn('street', fn.array_join(fn.slice(fn.col('addressArray'), 1, fn.size(fn.col('addressArray')) - 3),' '))\
      .withColumn('street',fn.expr(''' INITCAP(REPLACE(TRIM(street),'  ',' ')) '''))\
      .withColumn('city', fn.expr(''' INITCAP(REPLACE(TRIM(city),'  ',' ')) '''))\
      .withColumn('state', fn.expr(''' UPPER(REPLACE(TRIM(state),'  ',' ')) '''))\
      .withColumn('zipcode', fn.expr(''' TRIM(zipcode) '''))\
      .drop('full_address','addressArray')\
      .select('cust_id','street','city','state','zipcode')\

df2.show(truncate = False)

result_df = df1\
            .unionByName(df2)\
            .orderBy('cust_id')

result_df.show(truncate = False)

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-------+---------------------+-----------+-----+-------+
|cust_id|street               |city       |state|zipcode|
+-------+---------------------+-----------+-----+-------+
|1      |123 Main St          |New York   |NY   |10001  |
|2      |456 Oak Avenue       |Los Angeles|CA   |90001  |
|3      |789 Pine Rd Suite 100|Chicago    |IL   |60601  |
|5      |555 Cedar Ln Apt 2b  |Miami      |FL   |33101  |
+-------+---------------------+-----------+-----+-------+

+-------+--------------+------+-----+-------+
|cust_id|street        |city  |state|zipcode|
+-------+--------------+------+-----+-------+
|4      |321 Elm Street|Boston|MA   |02101  |
+-------+--------------+------+-----+-------+

+-------+---------------------+-----------+-----+-------+
|cust_id|street               |city       |state|zipcode|
+-------+---------------------+-----------+-----+-------+
|1      |123 Main St          |New York   |NY   |10001  |
|2      |456 Oak Avenue       |Los Angeles|CA   |90001  |
|3      |789 P

True

**Instructor Notes:** Complex string parsing with regex and case normalization. Tests advanced string manipulation patterns.

## Problem 19: Multi-Conditional Window Functions

**Requirement:** Financial analytics needs moving averages with different conditions.

**Scenario:** Calculate different types of moving averages (simple, exponential) for stock prices.

In [None]:
# Source DataFrame
stock_ma_data = [
    ("2023-01-01", "AAPL", 150.0),
    ("2023-01-02", "AAPL", 152.0),
    ("2023-01-03", "AAPL", 151.5),
    ("2023-01-04", "AAPL", 153.0),
    ("2023-01-05", "AAPL", 154.5),
    ("2023-01-06", "AAPL", 153.5),
    ("2023-01-07", "AAPL", 155.0)
]

stock_ma_df = spark.createDataFrame(stock_ma_data, ["date", "symbol", "price"])
stock_ma_df = stock_ma_df.withColumn("date", col("date").cast("date"))
stock_ma_df.show()

+----------+------+-----+
|      date|symbol|price|
+----------+------+-----+
|2023-01-01|  AAPL|150.0|
|2023-01-02|  AAPL|152.0|
|2023-01-03|  AAPL|151.5|
|2023-01-04|  AAPL|153.0|
|2023-01-05|  AAPL|154.5|
|2023-01-06|  AAPL|153.5|
|2023-01-07|  AAPL|155.0|
+----------+------+-----+



In [None]:
# Expected Output
expected_data = [
    ("2023-01-01", "AAPL", 150.0, None),
    ("2023-01-02", "AAPL", 152.0, None),
    ("2023-01-03", "AAPL", 151.5, 151.17),
    ("2023-01-04", "AAPL", 153.0, 152.17),
    ("2023-01-05", "AAPL", 154.5, 153.0),
    ("2023-01-06", "AAPL", 153.5, 153.67),
    ("2023-01-07", "AAPL", 155.0, 154.33)
]

expected_df = spark.createDataFrame(expected_data, ["date", "symbol", "price", "sma_3d"])
expected_df = expected_df.withColumn("date", col("date").cast("date"))
expected_df.show()

+----------+------+-----+------+
|      date|symbol|price|sma_3d|
+----------+------+-----+------+
|2023-01-01|  AAPL|150.0|  NULL|
|2023-01-02|  AAPL|152.0|  NULL|
|2023-01-03|  AAPL|151.5|151.17|
|2023-01-04|  AAPL|153.0|152.17|
|2023-01-05|  AAPL|154.5| 153.0|
|2023-01-06|  AAPL|153.5|153.67|
|2023-01-07|  AAPL|155.0|154.33|
+----------+------+-----+------+



In [None]:
# YOUR SOLUTION HERE

win = Window.partitionBy('symbol').orderBy(fn.col('date').asc()).rowsBetween(-2,0)

winRowNum = Window.partitionBy('symbol').orderBy(fn.col('date').asc())

result_df = \
      stock_ma_df\
          .withColumn('sma_3d', fn.avg(fn.col('price')).over(win))\
          .withColumn('sma_3d', fn.expr('ROUND(CAST(sma_3d as DOUBLE),2)'))\
          .withColumn('rwNum', fn.row_number().over(winRowNum))\
          .withColumn('sma_3d', fn.expr('case when rwNum < 3 then null else sma_3d end'))\
          .drop('rwNum')

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+----------+------+-----+------+
|      date|symbol|price|sma_3d|
+----------+------+-----+------+
|2023-01-01|  AAPL|150.0|  NULL|
|2023-01-02|  AAPL|152.0|  NULL|
|2023-01-03|  AAPL|151.5|151.17|
|2023-01-04|  AAPL|153.0|152.17|
|2023-01-05|  AAPL|154.5| 153.0|
|2023-01-06|  AAPL|153.5|153.67|
|2023-01-07|  AAPL|155.0|154.33|
+----------+------+-----+------+

✓ DataFrames are equal!



True

**Instructor Notes:** Complex window functions with multiple moving averages. Tests financial calculations and window bounds.

## Problem 20: Data Skew Handling Strategy

**Requirement:** Performance optimization for skewed customer order data.

**Scenario:** Handle data skew in customer orders by implementing salting technique.

In [None]:
# Source DataFrame (skewed data - one customer has most orders)
skewed_orders_data = [
    (1, "C001", 100.0),
    (2, "C001", 150.0),
    (3, "C001", 200.0),
    (4, "C001", 175.0),
    (5, "C001", 125.0),
    (6, "C002", 300.0),
    (7, "C003", 250.0),
    (8, "C004", 400.0),
    (9, "C005", 350.0)
]

skewed_orders_df = spark.createDataFrame(skewed_orders_data, ["order_id", "customer_id", "amount"])
skewed_orders_df.show()

+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|       1|       C001| 100.0|
|       2|       C001| 150.0|
|       3|       C001| 200.0|
|       4|       C001| 175.0|
|       5|       C001| 125.0|
|       6|       C002| 300.0|
|       7|       C003| 250.0|
|       8|       C004| 400.0|
|       9|       C005| 350.0|
+--------+-----------+------+



In [None]:
# Expected Output
expected_data = [
    ("C001", 750.0),
    ("C002", 300.0),
    ("C003", 250.0),
    ("C004", 400.0),
    ("C005", 350.0)
]

expected_df = spark.createDataFrame(expected_data, ["customer_id", "total_amount"])
expected_df.show()

+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|       C001|       750.0|
|       C002|       300.0|
|       C003|       250.0|
|       C004|       400.0|
|       C005|       350.0|
+-----------+------------+



In [None]:
# YOUR SOLUTION HERE

# Lets impliment the key salting here

salted_dataframe = \
        skewed_orders_df\
          .withColumn('salted_customer_id',
                          fn.expr(''' case when customer_id = 'C001'
                                      then customer_id ||'_'|| abs(hash(order_id) % 3)
                                      else customer_id end '''))
salted_dataframe.show()

result_df = \
        salted_dataframe\
          .groupBy('salted_customer_id')\
          .agg(fn.expr('sum(amount) as total_amount'))\
          .withColumn('customer_id', fn.expr('''split(salted_customer_id,'_')[0]'''))\
          .groupBy('customer_id')\
          .agg(fn.expr('sum(total_amount) as total_amount'))

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+--------+-----------+------+------------------+
|order_id|customer_id|amount|salted_customer_id|
+--------+-----------+------+------------------+
|       1|       C001| 100.0|            C001_1|
|       2|       C001| 150.0|            C001_1|
|       3|       C001| 200.0|            C001_0|
|       4|       C001| 175.0|            C001_2|
|       5|       C001| 125.0|            C001_2|
|       6|       C002| 300.0|              C002|
|       7|       C003| 250.0|              C003|
|       8|       C004| 400.0|              C004|
|       9|       C005| 350.0|              C005|
+--------+-----------+------+------------------+

+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|       C003|       250.0|
|       C004|       400.0|
|       C005|       350.0|
|       C001|       750.0|
|       C002|       300.0|
+-----------+------------+

✓ DataFrames are equal!



True

**Instructor Notes:** Data skew handling with salting technique. Tests performance optimization strategies for skewed data.

## Problem 21: Complex Filter with Multiple Joins

**Requirement:** Customer service needs to identify high-value customers with recent issues.

**Scenario:** Find customers with high lifetime value who have open support tickets.

In [None]:
# Source DataFrames
customers_high_value_data = [
    ("C001", "John Doe", 5000.0),
    ("C002", "Jane Smith", 3000.0),
    ("C003", "Bob Johnson", 7500.0),
    ("C004", "Alice Brown", 2000.0)
]

support_tickets_complex_data = [
    ("T001", "C001", "Open", "2023-01-15"),
    ("T002", "C002", "Closed", "2023-01-10"),
    ("T003", "C003", "Open", "2023-01-20"),
    ("T004", "C001", "Open", "2023-01-18")
]

customers_high_value_df = spark.createDataFrame(customers_high_value_data, ["customer_id", "customer_name", "lifetime_value"])
support_tickets_complex_df = spark.createDataFrame(support_tickets_complex_data, ["ticket_id", "customer_id", "status", "created_date"])

print("Customers:")
customers_high_value_df.show()
print("Support Tickets:")
support_tickets_complex_df.show()

Customers:
+-----------+-------------+--------------+
|customer_id|customer_name|lifetime_value|
+-----------+-------------+--------------+
|       C001|     John Doe|        5000.0|
|       C002|   Jane Smith|        3000.0|
|       C003|  Bob Johnson|        7500.0|
|       C004|  Alice Brown|        2000.0|
+-----------+-------------+--------------+

Support Tickets:
+---------+-----------+------+------------+
|ticket_id|customer_id|status|created_date|
+---------+-----------+------+------------+
|     T001|       C001|  Open|  2023-01-15|
|     T002|       C002|Closed|  2023-01-10|
|     T003|       C003|  Open|  2023-01-20|
|     T004|       C001|  Open|  2023-01-18|
+---------+-----------+------+------------+



In [None]:
# Expected Output
expected_data = [
    ("C001", "John Doe", 5000.0, "T001", "Open"),
    ("C001", "John Doe", 5000.0, "T004", "Open"),
    ("C003", "Bob Johnson", 7500.0, "T003", "Open")
]

expected_df = spark.createDataFrame(expected_data, ["customer_id", "customer_name", "lifetime_value", "ticket_id", "status"])
expected_df.show()

+-----------+-------------+--------------+---------+------+
|customer_id|customer_name|lifetime_value|ticket_id|status|
+-----------+-------------+--------------+---------+------+
|       C001|     John Doe|        5000.0|     T001|  Open|
|       C001|     John Doe|        5000.0|     T004|  Open|
|       C003|  Bob Johnson|        7500.0|     T003|  Open|
+-----------+-------------+--------------+---------+------+



In [None]:
# YOUR SOLUTION HERE

join_on = fn.expr(''' cust.customer_id = tickets.customer_id ''')

result_df = \
      customers_high_value_df.alias('cust')\
        .join(support_tickets_complex_df.alias('tickets'),
              join_on,
              'inner')\
        .where(''' tickets.status = 'Open' ''')\
        .select('cust.customer_id','cust.customer_name',
                'cust.lifetime_value','tickets.ticket_id','tickets.status')

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-----------+-------------+--------------+---------+------+
|customer_id|customer_name|lifetime_value|ticket_id|status|
+-----------+-------------+--------------+---------+------+
|       C001|     John Doe|        5000.0|     T001|  Open|
|       C001|     John Doe|        5000.0|     T004|  Open|
|       C003|  Bob Johnson|        7500.0|     T003|  Open|
+-----------+-------------+--------------+---------+------+

✓ DataFrames are equal!



True

**Instructor Notes:** Complex filtering with multiple join conditions. Tests business logic implementation with joins.

## Problem 22: Advanced Grouping with Multiple Aggregates

**Requirement:** Sales analytics needs comprehensive product performance metrics.

**Scenario:** Calculate multiple statistics (count, sum, avg, stddev) for products across regions.

In [None]:
# Source DataFrame
product_performance_data = [
    ("Electronics", "North", "Laptop", 50000),
    ("Electronics", "North", "Laptop", 55000),
    ("Electronics", "South", "Laptop", 45000),
    ("Electronics", "South", "Laptop", 48000),
    ("Electronics", "North", "Tablet", 30000),
    ("Electronics", "South", "Tablet", 25000),
    ("Clothing", "North", "Shirt", 20000),
    ("Clothing", "South", "Shirt", 22000)
]

product_performance_df = spark.createDataFrame(product_performance_data, ["category", "region", "product", "sales"])
product_performance_df.show()

+-----------+------+-------+-----+
|   category|region|product|sales|
+-----------+------+-------+-----+
|Electronics| North| Laptop|50000|
|Electronics| North| Laptop|55000|
|Electronics| South| Laptop|45000|
|Electronics| South| Laptop|48000|
|Electronics| North| Tablet|30000|
|Electronics| South| Tablet|25000|
|   Clothing| North|  Shirt|20000|
|   Clothing| South|  Shirt|22000|
+-----------+------+-------+-----+



In [None]:
# Expected Output
expected_data = [
    ("Electronics", "Laptop", 4, 198000, 49500.0, 4203.17),
    ("Electronics", "Tablet", 2, 55000, 27500.0, 3535.53),
    ("Clothing", "Shirt", 2, 42000, 21000.0, 1414.21)
]

expected_df = spark.createDataFrame(expected_data, ["category", "product", "transaction_count", "total_sales", "avg_sales", "std_sales"])
expected_df.show()

+-----------+-------+-----------------+-----------+---------+---------+
|   category|product|transaction_count|total_sales|avg_sales|std_sales|
+-----------+-------+-----------------+-----------+---------+---------+
|Electronics| Laptop|                4|     198000|  49500.0|  4203.17|
|Electronics| Tablet|                2|      55000|  27500.0|  3535.53|
|   Clothing|  Shirt|                2|      42000|  21000.0|  1414.21|
+-----------+-------+-----------------+-----------+---------+---------+



In [None]:
# YOUR SOLUTION HERE

result_df = \
      product_performance_df\
        .groupBy('category','product')\
        .agg(fn.expr('count(1) as transaction_count'),
              fn.expr('sum(sales) as total_sales'),
              fn.expr('round(avg(sales),1) as avg_sales'),
              fn.expr('round(stddev_samp(sales),2) as std_sales'))

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-----------+-------+-----------------+-----------+---------+---------+
|   category|product|transaction_count|total_sales|avg_sales|std_sales|
+-----------+-------+-----------------+-----------+---------+---------+
|Electronics| Laptop|                4|     198000|  49500.0|  4203.17|
|Electronics| Tablet|                2|      55000|  27500.0|  3535.53|
|   Clothing|  Shirt|                2|      42000|  21000.0|  1414.21|
+-----------+-------+-----------------+-----------+---------+---------+

✓ DataFrames are equal!



True

**Instructor Notes:** Multi-level aggregation with statistical functions. Tests complex grouping and multiple aggregate functions.

## Problem 23: Data Enrichment with External Reference

**Requirement:** Marketing needs customer data enriched with geographic information.

**Scenario:** Join customer data with postal code reference table to add city/state information.

In [None]:
# Source DataFrames
customers_geo_data = [
    ("C001", "John Doe", "10001"),
    ("C002", "Jane Smith", "90001"),
    ("C003", "Bob Johnson", "60601"),
    ("C004", "Alice Brown", "02101")
]

postal_codes_data = [
    ("10001", "New York", "NY"),
    ("90001", "Los Angeles", "CA"),
    ("60601", "Chicago", "IL"),
    ("02101", "Boston", "MA"),
    ("33101", "Miami", "FL")
]

customers_geo_df = spark.createDataFrame(customers_geo_data, ["customer_id", "customer_name", "postal_code"])
postal_codes_df = spark.createDataFrame(postal_codes_data, ["postal_code", "city", "state"])

print("Customers:")
customers_geo_df.show()
print("Postal Codes:")
postal_codes_df.show()

Customers:
+-----------+-------------+-----------+
|customer_id|customer_name|postal_code|
+-----------+-------------+-----------+
|       C001|     John Doe|      10001|
|       C002|   Jane Smith|      90001|
|       C003|  Bob Johnson|      60601|
|       C004|  Alice Brown|      02101|
+-----------+-------------+-----------+

Postal Codes:
+-----------+-----------+-----+
|postal_code|       city|state|
+-----------+-----------+-----+
|      10001|   New York|   NY|
|      90001|Los Angeles|   CA|
|      60601|    Chicago|   IL|
|      02101|     Boston|   MA|
|      33101|      Miami|   FL|
+-----------+-----------+-----+



In [None]:
# Expected Output
expected_data = [
    ("C001", "John Doe", "10001", "New York", "NY"),
    ("C002", "Jane Smith", "90001", "Los Angeles", "CA"),
    ("C003", "Bob Johnson", "60601", "Chicago", "IL"),
    ("C004", "Alice Brown", "02101", "Boston", "MA")
]

expected_df = spark.createDataFrame(expected_data, ["customer_id", "customer_name", "postal_code", "city", "state"])
expected_df.show()

+-----------+-------------+-----------+-----------+-----+
|customer_id|customer_name|postal_code|       city|state|
+-----------+-------------+-----------+-----------+-----+
|       C001|     John Doe|      10001|   New York|   NY|
|       C002|   Jane Smith|      90001|Los Angeles|   CA|
|       C003|  Bob Johnson|      60601|    Chicago|   IL|
|       C004|  Alice Brown|      02101|     Boston|   MA|
+-----------+-------------+-----------+-----------+-----+



In [None]:
# YOUR SOLUTION HERE

join_on = fn.expr(''' cust.postal_code =  post.postal_code ''')

result_df = \
      customers_geo_df.alias('cust')\
      .join(postal_codes_df.alias('post'),
            join_on,
            'inner')\
      .drop(fn.col('post.postal_code'))

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-----------+-------------+-----------+-----------+-----+
|customer_id|customer_name|postal_code|       city|state|
+-----------+-------------+-----------+-----------+-----+
|       C004|  Alice Brown|      02101|     Boston|   MA|
|       C001|     John Doe|      10001|   New York|   NY|
|       C003|  Bob Johnson|      60601|    Chicago|   IL|
|       C002|   Jane Smith|      90001|Los Angeles|   CA|
+-----------+-------------+-----------+-----------+-----+

✓ DataFrames are equal!



True

**Instructor Notes:** Data enrichment with reference table join. Tests lookup operations and data augmentation.

## Problem 24: Conditional Window Functions

**Requirement:** Analytics needs to calculate conditional running totals.

**Scenario:** Calculate running total of sales, but reset when category changes.

In [None]:
# Source DataFrame

category_sales_data = [
    ("2023-01-01", "Electronics", 1000.0),
    ("2023-01-02", "Electronics", 1500.0),
    ("2023-01-03", "Clothing", 800.0),
    ("2023-01-04", "Clothing", 1200.0),
    ("2023-01-05", "Electronics", 2000.0),
    ("2023-01-06", "Electronics", 1800.0)
]

category_sales_df = spark.createDataFrame(category_sales_data, ["date", "category", "sales"])
category_sales_df = category_sales_df.withColumn("date", col("date").cast("date"))
category_sales_df.show()

+----------+-----------+------+
|      date|   category| sales|
+----------+-----------+------+
|2023-01-01|Electronics|1000.0|
|2023-01-02|Electronics|1500.0|
|2023-01-03|   Clothing| 800.0|
|2023-01-04|   Clothing|1200.0|
|2023-01-05|Electronics|2000.0|
|2023-01-06|Electronics|1800.0|
+----------+-----------+------+



In [None]:
# Expected Output - CORRECTED

expected_data = [
    ("2023-01-01", "Electronics", 1000.0, 1000.0),
    ("2023-01-02", "Electronics", 1500.0, 2500.0),  # 1000 + 1500
    ("2023-01-03", "Clothing", 800.0, 800.0),       # RESET for new category
    ("2023-01-04", "Clothing", 1200.0, 2000.0),     # 800 + 1200
    ("2023-01-05", "Electronics", 2000.0, 4500.0),  # 2500 + 2000 (continues from previous Electronics)
    ("2023-01-06", "Electronics", 1800.0, 6300.0)   # 4500 + 1800
]

expected_df = spark.createDataFrame(expected_data, ["date", "category", "sales", "category_running_total"])
expected_df = expected_df.withColumn("date", col("date").cast("date"))
expected_df.show()

+----------+-----------+------+----------------------+
|      date|   category| sales|category_running_total|
+----------+-----------+------+----------------------+
|2023-01-01|Electronics|1000.0|                1000.0|
|2023-01-02|Electronics|1500.0|                2500.0|
|2023-01-03|   Clothing| 800.0|                 800.0|
|2023-01-04|   Clothing|1200.0|                2000.0|
|2023-01-05|Electronics|2000.0|                4500.0|
|2023-01-06|Electronics|1800.0|                6300.0|
+----------+-----------+------+----------------------+



In [None]:
# YOUR SOLUTION HERE

win = Window.partitionBy('category').orderBy(fn.col('date').asc_nulls_last()).rowsBetween(Window.unboundedPreceding, Window.currentRow)

result_df = category_sales_df\
      .withColumn('category_running_total', fn.sum(fn.col('sales')).over(win))

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+----------+-----------+------+----------------------+
|      date|   category| sales|category_running_total|
+----------+-----------+------+----------------------+
|2023-01-03|   Clothing| 800.0|                 800.0|
|2023-01-04|   Clothing|1200.0|                2000.0|
|2023-01-01|Electronics|1000.0|                1000.0|
|2023-01-02|Electronics|1500.0|                2500.0|
|2023-01-05|Electronics|2000.0|                4500.0|
|2023-01-06|Electronics|1800.0|                6300.0|
+----------+-----------+------+----------------------+

✓ DataFrames are equal!



True

**Instructor Notes:** Conditional window functions with partition reset. Tests complex window specifications and ordering.

## Problem 25: Multi-Column Deduplication

**Requirement:** Data quality needs advanced duplicate detection with fuzzy matching.

**Scenario:** Identify potential duplicates based on name similarity and other attributes.

In [None]:
# Source DataFrame
fuzzy_duplicates_data = [
    (1, "John Doe", "john@email.com", "123-456-7890"),
    (2, "Jon Doe", "john.doe@email.com", "123-456-7890"),
    (3, "Jane Smith", "jane@email.com", "987-654-3210"),
    (4, "Jane Smithe", "jane.smith@email.com", "987-654-3210"),
    (5, "Bob Johnson", "bob@email.com", "111-222-3333"),
    (6, "Robert Johnson", "bob.johnson@email.com", "111-222-3333")
]

fuzzy_duplicates_df = spark.createDataFrame(fuzzy_duplicates_data, ["cust_id", "name", "email", "phone"])
fuzzy_duplicates_df.show()

+-------+--------------+--------------------+------------+
|cust_id|          name|               email|       phone|
+-------+--------------+--------------------+------------+
|      1|      John Doe|      john@email.com|123-456-7890|
|      2|       Jon Doe|  john.doe@email.com|123-456-7890|
|      3|    Jane Smith|      jane@email.com|987-654-3210|
|      4|   Jane Smithe|jane.smith@email.com|987-654-3210|
|      5|   Bob Johnson|       bob@email.com|111-222-3333|
|      6|Robert Johnson|bob.johnson@email...|111-222-3333|
+-------+--------------+--------------------+------------+



In [None]:
# Expected Output
expected_data = [
    (1, "John Doe", "john@email.com", "123-456-7890", 1),
    (2, "Jon Doe", "john.doe@email.com", "123-456-7890", 1),
    (3, "Jane Smith", "jane@email.com", "987-654-3210", 2),
    (4, "Jane Smithe", "jane.smith@email.com", "987-654-3210", 2),
    (5, "Bob Johnson", "bob@email.com", "111-222-3333", 3),
    (6, "Robert Johnson", "bob.johnson@email.com", "111-222-3333", 3)
]

expected_df = spark.createDataFrame(expected_data, ["cust_id", "name", "email", "phone", "duplicate_group"])
expected_df.show()

+-------+--------------+--------------------+------------+---------------+
|cust_id|          name|               email|       phone|duplicate_group|
+-------+--------------+--------------------+------------+---------------+
|      1|      John Doe|      john@email.com|123-456-7890|              1|
|      2|       Jon Doe|  john.doe@email.com|123-456-7890|              1|
|      3|    Jane Smith|      jane@email.com|987-654-3210|              2|
|      4|   Jane Smithe|jane.smith@email.com|987-654-3210|              2|
|      5|   Bob Johnson|       bob@email.com|111-222-3333|              3|
|      6|Robert Johnson|bob.johnson@email...|111-222-3333|              3|
+-------+--------------+--------------------+------------+---------------+



In [None]:
# YOUR SOLUTION HERE

win = Window.orderBy('phone_clean','email_soundex')

result_df = \
      fuzzy_duplicates_df\
        .withColumn('name_soundex', fn.soundex(fn.col('name')))\
        .withColumn('email_part_1', fn.expr(''' trim(cast(split(email,'@')[0] as string)) '''))\
        .withColumn('email_soundex', fn.expr(''' soundex(split(email_part_1, '\\\\.')[0]) '''))\
        .withColumn('phone_clean', fn.expr(''' abs(hash(trim(replace(replace(phone,'+',''),'-','')))) '''))\
        .withColumn('duplicate_group', dense_rank().over(win))\
        .drop('email_part_1','email_soundex','phone_clean','name_soundex')

result_df.show(truncate = False)

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-------+--------------+---------------------+------------+---------------+
|cust_id|name          |email                |phone       |duplicate_group|
+-------+--------------+---------------------+------------+---------------+
|1      |John Doe      |john@email.com       |123-456-7890|1              |
|2      |Jon Doe       |john.doe@email.com   |123-456-7890|1              |
|3      |Jane Smith    |jane@email.com       |987-654-3210|2              |
|4      |Jane Smithe   |jane.smith@email.com |987-654-3210|2              |
|5      |Bob Johnson   |bob@email.com        |111-222-3333|3              |
|6      |Robert Johnson|bob.johnson@email.com|111-222-3333|3              |
+-------+--------------+---------------------+------------+---------------+

✓ DataFrames are equal!



True

**Instructor Notes:** Advanced deduplication with grouping logic. Tests complex duplicate identification strategies.

## Problem 26: Complex Data Type Transformations

**Requirement:** Data engineering needs to transform nested JSON structures.

**Scenario:** Convert array of structs to map and vice versa for different processing needs.

In [None]:
# Source DataFrame
user_preferences_data = [
    ("U001", [("theme", "dark"), ("language", "en"), ("notifications", "on")]),
    ("U002", [("theme", "light"), ("language", "es"), ("notifications", "off")]),
    ("U003", [("theme", "dark"), ("language", "fr"), ("notifications", "on")])
]

user_preferences_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("preferences", ArrayType(StructType([
        StructField("key", StringType(), True),
        StructField("value", StringType(), True)
    ])), True)
])

user_preferences_df = spark.createDataFrame(user_preferences_data, user_preferences_schema)
user_preferences_df.show(truncate=False)
user_preferences_df.printSchema()

+-------+------------------------------------------------------+
|user_id|preferences                                           |
+-------+------------------------------------------------------+
|U001   |[{theme, dark}, {language, en}, {notifications, on}]  |
|U002   |[{theme, light}, {language, es}, {notifications, off}]|
|U003   |[{theme, dark}, {language, fr}, {notifications, on}]  |
+-------+------------------------------------------------------+

root
 |-- user_id: string (nullable = true)
 |-- preferences: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)



In [None]:
# Expected Output
expected_data = [
    ("U001", "dark", "en", "on"),
    ("U002", "light", "es", "off"),
    ("U003", "dark", "fr", "on")
]

expected_df = spark.createDataFrame(expected_data, ["user_id", "theme", "language", "notifications"])
expected_df.show()

+-------+-----+--------+-------------+
|user_id|theme|language|notifications|
+-------+-----+--------+-------------+
|   U001| dark|      en|           on|
|   U002|light|      es|          off|
|   U003| dark|      fr|           on|
+-------+-----+--------+-------------+



In [None]:
# YOUR SOLUTION HERE

# -- convensional

result_df = \
      user_preferences_df\
        .select('*', fn.expr('''inline_outer(preferences)'''))\
        .groupBy('user_id')\
        .agg(fn.expr(''' max(case when key = 'theme' then `value` else null end) as theme '''),
            fn.expr(''' max(case when key = 'language' then `value` else null end) as language '''),
            fn.expr(''' max(case when key = 'notifications' then `value` else null end) as notifications '''))

result_df.show()

##-- pivoting method

result_df = \
      user_preferences_df\
        .select('*', fn.expr('''inline_outer(preferences)'''))\
        .groupBy('user_id')\
        .pivot('key', ['theme','language','notifications'])\
        .agg(fn.expr('max(value)'))


result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-------+-----+--------+-------------+
|user_id|theme|language|notifications|
+-------+-----+--------+-------------+
|   U001| dark|      en|           on|
|   U002|light|      es|          off|
|   U003| dark|      fr|           on|
+-------+-----+--------+-------------+

+-------+-----+--------+-------------+
|user_id|theme|language|notifications|
+-------+-----+--------+-------------+
|   U001| dark|      en|           on|
|   U002|light|      es|          off|
|   U003| dark|      fr|           on|
+-------+-----+--------+-------------+

✓ DataFrames are equal!



True

**Instructor Notes:** Complex data type transformations. Tests array and struct manipulation for data reshaping.

## Problem 27: Advanced Partitioning Strategy

**Requirement:** Performance optimization for large-scale time-series data.

**Scenario:** Implement partitioning strategy for efficient querying of time-series data.

In [None]:
# Source DataFrame
time_series_large_data = [
    ("2023-01-01 10:00:00", "Sensor_A", 25.5),
    ("2023-01-01 10:00:00", "Sensor_B", 30.2),
    ("2023-01-01 11:00:00", "Sensor_A", 26.1),
    ("2023-01-01 11:00:00", "Sensor_B", 31.0),
    ("2023-01-02 10:00:00", "Sensor_A", 24.8),
    ("2023-01-02 10:00:00", "Sensor_B", 29.5),
    ("2023-01-02 11:00:00", "Sensor_A", 25.3),
    ("2023-01-02 11:00:00", "Sensor_B", 30.1)
]

time_series_large_df = spark.createDataFrame(time_series_large_data, ["timestamp", "sensor_id", "value"])
time_series_large_df = time_series_large_df.withColumn("timestamp", col("timestamp").cast("timestamp"))
time_series_large_df.show()

+-------------------+---------+-----+
|          timestamp|sensor_id|value|
+-------------------+---------+-----+
|2023-01-01 10:00:00| Sensor_A| 25.5|
|2023-01-01 10:00:00| Sensor_B| 30.2|
|2023-01-01 11:00:00| Sensor_A| 26.1|
|2023-01-01 11:00:00| Sensor_B| 31.0|
|2023-01-02 10:00:00| Sensor_A| 24.8|
|2023-01-02 10:00:00| Sensor_B| 29.5|
|2023-01-02 11:00:00| Sensor_A| 25.3|
|2023-01-02 11:00:00| Sensor_B| 30.1|
+-------------------+---------+-----+



In [None]:
# Expected Output
expected_data = [
    ("2023-01-01", "Sensor_A", 25.5, 26.1),
    ("2023-01-01", "Sensor_B", 30.2, 31.0),
    ("2023-01-02", "Sensor_A", 24.8, 25.3),
    ("2023-01-02", "Sensor_B", 29.5, 30.1)
]

expected_df = spark.createDataFrame(expected_data, ["date", "sensor_id", "min_value", "max_value"])
expected_df = expected_df.withColumn("date", col("date").cast("date"))
expected_df.show()

+----------+---------+---------+---------+
|      date|sensor_id|min_value|max_value|
+----------+---------+---------+---------+
|2023-01-01| Sensor_A|     25.5|     26.1|
|2023-01-01| Sensor_B|     30.2|     31.0|
|2023-01-02| Sensor_A|     24.8|     25.3|
|2023-01-02| Sensor_B|     29.5|     30.1|
+----------+---------+---------+---------+



In [None]:
# YOUR SOLUTION HERE

##-- time series analysis in runtime.

num_partitions = 200

time_series_large_partitioned_df = \
        time_series_large_df\
          .withColumn('date', to_date('timestamp'))\
          .repartition(num_partitions,'date')\
          .cache()

time_series_large_partitioned_df\
        .groupBy('date','sensor_id')\
        .agg(fn.expr('min(value) as min_value'),
             fn.expr('max(value) as max_value'))\
        .show()

time_series_large_partitioned_df.unpersist()

# -- for repeated use cases, use the spark warehouse tables. (for run time and for other analytics connetors)

# YOUR SOLUTION HERE

time_series_large_df\
        .withColumn('date', to_date('timestamp'))\
        .write\
        .bucketBy(50,'date','sensor_id')\
        .sortBy('date','sensor_id')\
        .format('parquet')\
        .mode('overwrite')\
        .saveAsTable('table1')

dataframe = spark.read.table('table1')

result_df = \
      dataframe\
          .groupBy('date','sensor_id')\
          .agg(fn.expr('min(value) as min_value'),
              fn.expr('max(value) as max_value'))

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+----------+---------+---------+---------+
|      date|sensor_id|min_value|max_value|
+----------+---------+---------+---------+
|2023-01-01| Sensor_A|     25.5|     26.1|
|2023-01-01| Sensor_B|     30.2|     31.0|
|2023-01-02| Sensor_A|     24.8|     25.3|
|2023-01-02| Sensor_B|     29.5|     30.1|
+----------+---------+---------+---------+

+----------+---------+---------+---------+
|      date|sensor_id|min_value|max_value|
+----------+---------+---------+---------+
|2023-01-01| Sensor_A|     25.5|     26.1|
|2023-01-02| Sensor_B|     29.5|     30.1|
|2023-01-02| Sensor_A|     24.8|     25.3|
|2023-01-01| Sensor_B|     30.2|     31.0|
+----------+---------+---------+---------+

✓ DataFrames are equal!



True

**Instructor Notes:** Partitioning strategy for performance. Tests date extraction and efficient aggregation patterns.

## Problem 28: Complex Business Logic Implementation

**Requirement:** Finance needs commission calculation with tiered rates.

**Scenario:** Calculate sales commissions with different rates based on sales tiers.
>
* Tier 1: First $10,000 of sales → 5% commission


* Tier 2: Next $10,000 ($10,001 - $20,000) → 7% commission  

* Tier 3: Sales above $20,000 → 9% commission

In [None]:
# Source DataFrame
sales_commissions_data = [
    ("S001", "John", 5000.0),
    ("S002", "Jane", 15000.0),
    ("S003", "Bob", 8000.0),
    ("S004", "Alice", 25000.0),
    ("S005", "Charlie", 12000.0)
]

sales_commissions_df = spark.createDataFrame(sales_commissions_data, ["sales_id", "salesperson", "sales_amount"])
sales_commissions_df.show()

+--------+-----------+------------+
|sales_id|salesperson|sales_amount|
+--------+-----------+------------+
|    S001|       John|      5000.0|
|    S002|       Jane|     15000.0|
|    S003|        Bob|      8000.0|
|    S004|      Alice|     25000.0|
|    S005|    Charlie|     12000.0|
+--------+-----------+------------+



In [None]:
# Expected Output

expected_data = [
    ("S001", "John", 5000.0, 250.0),
    ("S002", "Jane", 15000.0, 850.0),
    ("S003", "Bob", 8000.0, 400.0),
    ("S004", "Alice", 25000.0, 1650.0),
    ("S005", "Charlie", 12000.0, 640.0)
]

expected_df = spark.createDataFrame(expected_data, ["sales_id", "salesperson", "sales_amount", "commission"])
expected_df.show()

+--------+-----------+------------+----------+
|sales_id|salesperson|sales_amount|commission|
+--------+-----------+------------+----------+
|    S001|       John|      5000.0|     250.0|
|    S002|       Jane|     15000.0|     850.0|
|    S003|        Bob|      8000.0|     400.0|
|    S004|      Alice|     25000.0|    1650.0|
|    S005|    Charlie|     12000.0|     640.0|
+--------+-----------+------------+----------+



In [None]:
# YOUR SOLUTION HERE

case_string = fn.expr('''
                      case when sales_amount <= 10000
                      then sales_amount*0.05
                      when sales_amount <= 20000
                      then 10000*0.05 + (sales_amount - 10000)*0.07
                      when sales_amount > 20000
                      then 10000*0.05 + 10000*0.07 + (sales_amount-20000)*0.09
                      end
                      ''')

result_df = \
    sales_commissions_df\
        .withColumn('commission',case_string)

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+--------+-----------+------------+----------+
|sales_id|salesperson|sales_amount|commission|
+--------+-----------+------------+----------+
|    S001|       John|      5000.0|     250.0|
|    S002|       Jane|     15000.0|     850.0|
|    S003|        Bob|      8000.0|     400.0|
|    S004|      Alice|     25000.0|    1650.0|
|    S005|    Charlie|     12000.0|     640.0|
+--------+-----------+------------+----------+

✓ DataFrames are equal!



True

**Instructor Notes:** Complex business logic with tiered calculations. Tests conditional logic and mathematical operations.

## Problem 29: Multi-Step Data Transformation Pipeline

**Requirement:** ETL pipeline needs complex multi-step data transformation.

**Scenario:** Implement a multi-step transformation: clean, enrich, aggregate, and pivot data.

In [None]:
# Source DataFrame
raw_sales_data = [
    ("  john  ", "Electronics", "2023-01-15", "1000.50"),
    ("Jane", "Clothing", "2023-01-16", "800.75"),
    ("bob", "Electronics", "2023-01-17", "1200.25"),
    ("Alice", "Clothing", "2023-01-18", "950.00")
]

raw_sales_df = spark.createDataFrame(raw_sales_data, ["salesperson", "category", "sale_date", "amount"])
raw_sales_df.show()

+-----------+-----------+----------+-------+
|salesperson|   category| sale_date| amount|
+-----------+-----------+----------+-------+
|     john  |Electronics|2023-01-15|1000.50|
|       Jane|   Clothing|2023-01-16| 800.75|
|        bob|Electronics|2023-01-17|1200.25|
|      Alice|   Clothing|2023-01-18| 950.00|
+-----------+-----------+----------+-------+



In [None]:
# Expected Output
expected_data = [
    ("Electronics", 2200.75),
    ("Clothing", 1750.75)
]

expected_df = spark.createDataFrame(expected_data, ["category", "total_sales"])
expected_df.show()

+-----------+-----------+
|   category|total_sales|
+-----------+-----------+
|Electronics|    2200.75|
|   Clothing|    1750.75|
+-----------+-----------+



In [None]:
# YOUR SOLUTION HERE

result_df = raw_sales_df\
    .groupBy('category')\
    .agg(fn.expr('sum(amount) as total_sales'))\

result_df.show()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-----------+-----------+
|   category|total_sales|
+-----------+-----------+
|Electronics|    2200.75|
|   Clothing|    1750.75|
+-----------+-----------+

✓ DataFrames are equal!



True

**Instructor Notes:** Multi-step transformation pipeline. Tests data cleaning, type conversion, and aggregation in sequence.

## Problem 30: Complex Join with Aggregation

**Requirement:** Business intelligence needs customer behavior analysis with purchase patterns.

**Scenario:** Join customer data with orders and calculate complex behavioral metrics.

In [None]:
# Source DataFrames
customers_behavior_data = [
    ("C001", "John", "2023-01-01"),
    ("C002", "Jane", "2023-01-05"),
    ("C003", "Bob", "2023-01-10")
]

orders_behavior_data = [
    ("O001", "C001", "2023-01-15", 100.0),
    ("O002", "C001", "2023-01-20", 150.0),
    ("O003", "C001", "2023-02-01", 200.0),
    ("O004", "C002", "2023-01-25", 300.0),
    ("O005", "C003", "2023-02-05", 250.0)
]

customers_behavior_df = spark.createDataFrame(customers_behavior_data, ["customer_id", "customer_name", "signup_date"])
orders_behavior_df = spark.createDataFrame(orders_behavior_data, ["order_id", "customer_id", "order_date", "amount"])

print("Customers:")
customers_behavior_df.show()
print("Orders:")
orders_behavior_df.show()

Customers:
+-----------+-------------+-----------+
|customer_id|customer_name|signup_date|
+-----------+-------------+-----------+
|       C001|         John| 2023-01-01|
|       C002|         Jane| 2023-01-05|
|       C003|          Bob| 2023-01-10|
+-----------+-------------+-----------+

Orders:
+--------+-----------+----------+------+
|order_id|customer_id|order_date|amount|
+--------+-----------+----------+------+
|    O001|       C001|2023-01-15| 100.0|
|    O002|       C001|2023-01-20| 150.0|
|    O003|       C001|2023-02-01| 200.0|
|    O004|       C002|2023-01-25| 300.0|
|    O005|       C003|2023-02-05| 250.0|
+--------+-----------+----------+------+



In [None]:
# Expected Output
expected_data = [
    ("C001", "John", 3, 450.0, 150.0, 14.0),
    ("C002", "Jane", 1, 300.0, 300.0, 20.0),
    ("C003", "Bob", 1, 250.0, 250.0, 26.0)
]

expected_df = spark.createDataFrame(expected_data, ["customer_id", "customer_name", "order_count", "total_spent", "avg_order_value", "days_to_first_order"])
expected_df.show()

+-----------+-------------+-----------+-----------+---------------+-------------------+
|customer_id|customer_name|order_count|total_spent|avg_order_value|days_to_first_order|
+-----------+-------------+-----------+-----------+---------------+-------------------+
|       C001|         John|          3|      450.0|          150.0|               14.0|
|       C002|         Jane|          1|      300.0|          300.0|               20.0|
|       C003|          Bob|          1|      250.0|          250.0|               26.0|
+-----------+-------------+-----------+-----------+---------------+-------------------+



In [None]:
# YOUR SOLUTION HERE

join_on = fn.expr(''' cust.customer_id = ord.customer_id ''')

merge_dataframe = \
    customers_behavior_df.alias('cust')\
      .join(orders_behavior_df.alias('ord'),
            join_on,
            'inner')\
      .drop('ord.customer_id')\
      .repartition(10,fn.col('cust.customer_id'),fn.col('cust.customer_name'))\
      .cache()

result_df = \
      merge_dataframe\
        .groupBy('cust.customer_id','cust.customer_name')\
        .agg(fn.expr(''' count(ord.order_id) as order_count '''),
            fn.expr(''' sum(ord.amount) as total_spent '''),
            fn.expr(''' avg(ord.amount) as avg_order_value '''),
            fn.expr(''' date_diff(min(ord.order_date),min(cust.signup_date)) as days_to_first_order '''),
            )

result_df.show()

merge_dataframe.unpersist()

# Test your solution
assert_dataframe_equal(result_df, expected_df)

+-----------+-------------+-----------+-----------+---------------+-------------------+
|customer_id|customer_name|order_count|total_spent|avg_order_value|days_to_first_order|
+-----------+-------------+-----------+-----------+---------------+-------------------+
|       C002|         Jane|          1|      300.0|          300.0|                 20|
|       C001|         John|          3|      450.0|          150.0|                 14|
|       C003|          Bob|          1|      250.0|          250.0|                 26|
+-----------+-------------+-----------+-----------+---------------+-------------------+

✓ DataFrames are equal!



True

**Instructor Notes:** Complex join with multiple aggregations and date calculations. Tests comprehensive data analysis patterns.

# Set 2 Complete!

You've completed all 30 Easy/Medium problems in Set 2. These problems cover:
- Advanced joins and deduplication
- Complex window functions
- Multi-level aggregations
- Advanced UDFs and data validation
- Nested data operations
- Performance optimization strategies
- Complex business logic implementation

Ready for Set 3 with Medium difficulty problems?