# Apache Iceberg Tables in Lakehouse Lab

This notebook demonstrates Apache Iceberg table format features:
- Time travel queries
- Schema evolution
- ACID transactions
- Snapshot management

**Prerequisites:** This notebook requires the Iceberg configuration (`--iceberg` flag during installation).

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

# Check for Iceberg JAR directory in multiple possible locations
possible_iceberg_dirs = [
    "/home/jovyan/work/iceberg-jars",  # Mounted when using --iceberg flag
    "/opt/bitnami/spark/jars/iceberg", # Alternative mount location
    "/opt/bitnami/spark/jars"          # Check default Spark jars directory
]

all_jars = []
iceberg_dir = None

print("🔍 Searching for Iceberg JARs...")
for check_dir in possible_iceberg_dirs:
    print(f"Checking: {check_dir}")
    if os.path.exists(check_dir):
        jar_files = [f for f in os.listdir(check_dir) if f.endswith('.jar')]
        
        # Look for all required JAR files
        required_jars = [
            'iceberg-spark-runtime',
            'iceberg-aws',
            'hadoop-aws',
            'aws-java-sdk-bundle',  # AWS SDK v1 (for hadoop-aws compatibility)
            'bundle-2.',            # AWS SDK v2 bundle (for iceberg-aws)
            'url-connection-client' # AWS SDK v2 HTTP client
        ]
        
        found_jars = []
        for jar_name in required_jars:
            matching_jars = [f for f in jar_files if jar_name in f.lower()]
            if matching_jars:
                found_jars.extend(matching_jars)
        
        if found_jars:
            iceberg_dir = check_dir
            for jar in found_jars:
                all_jars.append(os.path.join(check_dir, jar))
            print(f"✅ Found {len(found_jars)} required JAR(s) in {check_dir}:")
            for jar in found_jars:
                print(f"   - {jar}")
            break
        else:
            print(f"   Directory exists but missing required JARs (has {len(jar_files)} total JARs)")
    else:
        print(f"   Directory not found")

if not all_jars:
    print("\n❌ No required JAR files found in any location!")
    print("\n🔧 This usually means:")
    print("   1. Installation was not run with --iceberg flag")
    print("   2. Docker Compose not started with iceberg configuration")
    print("   3. init-compute.sh failed to download required JARs")
    print("\n💡 To fix this:")
    print("   1. Stop containers: docker compose down")
    print("   2. Run: ./install.sh --iceberg --fat-server --branch dev")
    print("   3. This will download JARs and configure Iceberg support")
    print("\n⚠️  Iceberg functionality will not be available without these JARs")
    raise FileNotFoundError("No required JAR files found - run installation with --iceberg flag")

print(f"\n✅ Using JAR files from: {iceberg_dir}")
print(f"Total JARs loaded: {len(all_jars)}")

# Stop any existing Spark session
try:
    spark.stop()
    print("🔄 Stopped existing Spark session")
except:
    pass

# Configure Spark with Iceberg support and all discovered JARs
spark = SparkSession.builder \
    .appName("Lakehouse Lab - Iceberg Demo") \
    .config("spark.jars", ",".join(all_jars)) \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hadoop") \
    .config("spark.sql.catalog.iceberg.warehouse", "s3a://lakehouse/iceberg-warehouse/") \
    .config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", os.environ.get('MINIO_ROOT_USER', 'minio')) \
    .config("spark.hadoop.fs.s3a.secret.key", os.environ.get('MINIO_ROOT_PASSWORD', 'minio123')) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.attempts.maximum", "1") \
    .config("spark.hadoop.fs.s3a.connection.establish.timeout", "5000") \
    .config("spark.hadoop.fs.s3a.connection.timeout", "10000") \
    .getOrCreate()

print("✅ Spark session with Iceberg support initialized!")
print(f"Spark version: {spark.version}")
print("🧊 JAR files loaded:")
for jar in all_jars:
    print(f"   - {os.path.basename(jar)}")

# Test S3A connectivity
try:
    spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg")
    print("✅ Iceberg namespace ready")
except Exception as e:
    print(f"⚠️ Warning during namespace creation: {e}")
    print("This might be normal for first-time setup")

## 1. Create an Iceberg Table

Let's create a sample Iceberg table with customer data:

In [None]:
# Create sample data
from pyspark.sql.functions import *

# Initial customer data
customers_data = [
    (1, "Alice Johnson", "alice@email.com", "2023-01-15", "Premium"),
    (2, "Bob Smith", "bob@email.com", "2023-02-20", "Standard"),
    (3, "Carol Davis", "carol@email.com", "2023-03-10", "Premium"),
    (4, "David Wilson", "david@email.com", "2023-04-05", "Standard")
]

schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("email", StringType(), False),
    StructField("signup_date", StringType(), False),
    StructField("tier", StringType(), False)
])

df = spark.createDataFrame(customers_data, schema)

# Create Iceberg table
df.writeTo("iceberg.customers").create()

print("✅ Created Iceberg table 'iceberg.customers'")
spark.sql("SELECT * FROM iceberg.customers").show()

## 2. Time Travel Queries

Iceberg allows you to query data as it existed at any point in time:

In [None]:
# Get current snapshot information
snapshots = spark.sql("SELECT * FROM iceberg.customers.snapshots")
print("📸 Table snapshots:")
snapshots.select("snapshot_id", "timestamp_ms", "operation").show()

# Store first snapshot ID for time travel
first_snapshot = snapshots.first()["snapshot_id"]
print(f"First snapshot ID: {first_snapshot}")

In [None]:
# Add more data to demonstrate time travel
new_customers = [
    (5, "Eve Brown", "eve@email.com", "2024-01-15", "Premium"),
    (6, "Frank Miller", "frank@email.com", "2024-02-20", "Standard")
]

new_df = spark.createDataFrame(new_customers, schema)
new_df.writeTo("iceberg.customers").append()

print("✅ Added new customers")
print("Current data:")
spark.sql("SELECT COUNT(*) as current_count FROM iceberg.customers").show()

In [None]:
# Query historical data using snapshot ID
print("🕰️ Time travel query - data at first snapshot:")
historical_query = f"SELECT COUNT(*) as historical_count FROM iceberg.customers VERSION AS OF {first_snapshot}"
spark.sql(historical_query).show()

print("Comparison:")
spark.sql("""
SELECT 
    'Current' as timepoint, COUNT(*) as record_count 
FROM iceberg.customers
UNION ALL
SELECT 
    'Historical' as timepoint, COUNT(*) as record_count 
FROM iceberg.customers VERSION AS OF """ + str(first_snapshot) + """
""").show()

## 3. Schema Evolution

Iceberg supports schema evolution without breaking existing queries:

In [None]:
# Add a new column to the table
spark.sql("ALTER TABLE iceberg.customers ADD COLUMN phone STRING")

print("✅ Added 'phone' column to table")
print("Updated schema:")
spark.sql("DESCRIBE iceberg.customers").show()

In [None]:
# Insert data with the new column
evolved_customers = [
    (7, "Grace Lee", "grace@email.com", "2024-03-15", "Premium", "+1-555-0123")
]

evolved_schema = schema.add(StructField("phone", StringType(), True))
evolved_df = spark.createDataFrame(evolved_customers, evolved_schema)
evolved_df.writeTo("iceberg.customers").append()

print("✅ Inserted data with new schema")
spark.sql("SELECT * FROM iceberg.customers WHERE phone IS NOT NULL").show()

## 4. Table Maintenance

Iceberg provides operations for managing table snapshots and performance:

In [None]:
# View table history
print("📋 Table history:")
spark.sql("SELECT * FROM iceberg.customers.history").show()

print("📊 Current snapshots:")
spark.sql("SELECT snapshot_id, timestamp_ms, operation, summary FROM iceberg.customers.snapshots").show(truncate=False)

In [None]:
# View table files
print("📁 Table files:")
files_df = spark.sql("SELECT file_path, file_format, record_count FROM iceberg.customers.files")
files_df.show(truncate=False)

## 5. Rollback Capability

Iceberg allows you to rollback to previous snapshots:

In [None]:
# Show current count
print("Before rollback:")
spark.sql("SELECT COUNT(*) as count FROM iceberg.customers").show()

# Rollback to first snapshot
rollback_sql = f"CALL iceberg.system.rollback_to_snapshot('iceberg.customers', {first_snapshot})"
spark.sql(rollback_sql)

print("✅ Rolled back to first snapshot")
print("After rollback:")
spark.sql("SELECT COUNT(*) as count FROM iceberg.customers").show()
spark.sql("SELECT * FROM iceberg.customers").show()

## 🎉 Summary

This notebook demonstrated key Apache Iceberg features:

✅ **ACID Transactions** - All operations are atomic and consistent

✅ **Time Travel** - Query data as it existed at any point in time

✅ **Schema Evolution** - Add columns without breaking existing queries

✅ **Snapshot Management** - View and manage table versions

✅ **Rollback Capability** - Easily revert to previous states

### Next Steps:
- Explore partition evolution with `ALTER TABLE ... REPLACE PARTITION FIELD`
- Set up branch and tag management for complex workflows
- Integrate with Spark streaming for real-time Iceberg updates
- Use Iceberg tables in your production analytics pipelines

In [None]:
# Cleanup (optional)
# spark.sql("DROP TABLE iceberg.customers")
# print("🧹 Cleaned up demo table")

# Stop Spark session
spark.stop()
print("✅ Spark session closed")