# Apache Iceberg with Spark and Nessie

This notebook demonstrates how to use PySpark with Apache Iceberg and Nessie catalog.

## 1. Initialize Spark Session with Iceberg and Nessie

The Spark configuration is automatically loaded from `spark-defaults.conf`.

In [None]:
from pyspark.sql import SparkSession

# Create Spark session (configuration loaded from spark-defaults.conf)
spark = SparkSession.builder \
    .appName("IcebergNessieDemo") \
    .getOrCreate()

print("Spark version:", spark.version)
print("Spark session created successfully!")

## 2. Verify Catalog Configuration

In [None]:
# Show configured catalogs
spark.sql("SHOW CATALOGS").show()

## 3. Create Namespace (Database)

In [None]:
# Create namespace if not exists
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.demo")
print("✓ Namespace created")

# Show namespaces
spark.sql("SHOW NAMESPACES IN nessie").show()

## 4. Create Iceberg Table from Sample Data

In [None]:
# Read sample data from MinIO
df_orders = spark.read \
    .format("parquet") \
    .load("s3a://warehouse/data/orders.parquet")

print(f"Loaded {df_orders.count()} orders")
df_orders.show(5)

In [None]:
# Create Iceberg table
df_orders.writeTo("nessie.demo.orders") \
    .using("iceberg") \
    .createOrReplace()

print("✓ Iceberg table 'nessie.demo.orders' created")

## 5. Query Iceberg Table

In [None]:
# Query the table
spark.sql("SELECT * FROM nessie.demo.orders LIMIT 10").show()

## 6. Perform Analytics

In [None]:
# Orders by status
spark.sql("""
    SELECT status, 
           COUNT(*) as order_count,
           SUM(total_amount) as total_revenue,
           AVG(total_amount) as avg_order_value
    FROM nessie.demo.orders
    GROUP BY status
    ORDER BY total_revenue DESC
""").show()

In [None]:
# Top products by revenue
spark.sql("""
    SELECT product_name,
           COUNT(*) as order_count,
           SUM(total_amount) as total_revenue
    FROM nessie.demo.orders
    GROUP BY product_name
    ORDER BY total_revenue DESC
    LIMIT 10
""").show()

## 7. Create Customers Table

In [None]:
# Read customers JSON data
df_customers = spark.read \
    .format("json") \
    .load("s3a://warehouse/data/customers.json")

print(f"Loaded {df_customers.count()} customers")
df_customers.show(5)

In [None]:
# Create Iceberg table for customers
df_customers.writeTo("nessie.demo.customers") \
    .using("iceberg") \
    .createOrReplace()

print("✓ Iceberg table 'nessie.demo.customers' created")

## 8. Join Orders with Customers

In [None]:
# Join analysis
spark.sql("""
    SELECT c.name,
           c.city,
           COUNT(o.order_id) as order_count,
           SUM(o.total_amount) as total_spent,
           AVG(o.total_amount) as avg_order_value
    FROM nessie.demo.orders o
    JOIN nessie.demo.customers c ON o.customer_id = c.customer_id
    GROUP BY c.name, c.city
    ORDER BY total_spent DESC
""").show()

## 9. View Table Metadata

In [None]:
# Show tables
spark.sql("SHOW TABLES IN nessie.demo").show()

In [None]:
# Describe table
spark.sql("DESCRIBE EXTENDED nessie.demo.orders").show(100, truncate=False)

## 10. Time Travel with Iceberg

In [None]:
# Show table history (snapshots)
spark.sql("SELECT * FROM nessie.demo.orders.history").show(truncate=False)

In [None]:
# Show table snapshots
spark.sql("SELECT * FROM nessie.demo.orders.snapshots").show(truncate=False)

## 11. Insert New Data

In [None]:
from pyspark.sql import Row
from datetime import date

# Create new order
new_orders = [
    Row(order_id=101, customer_id=1, product_name="Tablet", quantity=1, 
        unit_price=499.99, total_amount=499.99, order_date=date(2024, 1, 15), status="pending"),
    Row(order_id=102, customer_id=2, product_name="Smartphone", quantity=2, 
        unit_price=899.99, total_amount=1799.98, order_date=date(2024, 1, 16), status="processing")
]

new_df = spark.createDataFrame(new_orders)

# Append to table
new_df.writeTo("nessie.demo.orders").append()

print("✓ New orders inserted")

In [None]:
# Verify count increased
spark.sql("SELECT COUNT(*) as total_orders FROM nessie.demo.orders").show()

## 12. Update Data

In [None]:
# Update order status
spark.sql("""
    UPDATE nessie.demo.orders
    SET status = 'delivered'
    WHERE order_id = 101
""")

print("✓ Order updated")

In [None]:
# Verify update
spark.sql("SELECT * FROM nessie.demo.orders WHERE order_id = 101").show()

## 13. Delete Data

In [None]:
# Delete cancelled orders
spark.sql("""
    DELETE FROM nessie.demo.orders
    WHERE status = 'cancelled'
""")

print("✓ Cancelled orders deleted")

## 14. Schema Evolution

In [None]:
# Add a new column
spark.sql("""
    ALTER TABLE nessie.demo.orders 
    ADD COLUMN discount DOUBLE
""")

print("✓ Column added")

In [None]:
# Show updated schema
spark.sql("DESCRIBE nessie.demo.orders").show()

## 15. Working with Nessie Branches (Git-like versioning)

In [None]:
from pynessie import init

# Connect to Nessie
nessie_client = init("http://nessie:19120/api/v2")

# List branches
branches = nessie_client.list_references()
print("\nNessie branches:")
for branch in branches.references:
    print(f"  - {branch.name} ({branch.hash_[:8]})")

In [None]:
# Create a new branch for testing
try:
    main_branch = nessie_client.get_reference("main")
    nessie_client.create_reference(
        "dev",
        "BRANCH",
        main_branch.hash_
    )
    print("✓ Created 'dev' branch")
except Exception as e:
    print(f"Branch may already exist: {e}")

## Summary

This notebook demonstrated:
- Configuring Spark with Iceberg and Nessie
- Creating Iceberg tables from S3/MinIO data
- Querying and analyzing data
- CRUD operations (Insert, Update, Delete)
- Schema evolution
- Time travel queries
- Git-like branching with Nessie