In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
# In my local, I've already install pyspark 3.5.3 to test
# So the version of jars will be different from jars to run spark in container
jars_dir = "jars"
jars = [
    os.path.join(jars_dir, "iceberg-spark-runtime-3.5_2.12-1.5.0.jar"),
    os.path.join(jars_dir, "hadoop-aws-3.3.4.jar"),
    os.path.join(jars_dir, "aws-java-sdk-bundle-1.12.262.jar")
]

In [3]:
spark = SparkSession.builder \
    .appName("IcebergDemo") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hive") \
    .config("spark.sql.catalog.iceberg.uri", "thrift://localhost:9083") \
    .config("spark.sql.catalog.iceberg.warehouse", "s3a://lakehouse") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.jars", ",".join(jars)) \
    .getOrCreate()

In [4]:
spark.sql("CREATE SCHEMA IF NOT EXISTS iceberg.demo_db")
spark.sql("DROP TABLE IF EXISTS iceberg.demo_db.orders")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg.demo_db.orders (
    order_id INT,
    customer STRING,
    total DOUBLE
)
USING ICEBERG
PARTITIONED BY (customer)
""")


DataFrame[]

In [5]:
# Commit 1
spark.sql("""
INSERT INTO iceberg.demo_db.orders VALUES
(101, 'Alice', 150.0),
(102, 'Bob', 200.0)
""")

DataFrame[]

In [6]:
# Commit 2
spark.sql("""
INSERT INTO iceberg.demo_db.orders VALUES
(103, 'Alice', 120.0),
(104, 'Bob', 80.0)
""")

DataFrame[]

In [7]:
# Commit 3
spark.sql("""
INSERT INTO iceberg.demo_db.orders VALUES
(105, 'Charlie', 300.0)
""")

DataFrame[]

In [8]:
# Select data from the latest snapshot
spark.sql("SELECT * FROM iceberg.demo_db.orders").show()


+--------+--------+-----+
|order_id|customer|total|
+--------+--------+-----+
|     105| Charlie|300.0|
|     103|   Alice|120.0|
|     101|   Alice|150.0|
|     102|     Bob|200.0|
|     104|     Bob| 80.0|
+--------+--------+-----+



In [9]:
# View the snapshot history of the table
spark.sql("SELECT * FROM iceberg.demo_db.orders.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-08-18 23:08:...|1300535625166134230|               NULL|   append|s3a://lakehouse/d...|{spark.app.id -> ...|
|2025-08-18 23:08:...|8976814679347534875|1300535625166134230|   append|s3a://lakehouse/d...|{spark.app.id -> ...|
|2025-08-18 23:08:...|6989084205877280051|8976814679347534875|   append|s3a://lakehouse/d...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [11]:
snapshot_id_orders = 1300535625166134230
spark.sql(f"SELECT * FROM iceberg.demo_db.orders FOR VERSION AS OF {snapshot_id_orders}").show()

+--------+--------+-----+
|order_id|customer|total|
+--------+--------+-----+
|     101|   Alice|150.0|
|     102|     Bob|200.0|
+--------+--------+-----+

