### Exercise 3 : Run with Spark (inside Jupyter)

In [1]:
# ============================================
# 0. Imports & Spark session
# ============================================

import time
import builtins  # <-- IMPORTANT
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    avg,
    round as spark_round,   # Spark round ONLY for Columns
    count,
    col,
    sum as _sum
)

spark = (
    SparkSession.builder
    .appName("PostgresVsSparkBenchmark")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.2")
    .config("spark.eventLog.enabled", "true")
    .config("spark.eventLog.dir", "/tmp/spark-events")
    .config("spark.history.fs.logDirectory", "/tmp/spark-events")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

# ============================================
# 1. JDBC connection config
# ============================================

jdbc_url = "jdbc:postgresql://postgres:5432/postgres"
jdbc_props = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

# ============================================
# 2. Load data from PostgreSQL
# ============================================

print("\n=== Loading people_big from PostgreSQL ===")

start = time.time()

df_big = spark.read.jdbc(
    url=jdbc_url,
    table="people_big",
    properties=jdbc_props
)

# Force materialization
row_count = df_big.count()

print(f"Rows loaded: {row_count}")
print("Load time:", builtins.round(time.time() - start, 2), "seconds")

# Register temp view
df_big.createOrReplaceTempView("people_big")

# ============================================
# 3. Query (a): Simple aggregation
# ============================================

print("\n=== Query (a): AVG salary per department ===")

start = time.time()

q_a = (
    df_big
    .groupBy("department")
    .agg(spark_round(avg("salary"), 2).alias("avg_salary"))
    .orderBy("department", ascending=False)
    .limit(10)
)

q_a.collect()
q_a.show(truncate=False)
print("Query (a) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 4. Query (b): Nested aggregation
# ============================================

print("\n=== Query (b): Nested aggregation ===")

start = time.time()

q_b = spark.sql("""
SELECT country, AVG(avg_salary) AS avg_salary
FROM (
    SELECT country, department, AVG(salary) AS avg_salary
    FROM people_big
    GROUP BY country, department
) sub
GROUP BY country
ORDER BY avg_salary DESC
LIMIT 10
""")

q_b.collect()
q_b.show(truncate=False)
print("Query (b) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 5. Query (c): Sorting + Top-N
# ============================================

print("\n=== Query (c): Top 10 salaries ===")

start = time.time()

q_c = (
    df_big
    .orderBy(col("salary").desc())
    .limit(10)
)

q_c.collect()
q_c.show(truncate=False)
print("Query (c) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 6. Query (d): Heavy self-join (COUNT only)
# ============================================

print("\n=== Query (d): Heavy self-join COUNT (DANGEROUS) ===")

start = time.time()

q_d = (
    df_big.alias("p1")
    .join(df_big.alias("p2"), on="country")
    .count()
)

print("Join count:", q_d)
print("Query (d) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 7. Query (d-safe): Join-equivalent rewrite
# ============================================

print("\n=== Query (d-safe): Join-equivalent rewrite ===")

start = time.time()

grouped = df_big.groupBy("country").agg(count("*").alias("cnt"))

q_d_safe = grouped.select(
    _sum(col("cnt") * col("cnt")).alias("total_pairs")
)

q_d_safe.collect()
q_d_safe.show()
print("Query (d-safe) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 8. Cleanup
# ============================================

#spark.stop()


=== Loading people_big from PostgreSQL ===
Rows loaded: 1000000
Load time: 9.36 seconds

=== Query (a): AVG salary per department ===
+------------------+----------+
|department        |avg_salary|
+------------------+----------+
|Workforce Planning|85090.82  |
|Web Development   |84814.36  |
|UX Design         |84821.2   |
|UI Design         |85164.64  |
|Treasury          |84783.27  |
|Training          |85148.1   |
|Tax               |85018.57  |
|Sustainability    |85178.99  |
|Supply Chain      |84952.89  |
|Subscriptions     |84899.19  |
+------------------+----------+

Query (a) time: 11.36 seconds

=== Query (b): Nested aggregation ===
+------------+-----------------+
|country     |avg_salary       |
+------------+-----------------+
|Egypt       |87382.22963311202|
|Kuwait      |87349.3517377211 |
|Saudi Arabia|87348.80512175431|
|Panama      |87345.00623707911|
|Denmark     |87328.03514120901|
|Jamaica     |87305.43735208298|
|Lebanon     |87292.76891750698|
|Turkey      |872

## Exercise 4: Port the SQL queries from exercise 1 to spark
Exercise 1 - PostgreSQL Analytical Queries (E-commerce)
- A. What is the single item with the highest price_per_unit?  
- B. What are the top 3 products with the highest total quantity sold across all orders?  
- C. What is the total revenue per product category?  
  (Revenue = price_per_unit Ã— quantity)
- D. Which customers have the highest total spending?  

In [2]:
#Load the orders table from PostgreSQL
df_orders = spark.read.jdbc(
    url=jdbc_url,
    table="orders",
    properties=jdbc_props
)

df_orders.createOrReplaceTempView("orders")

#A. Single item with highest price_per_unit
q_A = df_orders.orderBy(col("price_per_unit").desc()).limit(1)
q_A.show()

+-------------+----------------+--------+--------------+----------+-------+
|customer_name|product_category|quantity|price_per_unit|order_date|country|
+-------------+----------------+--------+--------------+----------+-------+
|   Emma Brown|      Automotive|       3|       2000.00|2024-10-11|  Italy|
+-------------+----------------+--------+--------------+----------+-------+



In [3]:
#B. Top 3 products with highest total quantity sold
q_B = (
    df_orders
    .groupBy("product_category")
    .agg(_sum("quantity").alias("total_quantity_sold"))
    .orderBy(col("total_quantity_sold").desc())
    .limit(3)
)
q_B.show()

+----------------+-------------------+
|product_category|total_quantity_sold|
+----------------+-------------------+
| Health & Beauty|             300842|
|     Electronics|             300804|
|            Toys|             300598|
+----------------+-------------------+



In [4]:
#C. Total revenue per product category
q_C = (
    df_orders
    .groupBy("product_category")
    .agg(_sum(col("price_per_unit") * col("quantity")).alias("total_revenue"))
    .orderBy(col("total_revenue").desc())
)
q_C.show()

+----------------+-------------+
|product_category|total_revenue|
+----------------+-------------+
|      Automotive| 306589798.86|
|     Electronics| 241525009.45|
|   Home & Garden|  78023780.09|
|          Sports|  61848990.83|
| Health & Beauty|  46599817.89|
| Office Supplies|  38276061.64|
|         Fashion|  31566368.22|
|            Toys|  23271039.02|
|         Grocery|  15268355.66|
|           Books|  12731976.04|
+----------------+-------------+



In [5]:
#D. Customers with highest total spending
q_D = (
    df_orders
    .groupBy("customer_name")
    .agg(_sum(col("price_per_unit") * col("quantity")).alias("total_spent"))
    .orderBy(col("total_spent").desc())
    .limit(20)
)
q_D.show()

+--------------+-----------+
| customer_name|total_spent|
+--------------+-----------+
|  Carol Taylor|  991179.18|
|    Nina Lopez|  975444.95|
|Daniel Jackson|  959344.48|
|   Carol Lewis|  947708.57|
|  Daniel Young|  946030.14|
|Alice Martinez|  935100.02|
|   Ethan Perez|  934841.24|
|       Leo Lee|  934796.48|
|     Eve Young|  933176.86|
| Ivy Rodriguez|  925742.64|
| Nina Robinson|  923627.53|
|   John Thomas|  922858.59|
|    Eve Taylor|  917620.11|
|   Henry Smith|  916273.48|
|    Mia Wilson|  914740.05|
|  Liam Johnson|  914266.79|
|   Alice Brown|  913262.30|
|    Bob Walker|  913202.74|
|Olivia Sanchez|  912017.65|
|  Amelia Moore|  911627.44|
+--------------+-----------+



In [6]:
# ============================================
# Final Step: Cleanup
# ============================================

spark.stop()