# Assignment Lesson 6. Pagila Database Analysis

## Requirements.

1. Using Spark, compute monthly revenue by film category.
2. Define customer lifetime value (CLV) using Spark.
3. Identify the top 1% of customers generating 80% of revenue.
4. Propose a partitioning strategy for the payment table:
    - by date?
    - by store?
    - by customer?
    
    Explain trade-offs.

5. The following join is very slow at scale:

    `payment -> rental -> inventory -> film -> film_category -> category`

    Propose:

    - join order optimization
    - indexing strategies
    - caching or materialized views

## My Solution

### Set Up Spark Session and JDBC Connection

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, count, avg, round as _round, max as _max, min as _min, \
    stddev, percentile_approx, \
    date_format, datediff,  \
    countDistinct, concat_ws, \
    row_number, broadcast
from pyspark.sql.window import Window
import time

spark = SparkSession.builder \
    .appName("PagilaAnalysis") \
    .config("spark.jars", "../jars/postgresql-42.7.8.jar") \
    .config("spark.driver.extraClassPath", "../jars/postgresql-42.7.8.jar") \
    .master("local[*]") \
    .getOrCreate()
    
jdbc_url = "jdbc:postgresql://localhost:5432/analytics"
db_properties = {
    "user": "spark_user",
    "password": "spark_password",
    "driver": "org.postgresql.Driver"
}

25/12/30 10:35:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Load Data

In [2]:
print("\\n=== READING PAGILA DATA FROM POSTGRESQL ===")

# Read all tables needed
df_film = spark.read.jdbc(url=jdbc_url, table="film", properties=db_properties)
df_film_category = spark.read.jdbc(url=jdbc_url, table="film_category", properties=db_properties)
df_category = spark.read.jdbc(url=jdbc_url, table="category", properties=db_properties)
df_inventory = spark.read.jdbc(url=jdbc_url, table="inventory", properties=db_properties)
df_rental = spark.read.jdbc(url=jdbc_url, table="rental", properties=db_properties)
df_payment = spark.read.jdbc(url=jdbc_url, table="payment", properties=db_properties)
df_store = spark.read.jdbc(url=jdbc_url, table="store", properties=db_properties)
df_customer = spark.read.jdbc(url=jdbc_url, table="customer", properties=db_properties)

print("\\n=== All tables loaded! ===")

\n=== READING PAGILA DATA FROM POSTGRESQL ===


\n=== All tables loaded! ===


### Exercise 1. Monthly Revenue by Film Category

In [3]:
print("\\n=== EXERCISE 1. Monthly Revenue by Film Category")

monthly_revenue_by_category = df_payment \
    .join(df_rental, "rental_id") \
    .join(df_inventory, "inventory_id") \
    .join(df_film, "film_id") \
    .join(df_film_category, "film_id") \
    .join(df_category, "category_id") \
    .withColumn("year_month", date_format(col("payment_date"), "yyyy-MM")) \
    .groupBy("year_month", col("name").alias("category_name")) \
    .agg(
        _sum("amount").alias("total_revenue")
    ) \
    .orderBy("year_month", col("total_revenue").desc())

# Show result
print("\\nMonthly Revenue by Category:")
monthly_revenue_by_category.show(truncate=False)

pivot_revenue = (
    monthly_revenue_by_category
    .groupBy("category_name")      
    .pivot("year_month")           
    .agg(_round(_sum("total_revenue"), 2))
    .orderBy("category_name")
)
    
print("\\n Pivot View (Month x Category):")
pivot_revenue.show(truncate=False)

\n=== EXERCISE 1. Monthly Revenue by Film Category
\nMonthly Revenue by Category:


                                                                                

+----------+-------------+-------------+
|year_month|category_name|total_revenue|
+----------+-------------+-------------+
|2022-01   |Sci-Fi       |244.43       |
|2022-01   |Action       |237.44       |
|2022-01   |New          |236.47       |
|2022-01   |Sports       |235.48       |
|2022-01   |Comedy       |226.57       |
|2022-01   |Drama        |218.55       |
|2022-01   |Foreign      |210.48       |
|2022-01   |Documentary  |195.51       |
|2022-01   |Family       |189.53       |
|2022-01   |Classics     |168.58       |
|2022-01   |Animation    |164.54       |
|2022-01   |Travel       |150.68       |
|2022-01   |Horror       |147.68       |
|2022-01   |Music        |136.63       |
|2022-01   |Games        |129.71       |
|2022-01   |Children     |123.68       |
|2022-02   |Sports       |821.20       |
|2022-02   |Games        |728.34       |
|2022-02   |Family       |709.12       |
|2022-02   |Action       |684.34       |
+----------+-------------+-------------+
only showing top

                                                                                

\n Pivot View (Month x Category):


[Stage 88:>                                                         (0 + 1) / 1]

+-------------+-------+-------+-------+-------+-------+-------+-------+
|category_name|2022-01|2022-02|2022-03|2022-04|2022-05|2022-06|2022-07|
+-------------+-------+-------+-------+-------+-------+-------+-------+
|Action       |237.44 |684.34 |839.90 |652.34 |628.36 |741.06 |592.41 |
|Animation    |164.54 |623.48 |802.02 |779.04 |882.86 |757.02 |647.34 |
|Children     |123.68 |520.76 |620.36 |698.27 |611.35 |636.28 |444.85 |
|Classics     |168.58 |541.59 |604.43 |579.46 |643.37 |621.44 |480.72 |
|Comedy       |226.57 |640.66 |736.31 |633.57 |749.44 |730.42 |666.61 |
|Documentary  |195.51 |585.46 |756.24 |646.48 |697.38 |752.02 |584.43 |
|Drama        |218.55 |678.36 |768.24 |748.39 |809.11 |720.35 |644.39 |
|Family       |189.53 |709.12 |684.28 |744.12 |657.30 |688.20 |553.52 |
|Foreign      |210.48 |663.48 |736.24 |623.40 |756.20 |609.50 |671.37 |
|Games        |129.71 |728.34 |740.31 |589.64 |753.42 |678.40 |661.51 |
|Horror       |147.68 |589.76 |730.41 |561.65 |587.67 |553.71 |5

                                                                                

### Exercise 2. Define Customer Lifetime Value (CLV)

In the basic background: `CLV = Total Revenue from Customer`.

In [4]:
print("\\n=== EXERCISE 2. Customer Lifetime Value")

clv_summary = df_payment \
    .join(df_customer, "customer_id") \
    .groupBy(
        "customer_id",
        concat_ws(" ", col("first_name"), col("last_name")).alias("customer_name")
    ) \
    .agg(
        # Total Revenue (CLV)
        _sum("amount").alias("clv"),
        # Total Transactions
        count("payment_id").alias("total_trans"),
        # Number of active months
        countDistinct(date_format(col("payment_date"), "yyyy-MM")).alias("active_months"),
        # Average revenue of each transaction value
        _round(avg("amount"), 2).alias("avg_trans_value"),
        # Tenure days
        datediff(_max("payment_date"), _min("payment_date")).alias("tenure_days"),
        # _max("payment_date").alias("last_purchase_date"),
        # _min("payment_date").alias("first_purchase_date")
    ) \
    .withColumn(
        "avg_monthly_revenue",
        _round(col("clv")/col("active_months"), 2)
    ) \
    .orderBy(col("clv").desc())

# Show result
print("\\n Top 20 Customers by CLV:")
clv_summary.show(20, truncate=False)

# Statistic Summary
print("\\n CLV Statistic Summary.")
clv_summary.select(
    _round(avg("clv"), 2).alias("avg_clv"),
    _round(stddev("clv"), 2).alias("stddev_clv"),
    _round(_min("clv"), 2).alias("min_clv"),
    _round(_max("clv"), 2).alias("max_clv"),
    _round(percentile_approx("clv", 0.25), 2).alias("25th_percentile_clv"),
    _round(percentile_approx("clv", 0.5), 2).alias("median_clv"),
    _round(percentile_approx("clv", 0.75), 2).alias("75th_percentile_clv")
).show()

\n=== EXERCISE 2. Customer Lifetime Value


\n Top 20 Customers by CLV:


                                                                                

+-----------+--------------+------+-----------+-------------+---------------+-----------+-------------------+
|customer_id|customer_name |clv   |total_trans|active_months|avg_trans_value|tenure_days|avg_monthly_revenue|
+-----------+--------------+------+-----------+-------------+---------------+-----------+-------------------+
|526        |KARL SEAL     |221.55|45         |6            |4.92           |171        |36.93              |
|148        |ELEANOR HUNT  |216.54|46         |7            |4.71           |180        |30.93              |
|144        |CLARA SHAW    |195.58|42         |7            |4.66           |179        |27.94              |
|137        |RHONDA KENNEDY|194.61|39         |7            |4.99           |180        |27.80              |
|178        |MARION SNYDER |194.61|39         |7            |4.99           |176        |27.80              |
|459        |TOMMY COLLAZO |186.62|38         |7            |4.91           |182        |26.66              |
|469      

### Exercise 3. Identify The Top 1% of Customers Generating 80% of Revenue.

In [5]:
print("\\n=== EXERCISE 3. Top 1% of Customers Generating 80% of Revenue")

# Compute total revenue
total_revenue = clv_summary.agg(_sum("clv")).first()[0]
# Compute total customers
total_customers = clv_summary.count()

# Compute cumulative revenue
windowSpec = Window.orderBy(col("clv").desc())

# Pareto
## Ranking customer by clv
## Cumulative revenue from customer #1 to present
## % revenue at this present row
## % number of customers at this present row
customer_pareto = clv_summary \
    .withColumn("row_num", row_number().over(windowSpec)) \
    .withColumn("cumulative_revenue", _sum("clv").over(windowSpec.rowsBetween(Window.unboundedPreceding, 0))) \
    .withColumn("pct_revenue", col("cumulative_revenue") / total_revenue * 100) \
    .withColumn("pct_customer", col("row_num") / total_customers * 100)

# PART 1 — Top 1% customers generate how much revenue?
top1pct_cutoff = int(total_customers * 0.01)

top1pct_customer_revenue = customer_pareto \
    .filter(col("row_num") <= top1pct_cutoff) \
    .agg(_max("pct_revenue").alias("top1pct_customer_revenue")) \
    .collect()[0]["top1pct_customer_revenue"]

top1pct_customer = customer_pareto \
    .filter(col("row_num") <= top1pct_cutoff) \
    .select("customer_id", "customer_name", "clv", "total_trans", "pct_revenue")

# PART 2 — How many customers are needed to reach 80% revenue?
pct_customer_gen_80pct_revenue = customer_pareto \
    .filter(col("pct_revenue") >= 80) \
    .agg(_min("pct_customer").alias("pct_customer_gen_80pct_revenue")) \
    .collect()[0]["pct_customer_gen_80pct_revenue"]


# Summary Result
print(f"Total Revenue: ${total_revenue}")
print(f"Total Customers: {total_customers} customers")
print(f"1. Top 1% customers generate approximately {top1pct_customer_revenue}% of total revenue.")
print("\\n== Top 1% Customers:")
top1pct_customer.show(20, truncate=False)
print(f"2. To generate 80% of revenue, approximately {pct_customer_gen_80pct_revenue}% of customers are required.")

\n=== EXERCISE 3. Top 1% of Customers Generating 80% of Revenue


25/12/30 10:36:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 10:36:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 10:36:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 10:36:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 10:36:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 10:36:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 1

Total Revenue: $67416.51
Total Customers: 599 customers
1. Top 1% customers generate approximately 1.517269290% of total revenue.
\n== Top 1% Customers:


25/12/30 10:36:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 10:36:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 10:36:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 10:36:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 10:36:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 10:36:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/30 1

+-----------+--------------+------+-----------+-----------+
|customer_id|customer_name |clv   |total_trans|pct_revenue|
+-----------+--------------+------+-----------+-----------+
|526        |KARL SEAL     |221.55|45         |0.328628700|
|148        |ELEANOR HUNT  |216.54|46         |0.649825980|
|144        |CLARA SHAW    |195.58|42         |0.939932960|
|137        |RHONDA KENNEDY|194.61|39         |1.228601120|
|178        |MARION SNYDER |194.61|39         |1.517269290|
+-----------+--------------+------+-----------+-----------+

2. To generate 80% of revenue, approximately 73.12186978297161% of customers are required.


### Exercise 4. Propose a partitioning strategy for the payment table:
- by date?
- by store?
- by customer?

In [6]:
# print("\\n=== EXERCISE 4. Partitioning Strategy Analysis")

# # ---- Strategy 1: By Date ----
# print("\n 1. PARTITIONING BY DATE (`payment_date`)")
# print("-" * 60)

# date_distribution = df_payment \
#     .withColumn("payment_month", date_format(col("payment_date"), "yyyy-MM")) \
#     .groupBy("payment_month") \
#     .agg(
#         count("payment_id").alias("payment_count"),
#         _round(_sum("amount"), 2).alias("total_revenue")
#     ) \
#     .orderBy("payment_month")

# date_distribution.show()

# date_stats = date_distribution.agg(
#     count("payment_month").alias("total_months"),
#     _round(avg("payment_count"), 0).alias("avg_payments_per_month"),
#     _round(stddev("payment_count"), 0).alias("stddev_payments"),
#     _max("payment_count").alias("max_payments"),
#     _min("payment_count").alias("min_payments")
# )
# date_stats.show()

# # ---- Strategy 2: By Store ----
# print("\n 2. PARTITIONING BY STORE")
# print("-" * 60)

# store_distribution = df_payment \
#     .join(df_rental, "rental_id") \
#     .join(df_inventory, "inventory_id") \
#     .groupBy("store_id") \
#     .agg(
#         count("payment_id").alias("payment_count"),
#         _round(_sum("amount"), 2).alias("total_revenue"),
#         countDistinct("customer_id").alias("unique_customers")
#     ) \
#     .orderBy("store_id")

# store_distribution.show()

### Exercise 5. Join Optimization Analysis

The following join is very slow at scale:
    
`payment -> rental -> inventory -> film -> film_category -> category`

Propose:
- join order optimization
- indexing strategies
- caching or materialized views

In [10]:
print(f"\\n=== Exercise 5. Join Optimization Analysis")
print("\\n TABLE SIZE ANALYSIS:")
table_counts = {
    "payment": df_payment.count(),
    "rental": df_rental.count(),
    "inventory": df_inventory.count(),
    "film": df_film.count(),
    "film_category": df_film_category.count(),
    "category": df_category.count()
}

for table, count in table_counts.items():
    print(f"   {table:15s}: {count:,} rows")

\n=== Exercise 5. Join Optimization Analysis
\n TABLE SIZE ANALYSIS:
   payment        : 16,049 rows
   rental         : 16,044 rows
   inventory      : 4,581 rows
   film           : 1,000 rows
   film_category  : 1,000 rows
   category       : 16 rows


#### 1. Join Order Optimization

*The goal of join order optimization is to **reduce the row count as early as possible**.*

**Strategy:** Dimension Pre-Aggregation & Broadcast.

In the current order, it is starting with `payment` and `rental`, which are high volume ~16k. If we are looking for a specific category such as *Action movies*, we are processing millions of payments only to filter them at the very last step.

Therefore, to **OPTIMAL JOIN ORDER**:

**Reverse the Join:**

1. Start with smallest tables first:
   `category` -> `film_category` -> `film`

   - Film creates a small "lookup" set.
   
2. Then join to medium tables: -> `inventory`
   
3. Finally join to largest tables: -> `rental` -> `payment`

Do: `category` -> `film_category` -> `film` -> `inventory` -> `rental` -> `payment`

In [8]:
print(f"\\n=== 1. Join Order Optimization ===")
start_bad = time.time()
result_bad = df_payment \
    .join(df_rental, "rental_id") \
    .join(df_inventory, "inventory_id") \
    .join(df_film, "film_id") \
    .join(df_film_category, "film_id") \
    .join(df_category, "category_id") \
    .select("payment_id", "amount", col("name").alias("category_name"))
count_bad = result_bad.count()
time_bad = time.time() - start_bad

# Good join order (optimized)
start_good = time.time()
result_good = df_category \
    .join(df_film_category, "category_id") \
    .join(df_film, "film_id") \
    .join(df_inventory, "film_id") \
    .join(df_rental, "inventory_id") \
    .join(df_payment, "rental_id") \
    .select("payment_id", "amount", col("name").alias("category_name"))
count_good = result_good.count()
time_good = time.time() - start_good

print(f"""Original Join Order: {time_bad:.2f} seconds ({count_bad:,} rows)
Optimized Join Order: {time_good:.2f} seconds ({count_good:,} rows)
Improvement: {((time_bad - time_good) / time_bad * 100):.1f}% faster.""")

\n=== 1. Join Order Optimization ===


                                                                                

Original Join Order: 3.49 seconds (16,049 rows)
Optimized Join Order: 2.20 seconds (16,049 rows)
Improvement: 36.9% faster.


**Advanced:** Instead of a linear join, group the smaller "Dimension" tables (`category`, `film_category`, `film`, `inventory`) first. In Big Data frameworks like Spark, when the `category`, `film_category`, `film` and `inventory` tables are small enough to fit in memory, we use a **Broadcast Join**. This converts a distributed shuffle join into a local map-side join and avoids shuffling the massive `payment` and `rental` tables across the network.

Proposed Order:

1. Join `category` -> `film_category` -> `film` -> `inventory` 

    Result: `ref_inventory_dim` (Broadcasted).

2. Join `payment` -> `rental`

    Result: `fact_transactions`.

3. Join `Fact_Transactions` and `Ref_Inventory_Dim`.

In [14]:
start_broadcast = time.time()
ref_inventory_dim = df_category \
    .join(df_film_category, "category_id") \
    .join(df_film, "film_id") \
    .join(df_inventory, "film_id")
result_broadcast = df_payment \
    .join(df_rental, "rental_id") \
    .join(broadcast(ref_inventory_dim), "inventory_id") \
    .select("payment_id", "amount", col("name").alias("category_name"))
count_broadcast = result_broadcast.count()
time_broadcast = time.time() - start_broadcast

print(f"""Without Broadcast: {time_bad:.2f} seconds
With Broadcast: {time_broadcast:.2f} seconds
Improvement: {((time_bad - time_broadcast) / time_bad * 100):.1f}% faster
""")

                                                                                

Without Broadcast: 3.49 seconds
With Broadcast: 1.81 seconds
Improvement: 48.1% faster



#### 2. Indexing Strategies

A. For Relational Database PostgreSQL: Indexes should be added on all foreign keys involved in joins.

B. For Distributed System Apache Spark: We can use partition method in Exercise 4 to store data in directories based on identified column. This enables **Partition Pruning**, allowing Spark to completely skip reading folders that do not match the query's time range.


#### 3. Caching and Materialized Views

To avoid re-computing this expensive 6-table join for every query, I propose implementing a multi-layer caching strategy.

**Materialized Views**:

- Denormalize the schema into a Flattened Fact Table. Pre-join all 6 tables into a single wide table: `dm_rental_analysis`.

- Trade-off: Gain massive read speed but pay a cost in storage and refresh latency.

**Distributed Caching**: 

- If the result of the `inventory` + `film` + `category` join is used across multiple downstream models, use `.persist(StorageLevel.MEMORY_AND_DISK)`.
- Benefit: It stores the intermediate computed state in RAM, preventing the Spark DAG from re-evaluating the entire lineage.