Problem Statement:

Generate a monthly sales report for all products in the year 2024. The report should include the total sales for each product by month, ensuring that even months with no sales data are represented with a total sales value of zero.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from datetime import datetime

# Initialize Spark session
spark = SparkSession.builder \
    .appName("ProductTransactions") \
    .getOrCreate()

# Define schema for products table
products_schema = StructType([
    StructField("pid", IntegerType(), True),
    StructField("pname", StringType(), True),
    StructField("price", IntegerType(), True)
])

# Define schema for transactions table
transactions_schema = StructType([
    StructField("pid", IntegerType(), True),
    StructField("sold_date", DateType(), True),
    StructField("qty", IntegerType(), True),
    StructField("amount", IntegerType(), True)
])

# Create DataFrame for products
products_data = [
    (1, 'A', 1000),
    (2, 'B', 400),
    (3, 'C', 500)
]
products_df = spark.createDataFrame(products_data, schema=products_schema)

# Create DataFrame for transactions
transactions_data = [
    (1, datetime.strptime('2024-02-01', '%Y-%m-%d').date(), 2, 2000),
    (1, datetime.strptime('2024-03-01', '%Y-%m-%d').date(), 4, 4000),
    (1, datetime.strptime('2024-03-15', '%Y-%m-%d').date(), 2, 2000),
    (3, datetime.strptime('2024-04-24', '%Y-%m-%d').date(), 3, 1500),
    (3, datetime.strptime('2024-05-16', '%Y-%m-%d').date(), 5, 2500)
]
transactions_df = spark.createDataFrame(transactions_data, schema=transactions_schema)

# Show the dataframes
print("Products:")
products_df.display()

print("Transactions:")
transactions_df.display()

# Further processing or analysis can be done using Spark SQL or DataFrame operations
# For example, joining products and transactions to calculate total sales per product
joined_df = transactions_df.join(products_df, on='pid', how='inner')
joined_df.display()


Products:


pid,pname,price
1,A,1000
2,B,400
3,C,500


Transactions:


pid,sold_date,qty,amount
1,2024-02-01,2,2000
1,2024-03-01,4,4000
1,2024-03-15,2,2000
3,2024-04-24,3,1500
3,2024-05-16,5,2500


pid,sold_date,qty,amount,pname,price
1,2024-02-01,2,2000,A,1000
1,2024-03-01,4,4000,A,1000
1,2024-03-15,2,2000,A,1000
3,2024-04-24,3,1500,C,500
3,2024-05-16,5,2500,C,500


In [0]:
from pyspark.sql import functions as F

# Assuming `products_df` and `transactions_df` are already created

# Step 1: Create the base DataFrame with `pid`, `pname`, `years`, and `month` initialized to 1.
initial_df = products_df.alias("p").join(transactions_df.alias("t"), F.col("p.pid") == F.col("t.pid"), 'left') \
    .select(F.col("p.pid").alias("pid"), F.col("p.pname").alias("pname"), 
            F.when(F.col("t.sold_date").isNotNull(), F.year("t.sold_date")).otherwise(2024).alias("years"),
            F.lit(1).alias("month")).distinct()

# Step 2: Create DataFrame for all months (1 to 12) using a loop.
months_df = initial_df
for i in range(2, 13):  # Loop from month 2 to 12
    new_month_df = initial_df.withColumn("month", F.lit(i))
    months_df = months_df.union(new_month_df)

# Step 3: Join with `transactions_df` again to calculate total_sales for each `pid`, `pname`, `years`, and `month`.
final_df = months_df.alias("r").join(transactions_df.alias("t"), 
                                     (F.col("r.pid") == F.col("t.pid")) & 
                                     (F.month("t.sold_date") == F.col("r.month")), 
                                     'left')

# Step 4: Perform group by to aggregate `total_sales`
final_df = final_df.groupBy("r.pid", "r.pname", "r.years", "r.month") \
    .agg(F.sum("t.amount").alias("total_sales"))

# Step 5: Replace `NULL` values in `total_sales` with 0.
final_df = final_df.withColumn("total_sales", F.coalesce(F.col("total_sales"), F.lit(0)))

# Step 6: Order the final result.
final_df = final_df.orderBy("r.pid", "r.pname", "r.years", "r.month")

# Display the final result
final_df.display()



pid,pname,years,month,total_sales
1,A,2024,1,0
1,A,2024,2,2000
1,A,2024,3,6000
1,A,2024,4,0
1,A,2024,5,0
1,A,2024,6,0
1,A,2024,7,0
1,A,2024,8,0
1,A,2024,9,0
1,A,2024,10,0


In [0]:
products_df.createOrReplaceTempView('products')
transactions_df.createOrReplaceTempView('transactions')


In [0]:
%sql
WITH initial_data AS (
    -- Step 1: Create the base DataFrame with `pid`, `pname`, `years`, and `month` initialized to 1.
    SELECT 
        p.pid, 
        p.pname, 
        COALESCE(YEAR(t.sold_date), 2024) AS years,
        1 AS month
    FROM products p
    LEFT JOIN transactions t ON p.pid = t.pid
    GROUP BY p.pid, p.pname, COALESCE(YEAR(t.sold_date), 2024)
),

-- Step 2: Create DataFrame for all months (1 to 12)
all_months_data AS (
    SELECT pid, pname, years, 1 AS month FROM initial_data
    UNION ALL SELECT pid, pname, years, 2 AS month FROM initial_data
    UNION ALL SELECT pid, pname, years, 3 AS month FROM initial_data
    UNION ALL SELECT pid, pname, years, 4 AS month FROM initial_data
    UNION ALL SELECT pid, pname, years, 5 AS month FROM initial_data
    UNION ALL SELECT pid, pname, years, 6 AS month FROM initial_data
    UNION ALL SELECT pid, pname, years, 7 AS month FROM initial_data
    UNION ALL SELECT pid, pname, years, 8 AS month FROM initial_data
    UNION ALL SELECT pid, pname, years, 9 AS month FROM initial_data
    UNION ALL SELECT pid, pname, years, 10 AS month FROM initial_data
    UNION ALL SELECT pid, pname, years, 11 AS month FROM initial_data
    UNION ALL SELECT pid, pname, years, 12 AS month FROM initial_data
),

-- Step 3: Join with `transactions` again to calculate total_sales
joined_data AS (
    SELECT 
        am.pid,
        am.pname,
        am.years,
        am.month,
        SUM(t.amount) AS total_sales
    FROM all_months_data am
    LEFT JOIN transactions t 
        ON am.pid = t.pid 
        AND MONTH(t.sold_date) = am.month
        AND YEAR(t.sold_date) = am.years
    GROUP BY am.pid, am.pname, am.years, am.month
)

-- Step 4: Replace NULL values in `total_sales` with 0 and order the result
SELECT 
    pid, 
    pname, 
    years, 
    month, 
    COALESCE(total_sales, 0) AS total_sales
FROM joined_data
ORDER BY pid, pname, years, month;


pid,pname,years,month,total_sales
1,A,2024,1,0
1,A,2024,2,2000
1,A,2024,3,6000
1,A,2024,4,0
1,A,2024,5,0
1,A,2024,6,0
1,A,2024,7,0
1,A,2024,8,0
1,A,2024,9,0
1,A,2024,10,0


Explanation:

1. Initial DataFrame (initial_df): This corresponds to the first part of the recursive CTE, where we select distinct products and set the initial month to 

2. Manual Recursion (Loop): Instead of using recursion (which PySpark doesn't support), we generate the next months (2 to 12) by looping and unioning the DataFrame for each month.

3. Join and Aggregation: We use a left join to match each product's month with transaction data and aggregate the amount column to calculate total_sales.
Handling NULL Values: The coalesce() function replaces NULL values in total_sales with 0.

4. Ordering: The result is ordered by pid, pname, years, and month, mimicking the final ORDER BY in the SQL query.