# Iceberg ETL Notebook

This notebook demonstrates an ETL pipeline using Apache Spark and Iceberg tables.

## Overview
1. Initialize Spark Session with Iceberg configuration
2. Generate mock sales data
3. Write data to Iceberg table
4. Query and analyze the data
5. Explore Iceberg table features (snapshots, time travel)

## 1. Initialize Spark Session

The Spark session is pre-configured with Iceberg catalog settings via `spark-defaults.conf`:
- Catalog: `demo` (Hive-based Iceberg catalog)
- Warehouse: `s3a://warehouse/` (MinIO)
- Metastore: `thrift://hive-metastore:9083`

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    DoubleType,
    TimestampType,
)
from faker import Faker
import random
from datetime import datetime, timedelta

In [None]:
# Create Spark session
# Configuration is loaded from spark-defaults.conf
spark = SparkSession.builder \
    .appName("Iceberg ETL Notebook") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
spark

In [None]:
# Verify Iceberg catalog is configured
spark.sql("SHOW CATALOGS").show()

## 2. Define Schema and Generate Mock Data

In [None]:
# Define schema for sales data
schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_name", StringType(), False),
    StructField("customer_email", StringType(), False),
    StructField("customer_city", StringType(), False),
    StructField("customer_country", StringType(), False),
    StructField("product_name", StringType(), False),
    StructField("product_category", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("order_date", TimestampType(), False),
    StructField("order_status", StringType(), False),
])

print("Schema defined successfully!")

In [None]:
def generate_mock_sales_data(num_records=1000):
    """Generate realistic mock sales data using Faker"""
    fake = Faker()
    fake.seed_instance(42)

    data = []
    start_date = datetime(2025, 1, 1)

    for i in range(1, num_records + 1):
        product_categories = ["Electronics", "Clothing", "Books", "Home", "Sports"]
        product_category = random.choice(product_categories)

        # Category-based pricing
        if product_category == "Electronics":
            amount = round(random.uniform(50, 2000), 2)
        elif product_category == "Clothing":
            amount = round(random.uniform(20, 200), 2)
        elif product_category == "Books":
            amount = round(random.uniform(10, 50), 2)
        elif product_category == "Home":
            amount = round(random.uniform(30, 500), 2)
        else:
            amount = round(random.uniform(15, 300), 2)

        days_ago = random.randint(0, 365)

        data.append((
            i,
            fake.name(),
            fake.email(),
            fake.city(),
            fake.country(),
            fake.catch_phrase(),
            product_category,
            amount,
            start_date + timedelta(days=days_ago),
            random.choice(["pending", "completed", "cancelled", "refunded"]),
        ))
    return data

In [None]:
# Generate mock data
print("--- Generating Mock Sales Data ---")
mock_data = generate_mock_sales_data(1000)
df = spark.createDataFrame(mock_data, schema)

print(f"Generated {df.count()} records")
df.printSchema()

In [None]:
# Preview the data
df.show(10, truncate=False)

## 3. Write Data to Iceberg Table

In [None]:
# Create database if not exists
spark.sql("CREATE DATABASE IF NOT EXISTS demo.sales_db")
spark.sql("SHOW DATABASES IN demo").show()

In [None]:
# Write data to Iceberg table
print("--- Writing to Iceberg Table ---")
df.writeTo("demo.sales_db.sales").createOrReplace()

print("Data written successfully!")

## 4. Query and Analyze the Data

In [None]:
# Verify the data
print("--- Total Records ---")
spark.sql("SELECT COUNT(*) as total_records FROM demo.sales_db.sales").show()

In [None]:
# Sales by category
print("--- Sales by Category ---")
spark.sql("""
    SELECT 
        product_category,
        COUNT(*) as num_orders,
        ROUND(SUM(amount), 2) as total_revenue,
        ROUND(AVG(amount), 2) as avg_order_value
    FROM demo.sales_db.sales
    GROUP BY product_category
    ORDER BY total_revenue DESC
""").show()

In [None]:
# Sales by status
print("--- Sales by Status ---")
spark.sql("""
    SELECT 
        order_status,
        COUNT(*) as num_orders,
        ROUND(SUM(amount), 2) as total_amount
    FROM demo.sales_db.sales
    GROUP BY order_status
    ORDER BY num_orders DESC
""").show()

In [None]:
# Top 10 countries by revenue
print("--- Top 10 Countries by Revenue ---")
spark.sql("""
    SELECT 
        customer_country,
        COUNT(*) as num_orders,
        ROUND(SUM(amount), 2) as total_revenue
    FROM demo.sales_db.sales
    GROUP BY customer_country
    ORDER BY total_revenue DESC
    LIMIT 10
""").show()

## 5. Explore Iceberg Table Features

In [None]:
# View Iceberg snapshots
print("--- Iceberg Snapshots ---")
spark.sql("SELECT * FROM demo.sales_db.sales.snapshots").show(truncate=False)

In [None]:
# View table history
print("--- Table History ---")
spark.sql("SELECT * FROM demo.sales_db.sales.history").show(truncate=False)

In [None]:
# View table partitions
print("--- Table Partitions ---")
spark.sql("SELECT * FROM demo.sales_db.sales.partitions").show(truncate=False)

In [None]:
# View table manifest files
print("--- Manifest Files ---")
spark.sql("SELECT * FROM demo.sales_db.sales.manifests").show(truncate=False)

## 6. Additional Operations (Optional)

### Append More Data

In [None]:
# Generate additional data and append
print("--- Appending Additional Data ---")
additional_data = generate_mock_sales_data(500)
# Adjust order_id to avoid duplicates
additional_data = [(i + 1000, *rest) for i, *rest in additional_data]
df_additional = spark.createDataFrame(additional_data, schema)

df_additional.writeTo("demo.sales_db.sales").append()

spark.sql("SELECT COUNT(*) as total_records FROM demo.sales_db.sales").show()

In [None]:
# Check snapshots after append
spark.sql("SELECT * FROM demo.sales_db.sales.snapshots").show(truncate=False)

### Time Travel Query

In [None]:
# Get the first snapshot ID for time travel
snapshots = spark.sql("SELECT snapshot_id FROM demo.sales_db.sales.snapshots ORDER BY committed_at").collect()
if len(snapshots) > 1:
    first_snapshot_id = snapshots[0]["snapshot_id"]
    print(f"--- Querying data at snapshot {first_snapshot_id} (before append) ---")
    spark.sql(f"SELECT COUNT(*) as records_at_first_snapshot FROM demo.sales_db.sales VERSION AS OF {first_snapshot_id}").show()

## Cleanup

In [None]:
# Stop Spark session
spark.stop()
print("Spark session stopped.")