# Apache Iceberg Metadata Layers with PySpark
## A Complete Practical Guide

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import json

# Initialize Spark with Iceberg configuration
spark = SparkSession.builder \
    .appName("IcebergMetadataExploration") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "/home/jovyan/iceberg/warehouse") \
    .getOrCreate()

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 39780)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/usr/local/spark/python/pyspark/accumulators.py", line 271, in accum_updates
    num_updates =

## 1. Create Sample Iceberg Table

In [6]:
# Create database and partitioned table
spark.sql("CREATE DATABASE IF NOT EXISTS local.db")

spark.sql("DROP TABLE IF EXISTS local.db.sample")
spark.sql("""
CREATE TABLE local.db.sample (
    id bigint,
    data string,
    dt date)
USING iceberg
PARTITIONED BY (months(dt))
""")

DataFrame[]

## 2. Insert Data with Proper Date Formatting

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import datetime

# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Explicit schema
schema = StructType([
    StructField("id", LongType()),
    StructField("data", StringType()),
    StructField("dt", DateType())
])

# Convert dt to datetime.date
data = [
    (1, "A", datetime.date(2023, 1, 1)),
    (2, "B", datetime.date(2023, 1, 15)),
    (3, "C", datetime.date(2023, 2, 1))
]

# Create DataFrame directly with DateType
df = spark.createDataFrame(data, schema=schema)

# Now you can write to Iceberg
df.writeTo("local.db.sample").append()


## 3. View Table Metadata Structure

In [8]:
# Get table location
table_info = spark.sql("DESCRIBE EXTENDED local.db.sample").collect()
location = [row.data_type for row in table_info if row.col_name == "Location"][0]
print(f"Table stored at: {location}")

Table stored at: /home/jovyan/iceberg/warehouse/db/sample


## 4. Explore Metadata Files

In [9]:
# View metadata file versions
metadata_versions = spark.sql("SELECT * FROM local.db.sample.metadata_log_entries")
print("Metadata versions:")
metadata_versions.select("timestamp", "file").show(truncate=False)

Metadata versions:
+-----------------------+------------------------------------------------------------------+
|timestamp              |file                                                              |
+-----------------------+------------------------------------------------------------------+
|2025-07-12 06:04:04.675|/home/jovyan/iceberg/warehouse/db/sample/metadata/v1.metadata.json|
|2025-07-12 06:04:11.028|/home/jovyan/iceberg/warehouse/db/sample/metadata/v2.metadata.json|
+-----------------------+------------------------------------------------------------------+



## 5. Examine Snapshots

In [12]:
spark.sql("SHOW TABLES IN local.db").show()


+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|       db|   sample|      false|
+---------+---------+-----------+



In [11]:
# Query Iceberg metadata table: snapshots
snapshots = spark.sql("SELECT * FROM local.db.sample.snapshots")

# Display snapshot details
snapshots.select("snapshot_id", "timestamp", "operation", "manifest_list").show(truncate=False)


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `timestamp` cannot be resolved. Did you mean one of the following? [`operation`, `parent_id`, `summary`, `committed_at`, `manifest_list`].;
'Project [snapshot_id#92L, 'timestamp, operation#94, manifest_list#95]
+- Project [committed_at#91, snapshot_id#92L, parent_id#93L, operation#94, manifest_list#95, summary#96]
   +- SubqueryAlias local.db.sample.snapshots
      +- RelationV2[committed_at#91, snapshot_id#92L, parent_id#93L, operation#94, manifest_list#95, summary#96] local.db.sample.snapshots local.db.sample.snapshots


## 6. Inspect Manifest Lists

In [None]:
# View manifest lists
manifests = spark.sql("SELECT * FROM local.db.sample.manifests")
print("Manifest lists:")
manifests.show(truncate=False)

## 7. Examine Data Files

In [None]:
# View all data files
data_files = spark.sql("SELECT * FROM local.db.sample.files")
print("Data files in manifests:")
data_files.select("file_path", "partition", "record_count").show(truncate=False)

## 8. Schema Evolution Example

In [None]:
# Add new column
spark.sql("ALTER TABLE local.db.sample ADD COLUMN new_col string")

# Insert data with new schema (with proper date handling)
new_data = [
    (4, "D", "2023-02-15", "value1"),
    (5, "E", "2023-03-01", "value2")
]

new_schema = StructType([
    StructField("id", LongType()),
    StructField("data", StringType()),
    StructField("dt", DateType()),
    StructField("new_col", StringType())
])

new_df = spark.createDataFrame(new_data, schema=new_schema)
new_df = new_df.withColumn("dt", to_date(col("dt")))
new_df.writeTo("local.db.sample").append()

# Verify schema change
print("Updated schema:")
spark.sql("DESCRIBE local.db.sample").show()

## 9. Time Travel Demonstration

In [None]:
# Get all snapshot IDs
snapshot_ids = [row.snapshot_id for row in spark.sql("SELECT snapshot_id FROM local.db.sample.snapshots").collect()]

# Query data at each snapshot
for snap_id in snapshot_ids:
    print(f"\nData at snapshot {snap_id}:")
    spark.read \
        .option("snapshot-id", snap_id) \
        .table("local.db.sample") \
        .show()

## 10. Metadata Optimization

In [None]:
# Compact data files
print("Running data file compaction...")
spark.sql("CALL local.system.rewrite_data_files(table => 'local.db.sample')")

# Expire old snapshots
print("\nExpiring old snapshots...")
spark.sql("CALL local.system.expire_snapshots(table => 'local.db.sample', older_than => TIMESTAMP '2023-01-01 00:00:00')")

# View results
print("\nFiles after optimization:")
spark.sql("SELECT * FROM local.db.sample.files").show(truncate=False)

## 11. Clean Up

In [None]:
# Drop the table
spark.sql("DROP TABLE local.db.sample")
spark.sql("DROP DATABASE local.db")

## Key Takeaways

1. **Date Handling Fixes**:
   - Always convert string dates to proper DateType before writing
   - Use explicit schemas when creating DataFrames
   - Prefer `to_date()` function for conversions

2. **Iceberg Metadata Layers**:
   - **Metadata Files**: Versioned JSON with table schema and snapshots
   - **Manifest Lists**: Index of manifest files with statistics
   - **Manifest Files**: List of data files with detailed metrics
   - **Data Files**: Actual Parquet/AVRO/ORC files

3. **Best Practices**:
   - Use DataFrame API with proper schemas for type safety
   - Regularly compact files and expire snapshots
   - Leverage time travel for data auditing