# Managing Iceberg tables

In this part of the workshop we'll look at the different ways Iceberg enables you to optimize and maintain your tables.

You can learn more in the Iceberg [documentation](https://iceberg.apache.org/docs/latest/spark-procedures/#metadata-management).

### Starting Spark

Start Spark and connect to your Lakekeeper Catalog.


In [None]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pandas as pd

# This CATALOG_URL works for the "docker compose" testing and development environment
# Change 'server' if you are not running on "docker compose" (f. ex. 'localhost' if Lakekeeper is running locally).
CATALOG_URL = "http://server:8181/catalog"
WAREHOUSE = "demo"
MY_NAMESPACE = "my_db"

SPARK_VERSION = pyspark.__version__
SPARK_MINOR_VERSION = '.'.join(SPARK_VERSION.split('.')[:2])
ICEBERG_VERSION = "1.8.1"

In [None]:
# Configures the Iceberg catalog (Lakekeeper) and loads the Iceberg library
# NOTE: no credentials are being passed. The catalog automatically assigned temp credentials per session
config = {
    f"spark.sql.catalog.lakekeeper": "org.apache.iceberg.spark.SparkCatalog",
    f"spark.sql.catalog.lakekeeper.type": "rest",
    f"spark.sql.catalog.lakekeeper.uri": CATALOG_URL,
    f"spark.sql.catalog.lakekeeper.warehouse": WAREHOUSE,
    f"spark.sql.catalog.lakekeeper.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.defaultCatalog": "lakekeeper",
    "spark.jars.packages": f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_MINOR_VERSION}_2.12:{ICEBERG_VERSION},org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}",
}

spark_config = SparkConf().setMaster('local').setAppName("Qlik-Connect-Iceberg-Workshop")
for k, v in config.items():
    spark_config = spark_config.set(k, v)

spark = SparkSession.builder.config(conf=spark_config).getOrCreate()

# Select the "lakekeeper catalog" to use in subsequent SQL operations
spark.sql("USE lakekeeper")

### Create a table and load some data

You'll create a table and load some data.  We'll then optimize these files by compacting them.

In [None]:
# Install the requests library to help us download sample data

!pip install -q requests

We'll download some sample data, NYC Taxi, that we will use to create a test table.

In [None]:
import requests
import json

### https://data.cityofnewyork.us/NYC-BigApps/Citi-Bike-System-Data/vsnr-94wk

r = requests.get('https://gbfs.citibikenyc.com/gbfs/en/station_status.json')
station_status = r.json()

with open("/home/jovyan/work/station_status.json", "w") as f:
    for item in station_status['data']['stations']:
        json.dump(item, f)
        f.write('\n\r')

f.close()

Drop any existing tables, if any, so we have a clean slate.

Use Spark DataFrames to read the sample data, parse it and then write it out to an Iceberg table called ```stations```

In [None]:
spark.sql(f'CREATE NAMESPACE IF NOT EXISTS {MY_NAMESPACE}').toPandas()
spark.sql(f'DROP TABLE IF EXISTS {MY_NAMESPACE}.stations PURGE')

df = spark.read.format("json") \
          .option("header",True) \
          .option("inferschema",True) \
          .load("/home/jovyan/work/station_status.json")

df.repartition(100).write.saveAsTable(f'{MY_NAMESPACE}.stations')

In [None]:
# Query the stations table we just created to ensure data was written

spark.sql(f'SELECT * FROM {MY_NAMESPACE}.stations limit 10').toPandas()

Check how many files were created.  In this example, we forced Spark to split the data into 100 files, but in the real world this will happen naturally.

In [None]:
spark.sql(f'SELECT count(*) FROM lakekeeper.{MY_NAMESPACE}.stations.files').toPandas()

### Rewrite data file, aka. compaction

Compaction is an important process that combines smalls files into few larger files

We start off compacting our table by looking for 2 or more files with the smallest size.

In [None]:
ret = spark.sql(f"CALL lakekeeper.system.rewrite_data_files( \
                table => '{MY_NAMESPACE}.stations', \
                options => map('min-input-files','2', 'rewrite-job-order','bytes-asc')) \
            ")
ret.show()

Inspect the `files` information table again and you'll see that we only have 1 single file now

In [None]:
spark.sql(f'SELECT count(*) FROM lakekeeper.{MY_NAMESPACE}.stations.files').toPandas()

*** Before starting this step, drop the table and recreate it as before so we can test out other compaction scenarios. ***

In the following compaction scenario we're sorting the data during compaction. There are bin-packing and sorting using standard ordering or zorder.
- Binpacking simply arranges bits to fit more into fewer files.
- Sorting organizes rows by sort key so similar data is colocated in the same files making reads more efficient.
- Zorder is more complex ordering that comes with its own pros/cons

In [None]:
ret = spark.sql(f"CALL lakekeeper.system.rewrite_data_files( \
                table => '{MY_NAMESPACE}.stations', \
                strategy => 'sort', \
                sort_order => 'station_id DESC NULLS LAST, legacy_id DESC NULLS LAST') \
            ")
ret.show()

Another interesting optimization is to compact only those files that meet a specific filter criteria.  This is helpful when there is large skew in the data and the low cardinality data is not often compacted because it's under the file number of byte size threshold.

In [None]:
ret = spark.sql(f"CALL lakekeeper.system.rewrite_data_files(\
                table => '{MY_NAMESPACE}.stations', \
                where => 'is_installed = 1') \
            ")
ret.show()

### Expiring snapshots

As you already noticed, Iceberg creates lots of snapshots to keep track of changes.  Each snapshot creates numerous manifest files that track everything about files and partitions and schemas.  Each snapshot is also maintains the full table history so you can time travel in queries. However, all of this takes up storage and cost you money.  

It's a good practice to expire old snapshots after some period of time or number of snapshots created.

First inspect your `snapshots` information table and lets see which one to expire.

In [None]:
spark.sql(f'SELECT * FROM lakekeeper.{MY_NAMESPACE}.stations.snapshots').toPandas()

Make sure you update the command below to include the oldest snapshot ID you got from the previous command.

In [None]:
ret = spark.sql(f"CALL lakekeeper.system.expire_snapshots( \
                table => '{MY_NAMESPACE}.stations', \
                snapshot_ids => ARRAY(CHANGE_ME__SNAPSHOT_ID)) \
            ")
ret.show()

Inspect the `snapshots` table again and you'll see the old snapshot was removed