In [None]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pandas as pd

# This CATALOG_URL works for the "docker compose" testing and development environment
# Change 'lakekeeper' if you are not running on "docker compose" (f. ex. 'localhost' if Lakekeeper is running locally).
CATALOG_URL = "http://lakekeeper:8181/catalog"
WAREHOUSE = "demo"

SPARK_VERSION = pyspark.__version__
SPARK_MINOR_VERSION = '.'.join(SPARK_VERSION.split('.')[:2])
ICEBERG_VERSION = "1.9.1"

# Connect with Spark

In [None]:
config = {
    f"spark.sql.catalog.lakekeeper": "org.apache.iceberg.spark.SparkCatalog",
    f"spark.sql.catalog.lakekeeper.type": "rest",
    f"spark.sql.catalog.lakekeeper.uri": CATALOG_URL,
    f"spark.sql.catalog.lakekeeper.warehouse": WAREHOUSE,
    f"spark.sql.catalog.lakekeeper.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.defaultCatalog": "lakekeeper",
    "spark.jars.packages": f"com.amazonaws:aws-java-sdk-bundle:1.12.262,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-{SPARK_MINOR_VERSION}_2.12:{ICEBERG_VERSION},org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}",
    # These credentials are used to store the Checkpoint. Table IO is still done via vended-credentials from Lakekeeper
    "spark.hadoop.fs.s3a.access.key": "minio-root-user",
    "spark.hadoop.fs.s3a.secret.key": "minio-root-password",
    "spark.hadoop.fs.s3a.endpoint": "http://minio:9000",
    "spark.hadoop.fs.s3a.path.style.access": "true"
}


In [None]:
spark_config = SparkConf().setMaster('local').setAppName("Iceberg-REST")
for k, v in config.items():
    spark_config = spark_config.set(k, v)

spark = SparkSession.builder.config(conf=spark_config).getOrCreate()

spark.sql("USE lakekeeper")

## Read and Write Tables

In [None]:
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS streaming_namespace")
spark.sql("SHOW NAMESPACES").toPandas()

In [None]:
import pyspark.sql.functions as F
stream = (spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", 1)
  .load()
)
stream = stream.withColumn("result", F.col("value") + F.lit(1))

In [None]:
(stream.writeStream
    .format("iceberg")
    .outputMode("append")
    .option("checkpointLocation", "s3a://examples/my-checkpoint/location/")
    .toTable("streaming_namespace.streaming_table")
)

In [None]:
spark.sql("SELECT * FROM my_namespace.streaming_table").toPandas()