# Iceberg Lakehouse with Spark, MinIO, and Nessie

This notebook demonstrates how to work with Apache Iceberg tables using Spark as the query engine, MinIO for storage, and Nessie as the catalog.


In [29]:
from pyspark.sql import SparkSession
import os

# Set required environment variables for Java 17 compatibility
os.environ['HADOOP_USER_NAME'] = 'jovyan'
os.environ['JAVA_TOOL_OPTIONS'] = '--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'

# Stop any existing Spark session
try:
    if 'spark' in locals() and hasattr(spark, 'sparkContext'):
        print("Stopping existing Spark session...")
        spark.stop()
        print("Spark session stopped.")
except:
    pass

print("Creating new Spark session...")

# Create Spark session with proper configuration for Java 17 and Iceberg
spark = SparkSession.builder \
    .appName("IcebergLakehouseDemo") \
    .master("local[*]") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.bindAddress", "localhost") \
    .config("spark.driver.extraJavaOptions", 
            "--add-opens=java.base/java.lang=ALL-UNNAMED "
            "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED "
            "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED "
            "--add-opens=java.base/java.io=ALL-UNNAMED "
            "--add-opens=java.base/java.net=ALL-UNNAMED "
            "--add-opens=java.base/java.nio=ALL-UNNAMED "
            "--add-opens=java.base/java.util=ALL-UNNAMED "
            "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED "
            "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED "
            "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED "
            "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED "
            "--add-opens=java.base/sun.security.action=ALL-UNNAMED "
            "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED "
            "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED "
            "-Djavax.security.auth.useSubjectCredsOnly=false") \
    .config("spark.executor.extraJavaOptions", 
            "--add-opens=java.base/java.lang=ALL-UNNAMED "
            "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED "
            "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED "
            "--add-opens=java.base/java.io=ALL-UNNAMED "
            "--add-opens=java.base/java.net=ALL-UNNAMED "
            "--add-opens=java.base/java.nio=ALL-UNNAMED "
            "--add-opens=java.base/java.util=ALL-UNNAMED "
            "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED "
            "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED "
            "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED "
            "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED "
            "--add-opens=java.base/sun.security.action=ALL-UNNAMED "
            "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED "
            "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED "
            "-Djavax.security.auth.useSubjectCredsOnly=false") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.nessie.type", "rest") \
    .config("spark.sql.catalog.nessie.uri", "http://nessie:19120/api/v1") \
    .config("spark.sql.catalog.nessie.warehouse", "s3a://warehouse/") \
    .config("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.nessie.s3.endpoint", "http://minio:9000") \
    .config("spark.sql.catalog.nessie.s3.access-key-id", "admin") \
    .config("spark.sql.catalog.nessie.s3.secret-access-key", "password123") \
    .config("spark.sql.catalog.nessie.s3.path-style-access", "true") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.sql.defaultCatalog", "nessie") \
    .config("spark.jars", 
            "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.4.3.jar,"
            "/opt/spark/jars/iceberg-aws-bundle-1.4.3.jar,"
            "/opt/spark/jars/nessie-iceberg-1.0.1.jar,"
            "/opt/spark/jars/hadoop-aws-3.3.4.jar,"
            "/opt/spark/jars/aws-java-sdk-bundle-1.12.367.jar") \
    .getOrCreate()

print("✅ Spark session created successfully!")
print(f"📊 Spark version: {spark.version}")
print(f"🎯 Spark master: {spark.sparkContext.master}")

# Test basic connectivity
try:
    catalogs = spark.sql("SHOW CATALOGS").collect()
    print(f"📋 Available catalogs: {[row.catalog for row in catalogs]}")
    if 'nessie' in [row.catalog for row in catalogs]:
        print("✅ Nessie catalog is available!")
    else:
        print("⚠️  Nessie catalog not found in catalog list")
except Exception as e:
    print(f"❌ Error listing catalogs: {e}")

Creating new Spark session...


25/06/14 15:59:45 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext should be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:501)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:485)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.ja

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
	at java.base/javax.security.auth.Subject.getSubject(Subject.java:347)
	at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:588)
	at org.apache.spark.util.Utils$.$anonfun$getCurrentUserName$1(Utils.scala:2446)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2446)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:339)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
	at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:501)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:485)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1575)


In [16]:
# Test service connectivity first
import requests
from time import sleep

print("🔍 Testing service connectivity...")

# Test MinIO
try:
    response = requests.get("http://localhost:9000/minio/health/live", timeout=5)
    print("✅ MinIO is accessible")
except:
    print("❌ MinIO is not accessible")

# Test Nessie
try:
    response = requests.get("http://localhost:19120/api/v1/config", timeout=5)
    print("✅ Nessie is accessible")
except:
    print("❌ Nessie is not accessible")

# Test Spark Master
try:
    response = requests.get("http://localhost:8080", timeout=5)
    print("✅ Spark Master is accessible")
except:
    print("❌ Spark Master is not accessible")

print("\n🚀 Proceeding with Spark session creation...")

🔍 Testing service connectivity...
✅ MinIO is accessible
✅ Nessie is accessible
❌ Spark Master is not accessible

🚀 Proceeding with Spark session creation...


In [2]:
# Test connection to Nessie catalog
print("📊 Testing Nessie catalog connection...")

try:
    # Show available catalogs
    catalogs_df = spark.sql("SHOW CATALOGS")
    print("Available catalogs:")
    catalogs_df.show()
    
    # Switch to Nessie catalog
    spark.sql("USE CATALOG nessie")
    print("✅ Successfully switched to Nessie catalog")
    
    # Show namespaces
    namespaces_df = spark.sql("SHOW NAMESPACES")
    print("Available namespaces in Nessie:")
    namespaces_df.show()
    
except Exception as e:
    print(f"❌ Error connecting to Nessie catalog: {e}")
    print("Continuing with spark_catalog...")
    spark.sql("USE CATALOG spark_catalog")

NameError: name 'spark' is not defined

In [18]:
# Create a namespace (database) in Nessie
spark.sql("CREATE NAMESPACE IF NOT EXISTS lakehouse")
spark.sql("USE lakehouse")
print("Created and switched to 'lakehouse' namespace")

NameError: name 'spark' is not defined

In [None]:
# Create sample data
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, DateType
from pyspark.sql.functions import col, current_timestamp
from datetime import date

# Define schema
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", DecimalType(10, 2), True),
    StructField("launch_date", DateType(), True)
])

# Create sample data
data = [
    (1, "Laptop Pro", "Electronics", 1299.99, date(2024, 1, 15)),
    (2, "Wireless Mouse", "Electronics", 29.99, date(2024, 2, 1)),
    (3, "Coffee Mug", "Kitchen", 12.50, date(2024, 1, 20)),
    (4, "Desk Chair", "Furniture", 249.99, date(2024, 3, 10)),
    (5, "Notebook", "Stationery", 5.99, date(2024, 2, 15))
]

df = spark.createDataFrame(data, schema)
df.show()
print(f"Created DataFrame with {df.count()} rows")

In [None]:
# Create Iceberg table
df.writeTo("nessie.lakehouse.products").using("iceberg").create()
print("Created Iceberg table: nessie.lakehouse.products")

# Verify table creation
spark.sql("SHOW TABLES IN lakehouse").show()

In [None]:
# Query the Iceberg table
result = spark.sql("SELECT * FROM nessie.lakehouse.products ORDER BY price DESC")
result.show()

# Show table metadata
spark.sql("DESCRIBE EXTENDED nessie.lakehouse.products").show(truncate=False)

In [None]:
# Insert more data (demonstrating append operation)
new_data = [
    (6, "Smartphone", "Electronics", 699.99, date(2024, 4, 1)),
    (7, "Table Lamp", "Furniture", 45.00, date(2024, 3, 25))
]

new_df = spark.createDataFrame(new_data, schema)
new_df.writeTo("nessie.lakehouse.products").using("iceberg").append()

print("Appended new data to the table")
spark.sql("SELECT COUNT(*) as total_products FROM nessie.lakehouse.products").show()

In [None]:
# Update existing data (demonstrating merge operation)
spark.sql("""
    MERGE INTO nessie.lakehouse.products AS target
    USING (
        SELECT 1 as product_id, 1199.99 as new_price
    ) AS source
    ON target.product_id = source.product_id
    WHEN MATCHED THEN UPDATE SET price = source.new_price
""")

print("Updated product price")
spark.sql("SELECT * FROM nessie.lakehouse.products WHERE product_id = 1").show()

In [None]:
# Show table history (time travel capability)
spark.sql("SELECT * FROM nessie.lakehouse.products.history").show(truncate=False)

# Show snapshots
spark.sql("SELECT * FROM nessie.lakehouse.products.snapshots").show(truncate=False)

In [None]:
# Demonstrate time travel - query previous version
# Get the first snapshot ID
snapshots = spark.sql("SELECT snapshot_id FROM nessie.lakehouse.products.snapshots ORDER BY committed_at LIMIT 1")
first_snapshot = snapshots.collect()[0]['snapshot_id']

print(f"Querying snapshot: {first_snapshot}")
spark.sql(f"SELECT COUNT(*) as count_at_first_snapshot FROM nessie.lakehouse.products VERSION AS OF {first_snapshot}").show()

In [None]:
# Analytics queries
print("=== Product Analytics ===")

# Products by category
spark.sql("""
    SELECT category, 
           COUNT(*) as product_count,
           AVG(price) as avg_price,
           MIN(price) as min_price,
           MAX(price) as max_price
    FROM nessie.lakehouse.products 
    GROUP BY category
    ORDER BY product_count DESC
""").show()

# Recent launches
spark.sql("""
    SELECT product_name, category, price, launch_date
    FROM nessie.lakehouse.products
    WHERE launch_date >= '2024-03-01'
    ORDER BY launch_date DESC
""").show()

In [None]:
# Clean up (optional)
# spark.sql("DROP TABLE IF EXISTS nessie.lakehouse.products")
# spark.sql("DROP NAMESPACE IF EXISTS lakehouse")

# Stop Spark session
# spark.stop()
print("Notebook completed successfully!")