In [None]:
#
# PREPARATIONS
#

In [None]:
# create the 'warehouse' S3 bucket

import boto3


s3_resource = boto3.resource('s3', 
    endpoint_url='http://minio:9000',
    aws_access_key_id='minioadmin',
    aws_secret_access_key='minioadmin',
    aws_session_token=None,
    config=boto3.session.Config(signature_version='s3v4'),
    verify=False,
)

# if it fails with BucketAlreadyOwnedByYou, it means the bucket is already there
try:
    s3_resource.Bucket("warehouse").create()
except Exception as e:
    print(e)

In [None]:
# bootstrap the catalog

import requests


r = requests.post("http://lakekeeper:8181/management/v1/bootstrap", json={"accept-terms-of-use": True})
r.json()

In [None]:
# initialise the 'iceberg' warehouse in Lakekeeper

import requests


payload = {
  "warehouse-name": "iceberg",
  "project-id": "00000000-0000-0000-0000-000000000000",
  "storage-profile": {
    "type": "s3",
    "bucket": "warehouse",
    "key-prefix": "iceberg",
    "assume-role-arn": None,
    "endpoint": "http://minio:9000",
    "region": "eu-central-1",
    "path-style-access": True,
    "flavor": "minio",
    "sts-enabled": True,
  },
  "storage-credential": {
    "type": "s3",
    "credential-type": "access-key",
    "aws-access-key-id": "minioadmin",
    "aws-secret-access-key": "minioadmin"
  }
}

r = requests.post("http://lakekeeper:8181/management/v1/warehouse", json=payload)
r.json()

In [None]:
# check the config of the created warehouse

import requests


r = requests.get("http://lakekeeper:8181/catalog/v1/config?warehouse=iceberg")
r.json()

In [None]:
# check that Spark client works

from pyspark.sql import SparkSession


spark = (
    SparkSession.builder
        .config(
            "spark.sql.extensions",
            "org.projectnessie.spark.extensions.NessieSparkSessionExtensions, org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
        )
        .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
        .config("spark.sql.catalog.iceberg.type", "rest")
        .config("spark.sql.catalog.iceberg.uri", "http://lakekeeper:8181/catalog/")
        .config("spark.sql.catalog.iceberg.warehouse", "iceberg")
        .config("spark.sql.catalog.iceberg.ref", "main")
        .config("spark.sql.catalog.iceberg.cache-enabled", False)
        .getOrCreate()
)
spark.sparkContext.setLogLevel('ERROR')

spark.sql("""
    SHOW NAMESPACES FROM iceberg
""").toPandas()

In [None]:
# check existence of pyiceberg config

!cat .pyiceberg.yaml

In [None]:
# check if pyiceberg works

from pyiceberg.catalog import load_catalog


catalog = load_catalog("rest")

# recreate
catalog.create_namespace("workshop")
catalog.list_namespaces()

In [None]:
#
# TABLE DEFINITION
#

In [None]:
# cleanup if required

spark.sql("""
DROP TABLE IF EXISTS iceberg.workshop.atable
""")

In [None]:
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg.workshop.atable (
    id string COMMENT 'random id',
    name string COMMENT 'a name',
    value int COMMENT 'integer with some numerical value',
    timestamp timestamp COMMENT 'a timestamp column'
)
PARTITIONED BY (years(timestamp))
TBLPROPERTIES (
    'format-version'='2'
)
""")

In [None]:
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg.workshop.atable (
    id string COMMENT 'random id',
    name string COMMENT 'a name',
    value int COMMENT 'integer with some numerical value',
    timestamp timestamp COMMENT 'a timestamp column'
)
PARTITIONED BY (years(timestamp))
TBLPROPERTIES (
    'format-version'='2'
)
""")

In [None]:
# describe the created table

spark.sql("""
DESCRIBE EXTENDED iceberg.workshop.atable
""").toPandas()

In [None]:
# insert some data

import random
import uuid

from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.functions import to_timestamp


df = spark.createDataFrame([
    Row(
        id=str(uuid.uuid4()),
        name=random.choice(["Ariel", "Georgina", "Tom", "Ulma"]),
        value=random.randint(100, 999),
        timestamp=datetime.fromisoformat("2024-12-01T07:32:18")
    ),
    Row(
        id=str(uuid.uuid4()),
        name=random.choice(["Ariel", "Georgina", "Tom", "Ulma"]),
        value=random.randint(100, 999),
        timestamp=datetime.fromisoformat("2025-01-02T09:11:23")
    ),
])

df.writeTo("iceberg.workshop.atable").append()

In [None]:
spark.sql("""
SELECT *
FROM iceberg.workshop.atable
ORDER BY name
""").toPandas()

In [None]:
spark.sql("""
SELECT *
FROM iceberg.workshop.atable.history
""").show(1000, False)

In [None]:
spark.sql("""
--SELECT *
SELECT content, file_path, record_count, partition
FROM iceberg.workshop.atable.data_files
""").toPandas()

In [None]:
#
# PARTITIONING
#

In [None]:
# change partition layout

spark.sql("""
ALTER TABLE iceberg.workshop.atable
DROP PARTITION FIELD years(timestamp)
""")

In [None]:
spark.sql("""
ALTER TABLE iceberg.workshop.atable
ADD PARTITION FIELD bucket(2, id)
""")

In [None]:
spark.sql("""
ALTER TABLE iceberg.workshop.atable
ADD PARTITION FIELD truncate(5, name)
""")

In [None]:
spark.sql("""
DESCRIBE EXTENDED iceberg.workshop.atable
""").toPandas()

In [None]:
# anything changed?

spark.sql("""
--SELECT *
SELECT content, file_path, record_count, partition
FROM iceberg.workshop.atable.data_files
""").toPandas()

In [None]:
# write more data

df = spark.createDataFrame([
    Row(
        id=str(uuid.uuid4()),
        name=random.choice(["Ariel", "Georgina", "Tom", "Ulma"]),
        value=random.randint(100, 999),
        timestamp=datetime.fromisoformat("2024-12-01T07:32:18")
    ),
    Row(
        id=str(uuid.uuid4()),
        name=random.choice(["Ariel", "Georgina", "Tom", "Ulma"]),
        value=random.randint(100, 999),
        timestamp=datetime.fromisoformat("2025-01-02T09:11:23")
    ),
])

df.writeTo("iceberg.workshop.atable").append()

In [None]:
# what about now?

spark.sql("""
--SELECT *
SELECT content, file_path, record_count, partition
FROM iceberg.workshop.atable.data_files
""").toPandas()

In [None]:
# partitions?

spark.sql("""
SELECT *
FROM iceberg.workshop.atable.partitions
""").toPandas()

In [None]:
#
# SNAPSHOTS
# 

In [None]:
# list snapshots

spark.sql("""
-- SELECT *
SELECT snapshot_id, committed_at
FROM iceberg.workshop.atable.snapshots
ORDER BY committed_at
""").show(50, False)

In [None]:
# query older snapshots

spark.sql("""
SELECT *
FROM iceberg.workshop.atable VERSION AS OF 3030987006270639493
""").toPandas()

In [None]:
# query metadata of older snapshots

spark.sql("""
SELECT *
FROM iceberg.workshop.atable.data_files VERSION AS OF 3343137357946927937
""").toPandas()

In [None]:
# create tags

spark.sql("""
ALTER TABLE iceberg.workshop.atable
CREATE TAG `TAG1`
AS OF VERSION 3343137357946927937
""").toPandas()

In [None]:
# list refs

spark.sql("""
SELECT * FROM
iceberg.workshop.atable.refs
""").toPandas()

In [None]:
# query a tag

spark.sql("""
SELECT * FROM
iceberg.workshop.atable.tag_TAG1
""").toPandas()

In [None]:
# retire some snapshots

spark.sql("""
CALL iceberg.system.expire_snapshots(
	table => 'iceberg.workshop.atable', snapshot_ids => ARRAY(984222577879189820)
)
""").toPandas()

In [None]:
# query snapshots

spark.sql("""
SELECT * FROM
iceberg.workshop.atable.snapshots
ORDER BY committed_at
""").toPandas()

In [None]:
#
# LIBRARIES
#

In [None]:
# perform a scan of the table with Polars

import polars as pl

table = catalog.load_table("workshop.atable")

storage_options = {
    "s3.endpoint": "http://minio:9000",
    "s3.region": "eu-central-1",
    "s3.access-key-id": "minioadmin",
    "s3.secret-access-key": "minioadmin",
}

pl_table = pl.scan_iceberg(table.metadata_location, storage_options=storage_options)

In [None]:
# query it with Polars

pl.sql(
    """
    SELECT * FROM pl_table
    """
).collect()

In [None]:
# Lazy Frame?

pl.sql(
    """
    SELECT * FROM pl_table
    """
).explain()

In [None]:
# configure DuckDB to work with Iceberg and MinIO

import duckdb


duckdb.sql("""
INSTALL iceberg;
LOAD iceberg;
INSTALL httpfs;
LOAD httpfs;

SET s3_endpoint='minio:9000';
SET s3_access_key_id='minioadmin';
SET s3_secret_access_key='minioadmin';
SET s3_region='eu-central-1';
SET s3_use_ssl=false;
SET s3_url_style='path';
""")

In [None]:
# query with DuckDB

duckdb.sql(f"SELECT * FROM iceberg_scan('{table.metadata_location}')")

In [None]:
# PyIceberg exploration

dir(table.inspect)

In [None]:
table.inspect.refs().to_pandas()

In [None]:
#
# DATA FILES
#

In [None]:
spark.sql("""
SELECT * FROM
iceberg.workshop.atable.data_files
""").toPandas()

In [None]:
# Delete some data

spark.sql("""
DELETE FROM
iceberg.workshop.atable
WHERE name = 'Ariel'
""").toPandas()

In [None]:
# are files gone?

spark.sql("""
SELECT * FROM
iceberg.workshop.atable.data_files
""").toPandas()

In [None]:
# yes, but also no

spark.sql("""
SELECT * FROM
iceberg.workshop.atable.all_data_files
""").toPandas()

In [None]:
#
# METADATA
#

In [None]:
# PyIceberg exploration

dir(table.inspect)