## 1. Initialize Spark Session with Iceberg

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import os
import boto3
from botocore.client import Config

# do we need?
import glob

# Get list of JARs from the jars directory
jars_dir = "/opt/spark-apps/jars"
jar_files = glob.glob(os.path.join(jars_dir, "*.jar"))
jars_str = ",".join(jar_files) if jar_files else ""

print(f"Found JARs: {jars_str}")

# Create Spark session with Iceberg extensions
spark = SparkSession.builder \
    .appName("IcebergDemo") \
    .master("spark://spark-master:7077") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3a://warehouse/") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.jars", jars_str) \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Spark App ID: {spark.sparkContext.applicationId}")

import sys
print(f"Driver Python version: {sys.version}")


## 2. Setup MinIO Bucket

Before creating Iceberg tables, ensure the warehouse bucket exists in MinIO.


In [None]:

s3_client = boto3.client(
    "s3",
    endpoint_url="http://minio:9000",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
    config=Config(signature_version="s3v4"),
    region_name="us-east-1",
)

# Attempt to list buckets to verify connectivity
try:
    resp = s3_client.list_buckets()
    buckets = [b["Name"] for b in resp.get("Buckets", [])]
    print(f"S3 connection successful. Buckets: {buckets}")
except Exception as e:
    print(f"Error connecting to S3: {e}")

def delete_s3_bucket(bucket_name):
    try:
        # Delete all objects in the bucket
        objects = s3_client.list_objects_v2(Bucket=bucket_name)
        if 'Contents' in objects:
            for obj in objects['Contents']:
                s3_client.delete_object(Bucket=bucket_name, Key=obj['Key'])

        # Delete the bucket
        s3_client.delete_bucket(Bucket=bucket_name)
        print(f"Deleted bucket: {bucket_name}")
    except Exception as e:
        print(f"Error deleting bucket {bucket_name}: {e}")

def create_s3_bucket(bucket_name):
    try:
        s3_client.create_bucket(Bucket=bucket_name)
        print(f"Created bucket: {bucket_name}")
    except Exception as e:
        print(f"Error creating bucket {bucket_name}: {e}")


In [None]:
# Delete buckets if needed
delete_s3_bucket("warehouse")

In [None]:
# Create necessary S3 buckets
create_s3_bucket("warehouse")


## 3. Create an Iceberg Table

In [None]:
# Create a new Iceberg table
spark.sql("""
    CREATE TABLE IF NOT EXISTS local.default.customers (
        customer_id INT,
        name STRING,
        email STRING,
        age INT,
        city STRING
    )
    USING iceberg
""")

print("Table 'customers' created successfully!")

## 4. Insert Sample Data

In [None]:
# Insert sample data
spark.sql("""
    INSERT INTO local.default.customers VALUES
    (1, 'Alice Johnson', 'alice@example.com', 30, 'San Francisco'),
    (2, 'Bob Smith', 'bob@example.com', 25, 'New York'),
    (3, 'Charlie Brown', 'charlie@example.com', 35, 'Boston'),
    (4, 'Diana Prince', 'diana@example.com', 28, 'Seattle'),
    (5, 'Eve Wilson', 'eve@example.com', 32, 'Austin')
""")

print("Sample data inserted!")

## 5. Query the Table

In [None]:
# Query the table
result = spark.sql("SELECT * FROM local.default.customers")
result.show()

# Display as pandas DataFrame
result.toPandas()

## 6. Iceberg Features - Time Travel

In [None]:
# Update data (creates a new version)
spark.sql("""
    UPDATE local.default.customers
    SET age = 31
    WHERE customer_id = 1
""")

print("Data updated!")

# Query current version
print("\nCurrent version:")
spark.sql("SELECT * FROM local.default.customers WHERE customer_id = 1").show()

## 7. Table Statistics and Metadata

In [None]:
# Get table statistics
spark.sql("SELECT * FROM local.default.customers").describe().show()

# Count records
count = spark.sql("SELECT COUNT(*) as record_count FROM local.default.customers").collect()[0][0]
print(f"\nTotal records: {count}")

## 8. Analytics Example

In [None]:
# Analytics queries
spark.sql("""
    SELECT
        city,
        COUNT(*) as customer_count,
        AVG(age) as avg_age,
        MIN(age) as min_age,
        MAX(age) as max_age
    FROM local.default.customers
    GROUP BY city
    ORDER BY customer_count DESC
""").show()