# Exercise 1

1. cd ecommerce
2. python3 dataset_generator.py
3. wc -l orders_1M.csv
4. Here I moved the csv file to data because for some reason it could not find it when it was in ecommerce
``` sql
DROP TABLE IF EXISTS orders;

CREATE TABLE orders (
  id SERIAL PRIMARY KEY,
  customer_name TEXT,
  product_category TEXT,
  quantity INTEGER,
  price_per_unit FLOAT,
  order_date DATE,
  country TEXT
);

\COPY orders(customer_name, product_category, quantity, price_per_unit, order_date, country)
FROM '/data/orders_1M.csv' DELIMITER ',' CSV HEADER;
```

### A. What is the single item with the highest price_per_unit?
``` sql 
SELECT product_category, price_per_unit
FROM orders
ORDER BY price_per_unit DESC
LIMIT 1;
```

| product_category | price_per_unit |
|----|----|
|Automotive|2000|

### B. What are the top 3 products with the highest total quantity sold across all orders?

``` sql
SELECT product_category, SUM(quantity) AS total_quantity_sold
FROM orders
GROUP BY product_category
ORDER BY total_quantity_sold DESC
LIMIT 3;
```
| product_category | total_quantity_sold |
|------------------|---------------------|
| Health & Beauty  |              300842|
| Electronics      |              300804|
| Toys             |              300598|


### C. What is the total revenue per product category? (Revenue = price_per_unit Ã— quantity)

``` sql
SELECT product_category, SUM(price_per_unit * quantity) AS total_revenue
FROM orders
GROUP BY product_category
ORDER BY total_revenue DESC;
```

| product_category |   total_revenue |   
|------------------|--------------------|
| Automotive       | 306589798.86000013|
| Electronics      | 241525009.44999987|
| Home & Garden    |  78023780.09000014|
| Sports           |  61848990.83000007|
| Health & Beauty  | 46599817.890000135|
| Office Supplies  |  38276061.63999981|
| Fashion          |  31566368.22000009|
| Toys             | 23271039.020000048|
| Grocery          |  15268355.66000005|
| Books            | 12731976.040000036|

### D. Which customers have the highest total spending?

``` sql
SELECT customer_name, SUM(price_per_unit * quantity) AS total_spent
FROM orders
GROUP BY customer_name
ORDER BY total_spent DESC
LIMIT 10;
```

| customer_name  |    total_spent  |  
|----------------|-------------------|
| Carol Taylor   | 991179.1800000002|
| Nina Lopez     | 975444.9500000002|
| Daniel Jackson |         959344.48|
| Carol Lewis    | 947708.5699999998|
| Daniel Young   | 946030.1400000001|
| Alice Martinez | 935100.0200000006|
| Ethan Perez    | 934841.2400000001|
| Leo Lee        | 934796.4799999995|
| Eve Young      | 933176.8600000003|
| Ivy Rodriguez  | 925742.6400000006|

# Exercise 2

The first big issue is with the query itself -

``` sql
SELECT COUNT(*)
FROM people_big p1
JOIN people_big p2
  ON p1.country = p2.country;
```

What this query does is self join the table which explodes its size and then counts the amount of rows. Example: 

|name|country|
|--|--|
|Jonathan S.| UK|
|Martin B.| UK|
|Pat G.| Latvia|
|Andrej M.| Latvia|

After join -

|name 1|name 2| country |
|--|--|--|
|Jonathan S.| Martin B.|UK|
|Jonathan S.| Jonathan S.|UK|
|Martin B.|Jonathan S.| UK|
|Martin B.| Martin B.|UK|
|Pat G.| Andrej M.|Latvia|
|Pat G.| Pat G.|Latvia|
|Andrej M.| Pat G.|Latvia|
|Andrej M.| Andrej M.|Latvia|

This is completely unnecessary and can be done more nicely and cleverly -

``` sql
SELECT SUM(count * count)
FROM (
  SELECT country, COUNT(*) AS count
  FROM people_big
  GROUP BY country
) temp;
```

This has the same output except it avoids the costly self join and simply counts the amount of people from each country, squares them (same as the join creating pairs) abd then sums those up. <br>

Of course it is not really possibe to enforce smart query making rules on users. But what is possible is limiting the amount of time a query can execute for to avoid some badly written queries. <br>

Apart from that usual solutions like indexing or views would not help too much because the main problem is that this query is an analytical one - not what OLTP is optimised for. OLTP is optimised for many small simple concurrent queries. OLAP on the other hand is optimised for infrequent but complex queries. So a good idea in this case would be to switch to an OLAP database.

# Exercise 3

In [1]:
#import os
#os.makedirs("/tmp/spark-events", exist_ok=True)

In [2]:
# ============================================
# 0. Imports & Spark session
# ============================================

import time
import builtins  # <-- IMPORTANT
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    avg,
    round as spark_round,   # Spark round ONLY for Columns
    count,
    col,
    sum as _sum
)

spark = (
    SparkSession.builder
    .appName("PostgresVsSparkBenchmark")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.2")
    .config("spark.eventLog.enabled", "true")
    .config("spark.eventLog.dir", "/tmp/spark-events")
    .config("spark.history.fs.logDirectory", "/tmp/spark-events")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

# ============================================
# 1. JDBC connection config
# ============================================

jdbc_url = "jdbc:postgresql://pg-bigdata:5432/postgres"
jdbc_props = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

# ============================================
# 2. Load data from PostgreSQL
# ============================================

print("\n=== Loading people_big from PostgreSQL ===")

start = time.time()

df_big = spark.read.jdbc(
    url=jdbc_url,
    table="people_big",
    properties=jdbc_props
)

# Force materialization
row_count = df_big.count()

print(f"Rows loaded: {row_count}")
print("Load time:", builtins.round(time.time() - start, 2), "seconds")

# Register temp view
df_big.createOrReplaceTempView("people_big")

# ============================================
# 3. Query (a): Simple aggregation
# ============================================

print("\n=== Query (a): AVG salary per department ===")

start = time.time()

q_a = (
    df_big
    .groupBy("department")
    .agg(spark_round(avg("salary"), 2).alias("avg_salary"))
    .orderBy("department", ascending=False)
    .limit(10)
)

q_a.collect()
q_a.show(truncate=False)
print("Query (a) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 4. Query (b): Nested aggregation
# ============================================

print("\n=== Query (b): Nested aggregation ===")

start = time.time()

q_b = spark.sql("""
SELECT country, AVG(avg_salary) AS avg_salary
FROM (
    SELECT country, department, AVG(salary) AS avg_salary
    FROM people_big
    GROUP BY country, department
) sub
GROUP BY country
ORDER BY avg_salary DESC
LIMIT 10
""")

q_b.collect()
q_b.show(truncate=False)
print("Query (b) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 5. Query (c): Sorting + Top-N
# ============================================

print("\n=== Query (c): Top 10 salaries ===")

start = time.time()

q_c = (
    df_big
    .orderBy(col("salary").desc())
    .limit(10)
)

q_c.collect()
q_c.show(truncate=False)
print("Query (c) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 6. Query (d): Heavy self-join (COUNT only)
# ============================================

print("\n=== Query (d): Heavy self-join COUNT (DANGEROUS) ===")

start = time.time()

q_d = (
    df_big.alias("p1")
    .join(df_big.alias("p2"), on="country")
    .count()
)

print("Join count:", q_d)
print("Query (d) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 7. Query (d-safe): Join-equivalent rewrite
# ============================================

print("\n=== Query (d-safe): Join-equivalent rewrite ===")

start = time.time()

grouped = df_big.groupBy("country").agg(count("*").alias("cnt"))

q_d_safe = grouped.select(
    _sum(col("cnt") * col("cnt")).alias("total_pairs")
)

q_d_safe.collect()
q_d_safe.show()
print("Query (d-safe) time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 8. Cleanup
# ============================================

spark.stop()



=== Loading people_big from PostgreSQL ===
Rows loaded: 1000000
Load time: 25.77 seconds

=== Query (a): AVG salary per department ===
+------------------+----------+
|department        |avg_salary|
+------------------+----------+
|Workforce Planning|85090.82  |
|Web Development   |84814.36  |
|UX Design         |84821.2   |
|UI Design         |85164.64  |
|Treasury          |84783.27  |
|Training          |85148.1   |
|Tax               |85018.57  |
|Sustainability    |85178.99  |
|Supply Chain      |84952.89  |
|Subscriptions     |84899.19  |
+------------------+----------+

Query (a) time: 18.29 seconds

=== Query (b): Nested aggregation ===
+------------+-----------------+
|country     |avg_salary       |
+------------+-----------------+
|Egypt       |87382.229633112  |
|Kuwait      |87349.3517377211 |
|Saudi Arabia|87348.80512175433|
|Panama      |87345.00623707911|
|Denmark     |87328.03514120901|
|Jamaica     |87305.437352083  |
|Lebanon     |87292.76891750695|
|Turkey      |87

## 1. What the Spark code does

### 1. Spark session initialisation
- Starting a Spark driver
- Creating a local environment
- Configuring JDBC connection

### 2. Loading data from Postgres
- Connecting to PostgreSQL
- Reading the table
- Converting to Spark dataframe
- Stores it in memory
- Creates a temporary view which allows to use SQL-style queries

After this step PostgreSQL is no longer involved in query execution. All queries run inside Spark

### 3. Executing queries
#### Query (a): Simple aggregation
Eqivalent to 
``` sql
SELECT department, AVG(salary)
FROM people_big
GROUP BY department;
```

Parallel aggregation executed across worker threads

#### Query (b): Nested aggregation
This already uses an SQL-style query. I demonstrates multi-stage aggregation with intermediate result sets and Spark's ability to pipeline more complex queries.


#### Query (c): Sorting + Top-N
This is a global sort which is a shuffle-heavy operation. Spark can still parallelise it


#### Query (d): Heavy self-join (COUNT only)
This is the same quadratic explosion that was in the 2nd exercise. Spark can run this but it's still a better idea not to

#### Query (d-safe): Join-equivalent rewrite
Same result as previous but avoiding the join. Has a way lower complexity 


## 2. Architectural contrasts with PostgreSQL
|Feature|PostgreSQL|Spark|
|----|----|----|
|Architecture|Single-node (by default)|Distributed|
|Scaling|Vertical|Horizontal|
|Execution|Row-based|Partition-based|
|Memory usage|Limited|In-memory first|
|Best for|OLTP|OLAP / Analytics|

## 3. Advantages and limitations
### Advantages
- Scalability (Adding more nodes - more compute, can handle billions of rows)
- Performance (In-memory computation, parallel aggregations, efficient shuffles)

### Limitations
- Overhead (Startup time, not ideal for small datasets)
- Not OLTP(No transactions, indexes and constraints)

## 4. Relation to Exercise 2
### Exercise 2 showed that -
- Naive analytical queries do not work well with OLTP databases at all
- Usual solutions like views or idexes can helpa bit but are not enough

### Exercise 3 shows that -
- Distributed engines like spark can handle it better but still are not ideal
- The best way to counter bad query desin is to rewrite it

# Exercise 4

In [4]:
spark = (
    SparkSession.builder
    .appName("PostgresVsSparkBenchmark")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.2")
    .config("spark.eventLog.enabled", "true")
    .config("spark.eventLog.dir", "/tmp/spark-events")
    .config("spark.history.fs.logDirectory", "/tmp/spark-events")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

jdbc_url = "jdbc:postgresql://postgres:5432/postgres"
jdbc_props = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

print("\n=== Loading orders from PostgreSQL ===")

start = time.time()

df_big = spark.read.jdbc(
    url=jdbc_url,
    table="orders",
    properties=jdbc_props
)

# Force materialization
row_count = df_big.count()

print(f"Rows loaded: {row_count}")
print("Load time:", builtins.round(time.time() - start, 2), "seconds")

# Register temp view
df_big.createOrReplaceTempView("orders")

# ============================================
# 1. Query A
# ============================================

print("\n=== Query A ===")

start = time.time()

q_b = spark.sql("""
SELECT product_category, price_per_unit
FROM orders
ORDER BY price_per_unit DESC
LIMIT 1;
""")

q_b.collect()
q_b.show(truncate=False)
print("Query A time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 2. Query B
# ============================================

print("\n=== Query B ===")

start = time.time()

q_b = spark.sql("""
SELECT product_category, SUM(quantity) AS total_quantity_sold
FROM orders
GROUP BY product_category
ORDER BY total_quantity_sold DESC
LIMIT 3;
""")

q_b.collect()
q_b.show(truncate=False)
print("Query B time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 3. Query C
# ============================================

print("\n=== Query C ===")

start = time.time()

q_b = spark.sql("""
SELECT product_category, SUM(price_per_unit * quantity) AS total_revenue
FROM orders
GROUP BY product_category
ORDER BY total_revenue DESC;
""")

q_b.collect()
q_b.show(truncate=False)
print("Query C time:", builtins.round(time.time() - start, 2), "seconds")

# ============================================
# 4. Query D
# ============================================

print("\n=== Query D ===")

start = time.time()

q_b = spark.sql("""
SELECT customer_name, SUM(price_per_unit * quantity) AS total_spent
FROM orders
GROUP BY customer_name
ORDER BY total_spent DESC
LIMIT 10;
""")

q_b.collect()
q_b.show(truncate=False)
print("Query D time:", builtins.round(time.time() - start, 2), "seconds")


# ============================================

spark.stop()


=== Loading orders from PostgreSQL ===
Rows loaded: 1000000
Load time: 3.14 seconds

=== Query A ===
+----------------+--------------+
|product_category|price_per_unit|
+----------------+--------------+
|Automotive      |2000.0        |
+----------------+--------------+

Query A time: 8.09 seconds

=== Query B ===
+----------------+-------------------+
|product_category|total_quantity_sold|
+----------------+-------------------+
|Health & Beauty |300842             |
|Electronics     |300804             |
|Toys            |300598             |
+----------------+-------------------+

Query B time: 5.17 seconds

=== Query C ===
+----------------+--------------------+
|product_category|total_revenue       |
+----------------+--------------------+
|Automotive      |3.065897988599943E8 |
|Electronics     |2.4152500945000267E8|
|Home & Garden   |7.80237800900001E7  |
|Sports          |6.1848990830000326E7|
|Health & Beauty |4.65998178900003E7  |
|Office Supplies |3.8276061640000574E7|
|Fash