### Import PySpark and Create SparkSession
 - The SparkSession should be automatically configured by PYSPARK_SUBMIT_ARGS
 - defined in docker-compose.yml for the jupyterlab service.

In [1]:
from pyspark.sql import SparkSession
import os

In [2]:
import pyspark

In [3]:
pyspark.__version__

'3.5.3'

### Create SparkSession, manually set configs

In [4]:
spark = (SparkSession.builder \
    .master("spark://spark-master:7077") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.iceberg_jdbc", "org.apache.iceberg.spark.SparkCatalog") \
    # .config("spark.sql.catalog.iceberg_jdbc.catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog") \
    .config("spark.sql.catalog.iceberg_jdbc.uri", "jdbc:postgresql://postgres_catalog:5432/iceberg_catalog") \
    .config("spark.sql.catalog.iceberg_jdbc.jdbc.user", "iceberg") \
    .config("spark.sql.catalog.iceberg_jdbc.jdbc.password", "icebergpassword") \
    .config("spark.sql.catalog.iceberg_jdbc.warehouse", "s3a://iceberg-warehouse/") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
                     ).getOrCreate()

:: loading settings :: url = jar:file:/opt/conda/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-115d588d-2513-425f-8f2c-f64df4972beb;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.9.0 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.postgresql#postgresql;42.6.0 in central
	found org.checkerframework#checker-qual;3.31.0 in central
:: resolution report :: resolve 121ms :: artifacts dl 5ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.apache.iceber

In [5]:
print("SparkSession created successfully!")
print(f"Spark version: {spark.version}")
# Verify Iceberg catalog configuration - Should show default once catalog is used
spark.sql("SHOW CURRENT NAMESPACE").show()
# Define the catalog name we configured

SparkSession created successfully!
Spark version: 3.5.3
+-------------+---------+
|      catalog|namespace|
+-------------+---------+
|spark_catalog|  default|
+-------------+---------+



In [6]:
iceberg_catalog_name = "iceberg_jdbc" # Must match spark.sql.catalog.iceberg_jdbc in config

Create a Database/Schema in Iceberg using Spark

In [7]:
db_name = "spark_schema"

In [8]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {iceberg_catalog_name}.{db_name}").show()
spark.sql(f"USE {iceberg_catalog_name}.{db_name}")
spark.sql(f"SHOW DATABASES IN {iceberg_catalog_name}").show()
# In Spark, `DATABASE` and `SCHEMA` are often used interchangeably.
# Iceberg uses `NAMESPACE`.

++
||
++
++

+------------+
|   namespace|
+------------+
|   my_schema|
|spark_schema|
+------------+



In [9]:
db_name='my_schema'

 Create an Iceberg Table with Spark SQL

In [10]:
table_name = "spark_orders"
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {iceberg_catalog_name}.{db_name}.{table_name} (
    order_id STRING,
    customer_id STRING,
    order_date DATE,
    amount DECIMAL(10, 2),
    category STRING
)
USING iceberg
PARTITIONED BY (category, days(order_date)) -- Column partitioning and hidden partitioning by day
TBLPROPERTIES (
    'write.format.default'='parquet',
    'format-version'='2'
)
""").show()

25/05/14 11:33:52 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


++
||
++
++



In [11]:
spark.sql(f"SHOW TABLES IN {iceberg_catalog_name}.{db_name}").show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|my_schema|spark_orders|      false|
+---------+------------+-----------+



Insert Data using Spark SQL

In [12]:
spark.sql(f"""
INSERT INTO {iceberg_catalog_name}.{db_name}.{table_name} VALUES
('ORD001', 'CUST101', DATE '2023-01-15', 100.50, 'electronics'),
('ORD002', 'CUST102', DATE '2023-01-16', 75.20, 'books'),
('ORD003', 'CUST101', DATE '2023-01-16', 250.00, 'electronics'),
('ORD004', 'CUST103', DATE '2023-01-17', 45.99, 'home'),
('ORD005', 'CUST102', DATE '2023-01-18', 120.00, 'books')
""")
print(f"Data inserted into {db_name}.{table_name}")

                                                                                

Data inserted into my_schema.spark_orders


In [13]:
print(f"Querying all data from {db_name}.{table_name}:")
spark.sql(f"SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name} ORDER BY order_date").show()

Querying all data from my_schema.spark_orders:
+--------+-----------+----------+------+-----------+
|order_id|customer_id|order_date|amount|   category|
+--------+-----------+----------+------+-----------+
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD005|    CUST102|2023-01-18|120.00|      books|
|  ORD005|    CUST102|2023-01-18|120.00|      books|
|  ORD005|    CUST102|2023-01-18|120.00|      books|

Select Data using Spark SQL

In [14]:
print(f"Querying all data from {db_name}.{table_name}:")
spark.sql(f"SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name} ORDER BY order_date").show()

Querying all data from my_schema.spark_orders:
+--------+-----------+----------+------+-----------+
|order_id|customer_id|order_date|amount|   category|
+--------+-----------+----------+------+-----------+
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD005|    CUST102|2023-01-18|120.00|      books|
|  ORD005|    CUST102|2023-01-18|120.00|      books|
|  ORD005|    CUST102|2023-01-18|120.00|      books|

### Querying electronics orders (demonstrating partition filter pushdown):

In [15]:
spark.sql(f"""
SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name}
WHERE category = 'electronics' AND order_date = DATE '2023-01-16'
""").show()

+--------+-----------+----------+------+-----------+
|order_id|customer_id|order_date|amount|   category|
+--------+-----------+----------+------+-----------+
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
+--------+-----------+----------+------+-----------+



To see the query plan:

In [21]:
print(spark.sql(f"""EXPLAIN SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name} 
          WHERE category = 'electronics' AND order_date = DATE '2023-01-16'""")\
.collect()[0][0])#.show(truncate=False)

== Physical Plan ==
*(1) Filter (order_date#291 = 2023-01-16)
+- *(1) ColumnarToRow
   +- BatchScan iceberg_jdbc.my_schema.spark_orders[order_id#289, customer_id#290, order_date#291, amount#292, category#293] iceberg_jdbc.my_schema.spark_orders (branch=null) [filters=category IS NOT NULL, order_date IS NOT NULL, category = 'electronics', order_date = 19373, groupedBy=] RuntimeFilters: []




### DataFrame API for Writing and Reading

In [22]:
from pyspark.sql.functions import col, to_date, lit
from pyspark.sql.types import StructType, StructField, StringType, DateType,FloatType

In [23]:
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("amount", FloatType(), True),
    StructField("category", StringType(), True)
])
data = [
    ('ORD006', 'CUST104', '2023-01-19', 300.20, 'electronics'),
    ('ORD007', 'CUST105', '2023-01-19', 22.2, 'books')
]

In [25]:
# Convert string dates to DateType for DataFrame creation
from datetime import datetime
data_typed = [(r[0], r[1], datetime.strptime(r[2], '%Y-%m-%d').date(), r[3], r[4]) for r in data]

In [26]:
data_typed

[('ORD006', 'CUST104', datetime.date(2023, 1, 19), 300.2, 'electronics'),
 ('ORD007', 'CUST105', datetime.date(2023, 1, 19), 22.2, 'books')]

In [27]:
new_orders_df = spark.createDataFrame(data_typed, schema=schema)
print("\nNew orders DataFrame:")
new_orders_df.show()


New orders DataFrame:
+--------+-----------+----------+------+-----------+
|order_id|customer_id|order_date|amount|   category|
+--------+-----------+----------+------+-----------+
|  ORD006|    CUST104|2023-01-19| 300.2|electronics|
|  ORD007|    CUST105|2023-01-19|  22.2|      books|
+--------+-----------+----------+------+-----------+



In [28]:
# Append data using DataFrameWriter
new_orders_df.writeTo(f"{iceberg_catalog_name}.{db_name}.{table_name}").append()

print(f"\nData after DataFrame append:")
spark.sql(f"""SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name}
           ORDER BY order_date, order_id""").show()


Data after DataFrame append:
+--------+-----------+----------+------+-----------+
|order_id|customer_id|order_date|amount|   category|
+--------+-----------+----------+------+-----------+
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD005|    CUST102|2023-01-18|120.00|      books|
|  ORD005|    CUST102|2023-01-18|120.00|      books|
|  ORD005|    CUST102|2023-01-18|120.00|      books|
|  ORD006|    CU

#### Iceberg Metadata - Snapshots, History, Manifests, Files

In [29]:
print(f"\nSnapshots for {db_name}.{table_name}:")
spark.sql(f"SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name}.snapshots").show()


Snapshots for my_schema.spark_orders:
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-05-14 11:24:...|1518423295053039432|               NULL|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-05-14 11:25:...|4748236537466741489|1518423295053039432|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-05-14 11:28:...|4489780900243996563|4748236537466741489|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-05-14 11:34:...|5204727257343880368|4489780900243996563|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-05-14 11:37:...|1428847637346319753|5204727257343880368|   append|s3a://iceberg-war...|{spark.app.id -> ...|
+--------------------+-------------------

In [31]:
print(f"\nManifests for {db_name}.{table_name}:")
spark.sql(f"SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name}.manifests").toPandas()


Manifests for my_schema.spark_orders:


Unnamed: 0,content,path,length,partition_spec_id,added_snapshot_id,added_data_files_count,existing_data_files_count,deleted_data_files_count,added_delete_files_count,existing_delete_files_count,deleted_delete_files_count,partition_summaries
0,0,s3a://iceberg-warehouse/my_schema/spark_orders...,7829,0,1428847637346319753,2,0,0,0,0,0,"[(False, False, books, electronics), (False, F..."
1,0,s3a://iceberg-warehouse/my_schema/spark_orders...,7946,0,5204727257343880368,5,0,0,0,0,0,"[(False, False, books, home), (False, False, 2..."
2,0,s3a://iceberg-warehouse/my_schema/spark_orders...,7830,0,4489780900243996563,2,0,0,0,0,0,"[(False, False, books, electronics), (False, F..."
3,0,s3a://iceberg-warehouse/my_schema/spark_orders...,7948,0,4748236537466741489,5,0,0,0,0,0,"[(False, False, books, home), (False, False, 2..."
4,0,s3a://iceberg-warehouse/my_schema/spark_orders...,7949,0,1518423295053039432,5,0,0,0,0,0,"[(False, False, books, home), (False, False, 2..."


In [34]:
print(f"\nData Files for {db_name}.{table_name}:")
spark.sql(f"SELECT file_path, record_count, partition FROM {iceberg_catalog_name}.{db_name}.{table_name}.files")\
.show()


Data Files for my_schema.spark_orders:
+--------------------+------------+--------------------+
|           file_path|record_count|           partition|
+--------------------+------------+--------------------+
|s3a://iceberg-war...|           1|{electronics, 202...|
|s3a://iceberg-war...|           1| {books, 2023-01-19}|
|s3a://iceberg-war...|           1| {books, 2023-01-16}|
|s3a://iceberg-war...|           1| {books, 2023-01-18}|
|s3a://iceberg-war...|           1|  {home, 2023-01-17}|
|s3a://iceberg-war...|           1|{electronics, 202...|
|s3a://iceberg-war...|           1|{electronics, 202...|
|s3a://iceberg-war...|           1|{electronics, 202...|
|s3a://iceberg-war...|           1| {books, 2023-01-19}|
|s3a://iceberg-war...|           1| {books, 2023-01-16}|
|s3a://iceberg-war...|           1| {books, 2023-01-18}|
|s3a://iceberg-war...|           1|  {home, 2023-01-17}|
|s3a://iceberg-war...|           1|{electronics, 202...|
|s3a://iceberg-war...|           1|{electronics,

In [36]:
print(f"\nPartitions for {db_name}.{table_name}:")
# This shows how data is partitioned based on `category` and `order_date_day` (hidden transform)
spark.sql(f"SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name}.partitions")\
    .toPandas()


Partitions for my_schema.spark_orders:


Unnamed: 0,partition,spec_id,record_count,file_count,total_data_file_size_in_bytes,position_delete_record_count,position_delete_file_count,equality_delete_record_count,equality_delete_file_count,last_updated_at,last_updated_snapshot_id
0,"(electronics, 2023-01-19)",0,2,2,3310,0,0,0,0,2025-05-14 11:37:30.245,1428847637346319753
1,"(books, 2023-01-19)",0,2,2,3226,0,0,0,0,2025-05-14 11:37:30.245,1428847637346319753
2,"(books, 2023-01-16)",0,3,3,4689,0,0,0,0,2025-05-14 11:34:04.792,5204727257343880368
3,"(books, 2023-01-18)",0,3,3,4692,0,0,0,0,2025-05-14 11:34:04.792,5204727257343880368
4,"(home, 2023-01-17)",0,3,3,4668,0,0,0,0,2025-05-14 11:34:04.792,5204727257343880368
5,"(electronics, 2023-01-15)",0,3,3,4818,0,0,0,0,2025-05-14 11:34:04.792,5204727257343880368
6,"(electronics, 2023-01-16)",0,3,3,4818,0,0,0,0,2025-05-14 11:34:04.792,5204727257343880368


#### Time Travel Queries

In [41]:
history_df = spark.sql(f"SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name}.history ORDER BY made_current_at")
history_list = history_df.collect()

In [42]:
history_df.show()

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2025-05-14 11:24:...|1518423295053039432|               NULL|               true|
|2025-05-14 11:25:...|4748236537466741489|1518423295053039432|               true|
|2025-05-14 11:28:...|4489780900243996563|4748236537466741489|               true|
|2025-05-14 11:34:...|5204727257343880368|4489780900243996563|               true|
|2025-05-14 11:37:...|1428847637346319753|5204727257343880368|               true|
+--------------------+-------------------+-------------------+-------------------+



In [43]:
snapshots_df = spark.sql(f"SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name}.snapshots ORDER BY committed_at")
snapshot_list = snapshots_df.collect()

In [44]:
if len(snapshot_list) > 1:
    # Get the snapshot ID of the first insert operation (before the DataFrame append)
    # Assuming the first operation was the SQL INSERT and second was DataFrame append
    first_op_snapshot_id = history_list[0]["snapshot_id"] # The very first snapshot after table creation might be empty if no data was inserted then.
                                                            # The first data snapshot is what we usually want.
    
    # Find the snapshot *after* the first batch of INSERTs
    # The first row is the earliest snapshot.
    if len(snapshot_list) >= 2 and snapshot_list[1]["operation"] == "append": # Assuming first user data insert is an append
            target_snapshot_id = snapshot_list[1]["snapshot_id"] # This would be after the first SQL INSERT
            print(f"\nQuerying table state AS OF VERSION {target_snapshot_id} (after initial SQL INSERT):")
            spark.read.option("snapshot-id", target_snapshot_id)\
                .table(f"{iceberg_catalog_name}.{db_name}.{table_name}")\
                .orderBy("order_date", "order_id").show()


Querying table state AS OF VERSION 4748236537466741489 (after initial SQL INSERT):
+--------+-----------+----------+------+-----------+
|order_id|customer_id|order_date|amount|   category|
+--------+-----------+----------+------+-----------+
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD005|    CUST102|2023-01-18|120.00|      books|
|  ORD005|    CUST102|2023-01-18|120.00|      books|
+--------+-----------+----------+------+-----------+



In [45]:
latest_snapshot_id = snapshot_list[-1]["snapshot_id"]
print(f"\nQuerying table state AS OF LATEST SNAPSHOT {latest_snapshot_id} (current state):")
spark.read.table(f"{iceberg_catalog_name}.{db_name}.{table_name}").orderBy("order_date", "order_id").toPandas()


Querying table state AS OF LATEST SNAPSHOT 1428847637346319753 (current state):


Unnamed: 0,order_id,customer_id,order_date,amount,category
0,ORD001,CUST101,2023-01-15,100.5,electronics
1,ORD001,CUST101,2023-01-15,100.5,electronics
2,ORD001,CUST101,2023-01-15,100.5,electronics
3,ORD002,CUST102,2023-01-16,75.2,books
4,ORD002,CUST102,2023-01-16,75.2,books
5,ORD002,CUST102,2023-01-16,75.2,books
6,ORD003,CUST101,2023-01-16,250.0,electronics
7,ORD003,CUST101,2023-01-16,250.0,electronics
8,ORD003,CUST101,2023-01-16,250.0,electronics
9,ORD004,CUST103,2023-01-17,45.99,home


In [48]:
# Time travel using timestamp (requires made_current_at timestamp)
if len(snapshot_list) >= 2:
    timestamp_before_last_append = history_list[-2]["made_current_at"] # Timestamp of the snapshot before the last one
    print(f"\nQuerying table state AS OF TIMESTAMP '{timestamp_before_last_append}':")
    spark.read.option("as-of-timestamp", str(int(timestamp_before_last_append.timestamp() * 1000))) \
         .table(f"{iceberg_catalog_name}.{db_name}.{table_name}") \
         .orderBy("order_date", "order_id").show()


Querying table state AS OF TIMESTAMP '2025-05-14 11:34:04.792000':
+--------+-----------+----------+------+-----------+
|order_id|customer_id|order_date|amount|   category|
+--------+-----------+----------+------+-----------+
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD002|    CUST102|2023-01-16| 75.20|      books|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD004|    CUST103|2023-01-17| 45.99|       home|
|  ORD005|    CUST102|2023-01-18|120.00|      books|
|  ORD005|    CUST102|2023-01-18|120.00|      books|
|  ORD005|    CUST102|2023-01-1

Schema Evolution (Example: Add a new column)

In [49]:
print(f"\nSchema before evolution:")
spark.sql(f"DESCRIBE {iceberg_catalog_name}.{db_name}.{table_name}").show()


Schema before evolution:
+--------------+----------------+-------+
|      col_name|       data_type|comment|
+--------------+----------------+-------+
|      order_id|          string|   NULL|
|   customer_id|          string|   NULL|
|    order_date|            date|   NULL|
|        amount|   decimal(10,2)|   NULL|
|      category|          string|   NULL|
|              |                |       |
|# Partitioning|                |       |
|        Part 0|        category|       |
|        Part 1|days(order_date)|       |
+--------------+----------------+-------+



In [50]:
# Add a new column 'is_returned'
spark.sql(f"ALTER TABLE {iceberg_catalog_name}.{db_name}.{table_name} ADD COLUMN is_returned BOOLEAN")
print(f"\nSchema after adding 'is_returned' column:")
spark.sql(f"DESCRIBE {iceberg_catalog_name}.{db_name}.{table_name}").show()


Schema after adding 'is_returned' column:
+--------------+----------------+-------+
|      col_name|       data_type|comment|
+--------------+----------------+-------+
|      order_id|          string|   NULL|
|   customer_id|          string|   NULL|
|    order_date|            date|   NULL|
|        amount|   decimal(10,2)|   NULL|
|      category|          string|   NULL|
|   is_returned|         boolean|   NULL|
|              |                |       |
|# Partitioning|                |       |
|        Part 0|        category|       |
|        Part 1|days(order_date)|       |
+--------------+----------------+-------+



In [51]:
# Insert data with the new column
spark.sql(f"""
INSERT INTO {iceberg_catalog_name}.{db_name}.{table_name}
VALUES ('ORD008', 'CUST101', DATE '2023-01-20', 55.00, 'home', true)
""")

print(f"\nData after inserting with new column (old rows will have null for 'is_returned'):")
spark.sql(f"""SELECT order_id, category, order_date, amount, is_returned 
          FROM {iceberg_catalog_name}.{db_name}.{table_name} 
          ORDER BY order_date, order_id""")\
        .show()


Data after inserting with new column (old rows will have null for 'is_returned'):
+--------+-----------+----------+------+-----------+
|order_id|   category|order_date|amount|is_returned|
+--------+-----------+----------+------+-----------+
|  ORD001|electronics|2023-01-15|100.50|       NULL|
|  ORD001|electronics|2023-01-15|100.50|       NULL|
|  ORD001|electronics|2023-01-15|100.50|       NULL|
|  ORD002|      books|2023-01-16| 75.20|       NULL|
|  ORD002|      books|2023-01-16| 75.20|       NULL|
|  ORD002|      books|2023-01-16| 75.20|       NULL|
|  ORD003|electronics|2023-01-16|250.00|       NULL|
|  ORD003|electronics|2023-01-16|250.00|       NULL|
|  ORD003|electronics|2023-01-16|250.00|       NULL|
|  ORD004|       home|2023-01-17| 45.99|       NULL|
|  ORD004|       home|2023-01-17| 45.99|       NULL|
|  ORD004|       home|2023-01-17| 45.99|       NULL|
|  ORD005|      books|2023-01-18|120.00|       NULL|
|  ORD005|      books|2023-01-18|120.00|       NULL|
|  ORD005|      


Data Compaction (Rewrite Data Files - Small File Compaction)
- Iceberg procedures are called using CALL
- Insert some more data to potentially create small files

In [53]:
spark.sql(f"""INSERT INTO {iceberg_catalog_name}.{db_name}.{table_name} 
          VALUES ('ORD009', 'CUST106', DATE '2023-01-21', 10.00, 'books', false)""")
spark.sql(f"""INSERT INTO {iceberg_catalog_name}.{db_name}.{table_name}
          VALUES ('ORD010', 'CUST106', DATE '2023-01-21', 12.00, 'books', false)""")

print(f"\nData Files before compaction for {db_name}.{table_name} (partition: category=books):")
spark.sql(f"""
    SELECT file_path, record_count, file_size_in_bytes
    FROM {iceberg_catalog_name}.{db_name}.{table_name}.files
    WHERE partition.category = 'books'
""").show(truncate=False)


Data Files before compaction for my_schema.spark_orders (partition: category=books):
+----------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
|file_path                                                                                                                                                 |record_count|file_size_in_bytes|
+----------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
|s3a://iceberg-warehouse/my_schema/spark_orders/data/category=books/order_date_day=2023-01-21/00000-79-4e53f015-5157-4256-9f88-3d64c3ff9e86-0-00001.parquet|1           |1799              |
|s3a://iceberg-warehouse/my_schema/spark_orders/data/category=books/order_date_day=2023-01-21/00000-77-439017ff-e0bc-43c1-83bb-ea2b2eb9e833-0-00001.parquet|1 

Execute rewrite_data_files procedure for compaction
- This will compact small files into larger ones, default strategy is 'sort' which also sorts data within files
- For very small tables, the effect might be limited or group all into one file per partition.

In [54]:
print(f"\nRunning data compaction (rewrite_data_files) for table {db_name}.{table_name}...")
# You can specify sorting options or a where clause to limit compaction scope
result_df = spark.sql(f"""CALL {iceberg_catalog_name}.system.rewrite_data_files(
                      table => '{db_name}.{table_name}', 
                      strategy => 'sort', sort_order => 'order_date ASC, amount DESC', 
                      options => map('min-input-files','1'))""")
# options => map('min-input-files','1') to force compaction even with few files for demo
result_df.show()
print("Compaction procedure finished.")


Running data compaction (rewrite_data_files) for table my_schema.spark_orders...
+--------------------------+----------------------+---------------------+-----------------------+
|rewritten_data_files_count|added_data_files_count|rewritten_bytes_count|failed_data_files_count|
+--------------------------+----------------------+---------------------+-----------------------+
|                        22|                     8|                35616|                      0|
+--------------------------+----------------------+---------------------+-----------------------+

Compaction procedure finished.


In [55]:
print(f"\nData Files after compaction for {db_name}.{table_name} (partition: category=books):")
spark.sql(f"""
    SELECT file_path, record_count, file_size_in_bytes
    FROM {iceberg_catalog_name}.{db_name}.{table_name}.files
    WHERE partition.category = 'books'
""").show(truncate=False)


Data Files after compaction for my_schema.spark_orders (partition: category=books):
+----------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
|file_path                                                                                                                                                 |record_count|file_size_in_bytes|
+----------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
|s3a://iceberg-warehouse/my_schema/spark_orders/data/category=books/order_date_day=2023-01-19/00000-96-40d32e29-a242-4246-9b55-f3da68637cc8-0-00001.parquet|2           |2065              |
|s3a://iceberg-warehouse/my_schema/spark_orders/data/category=books/order_date_day=2023-01-18/00000-92-dae44c0f-791e-4dcb-b422-c65da4ab80e1-0-00001.parquet|3  

In [57]:
print(f"\nSnapshots after compaction:")
spark.sql(f"SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name}.snapshots ORDER BY committed_at DESC").show()


Snapshots after compaction:
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-05-14 11:46:...|6517193885541184603|7894115058018408800|  replace|s3a://iceberg-war...|{added-data-files...|
|2025-05-14 11:46:...|7894115058018408800|3235839622240604194|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-05-14 11:46:...|3235839622240604194|4314309291397521213|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-05-14 11:45:...|4314309291397521213|6075184588584117234|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-05-14 11:45:...|6075184588584117234|1428847637346319753|   append|s3a://iceberg-war...|{spark.app.id -> ...|
|2025-05-14 11:37:...|1428847637346319753|520472725

Filter Pushdown Demonstration
- The table is partitioned by (category, days(order_date))
- Spark should be able to push down filters on `category` and `order_date`.

In [58]:
query_with_filters = f"""
SELECT * FROM {iceberg_catalog_name}.{db_name}.{table_name}
WHERE category = 'electronics' AND order_date >= DATE '2023-01-15' AND order_date < DATE '2023-01-20'
"""
print("\nQuery with partition column filters:")
spark.sql(query_with_filters).show()


Query with partition column filters:
+--------+-----------+----------+------+-----------+-----------+
|order_id|customer_id|order_date|amount|   category|is_returned|
+--------+-----------+----------+------+-----------+-----------+
|  ORD003|    CUST101|2023-01-16|250.00|electronics|       NULL|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|       NULL|
|  ORD003|    CUST101|2023-01-16|250.00|electronics|       NULL|
|  ORD006|    CUST104|2023-01-19|300.20|electronics|       NULL|
|  ORD006|    CUST104|2023-01-19|300.20|electronics|       NULL|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|       NULL|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|       NULL|
|  ORD001|    CUST101|2023-01-15|100.50|electronics|       NULL|
+--------+-----------+----------+------+-----------+-----------+



In [62]:
print("\nEXPLAIN plan for the query (look for PushedFilters in ParquetScan or IcebergScan):")
print(spark.sql(f"EXPLAIN {query_with_filters}").collect()[0][0])


EXPLAIN plan for the query (look for PushedFilters in ParquetScan or IcebergScan):
== Physical Plan ==
*(1) ColumnarToRow
+- BatchScan iceberg_jdbc.my_schema.spark_orders[order_id#1854, customer_id#1855, order_date#1856, amount#1857, category#1858, is_returned#1859] iceberg_jdbc.my_schema.spark_orders (branch=null) [filters=category IS NOT NULL, order_date IS NOT NULL, category = 'electronics', order_date >= 19372, order_date < 19377, groupedBy=] RuntimeFilters: []




Interoperability Check - Read data created by Trino (if my_schema.employees exists)
 - Ensure the Trino notebook (01-trino-iceberg-getting-started.ipynb) has been run to create this table.

In [63]:
spark.conf.set("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog") 
spark.conf.set("spark.sql.catalog.iceberg_catalog.uri", "jdbc:postgresql://postgres_catalog:5432/iceberg_catalog")
spark.conf.set("spark.sql.catalog.iceberg_catalog.jdbc.user", "iceberg") 
spark.conf.set("spark.sql.catalog.iceberg_catalog.jdbc.password", "icebergpassword") 
spark.conf.set("spark.sql.catalog.iceberg_catalog.warehouse", "s3a://iceberg-warehouse/")

In [64]:
spark.catalog.setCurrentCatalog('iceberg_catalog')

In [65]:
spark.catalog.listCatalogs()

[CatalogMetadata(name='default_cache_iceberg', description=None),
 CatalogMetadata(name='iceberg_catalog', description=None),
 CatalogMetadata(name='iceberg_jdbc', description=None),
 CatalogMetadata(name='spark_catalog', description=None)]

In [69]:
iceberg_trino_catalog_name='iceberg_catalog'
trino_table_name = "my_schema.employees" # From Trino notebook

In [70]:
print(f"\nAttempting to read Trino-created table: {iceberg_trino_catalog_name}.{trino_table_name}")
# Spark needs to know the schema. If Trino created it, Spark should discover it via the JDBC catalog.
spark.sql(f"DESCRIBE {iceberg_trino_catalog_name}.{trino_table_name}").show()
spark.sql(f"SELECT * FROM {iceberg_trino_catalog_name}.{trino_table_name} LIMIT 5").show()
print("Successfully read Trino-created table with Spark.")


Attempting to read Trino-created table: iceberg_catalog.my_schema.employees
+--------------------+-------------+-------+
|            col_name|    data_type|comment|
+--------------------+-------------+-------+
|                  id|          int|   NULL|
|                name|       string|   NULL|
|          department|       string|   NULL|
|              salary|decimal(10,2)|   NULL|
|           hire_date|         date|   NULL|
|# Partition Infor...|             |       |
|          # col_name|    data_type|comment|
|          department|       string|   NULL|
+--------------------+-------------+-------+

+---+-------------+-----------+--------+----------+
| id|         name| department|  salary| hire_date|
+---+-------------+-----------+--------+----------+
|  7|George Yellow|         HR|65000.00|2023-03-15|
|  3|Charlie Brown|         HR|70000.00|2021-03-10|
|  7|George Yellow|         HR|65000.00|2023-03-15|
|  1|  Alice Smith|Engineering|90000.00|2020-01-15|
|  2|  Bob Johnson

In [71]:
# spark.stop() # Commented out to allow re-running cells easily
print("\nSpark Iceberg Datalakehouse Demo (Phase 2) completed.")


Spark Iceberg Datalakehouse Demo (Phase 2) completed.
