# PySpark with Iceberg Tables on Snowflake

Read and write Iceberg tables using your existing PySpark code.

---

## Prerequisites

Before running this notebook, you need an **external volume** configured in Snowflake. This requires:

1. **Cloud storage bucket** (S3, GCS, or Azure Blob)
2. **IAM role** with read/write access to the bucket
3. **External volume** in Snowflake pointing to that storage

If you haven't set this up yet, see: [Configure an external volume for Iceberg tables](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume)

---

**What this notebook demonstrates:**

1. Creating Iceberg tables in Snowflake
2. Reading Iceberg tables with PySpark DataFrame API
3. Transforming data and writing results back to Iceberg

## Configuration

Update these values for your environment.

In [None]:
# =============================================================================
# CONFIGURATION - Update these values for your environment
# =============================================================================

DATABASE = "SNOWFLAKE_EXAMPLE"              # Database to use
EXTERNAL_VOLUME = "sfe_iceberg_demo_vol"    # Your external volume name
DEMO_SCHEMA = "ICEBERG_SPARK_DEMO"          # Schema to create for this demo
BASE_PATH = "spark_demo"                    # Base path within your external volume

## Setup

Initialize Snowpark Connect and validate the configuration.

In [None]:
from snowflake import snowpark_connect
from snowflake.snowpark.context import get_active_session

# Initialize Spark session
spark = snowpark_connect.server.init_spark_session()

# Get Snowpark session for SQL DDL operations
session = get_active_session()

# Import PySpark AFTER initializing the session
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Validate external volume exists
try:
    session.sql(f"DESC EXTERNAL VOLUME {EXTERNAL_VOLUME}").collect()
    print(f"External volume '{EXTERNAL_VOLUME}' found")
except:
    raise ValueError(
        f"\n\nExternal volume '{EXTERNAL_VOLUME}' not found.\n"
        f"Update the EXTERNAL_VOLUME variable in the Configuration cell above.\n"
        f"Run: SHOW EXTERNAL VOLUMES to see available volumes."
    )

print(f"Spark version: {spark.version}")

---

## Create Iceberg Tables

First, we'll create Iceberg tables from Snowflake's sample TPC-H data. This uses SQL via the Snowpark session.

### Configure Schema for Iceberg

In [None]:
# Use the configured database and create schema for this demo
session.sql(f"USE DATABASE {DATABASE}").collect()
session.sql(f"CREATE SCHEMA IF NOT EXISTS {DEMO_SCHEMA}").collect()
session.sql(f"USE SCHEMA {DEMO_SCHEMA}").collect()

# Set the Iceberg catalog and external volume for the schema
session.sql(f"ALTER SCHEMA {DEMO_SCHEMA} SET CATALOG = 'SNOWFLAKE'").collect()
session.sql(f"ALTER SCHEMA {DEMO_SCHEMA} SET EXTERNAL_VOLUME = '{EXTERNAL_VOLUME}'").collect()

print(f"Schema '{DATABASE}.{DEMO_SCHEMA}' configured with external volume '{EXTERNAL_VOLUME}'")

### Create Iceberg Tables from Sample Data

In [None]:
# Create customers Iceberg table from TPC-H sample data
session.sql(f"""
    CREATE OR REPLACE ICEBERG TABLE CUSTOMERS
    BASE_LOCATION = '{BASE_PATH}/customers'
    AS SELECT 
        C_CUSTKEY as customer_id,
        C_NAME as name,
        C_MKTSEGMENT as market_segment,
        C_NATIONKEY as nation_id,
        C_ACCTBAL as account_balance
    FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER
    LIMIT 10000
""").collect()

# Create nations Iceberg table
session.sql(f"""
    CREATE OR REPLACE ICEBERG TABLE NATIONS
    BASE_LOCATION = '{BASE_PATH}/nations'
    AS SELECT 
        N_NATIONKEY as nation_id,
        N_NAME as nation_name,
        N_REGIONKEY as region_id
    FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.NATION
""").collect()

print("Iceberg tables created")

---

## Read Iceberg Tables with PySpark

Now the fun part - read those Iceberg tables using standard PySpark DataFrame API.

In [None]:
# Read Iceberg tables as Spark DataFrames
customers = spark.table("CUSTOMERS")
nations = spark.table("NATIONS")

print(f"Customers: {customers.count()} rows")
print(f"Nations: {nations.count()} rows")

customers.show(5)

In [None]:
nations.show()

---

## Transform Data

Standard PySpark transformations - joins, aggregations, window functions.

In [None]:
# Join customers with nations
# Note: Snowflake stores column names in uppercase, so use F.col() with uppercase names
customers_enriched = (
    customers
    .join(nations, F.col("CUSTOMERS.NATION_ID") == F.col("NATIONS.NATION_ID"), "left")
    .select(
        F.col("CUSTOMERS.CUSTOMER_ID").alias("customer_id"),
        F.col("CUSTOMERS.NAME").alias("name"),
        F.col("CUSTOMERS.MARKET_SEGMENT").alias("market_segment"),
        F.col("CUSTOMERS.ACCOUNT_BALANCE").alias("account_balance"),
        F.col("NATIONS.NATION_NAME").alias("nation_name")
    )
)

customers_enriched.show(10)

### Aggregations

In [None]:
# Aggregate by market segment and nation
segment_analysis = (
    customers_enriched
    .groupBy("market_segment", "nation_name")
    .agg(
        F.count("*").alias("customer_count"),
        F.sum("account_balance").alias("total_balance"),
        F.avg("account_balance").alias("avg_balance")
    )
    .orderBy(F.desc("total_balance"))
)

segment_analysis.show(15)

### Window Functions

In [None]:
# Rank customers within each market segment by account balance
segment_window = Window.partitionBy("market_segment").orderBy(F.desc("account_balance"))

top_customers = (
    customers_enriched
    .withColumn("rank_in_segment", F.row_number().over(segment_window))
    .filter(F.col("rank_in_segment") <= 3)
    .orderBy("market_segment", "rank_in_segment")
)

top_customers.show(15)

---

## Write Results to Iceberg

Save transformed data back to a new Iceberg table.

In [ ]:
# Write aggregated results to a new Iceberg table
segment_analysis.write.mode("overwrite").saveAsTable("SEGMENT_ANALYSIS")

# Verify the write
spark.table("SEGMENT_ANALYSIS").show(10)

---

## Summary

This notebook demonstrated:

| Operation | API |
|-----------|-----|
| Read Iceberg table | `spark.table("TABLE_NAME")` |
| Join tables | `.join()` |
| Aggregate | `.groupBy().agg()` |
| Window functions | `Window.partitionBy().orderBy()` |
| Write to Iceberg | `.write.mode("overwrite").saveAsTable()` |

All operations executed on Snowflake's compute engine, reading from and writing to Iceberg tables stored in your external volume.

---

## Cleanup (Optional)

Run this cell to remove the demo tables and schema.

In [None]:
# Uncomment to clean up
# session.sql("DROP ICEBERG TABLE IF EXISTS CUSTOMERS").collect()
# session.sql("DROP ICEBERG TABLE IF EXISTS NATIONS").collect()
# session.sql("DROP ICEBERG TABLE IF EXISTS SEGMENT_ANALYSIS").collect()
# session.sql(f"DROP SCHEMA IF EXISTS {DEMO_SCHEMA}").collect()
# print("Cleanup complete")