# EX3 -  Jonas Gstoettemayr

In [1]:
import rlcompleter, readline
readline.parse_and_bind("tab: complete")

In [2]:
%config IPCompleter.greedy=True

In [4]:
# ============================================
# 0. Imports & Spark session
# ============================================

import time
import builtins
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    avg,
    round as spark_round,   # Spark round ONLY for Columns
    count,
    col,
    sum as _sum,
    product
)

spark = (
    SparkSession.builder
    .appName("PostgresVsSparkBenchmark")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.2")
    .config("spark.eventLog.enabled", "true")
    .config("spark.eventLog.dir", "/tmp/spark-events")
    .config("spark.history.fs.logDirectory", "/tmp/spark-events")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

# ============================================
# 1. JDBC connection config
# ============================================

jdbc_url = "jdbc:postgresql://postgres:5432/postgres"
jdbc_props = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

# ============================================
# 2. Load data from PostgreSQL
# ============================================

print("\n=== Loading orders from PostgreSQL ===")

start = time.time()

df_order = spark.read.jdbc(
    url=jdbc_url,
    table="orders",
    properties=jdbc_props
)

# Force materialization
row_count = df_order.count()

print(f"Rows loaded: {row_count}")
print("Load time:", builtins.round(time.time() - start, 2), "seconds")

# Register temp view
df_order.createOrReplaceTempView("orders")

df_order.printSchema()

print("Query (d-safe) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# Query 1 -> A. What is the single item with the highest price_per_unit?
# ============================================

print("\n=== Query (1): Highest unit price ===")

start = time.time()

q_a = (
    df_order
    .orderBy("price_per_unit", ascending=False)
    .limit(3)
)

q_a.collect()
q_a.show(truncate=False)
print("Query (a) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# Query 2 -> B. What are the top 3 products with the highest total quantity sold across all orders?
# ============================================

print("\n=== Query (2): Top 3 products ===")

start = time.time()

q_b= (
    df_order
    .groupBy("product_category")
    .agg(_sum("quantity").alias("sum_quantity"))
    .orderBy("sum_quantity", ascending=False)
    .limit(3)
)

q_b.collect()
q_b.show(truncate=False)
print("Query (2) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# Query 3 -> C. What is the total revenue per product category?  
# ============================================

print("\n=== Query (3): Total revenue ===")

start = time.time()

q_c = (
    df_order
    .select(df_order.quantity, df_order.price_per_unit, df_order.product_category)
    .withColumn("revenue", col("quantity") * col("price_per_unit"))
    .groupBy("product_category")
    .agg(_sum("revenue"))
    .orderBy("product_category")
)

q_c.collect()
q_c.show(truncate=False)
print("Query (3) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# Query 4 -> D. Which customers have the highest total spending?
# ============================================

print("\n=== Query (4): Highest spending ===")

start = time.time()

q_c = spark.sql("""
select customer_name, sum(quantity * price_per_unit) as revenue
from orders
group by customer_name
order by revenue desc
limit 5;
""")

q_c.collect()
q_c.show(truncate=False)
print("Query (4) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# Cleanup
# ============================================

spark.stop()


=== Loading orders from PostgreSQL ===
Rows loaded: 1000000
Load time: 0.18 seconds
root
 |-- id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price_per_unit: decimal(18,2) (nullable = true)
 |-- order_date: date (nullable = true)
 |-- country: string (nullable = true)

Query (d-safe) time: 0.18 seconds

=== Query (1): Highest unit price ===
+------+-------------+----------------+--------+--------------+----------+-------+
|id    |customer_name|product_category|quantity|price_per_unit|order_date|country|
+------+-------------+----------------+--------+--------------+----------+-------+
|841292|Emma Brown   |Automotive      |3       |2000.00       |2024-10-11|Italy  |
|719717|Leo Smith    |Automotive      |3       |1999.97       |2024-01-19|Sweden |
|218300|Amir Wilson  |Automotive      |5       |1999.96       |2025-01-06|USA    |
+------+-------------+---------------