In [13]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

# This CATALOG_URL works for the "docker compose" testing and development environment
# Change 'lakekeeper' if you are not running on "docker compose" (f. ex. 'localhost' if Lakekeeper is running locally).
CATALOG_URL = "http://lakekeeper:8181/catalog"
WAREHOUSE = "iceberg_warehouse"

SPARK_VERSION = pyspark.__version__
SPARK_MINOR_VERSION = '.'.join(SPARK_VERSION.split('.')[:2])
ICEBERG_VERSION = "1.6.1"

# Connect with Spark

In [14]:
config = {
    f"spark.sql.catalog.lakekeeper": "org.apache.iceberg.spark.SparkCatalog",
    f"spark.sql.catalog.lakekeeper.type": "rest",
    f"spark.sql.catalog.lakekeeper.uri": CATALOG_URL,
    f"spark.sql.catalog.lakekeeper.warehouse": WAREHOUSE,
    f"spark.sql.catalog.lakekeeper.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.defaultCatalog": "lakekeeper",
    "spark.jars.packages": f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_MINOR_VERSION}_2.12:{ICEBERG_VERSION},org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}",
}


In [15]:
spark_config = SparkConf().setMaster('local').setAppName("Iceberg-REST")
for k, v in config.items():
    spark_config = spark_config.set(k, v)

spark = SparkSession.builder.config(conf=spark_config).getOrCreate()

spark.sql("USE lakekeeper")

DataFrame[]

## Read and Write Tables

In [16]:
spark.sql("SHOW CATALOGS").show(truncate=False)
print(f"Current default catalog: {spark.catalog.currentCatalog()}")
spark.sql("SHOW DATABASES").show(truncate=False)
spark.sql("SHOW TABLES in icebergdata").show(truncate=False)

+-------------+
|catalog      |
+-------------+
|lakekeeper   |
|spark_catalog|
+-------------+

Current default catalog: lakekeeper
+-------------------+
|namespace          |
+-------------------+
|icebergdata        |
|pyiceberg_namespace|
|my_namespace       |
+-------------------+

+-----------+-------------------------------------------+-----------+
|namespace  |tableName                                  |isTemporary|
+-----------+-------------------------------------------+-----------+
|icebergdata|debezium_offset_storage_table              |false      |
|icebergdata|debeziumcdc_dbz__inventory_customers       |false      |
|icebergdata|debeziumcdc_dbz__inventory_orders          |false      |
|icebergdata|debeziumcdc_dbz__inventory_products        |false      |
|icebergdata|debeziumcdc_dbz__inventory_products_on_hand|false      |
|icebergdata|debeziumcdc_dbz__inventory_geom            |false      |
+-----------+-------------------------------------------+-----------+



In [18]:
# retrieve and print orders data
orders = spark.sql("select * from icebergdata.debeziumcdc_dbz__inventory_orders order by id desc")
orders.show(truncate=False)


+-----+----------+---------+--------+----------+---------+----+-------+-------------------+--------+
|id   |order_date|purchaser|quantity|product_id|__deleted|__op|__table|__source_ts_ns     |__db    |
+-----+----------+---------+--------+----------+---------+----+-------+-------------------+--------+
|10333|2025-06-17|1231     |8       |105       |false    |c   |orders |1762545825949497000|postgres|
|10332|2025-04-18|1230     |3       |109       |false    |c   |orders |1762545815928545000|postgres|
|10331|2025-07-18|1229     |3       |104       |false    |c   |orders |1762545805910891000|postgres|
|10330|2025-01-07|1229     |6       |102       |false    |c   |orders |1762545805909349000|postgres|
|10329|2025-08-01|1229     |9       |109       |false    |c   |orders |1762545805907815000|postgres|
|10328|2025-03-15|1228     |8       |101       |false    |c   |orders |1762545795885726000|postgres|
|10327|2025-08-16|1228     |9       |104       |false    |c   |orders |1762545795884002000|

In [None]:
# retrieve and print customers data
# customers = spark.sql("select id,input_file_name() as input_file  from debeziumevents.debeziumcdc_testc_inventory_customers")
customers = spark.sql("select *  from icebergdata.debeziumcdc_testc_inventory_customers order by 1 asc")
customers.limit(10).show(truncate=False)


In [None]:
customers.printSchema()

In [None]:
# retrieve and print offset
data = spark.sql("select * from icebergdata.debezium_offset_storage_table")
data.show(truncate=False)


In [None]:
# retrieve and print offset
data = spark.sql(
    "select * from icebergdata.debeziumcdc_testc_inventory_customers.history ORDER BY made_current_at DESC")
data.show(truncate=False)



In [None]:
# retrieve and print offset
data = spark.sql("select * from icebergdata.debeziumcdc_testc_inventory_customers.snapshots")
data.limit(4).show(truncate=False)

In [None]:
customers.printSchema()

In [None]:
customers.printSchema()