In [1]:
# ============================================
# 0. Imports & Spark session
# ============================================

import time
import builtins  # <-- IMPORTANT
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    sum as _sum,
    col,
    desc
)

spark = (
    SparkSession.builder
    .appName("EcommerceQueriesSpark")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.2")
    .config("spark.eventLog.enabled", "true")
    .config("spark.eventLog.dir", "/tmp/spark-events")
    .config("spark.history.fs.logDirectory", "/tmp/spark-events")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

In [2]:
# ============================================
# 1. JDBC connection config
# ============================================

jdbc_url = "jdbc:postgresql://postgres:5432/postgres"
jdbc_props = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

In [3]:
# ============================================
# 2. Load orders_big from PostgreSQL
# ============================================

print("\n=== Loading orders_big from PostgreSQL ===")

start = time.time()

df_orders = spark.read.jdbc(
    url=jdbc_url,
    table="orders_big",
    properties=jdbc_props
)

# Force materialization
row_count = df_orders.count()

print(f"Rows loaded: {row_count}")
print("Load time:", builtins.round(time.time() - start, 2), "seconds")

# Register temp view
df_orders.createOrReplaceTempView("orders_big")


=== Loading orders_big from PostgreSQL ===
Rows loaded: 1000000
Load time: 1.18 seconds


In [4]:
# ============================================
# 3. Query A: Single item with highest price_per_unit
# ============================================

print("\n=== Query A: Highest price_per_unit ===")

start = time.time()

q_a = (
    df_orders
    .orderBy(col("price_per_unit").desc())
    .limit(1)
)

q_a.show(truncate=False)
print("Query A time:", builtins.round(time.time() - start, 2), "seconds")


=== Query A: Highest price_per_unit ===
+------+-------------+----------------+--------+--------------+----------+-------+
|id    |customer_name|product_category|quantity|price_per_unit|order_date|country|
+------+-------------+----------------+--------+--------------+----------+-------+
|841292|Emma Brown   |Automotive      |3       |2000.00       |2024-10-11|Italy  |
+------+-------------+----------------+--------+--------------+----------+-------+

Query A time: 1.38 seconds


In [5]:
# ============================================
# 4. Query B: Top 3 product categories by total quantity
# ============================================

print("\n=== Query B: Top 3 categories by quantity ===")

start = time.time()

q_b = (
    df_orders
    .groupBy("product_category")
    .agg(_sum("quantity").alias("total_quantity"))
    .orderBy(desc("total_quantity"))
    .limit(3)
)

q_b.show(truncate=False)
print("Query B time:", builtins.round(time.time() - start, 2), "seconds")


=== Query B: Top 3 categories by quantity ===
+----------------+--------------+
|product_category|total_quantity|
+----------------+--------------+
|Health & Beauty |300842        |
|Electronics     |300804        |
|Toys            |300598        |
+----------------+--------------+

Query B time: 0.45 seconds


In [6]:
# ============================================
# 5. Query C: Total revenue per product category
# ============================================

print("\n=== Query C: Revenue per category ===")

start = time.time()

q_c = (
    df_orders
    .withColumn("revenue", col("price_per_unit") * col("quantity"))
    .groupBy("product_category")
    .agg(_sum("revenue").alias("total_revenue"))
    .orderBy(desc("total_revenue"))
)

q_c.show(truncate=False)
print("Query C time:", builtins.round(time.time() - start, 2), "seconds")


=== Query C: Revenue per category ===
+----------------+-------------+
|product_category|total_revenue|
+----------------+-------------+
|Automotive      |306589798.86 |
|Electronics     |241525009.45 |
|Home & Garden   |78023780.09  |
|Sports          |61848990.83  |
|Health & Beauty |46599817.89  |
|Office Supplies |38276061.64  |
|Fashion         |31566368.22  |
|Toys            |23271039.02  |
|Grocery         |15268355.66  |
|Books           |12731976.04  |
+----------------+-------------+

Query C time: 0.59 seconds


In [7]:
# ============================================
# 6. Query D: Top 10 customers by total spending
# ============================================

print("\n=== Query D: Top 10 customers by spending ===")

start = time.time()

q_d = (
    df_orders
    .withColumn("spending", col("price_per_unit") * col("quantity"))
    .groupBy("customer_name")
    .agg(_sum("spending").alias("total_spending"))
    .orderBy(desc("total_spending"))
    .limit(10)
)

q_d.show(truncate=False)
print("Query D time:", builtins.round(time.time() - start, 2), "seconds")


=== Query D: Top 10 customers by spending ===
+--------------+--------------+
|customer_name |total_spending|
+--------------+--------------+
|Carol Taylor  |991179.18     |
|Nina Lopez    |975444.95     |
|Daniel Jackson|959344.48     |
|Carol Lewis   |947708.57     |
|Daniel Young  |946030.14     |
|Alice Martinez|935100.02     |
|Ethan Perez   |934841.24     |
|Leo Lee       |934796.48     |
|Eve Young     |933176.86     |
|Ivy Rodriguez |925742.64     |
+--------------+--------------+

Query D time: 0.54 seconds


In [8]:
# ============================================
# 7. Cleanup
# ============================================

spark.stop()