In [1]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.streaming import StreamingQuery

# This CATALOG_URL works for the "docker compose" testing and development environment
# Change 'lakekeeper' if you are not running on "docker compose" (f. ex. 'localhost' if Lakekeeper is running locally).
CATALOG_URL = "http://lakekeeper:8181/catalog"
WAREHOUSE = "demo"

SPARK_VERSION = pyspark.__version__
SPARK_MINOR_VERSION = '.'.join(SPARK_VERSION.split('.')[:2])
# Note: The driver memory should be within the bounds of the docker-compose.yaml 'services.jupyter.deploy.resources.[limits.memory,reservations.memory]'
# jupyter will allocate the 'reservations' and attempt to apply up to the limit if possible.
SPARK_DRIVER_MEMORY = '32G' # use total_docker_ram * .4
HADOOP_AWS_VERSION = "3.4.0"
ICEBERG_VERSION = "1.10.0"

In [2]:
# fix local directory writes
import os
import stat

# Create required directories
directories = ["/tmp/spark-local", "/tmp/s3a-buffer", "/tmp/spark-warehouse"]
for directory in directories:
    os.makedirs(directory, mode=0o777, exist_ok=True)
    os.chmod(directory, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)

In [2]:
config = {
    "spark.sql.catalog.lakekeeper": "org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.lakekeeper.type": "rest",
    "spark.sql.catalog.lakekeeper.uri": CATALOG_URL,
    "spark.sql.catalog.lakekeeper.warehouse": WAREHOUSE,
    "spark.sql.catalog.lakekeeper.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.defaultCatalog": "lakekeeper",
    "spark.driver.memory": SPARK_DRIVER_MEMORY,
    "spark.jars.packages": f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_MINOR_VERSION}_2.13:{ICEBERG_VERSION},org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION},org.apache.spark:spark-protobuf_2.13:{SPARK_VERSION},org.apache.hadoop:hadoop-aws:{HADOOP_AWS_VERSION}",
    # about zstd: Turn on zstd to make things highly compressed. Less size on disk, less IO bandwidth!
    # we want to use zstd for parquet: 
    "spark.sql.parquet.compression.codec": "zstd",
    "spark.sql.iceberg.planning.preserve-data-grouping": "true",
    # we also want to use zstd for iceberg datafiles
    "spark.sql.iceberg.compression-codec": "zstd",
    "spark.sql.iceberg.locality.enabled": "true",
    # note: merge-schema should be only set to true if you "trust" the upstream data producer
    "spark.sql.iceberg.merge-schema": "false",
    "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "spark.hadoop.fs.s3.endpoint": "http://minio:9000",
    "spark.hadoop.fs.s3.access.key": "minio-root-user",
    "spark.hadoop.fs.s3.secret.key": "minio-root-password",
    "spark.hadoop.fs.s3.path.style.access": "true",
    "spark.hadoop.fs.s3.connection.ssl.enabled": "false",
    "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "spark.hadoop.fs.s3a.endpoint": "http://minio:9000",
    "spark.hadoop.fs.s3a.access.key": "minio-root-user",
    "spark.hadoop.fs.s3a.secret.key": "minio-root-password",
    "spark.hadoop.fs.s3a.path.style.access": "true",
    "spark.hadoop.fs.s3a.connection.ssl.enabled": "false",
    "spark.hadoop.fs.s3a.committer.name": "magic",
    "spark.hadoop.fs.s3a.committer.magic.enabled": "true",
    "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true",
    "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a": "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
    "spark.hadoop.fs.s3a.buffer.dir": "/tmp",
    "spark.hadoop.fs.s3a.fast.upload": "true",
    "spark.hadoop.fs.s3a.fast.upload.buffer": "disk",
    "spark.local.dir": "/tmp/spark-local",
    "spark.worker.dir": "/tmp/spark-worker",
    "spark.sql.warehouse.dir": "/tmp/spark-warehouse",
}

In [3]:
spark_config = SparkConf().setMaster('local[*]').setAppName("Iceberg-REST")
for k, v in config.items():
    spark_config = spark_config.set(k, v)

spark: SparkSession = SparkSession.builder.config(conf=spark_config).getOrCreate()

In [4]:
spark.sql("USE lakekeeper")

DataFrame[]

In [5]:
spark.catalog.listDatabases()

[Database(name='icystreams', catalog='lakekeeper', description=None, locationUri='s3://examples/initial-warehouse/01998cb2-ac2e-7cb0-b341-ae5794445827')]

In [6]:
if int(spark.conf.get("spark.sql.shuffle.partitions")) > 100:
    print("reducing shuffle partitions to 32")
    spark.conf.set("spark.sql.shuffle.partitions", "32")

reducing shuffle partitions to 32


In [7]:
# this is set to the number of cores on the driver
spark.sparkContext.defaultParallelism

16

In [8]:
# if this cell fails, you need to go through the exercises in "first_steps-iceberg.ipynb"
catalog_namespace = 'icystreams'
spark.catalog.setCurrentDatabase('icystreams')
iceberg_table_name = 'ecomm'
iceberg_table_name_hp = "ecomm_hidden"

In [10]:
# uncomment and run to see what you have to work with. 
# use either of the two tables above for your streaming learning
# spark.sql(f"select count(*) as total from {catalog_namespace}.{iceberg_table_name}").show()

In [9]:
# Find the earliest snapshot
# if you blow away snapshots, then you can't really time travel anywhere :)
from datetime import datetime
from pyspark.sql.functions import asc, desc

def find_streaming_timestamp(namespace: str, table_name: str, from_earliest: bool = True) -> str :
    """
    Helps to find the correct commit timestamp for the snapshot to stream from.
    > Note: If you don't use the 'stream-from-timestamp' option, you will start streaming from 'latest' snapshot
    > So us the flag `from_earliest` to flip this behavior (if you want to!) 
    :param namespace: The namespace of the table
    :param table_name: The name of the table
    :param from_earliest: specify which snapshot to pull from (earliest, or latest). Remember, the 'stream-from-timestamp' option defaults to 'latest' 
    :return: The commit timestamp for the snapshot to read from
    """
    snapshots_df = (
        spark.read
        .table(f"{namespace}.{table_name}.snapshots")
        .orderBy(asc('committed_at') if from_earliest else desc('committed_at'))
    )
    if snapshots_df.count() == 0:
        raise ValueError(f"No snapshots found for table {namespace}.{table_name}")
    return str(int(snapshots_df.head()['committed_at'].timestamp() * 1000)) # like: 1759003232707


In [10]:
print(find_streaming_timestamp(catalog_namespace, iceberg_table_name_hp))

1759003232707


In [11]:
# create the streaming source dataframe.
# we are going to set up a rate-limited stream
# this is using 'streaming-max-files-per-micro-batch' and 'streaming-max-rows-per-micro-batch' to attempt to create a chunked stream
# this will be used to "manage" reading torrents of data without the dreaded "OutOfMemoryException"'s we are so fond of

# > Note: we know we have 109 million records, and each "day" contains over 100k records, so we will start ramping up the stream with 
# > a single file per micro-batch, and clamping at 10k records per trigger
# > this is also a good strategy if you are looking to create "wayyyy tooo much metadata!" - you can play with the throttle

spark.conf.set("spark.sql.adaptive.enabled", "false")

ecomm_streaming_source_df = (
    spark.readStream
    .format("iceberg")
    .option("streaming-max-files-per-micro-batch", "1")
    .option("streaming-max-rows-per-micro-batch", "10000")
    .option("streaming-skip-overwrite-snapshots", "true")
    .option("streaming-skip-delete-snapshots", "true")
    #.option("split-size", str(2 * 1024 * 1024)) # control how we split files as we read them
    .option("stream-from-timestamp", find_streaming_timestamp(
        catalog_namespace, 
        iceberg_table_name_hp)
    ) # takes a str(long) for unix epoch milliseconds
    .load(f"{catalog_namespace}.{iceberg_table_name_hp}")
)

In [12]:
ecomm_streaming_source_df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: float (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- event_date: date (nullable = true)


In [13]:
# light transforms, cause why not!
from pyspark.sql.functions import col, window, count

transformed_df = (
    ecomm_streaming_source_df
    .select("event_time", "event_type", "product_id", "category_id", "price", "user_id", "user_session")
    .withWatermark("event_time", "30 minutes")
    .select("event_time", "event_type", "product_id", "user_session", "user_id")
    .groupBy(window("event_time", "30 minutes"), "user_id", "product_id")
    .agg(count("event_type").alias('session_events'))
)

transformed_df.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- session_events: long (nullable = false)


In [14]:
def table_exists(table_name: str):
    return any(table.name == table_name for table in spark.catalog.listTables())

In [15]:
spark.catalog.listTables()

[Table(name='ecomm_hidden', catalog='lakekeeper', namespace=['icystreams'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='ecomm', catalog='lakekeeper', namespace=['icystreams'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='ecomm_hidden_other', catalog='lakekeeper', namespace=['icystreams'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='ecomm_hidden_aggregated', catalog='lakekeeper', namespace=['icystreams'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='ecomm_hidden_aggregated_new', catalog='lakekeeper', namespace=['icystreams'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='ecomm_hidden_aggregated_test', catalog='lakekeeper', namespace=['icystreams'], description=None, tableType='MANAGED', isTemporary=False)]

In [39]:
sink_table_name = f"{iceberg_table_name_hp}_aggregated_other_again"
table_exists(sink_table_name)

False

In [41]:
spark.sql(f"""
    CREATE OR REPLACE TABLE {catalog_namespace}.{sink_table_name} (
        {transformed_df.schema.toDDL()}
    ) USING iceberg
    LOCATION 's3://examples/initial-warehouse/01998cb2-ac2e-7cb0-b341-ae5794445827/01998cb4-24f4-75d2-b67a-f9a39a1e30cc'
""")

DataFrame[]

In [17]:
def get_table_location_from_describe(catalog_namespace, table_name):
    """
    Extract location from DESCRIBE EXTENDED output
    """
    table_df = spark.sql(f"DESCRIBE EXTENDED {catalog_namespace}.{table_name}")

    # Convert to list of rows for easier processing
    rows = table_df.collect()

    # Look for the Location in the detailed table information
    location = None
    in_detailed_section = False

    for row in rows:
        col_name = row['col_name']
        data_type = row['data_type']

        # Check if we've reached the detailed section
        if col_name == "# Detailed Table Information":
            in_detailed_section = True
            continue

        # Look for Location in the detailed section
        if in_detailed_section and col_name == "Location":
            location = data_type
            break

    return location

In [18]:
#spark.sql(f"""
#DESCRIBE TABLE EXTENDED {catalog_namespace}.{sink_table_name};
#""").show(100, False)

In [19]:
# this is a protected table property, so we can ignore it
#spark.sql(f"""
#ALTER TABLE {catalog_namespace}.{sink_table_name} 
#SET TBLPROPERTIES ('owner' = 'jovyan'); 
#""")

In [20]:
# Setup the required configurations and directories to easily run our structured streaming application
from pathlib import Path
checkpoint_dir = Path('/home/jovyan').joinpath('checkpoints').absolute()

app_name = 'streaming_ecomm_aggs_new'
app_version = 'v1'

# create a unique checkpoint directory so you can easily manage metadata for many tests/trials/etc
# also: it is simple to overwrite other peoples checkpoints - so if you're using UC Volumes (then ownership is clutch there)
app_checkpoint_dir = f"s3a://examples/applications/{app_name}/{app_version}/_checkpoints"
#app_checkpoint_dir = checkpoint_dir.joinpath(app_name, app_version, "_checkpoints")


In [21]:
app_checkpoint_dir

's3a://examples/applications/streaming_ecomm_aggs_new/v1/_checkpoints'

In [22]:
sink_table_name

'ecomm_hidden_aggregated_test'

In [25]:
#spark.sql(f"drop table {catalog_namespace}.{sink_table_name}")

In [45]:
# setup the streaming aggregation
spark.conf.set("spark.sql.iceberg.merge-schema", "true")
streaming_query: StreamingQuery = (
    transformed_df
    .writeStream
    .format("iceberg")
    .queryName("ecomm_windowed_aggs")
    .option("checkpointLocation", app_checkpoint_dir)
    .option("fanout-enabled", "true")
    .outputMode("append")
    .trigger(availableNow=True)  # 🚀 Goes as fast as possible while respecting your throttling limits!
    .toTable(f"lakekeeper.{catalog_namespace}.{sink_table_name}")
)

In [27]:
get_table_location_from_describe(catalog_namespace, sink_table_name)

's3://examples/initial-warehouse/01998cb2-ac2e-7cb0-b341-ae5794445827/01998d8d-50fa-7160-a3ba-be5698bb7e2b'

In [23]:
def convert_s3_to_s3a(location):
    """
    Convert s3:// URLs to s3a:// for Spark compatibility
    """
    if location and location.startswith("s3://"):
        return location.replace("s3://", "s3a://", 1)
    return location


In [25]:
print(f'/home/jovyan/datasets/iceberg/{sink_table_name}')

/home/jovyan/datasets/iceberg/ecomm_hidden_aggregated_test


In [44]:
#streaming_query: StreamingQuery = (
#    transformed_df
#    .writeStream
#    .format("iceberg")
#    .queryName("ecomm_windowed_aggs")
#    .option("checkpointLocation", app_checkpoint_dir)
#    .option("fanout-enabled", "true")
#    .option("path", f's3://examples/initial-warehouse/01998cb2-ac2e-7cb0-b341-ae5794445827/01998cb4-24f4-75d2-b67a-f9a39a1e30cc/')
#    .outputMode("complete")
#    .trigger(availableNow=True)
#    .start()
#)

Py4JJavaError: An error occurred while calling o371.start.
: org.apache.iceberg.exceptions.NoSuchTableException: Cannot find table for org.apache.iceberg.spark.PathIdentifier@55f29920.
	at org.apache.iceberg.spark.source.IcebergSource.getTable(IcebergSource.java:121)
	at org.apache.iceberg.spark.source.IcebergSource.inferPartitioning(IcebergSource.java:99)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:88)
	at org.apache.spark.sql.classic.DataStreamWriter.startInternal(DataStreamWriter.scala:280)
	at org.apache.spark.sql.classic.DataStreamWriter.start(DataStreamWriter.scala:136)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `s3://examples/initial-warehouse/01998cb2-ac2e-7cb0-b341-ae5794445827/01998cb4-24f4-75d2-b67a-f9a39a1e30cc`.`` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:173)
	at org.apache.iceberg.spark.source.IcebergSource.getTable(IcebergSource.java:116)
	... 16 more


In [46]:
streaming_query.recentProgress

[]

In [47]:
streaming_query.status

{'message': 'Terminated with exception: Forbidden: Table not found or action can_get_metadata forbidden for Anonymous',
 'isDataAvailable': False,
 'isTriggerActive': False}