# PySpark ETL

L'obiettivo di questo Notebook e' mostrare l'utilizzo di PySpark per l'analisi distribuita di Big Data, dall'introduzione all'architettura fino a pipeline ETL complete con Delta Lake.

- **Dataset**: 100M transazioni e-commerce
- **Storage**: MinIO (S3-compatible)
- **Output**: Dati ML-ready in Delta Lake per Notebook 4, dati Avro per Notebook 5 (Streaming)

# Setup & Environment

Configurazione ambiente Spark con Delta Lake support.

1. Importare librerie necessarie
2. Inizializzare SparkSession con configurazione MinIO
3. Definire helper functions
4. Verificare connessione Delta Lake

### Configurazione Spark
- **App Name**: Notebook-03-PySpark
- **Delta Lake**: Abilitato (2.4.0)
- **MinIO**: Configurato come S3-compatible storage
- **Driver Memory**: 10GB (local mode, unica JVM)
- **Spark UI**: http://localhost:4040

### MinIO Storage
- **MinIO Console**: http://localhost:9001
- **User/Password**: minioadmin

In [1]:
# IMPORTS

import os
import sys
import time
import pandas as pd
from pathlib import Path
#from datetime import datetime
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Add project root to path
project_root = Path.cwd().parent if "notebooks" in str(Path.cwd()) else Path.cwd()
sys.path.append(str(project_root))

# Import configurations
from config.spark_config import get_spark_session
from config.minio_config import get_s3a_path, BUCKET_NAME

print("All imports successful!")

All imports successful!


In [2]:
# PATHS CONFIGURATION

# Output directories
RESULTS_DIR = project_root / "docs" / "results"
GRAPH_DIR = project_root / "docs" / "graphs"
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
GRAPH_DIR.mkdir(parents=True, exist_ok=True)

# MinIO paths  
transactions_path = get_s3a_path("raw/", "transactions")
customers_path = get_s3a_path("raw/", "customers.parquet")
products_path = get_s3a_path("raw/", "products.parquet")
als_delta_path = get_s3a_path("ml_data/", "user_item_interactions")
rf_delta_path = get_s3a_path("ml_data/", "customer_features")
avro_output = get_s3a_path("streaming/source/", "transactions_avro")

print("MinIO paths configured:")
print(f"  Transactions: {transactions_path}")
print(f"  Customers: {customers_path}")
print(f"  Products: {products_path}")
print(f"  User-Item Interactions for ML: {als_delta_path}")
print(f"  Customer Features for ML: {rf_delta_path}")
print(f"  Streaming Source: {avro_output}")

MinIO paths configured:
  Transactions: s3a://bigdata-ecommerce/raw/transactions
  Customers: s3a://bigdata-ecommerce/raw/customers.parquet
  Products: s3a://bigdata-ecommerce/raw/products.parquet
  User-Item Interactions for ML: s3a://bigdata-ecommerce/ml_data/user_item_interactions
  Customer Features for ML: s3a://bigdata-ecommerce/ml_data/customer_features
  Streaming Source: s3a://bigdata-ecommerce/streaming/source/transactions_avro


In [3]:
# HELPER FUNCTIONS

def timer_decorator(func):
    """
    Decorator to measure execution time of functions.

    Args:
        func: Function to time.

    Returns:
        Wrapped function with timing output.
    """
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start_time
        print(f"\n    {func.__name__} completed in {duration:.2f} seconds")
        return result, duration
    return wrapper

def count_with_time(df, description="Operation"):
    """
    Count DataFrame rows with timing.

    Args:
        df: Spark DataFrame.
        description: Operation description for display.

    Returns:
        tuple: (count, duration_seconds)
    """
    start = time.time()
    n = df.count()
    duration = time.time() - start
    print(f"    {description}: {n:,} rows in {duration:.2f}s")
    return n, duration

def show_partitions(df, name="DataFrame"):
    """
    Display partition information.

    Args:
        df: Spark DataFrame.
        name: DataFrame name for display.

    Returns:
        int: Number of partitions.
    """
    num_partitions = df.rdd.getNumPartitions()
    print(f"{name} partitions: {num_partitions}")
    return num_partitions

def save_results(data, filename):
    """
    Save results to CSV in docs/results/.

    Args:
        data: Dictionary or pandas DataFrame.
        filename: Output filename.
    """
    filepath = RESULTS_DIR / filename
    if isinstance(data, dict):
        pd.DataFrame([data]).to_csv(filepath, index=False)
    else:
        data.to_csv(filepath, index=False)
    print(f"Saved: {filename}")

print("Helper functions defined!")

Helper functions defined!


In [5]:
# INITIALIZE SPARK SESSION

os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

spark = get_spark_session(
    app_name="Notebook-03-PySpark",
    enable_delta=True
)

# Display key configuration
print("\nKey Spark Configurations:")
conf = spark.sparkContext.getConf()
important_configs = [
    "spark.app.name",
    "spark.driver.memory",
    "spark.sql.shuffle.partitions",
    "spark.default.parallelism",
    "spark.memory.fraction",
    "spark.sql.adaptive.enabled"
]
for config in important_configs:
    print(f"  {config}: {conf.get(config, 'Not set')}")

Found 5 JAR files
Spark Session created: Notebook-03-PySpark
Spark Version: 3.4.1
Spark UI: http://localhost:4040
Delta Lake + MinIO: Enabled

Key Spark Configurations:
  spark.app.name: Notebook-03-PySpark
  spark.driver.memory: 10g
  spark.sql.shuffle.partitions: 24
  spark.default.parallelism: 12
  spark.memory.fraction: 0.6
  spark.sql.adaptive.enabled: true


## MinIO Storage Screenshots

**MinIO Data**

<img src="./screenshots/03_minio_data_1.png" alt="MinIo Data" >

**Partizioni Transaction**

<img src="./screenshots/03_minio_data_2.png" alt="MinIo Data Transaction" >

# Architettura PySpark

PySpark utilizza la **lazy evaluation**: le trasformazioni non vengono eseguite immediatamente ma vengono registrate in un **DAG** (Directed Acyclic Graph). Il **Catalyst Optimizer** analizza il grafo per trovare il piano di esecuzione piu' efficiente, che viene avviato solo quando si invoca un'action.

**Transformations**

Creano nuovi DataFrame senza eseguire calcoli immediati. Si dividono in due categorie:

1. **Narrow transformations**: operano su singole partizioni, non richiedono shuffle e network
    - `select()`, `filter()`, `withColumn()`

2. **Wide transformations**: riorganizzano dati tra partizioni, richiedono shuffle e network
    - `groupBy()`, `join()`, `orderBy()`

**Actions**

Triggherano l'esecuzione del piano computazionale e ritornano risultati concreti all'utente.
- `count()`, `collect()`, `show()`


Due principi fondamentali caratterizzano PySpark:

1. **Immutability**: ogni DataFrame e' immutabile e ogni trasformazione genera un nuovo DataFrame

2. **Distributed Computing**: partiziona i dati su piu' executors permettendo l'esecuzione parallela delle operazioni

In [6]:
# LOAD DATA

print("=" * 70)
print("LOAD DATA")
print("=" * 70 + "\n")

customers = spark.read.parquet(customers_path)
products = spark.read.parquet(products_path)
transactions = spark.read.parquet(transactions_path)

print(f"Customers: {customers.count():,} rows")
print(f"Products: {products.count():,} rows")
print(f"Transactions: {transactions.count():,} rows")

LOAD DATA

Customers: 1,000,000 rows
Products: 50,000 rows
Transactions: 100,000,000 rows


In [7]:
# LAZY EVALUATION DEMO

print("=" * 70)
print("LAZY EVALUATION DEMO")
print("=" * 70 + "\n")

# Create transformation (no execution yet)
print("1. Creating transformation (lazy)...")
filtered_transactions = transactions.filter(F.col("final_amount") > 1000)
print("   Transformation defined (no execution yet)")

# Trigger action (execution happens now)
print("\n2. Triggering action (execution happens now)...")
start = time.time()
result_count = filtered_transactions.count()
duration = time.time() - start

print(f"   Executed! Found {result_count:,} high-value transactions")
print(f"   Execution time: {duration:.2f}s")

LAZY EVALUATION DEMO

1. Creating transformation (lazy)...
   Transformation defined (no execution yet)

2. Triggering action (execution happens now)...
   Executed! Found 8,233,328 high-value transactions
   Execution time: 3.79s


In [8]:
# TRANSFORMATION & ACTIONS EXAMPLES

print("=" * 70)
print("TRANSFORMATION EXAMPLES")
print("=" * 70 + "\n")

# 1. Select
print("1. SELECT transformation:")
customer_basics = customers.select("customer_id", "name", "customer_segment")
print("   Selected 3 columns from customers")

# 2. Filter
print("\n2. FILTER transformation:")
vip_customers = customers.filter(F.col("customer_segment") == "VIP")
print("   Filtered VIP customers (lazy)")

# 3. WithColumn
print("\n3. WITHCOLUMN transformation:")
transactions_with_margin = transactions.withColumn(
    "margin_pct",
    F.round((F.col("final_amount") / F.col("total_amount")) * 100, 2)
)
print("   Added margin_pct column (lazy)")

# 4. GroupBy (transformation, not action!)
print("\n4. GROUPBY transformation (with agg):")
segment_stats = customers.groupBy("customer_segment").agg(
    F.count("*").alias("count"),
    F.avg("age").alias("avg_age")
)
print("   Grouped by segment (lazy)")

print("\n" + "=" * 70)
print("ACTIONS EXAMPLES")
print("=" * 70 + "\n")

# 1. Count
print("1. COUNT action:")
start = time.time()
vip_count = vip_customers.count()
duration = time.time() - start
print(f"   VIP customers: {vip_count:,} (executed in {duration:.2f}s)")

# 2. Show
print("\n2. SHOW action:")
print("   First 5 VIP customers:")
vip_customers.select("customer_id", "name", "age").show(5)

# 3. Collect
print("\n3. COLLECT action:")
top_3_vip = vip_customers.select("customer_id", "name").limit(3).collect()
print(f"   Collected {len(top_3_vip)} rows to driver")
for row in top_3_vip:
    print(f"   - {row['customer_id']}: {row['name']}")

TRANSFORMATION EXAMPLES

1. SELECT transformation:
   Selected 3 columns from customers

2. FILTER transformation:
   Filtered VIP customers (lazy)

3. WITHCOLUMN transformation:
   Added margin_pct column (lazy)

4. GROUPBY transformation (with agg):
   Grouped by segment (lazy)

ACTIONS EXAMPLES

1. COUNT action:
   VIP customers: 50,000 (executed in 0.20s)

2. SHOW action:
   First 5 VIP customers:
+-----------+--------------------+---+
|customer_id|                name|age|
+-----------+--------------------+---+
|  C00000017|       Aria Giannini| 26|
|  C00000032|Gelsomina Renzi-P...| 26|
|  C00000042|  Sig.ra Michela Emo| 63|
|  C00000062|  Gustavo Spanevello| 31|
|  C00000066|Sig.ra Lara Randazzo| 67|
+-----------+--------------------+---+
only showing top 5 rows


3. COLLECT action:
   Collected 3 rows to driver
   - C00000017: Aria Giannini
   - C00000032: Gelsomina Renzi-Piane
   - C00000042: Sig.ra Michela Emo


In [9]:
# DATAFRAME OPERATIONS

print("=" * 70)
print("DATAFRAME OPERATIONS")
print("=" * 70 + "\n")

# Basic info
print("1. DataFrame Info:")
print(f"   Columns: {len(customers.columns)}")
print(f"   Column names: {customers.columns}")
print(f"   Partitions: {customers.rdd.getNumPartitions()}")

# Schema
print("\n2. Schema:")
customers.printSchema()

# Describe
print("\n3. Describe (summary statistics):")
customers.select("age").describe().show()

# Distinct
print("\n4. Distinct values:")
segments = customers.select("customer_segment").distinct()
print("   Customer segments:")
segments.show()

DATAFRAME OPERATIONS

1. DataFrame Info:
   Columns: 13
   Column names: ['customer_id', 'name', 'email', 'phone', 'address', 'city', 'region', 'country', 'postal_code', 'registration_date', 'customer_segment', 'age', 'gender']
   Partitions: 12

2. Schema:
root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- customer_segment: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)


3. Describe (summary statistics):
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|           1000000|
|   mean|         46.506383|
| stddev|16.733604371767242|
|    min|    

In [10]:
# QUERY EXECUTION PLAN

print("=" * 70)
print("QUERY EXECUTION PLAN")
print("=" * 70 + "\n")

# Complex chain of transformations
result = (
    transactions
    .filter(F.col("status") == "completed")
    .filter(F.col("final_amount") > 500)
    .select(
        "transaction_id",
        "customer_id",
        "final_amount",
        "transaction_date"
    )
    .withColumn("high_value", F.when(F.col("final_amount") > 1000, "Yes").otherwise("No"))
    .orderBy(F.desc("final_amount"))
    .limit(10)
)

print("Chained operations:")
print("  1. Filter completed transactions")
print("  2. Filter amount > 500")
print("  3. Select specific columns")
print("  4. Add high_value column")
print("  5. Order by amount")
print("  6. Limit to top 10")

print("\nResult:")
result.show()

print("\nSpark optimizes this chain before execution (Catalyst optimizer)")

# Show physical plan
print("\nPHYSICAL PLAN (what to do):")
print("-" * 70)
result.explain(extended=False)

# Show logical plan
print("\nLOGICAL PLAN (how to do it):")
print("-" * 70)
result.explain(extended=True)

QUERY EXECUTION PLAN

Chained operations:
  1. Filter completed transactions
  2. Filter amount > 500
  3. Select specific columns
  4. Add high_value column
  5. Order by amount
  6. Limit to top 10

Result:
+--------------+-----------+------------+----------------+----------+
|transaction_id|customer_id|final_amount|transaction_date|high_value|
+--------------+-----------+------------+----------------+----------+
|   T0098657043|  C00472985|     8997.85|      2024-04-29|       Yes|
|   T0025697854|  C00291574|     8994.05|      2024-09-18|       Yes|
|   T0075933682|  C00968650|     8991.83|      2025-07-31|       Yes|
|   T0088081874|  C00633627|     8982.91|      2025-03-26|       Yes|
|   T0084912906|  C00797292|     8979.11|      2025-10-28|       Yes|
|   T0077918434|  C00794352|     8975.02|      2025-08-03|       Yes|
|   T0057242121|  C00663478|     8974.71|      2024-03-19|       Yes|
|   T0030245510|  C00101347|     8964.92|      2024-12-09|       Yes|
|   T0000997695|  C00

# Spark SQL & Joins

Spark SQL permette di eseguire query SQL distribuite su DataFrame con ottimizzazione automatica tramite Catalyst.

**Temporary Views** - per registrare DataFrame come tabelle SQL:
- **Temp View**: disponibile solo nella SparkSession corrente, si cancella al termine della sessione
- **Global Temp View**: accessibile a tutte le sessioni Spark nell'applicazione, utile per condividere dati tra notebook

**Operazioni JOIN**:
- Supporta tutte le tipologie standard: `inner`, `left`, `right`, `outer`
- Join standard richiedono shuffle (riorganizzazione dati tra partizioni) ed e' dunque lenta

**Broadcast Join Optimization**:
Quando una tabella e' piccola (< 10MB default), Spark invia una copia completa della tabella a tutti gli executors. Il join avviene localmente su ogni executor senza shuffle della tabella grande, risultando molto piu' veloce. Evitare di utilizzarlo con tabelle >100MB (rischio OOM).

In [11]:
# SQL QUERIES

print("=" * 70)
print("REGISTERING TEMPORARY VIEWS")
print("=" * 70 + "\n")

transactions.createOrReplaceTempView("transactions")
customers.createOrReplaceTempView("customers")
products.createOrReplaceTempView("products")

print("Registered views:")
print("   - transactions")
print("   - customers")
print("   - products")

print("\nAvailable tables:")
spark.sql("SHOW TABLES").show()

print("\n" + "=" * 70)
print("SIMPLE SQL QUERIES")
print("=" * 70 + "\n")

# Query 1: SELECT with WHERE
print("1. Top 10 high-value transactions:")
query1 = """
SELECT
    transaction_id,
    customer_id,
    final_amount,
    transaction_date
FROM transactions
WHERE status = 'completed' AND final_amount > 1000
ORDER BY final_amount DESC
LIMIT 10
"""
result1 = spark.sql(query1)
result1.show()

# Query 2: Aggregation
print("\n2. Transactions by status:")
query2 = """
SELECT
    status,
    COUNT(*) as count,
    SUM(final_amount) as total_revenue,
    AVG(final_amount) as avg_amount
FROM transactions
GROUP BY status
ORDER BY total_revenue DESC
"""
result2 = spark.sql(query2)
result2.show()


print("\n" + "=" * 70)
print("GROUPBY QUERIES")
print("=" * 70 + "\n")

# Customer segments analysis
print("1. Analysis by customer segment:")
query3 = """
SELECT
    customer_segment,
    COUNT(*) as num_customers,
    AVG(age) as avg_age,
    COUNT(DISTINCT region) as num_regions
FROM customers
GROUP BY customer_segment
ORDER BY num_customers DESC
"""
result3 = spark.sql(query3)
result3.show()

# Product categories
print("\n2. Sales by product category:")
query4 = """
SELECT
    p.category,
    COUNT(DISTINCT t.transaction_id) as num_transactions,
    SUM(t.final_amount) as total_revenue,
    AVG(t.final_amount) as avg_transaction_value
FROM transactions t
JOIN products p ON t.product_id = p.product_id
WHERE t.status = 'completed'
GROUP BY p.category
ORDER BY total_revenue DESC
LIMIT 10
"""
result4 = spark.sql(query4)
result4.show()

REGISTERING TEMPORARY VIEWS

Registered views:
   - transactions
   - customers
   - products

Available tables:
+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|         |   customers|      false|
|         |    products|      false|
|         |transactions|      false|
+---------+------------+-----------+


SIMPLE SQL QUERIES

1. Top 10 high-value transactions:
+--------------+-----------+------------+----------------+
|transaction_id|customer_id|final_amount|transaction_date|
+--------------+-----------+------------+----------------+
|   T0098657043|  C00472985|     8997.85|      2024-04-29|
|   T0025697854|  C00291574|     8994.05|      2024-09-18|
|   T0075933682|  C00968650|     8991.83|      2025-07-31|
|   T0088081874|  C00633627|     8982.91|      2025-03-26|
|   T0084912906|  C00797292|     8979.11|      2025-10-28|
|   T0077918434|  C00794352|     8975.02|      2025-08-03|
|   T0057242121|  C00663478|     8974.71

In [12]:
# JOIN OPERATIONS

# Inner Join
print("=" * 70)
print("INNER JOIN")
print("=" * 70 + "\n")

query5 = """
SELECT
    t.transaction_id,
    t.final_amount,
    c.name as customer_name,
    c.customer_segment,
    c.region
FROM transactions t
INNER JOIN customers c ON t.customer_id = c.customer_id
WHERE t.final_amount > 100
LIMIT 10
"""
result5 = spark.sql(query5)
print("Transactions with customer information:")
result5.show()

# Left Join
print("\n" + "=" * 70)
print("LEFT JOIN")
print("=" * 70 + "\n")

query6 = """
SELECT
    c.customer_id,
    c.name,
    c.customer_segment,
    COUNT(t.transaction_id) as num_transactions,
    COALESCE(SUM(t.final_amount), 0) as total_spent
FROM customers c
LEFT JOIN transactions t ON c.customer_id = t.customer_id
    AND t.status = 'completed'
GROUP BY c.customer_id, c.name, c.customer_segment
ORDER BY total_spent DESC
LIMIT 20
"""
result6 = spark.sql(query6)
print("Top 20 customers by total spent:")
result6.show()

# Broadcast Join Optimization
print("\n" + "=" * 70)
print("BROADCAST JOIN OPTIMIZATION")
print("=" * 70 + "\n")

print(f"Products table size: {products.count():,} rows")
print(f"Transactions table size: {transactions.count():,} rows")

# Standard join (without broadcast)
print("\n1. Standard Join:")
standard_join = transactions.join(
    products,
    transactions.product_id == products.product_id,
    "inner"
).select(
    transactions.transaction_id,
    transactions.final_amount,
    products.product_name,
    products.category
)

start = time.time()
standard_join.show(10)
standard_time = time.time() - start

# Broadcast join
print("\n2. Broadcast Join:")
broadcast_join = transactions.join(
    F.broadcast(products),
    transactions.product_id == products.product_id,
    "inner"
).select(
    transactions.transaction_id,
    transactions.final_amount,
    products.product_name,
    products.category
)

start = time.time()
broadcast_join.show(10)
broadcast_time = time.time() - start

# Compare
speedup = (standard_time / broadcast_time) if broadcast_time > 0 else 1
print(f"\nBroadcast join speedup: {speedup:.2f}x")
print(f"   Standard join: {standard_time:.2f}s")
print(f"   Broadcast join: {broadcast_time:.2f}s")

# Save comparison
join_comparison = pd.DataFrame({
    "Join_Type": ["Standard", "Broadcast"],
    "Time_s": [standard_time, broadcast_time]
})
save_results(join_comparison, "03_join_comparison.csv")

INNER JOIN

Transactions with customer information:
+--------------+------------+--------------------+----------------+-------+
|transaction_id|final_amount|       customer_name|customer_segment| region|
+--------------+------------+--------------------+----------------+-------+
|   T0032651422|      177.37|Sig.ra Clelia Ian...|      Occasional|Toscana|
|   T0016231588|      182.02|Sig.ra Clelia Ian...|      Occasional|Toscana|
|   T0090010818|       250.2|Sig.ra Clelia Ian...|      Occasional|Toscana|
|   T0085437320|      112.35|Sig.ra Clelia Ian...|      Occasional|Toscana|
|   T0087918645|      329.85|Sig.ra Clelia Ian...|      Occasional|Toscana|
|   T0010783655|      225.89|Sig.ra Clelia Ian...|      Occasional|Toscana|
|   T0063166968|      806.97|Sig.ra Clelia Ian...|      Occasional|Toscana|
|   T0067240728|      279.13|Sig.ra Clelia Ian...|      Occasional|Toscana|
|   T0018171683|      677.49|Sig.ra Clelia Ian...|      Occasional|Toscana|
|   T0018427078|       344.0|Sig.ra 

# Performance Tuning & Monitoring

L'ottimizzazione delle performance in PySpark si basa su quattro tecniche principali: **partitioning** per distribuire i dati in modo ottimale, **caching** per evitare ricomputazioni, **broadcast join** per ridurre lo shuffle, e **configuration tuning** per ottimizzare i parametri Spark.

## Partitioning

**Partition Size** - regola generale 128MB per partition (standard Hadoop):
- Partizioni troppo piccole --> overhead eccessivo
- Partizioni troppo grandi --> memory pressure e stragglers

**Partition Count**:
- Minimum: `num_cores * 2`
- Optimal: `num_cores * 4`

**Repartition vs Coalesce**:
- `repartition(n)`: esegue shuffle, utile per aumentare/ridurre partizioni e load balancing
- `coalesce(n)`: no shuffle, solo per ridurre partizioni, piu' veloce

## Caching

**Quando cachare**:
- DataFrame usato multiple volte nella stessa sessione
- Dopo operazioni costose (join, aggregazioni complesse)
- Algoritmi iterativi (machine learning)

**Quando non cachare**:
- DataFrame usato una sola volta
- Dati gia' ottimizzati in memoria (es. Parquet con columnar pruning)
- Dataset troppo grande per la memoria disponibile (rischio spill to disk)

## Spark UI

Accessibile su **http://localhost:4040** durante una SparkSession attiva.

**Tabs principali**:
- **Jobs**: tutte le jobs eseguite, durata, stages, tasks, status success/failure
- **Stages**: breakdown dettagliato di ogni job, metriche a livello task, shuffle read/write
- **Storage**: RDD/DataFrame cached, utilizzo memoria, frazione cached vs spilled to disk
- **Environment**: Spark properties, system properties, classpath
- **Executors**: metriche per executor, memory usage, GC time, distribuzione task
- **SQL**: execution plans (physical/logical), metriche per operatore, visualizzazione DAG

**Spark UI Jobs**

<img src="./screenshots/03_spark_jobs.png" alt="Spark UI Jobs" >

**Spark UI Executors**

<img src="./screenshots/03_spark_executors.png" alt="Spark UI Executors" >

In [13]:
# OPTIMIZATIONS

print("=" * 70)
print("OPTIMIZATIONS")
print("=" * 70 + "\n")

# Clear any previous cache
spark.catalog.clearCache()

# Baseline: Complex aggregation without optimization
print("Running baseline query...")
start = time.time()
result_baseline = (
    transactions
    .filter(F.col("status") == "completed")
    .join(customers, "customer_id")
    .join(products, "product_id")
    .groupBy("customer_segment", "category")
    .agg(
        F.count("*").alias("num_transactions"),
        F.sum("final_amount").alias("total_revenue"),
        F.avg("final_amount").alias("avg_transaction")
    )
    .orderBy(F.col("total_revenue").desc())
)

print("\nResult:")
result_baseline.show(10)
baseline_time = time.time() - start

print(f"Baseline completed:")
print(f"Time: {baseline_time:.2f}s")

print("\n" + "=" * 70)
print("PARTITIONING & CACHING")
print("=" * 70 + "\n")

# Check current partitions
print("Current partitions:")
print(f"  Transactions: {transactions.rdd.getNumPartitions()}")
print(f"  Customers:    {customers.rdd.getNumPartitions()}")
print(f"  Products:     {products.rdd.getNumPartitions()}")

# Optimal partitions: 2x physical cores for local mode
optimal_partitions = 24

print(f"\nRepartitioning transactions to {optimal_partitions} partitions & caching...")
start = time.time()
transactions_optimized = transactions.repartition(optimal_partitions)
transactions_optimized.cache()
transactions_count = transactions_optimized.count()
repart_time = time.time() - start

print(f"  Repartitioned in     {repart_time:.2f}s")
print(f"  New partitions:      {transactions_optimized.rdd.getNumPartitions()}")
print(f"  Transactions cached: {transactions_count:,} rows")

# Cache dimension tables (small, frequently joined)
print("\nCaching Customers & Products tables...")

customers_optimized = customers.cache()
customers_count = customers_optimized.count()
print(f"  Customers cached: {customers_count:,} rows")

products_optimized = products.cache()
products_count = products_optimized.count()
print(f"  Products cached: {products_count:,} rows")

print("\n" + "=" * 70)
print("OPTIMIZED PERFORMANCE")
print("=" * 70 + "\n")

# Same query with cached + repartitioned + broadcast
print("Running optimized query...")
start = time.time()
result_optimized = (
    transactions_optimized
    .filter(F.col("status") == "completed")
    .join(F.broadcast(customers_optimized), "customer_id")
    .join(F.broadcast(products_optimized), "product_id")
    .groupBy("customer_segment", "category")
    .agg(
        F.count("*").alias("num_transactions"),
        F.sum("final_amount").alias("total_revenue"),
        F.avg("final_amount").alias("avg_transaction")
    )
    .orderBy(F.col("total_revenue").desc())
)

print("\nResult:")
result_optimized.show(10)
optimized_time = time.time() - start

# Calculate improvement
speedup = baseline_time / optimized_time if optimized_time > 0 else 1
print(f"\nPerformance Improvement:")
print(f"  Baseline: {baseline_time:.2f}s")
print(f"  Optimized: {optimized_time:.2f}s")
print(f"  Speedup: {speedup:.2f}x")


# Release cached DataFrames
print("\nReleasing cached DataFrames...")
transactions_optimized.unpersist()
spark.catalog.clearCache()
print("  Cache released")

OPTIMIZATIONS

Running baseline query...

Result:
+----------------+--------------+----------------+--------------------+------------------+
|customer_segment|      category|num_transactions|       total_revenue|   avg_transaction|
+----------------+--------------+----------------+--------------------+------------------+
|         Regular|   Electronics|         4547002| 9.862931500629993E9| 2169.106479528708|
|      Occasional|   Electronics|         2595077| 4.460162046650009E9|1718.7012357051483|
|             VIP|   Electronics|         1298917| 3.760925916129992E9|2895.4320531103926|
|         Regular|        Beauty|        10136890|2.6945622244399986E9| 265.8174474064529|
|         Regular|       Fashion|         5985500|2.4465271903900003E9|408.74232568540646|
|         Regular|        Sports|         4307190| 2.095215886780005E9| 486.4461253810501|
|         Regular|        Health|         5477055|1.4617503961900039E9| 266.8862000089471|
|         Regular|Home & Kitchen|       

# ETL Pipeline & Delta Lake

In questa sezione costruiremo 2 pipeline ETL complete:
1. pipeline ETL completa che prepara dati ML-ready (per Notebook 4)
2. pipeline ETL completa che converte le transizioni in formato **Avro** (per Notebook 5)

## Target Output per ML

**ALS Data** (`user_item_interactions`) - per recommendation system:
- Campi: `customer_id`, `product_id`, `rating`
- Rating implicito derivato dal comportamento d'acquisto:

    ```
    rating = log(1 + purchase_count) * 2 + log(1 + total_spent) / 2
    ```

**Random Forest Data** (`customer_features`) - per classificazione:
1. **Features Engineering**:
    - **RFM**: Recency, Frequency, Monetary
    - **Behavioral**: avg_transaction, unique_products
    - **Temporal**: days_since_last_purchase
    - **Demographic**: age, region

2. **Labels**:
    - `customer_segment` (multiclass)

## Target Output per Streaming

La pipeline produce una conversione delle transazioni in **Avro** il quale e' un formato di serializzazione dati row-based, ideale per pipeline di streaming e write-intensive workloads, con le seguenti caratteristiche:

- **Schema embedded**: lo schema e' incluso nel file stesso, garantendo auto-descrizione dei dati
- **Row-based**: ottimizzato per scritture sequenziali e streaming, a differenza di Parquet che e' columnar
- **Schema evolution**: supporto nativo per aggiunta/rimozione di campi con backward/forward compatibility
- **Compatto e veloce**: serializzazione binaria efficiente

Leggiamo le transazioni dal Parquet partizionato su MinIO e le riscriviamo in formato Avro, mantenendo la partizione per `year`/`month`. Il risultato sara' il source per la Bronze layer del Notebook 5.

## Delta Lake: ACID su Data Lake

Delta Lake e' uno **storage layer costruito su Parquet** che aggiunge transazioni ACID, versioning e schema evolution ai data lake. Inoltre supporta sia batch che streaming in modo unificato.

**Problemi dei Data Lake tradizionali**:
- Assenza di transazioni ACID
- Nessun versionamento dei dati
- Inconsistenze nello schema
- Impossibilita' di rollback
- Mancanza di audit trail

**Soluzioni offerte da Delta Lake**:
- **ACID transactions**: atomicita' e consistenza garantite
- **Time Travel**: accesso a versioni precedenti dei dati
- **Schema enforcement & evolution**: validazione e evoluzione controllata dello schema
- **Audit history**: transaction log completo di tutte le operazioni
- **Unified batch & streaming**: stessa API per entrambe le modalita' di elaborazione

In [14]:
# ETL PIPELINE FOR ML

print("=" * 70)
print("ETL PIPELINE FOR ML")
print("=" * 70 + "\n")

print("Pipeline Goal: Prepare data for ML models in Notebook 4")
print("\nTarget Models:")
print("  1. ALS (Recommendation System)")
print("     - Need: user_id, item_id, rating (implicit)")
print("  2. Random Forest (Classification)")
print("     - Need: customer features + label")

print("\nETL Stages:")
print("  EXTRACT: Load raw data from MinIO")
print("  TRANSFORM: Create ML-ready features")
print("  LOAD: Save to Delta Lake tables")

# EXTRACT
print("\n" + "=" * 70)
print("EXTRACT")
print("=" * 70 + "\n")

print("Loading data for ETL pipeline...")
print(f"  Transactions: {transactions_count:,} rows")
print(f"  Customers:    {customers_count:,} rows")
print(f"  Products:     {products_count:,} rows")
print("\nExtract phase completed!")

# TRANSFORM: ALS
print("\n" + "=" * 70)
print("TRANSFORM - ALS DATA PREPROCESSING")
print("=" * 70 + "\n")

print("Creating user-item interaction matrix...")

user_item_interactions = (
    transactions
    .filter(F.col("status") == "completed")
    .groupBy("customer_id", "product_id")
    .agg(
        F.count("*").alias("purchase_count"),
        F.sum("final_amount").alias("total_spent")
    )
    .withColumn(
        "rating",
        F.round(
            F.expr("log(1 + purchase_count) * 2 + log(1 + total_spent) / 2"),
            2
        )
    )
    # Convert string IDs to numeric for ALS compatibility
    .withColumn("user_id", F.regexp_extract(F.col("customer_id"), r"(\d+)", 1).cast("int"))
    .withColumn("item_id", F.regexp_extract(F.col("product_id"), r"(\d+)", 1).cast("int"))
    .select(
        "user_id",
        "item_id",
        "rating",
        "customer_id",
        "product_id",
        "purchase_count",
        "total_spent"
    )
)

print("Schema after conversion:")
user_item_interactions.printSchema()

als_count = user_item_interactions.count()
print(f"Created {als_count:,} user-item interactions")
print("\nSample interactions:")
user_item_interactions.orderBy(F.desc("rating")).show(10)

# TRANSFORM: Random Forest
print("\n" + "=" * 70)
print("TRANSFORM - RANDOM FOREST DATA PREPROCESSING")
print("=" * 70 + "\n")

print("Creating customer features...")

# RFM model + behavioral features
customer_features = (
    transactions_optimized
    .filter(F.col("status") == "completed")
    .groupBy("customer_id")
    .agg(
        F.count("*").alias("frequency"),
        F.sum("final_amount").alias("monetary"),
        F.avg("final_amount").alias("avg_transaction"),
        F.expr("datediff(current_date(), max(transaction_date))").alias("recency"),
        F.expr("count(distinct product_id)").alias("unique_products")
    )
)

# Join with demographics
customer_features = customer_features.join(
    customers_optimized.select("customer_id", "customer_segment", "age", "region"),
    "customer_id",
    "inner"
)

# Select and cast to correct types
customer_features = (
    customer_features
    .select(
        "customer_id",
        "recency",
        "frequency",
        "monetary",
        "avg_transaction",
        "unique_products",
        "customer_segment",
        "age",
        "region",
    )
    .withColumn("recency", F.col("recency").cast("int"))
    .withColumn("frequency", F.col("frequency").cast("long"))
    .withColumn("monetary", F.col("monetary").cast("double"))
    .withColumn("avg_transaction", F.col("avg_transaction").cast("double"))
    .withColumn("unique_products", F.col("unique_products").cast("long"))
    .withColumn("age", F.col("age").cast("int"))
)

print("\nCustomer features prepared")
print(f"Schema: {customer_features.columns}")
customer_features.printSchema()
print("\nSample data:")
customer_features.show(10, truncate=False)
print("\nClass distribution (customer_segment):")
customer_features.groupBy("customer_segment").count().orderBy("count", ascending=False).show()
print("\nRFM Statistics:")
customer_features.select("recency", "frequency", "monetary").describe().show()

# LOAD
print("\n" + "=" * 70)
print("LOAD")
print("=" * 70 + "\n")

print("Delta Lake paths:")
print(f"  ALS data: {als_delta_path}")
print(f"  RF data: {rf_delta_path}")

# Write ALS data
print("\n1. Writing ALS user-item interactions to Delta...")
start = time.time()
user_item_interactions.write.format("delta").mode("overwrite").save(als_delta_path)
als_write_time = time.time() - start
print(f"  Written in {als_write_time:.2f}s")

# Write RF data
print("\n2. Writing RF customer features to Delta...")
start = time.time()
customer_features.write.format("delta").mode("overwrite").save(rf_delta_path)
rf_write_time = time.time() - start
print(f"  Written in {rf_write_time:.2f}s")

print("\nML data loaded to Delta Lake successfully!")

ETL PIPELINE FOR ML

Pipeline Goal: Prepare data for ML models in Notebook 4

Target Models:
  1. ALS (Recommendation System)
     - Need: user_id, item_id, rating (implicit)
  2. Random Forest (Classification)
     - Need: customer features + label

ETL Stages:
  EXTRACT: Load raw data from MinIO
  TRANSFORM: Create ML-ready features
  LOAD: Save to Delta Lake tables

EXTRACT

Loading data for ETL pipeline...
  Transactions: 100,000,000 rows
  Customers:    1,000,000 rows
  Products:     50,000 rows

Extract phase completed!

TRANSFORM - ALS DATA PREPROCESSING

Creating user-item interaction matrix...
Schema after conversion:
root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- purchase_count: long (nullable = false)
 |-- total_spent: double (nullable = true)

Created 73,874,174 user-item interactions

Sample interactions:
+---

In [15]:
# ETL PIPELINE FOR STREAMING

print("=" * 70)
print("ETL PIPELINE FOR STREAMING")
print("=" * 70 + "\n")

print("Pipeline Goal: Prepare data for Streaming in Notebook 5")

print("\nETL Stages:")
print("  EXTRACT: Load raw data from MinIO")
print("  TRANSFORM: Convert in Avro format")
print("  LOAD: Save to Delta Lake tables")

# EXTRACT
print("\n" + "=" * 70)
print("EXTRACT")
print("=" * 70 + "\n")

print(f"Source (Parquet): {transactions_path}")
print(f"Output (Avro):    {avro_output}")

# Read transactions from Parquet
print("\Reading transactions from Parquet...")

transactions_for_avro = spark.read.parquet(transactions_path)
total_records = transactions_for_avro.count()
print(f"  Read {total_records:,} records")

# TRANSFORM
print("\n" + "=" * 70)
print("TRANSFORM")
print("=" * 70 + "\n")

# Add partitioning columns
print("Add partitioning columns...")
transactions_for_avro = (
    transactions_for_avro
    .withColumn("year", F.year(F.col("transaction_date")))
    .withColumn("month", F.month(F.col("transaction_date")))
)
print("\nSchema:")
transactions_for_avro.printSchema()

# LOAD
print("\n" + "=" * 70)
print("LOAD")
print("=" * 70 + "\n")

# Write as Avro partitioned by year/month
print("Writing as Avro (partitioned by year/month)...")
start = time.time()
(
    transactions_for_avro
    .write
    .format("avro")
    .mode("overwrite")
    .partitionBy("year", "month")
    .save(avro_output)
)
write_time = time.time() - start
print(f"  Written in {write_time:.1f}s")

# Verify: read back a sample
print("\nVerifying Avro output...")
avro_check = spark.read.format("avro").load(avro_output)
avro_count = avro_check.count()
print(f"Avro records: {avro_count:,}")
print(f"\nSchema:")
avro_check.printSchema()
print(f"\nSample data:")
avro_check.show(5)

ETL PIPELINE FOR STREAMING

Pipeline Goal: Prepare data for Streaming in Notebook 5

ETL Stages:
  EXTRACT: Load raw data from MinIO
  TRANSFORM: Convert in Avro format
  LOAD: Save to Delta Lake tables

EXTRACT

Source (Parquet): s3a://bigdata-ecommerce/raw/transactions
Output (Avro):    s3a://bigdata-ecommerce/streaming/source/transactions_avro
\Reading transactions from Parquet...
  Read 100,000,000 records

TRANSFORM

Add partitioning columns...

Schema:
root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- discount_pct: double (nullable = true)
 |-- discount_amount: double (nullable = true)
 |-- final_amount: double (nullable = true)
 |-- shipping_cost: double (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- transaction_timestamp: timestamp_ntz (nullable

In [16]:
# CLEANUP

# Release all cached DataFrames
print("Releasing cached DataFrames...")
spark.catalog.clearCache()
print("  All cache released")

# Stop Spark
print("\nStopping Spark session...")
spark.stop()
print("  Spark session stopped.")

Releasing cached DataFrames...
  All cache released

Stopping Spark session...
  Spark session stopped.


## MinIO Storage Screenshots

**User-Item Interactions**

<img src="./screenshots/03_minio_delta_als.png" alt="MinIo Delta ALS" >

**Customer feature**

<img src="./screenshots/03_minio_delta_rf.png" alt="MinIo Delta RF" >

**Streaming Source**

<img src="./screenshots/03_minio_streaming_source.png" alt="MinIo Streaming Source" >