In [1]:
"""
Compacting Iceberg table with Delta (delta-rs)
* Generate some parquet files
* Create an Iceberg table with parquet files
* Create a Delta table with parquet files
* Compact parquet files with Delta library
* Replace data files in Iceberg
"""

'\nCompacting Iceberg table with Delta (delta-rs)\n* Generate some parquet files\n* Create an Iceberg table with parquet files\n* Create a Delta table with parquet files\n* Compact parquet files with Delta library\n* Replace data files in Iceberg\n'

In [2]:
# install libraries
!pip install deltalake -q
!pip install 'pyiceberg[sql-sqlite]==0.9.0rc2' -q
!pip install duckdb -q
!pip install pyarrow -q


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgr

In [3]:
# configure jupyter
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import duckdb
import os
from deltalake import convert_to_deltalake, DeltaTable
import json
from pprint import pprint

# Set the display option to show all columns and rows
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.max_rows', None)     # Show all rows
pd.set_option('display.max_colwidth', None) # Show full content in each cell

In [4]:
# use local filesystem as warehouse, can also use object store
warehouse = "./warehouse"
!rm -rf warehouse
!mkdir warehouse
!tree warehouse

[1;36mwarehouse[0m

0 directories, 0 files


In [5]:
%%time
# Create some parquet files
# 100_000 rows per row group, 1 row group per file
# 1_000_000 records == 10 files
duckdb_row_num = 1_000_000
con = duckdb.connect()
duckdb_data_files_df = con.execute(f"""
    COPY (FROM generate_series(1, {duckdb_row_num})) TO 'warehouse'
    (
        FORMAT 'parquet',
        COMPRESSION 'zstd',
        ROW_GROUP_SIZE 100_000,
        ROW_GROUPS_PER_FILE 1,
        OVERWRITE true,
        PER_THREAD_OUTPUT true,
        RETURN_FILES true
    )""").df()

CPU times: user 69.7 ms, sys: 8.47 ms, total: 78.2 ms
Wall time: 80.1 ms


In [6]:
print("Dump as parquet files:")
duckdb_data_files = duckdb_data_files_df['Files'].iloc[0].tolist()
for data_file in duckdb_data_files:
    print(data_file)

Dump as parquet files:
warehouse/data_0.parquet
warehouse/data_1.parquet
warehouse/data_2.parquet
warehouse/data_3.parquet
warehouse/data_4.parquet
warehouse/data_5.parquet
warehouse/data_6.parquet
warehouse/data_7.parquet
warehouse/data_8.parquet
warehouse/data_9.parquet


In [7]:
# here are the parquet files we generated
!tree warehouse/

[1;36mwarehouse/[0m
├── data_0.parquet
├── data_1.parquet
├── data_2.parquet
├── data_3.parquet
├── data_4.parquet
├── data_5.parquet
├── data_6.parquet
├── data_7.parquet
├── data_8.parquet
└── data_9.parquet

1 directory, 10 files


In [8]:
# create an Iceberg table
from pyiceberg.catalog import load_catalog
catalog = load_catalog("default", **{"uri": "sqlite:///:memory:", "warehouse": f"file://{warehouse}"})
namespace, table = "foo", "bar"
table_identifier = (namespace, table)
schema = pq.read_schema(duckdb_data_files[0])
catalog.create_namespace_if_not_exists(namespace)
with catalog.create_table_transaction(identifier=table_identifier, schema=schema) as txn:
    print(f"Adding data files {duckdb_data_files}")
    txn.add_files([os.path.abspath(data_file) for data_file in duckdb_data_files])

Adding data files ['warehouse/data_0.parquet', 'warehouse/data_1.parquet', 'warehouse/data_2.parquet', 'warehouse/data_3.parquet', 'warehouse/data_4.parquet', 'warehouse/data_5.parquet', 'warehouse/data_6.parquet', 'warehouse/data_7.parquet', 'warehouse/data_8.parquet', 'warehouse/data_9.parquet']


In [9]:
# check the same number of records
iceberg_table = catalog.load_table(table_identifier)
iceberg_row_num = iceberg_table.scan().count()
print(f"iceberg records={iceberg_row_num}")
assert duckdb_row_num == iceberg_row_num

# get the data files from iceberg
print("Iceberg data files:")
iceberg_data_files = iceberg_table.inspect.files().to_pydict()['file_path']
for data_file in iceberg_data_files:
    print(data_file)

iceberg records=1000000
Iceberg data files:
/Users/kevinliu/repos/iceberg-python/warehouse/data_0.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_1.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_2.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_3.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_4.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_5.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_6.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_7.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_8.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_9.parquet


In [10]:
!tree warehouse/

[1;36mwarehouse/[0m
├── data_0.parquet
├── data_1.parquet
├── data_2.parquet
├── data_3.parquet
├── data_4.parquet
├── data_5.parquet
├── data_6.parquet
├── data_7.parquet
├── data_8.parquet
├── data_9.parquet
└── [1;36mfoo.db[0m
    └── [1;36mbar[0m
        └── [1;36mmetadata[0m
            ├── 00000-5165b4bf-26e5-408c-8bd5-197f807bd99a.metadata.json
            ├── 611f72cf-6486-430c-bd92-d6691be98c19-m0.avro
            └── snap-1222512222308558805-0-611f72cf-6486-430c-bd92-d6691be98c19.avro

4 directories, 13 files


In [11]:
# create a delta table, convert the existing parquet dataset to delta
# writes `_delta_log`, returns None
convert_to_deltalake(
    uri=warehouse
)

In [12]:
!tree warehouse

[1;36mwarehouse[0m
├── [1;36m_delta_log[0m
│   └── 00000000000000000000.json
├── data_0.parquet
├── data_1.parquet
├── data_2.parquet
├── data_3.parquet
├── data_4.parquet
├── data_5.parquet
├── data_6.parquet
├── data_7.parquet
├── data_8.parquet
├── data_9.parquet
└── [1;36mfoo.db[0m
    └── [1;36mbar[0m
        └── [1;36mmetadata[0m
            ├── 00000-5165b4bf-26e5-408c-8bd5-197f807bd99a.metadata.json
            ├── 611f72cf-6486-430c-bd92-d6691be98c19-m0.avro
            └── snap-1222512222308558805-0-611f72cf-6486-430c-bd92-d6691be98c19.avro

5 directories, 14 files


In [13]:
!jq . warehouse/_delta_log/00000000000000000000.json

[1;37m{
  [0m[34;1m"protocol"[0m[1;37m: [0m[1;37m{
    [0m[34;1m"minReaderVersion"[0m[1;37m: [0m[0;37m1[0m[1;37m,
    [0m[34;1m"minWriterVersion"[0m[1;37m: [0m[0;37m2[0m[1;37m
  [1;37m}[0m[1;37m
[1;37m}[0m
[1;37m{
  [0m[34;1m"metaData"[0m[1;37m: [0m[1;37m{
    [0m[34;1m"id"[0m[1;37m: [0m[0;32m"a704b337-7354-42ac-baa0-7b3f680e3005"[0m[1;37m,
    [0m[34;1m"name"[0m[1;37m: [0m[1;30mnull[0m[1;37m,
    [0m[34;1m"description"[0m[1;37m: [0m[1;30mnull[0m[1;37m,
    [0m[34;1m"format"[0m[1;37m: [0m[1;37m{
      [0m[34;1m"provider"[0m[1;37m: [0m[0;32m"parquet"[0m[1;37m,
      [0m[34;1m"options"[0m[1;37m: [0m[1;37m{}[0m[1;37m
    [1;37m}[0m[1;37m,
    [0m[34;1m"schemaString"[0m[1;37m: [0m[0;32m"{\"type\":\"struct\",\"fields\":[{\"name\":\"generate_series\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}"[0m[1;37m,
    [0m[34;1m"partitionColumns"[0m[1;37m: [0m[1;37m[][0m[1;37m,
    [0m[34;1m

In [14]:
# read the delta table
delta_table = DeltaTable(warehouse)

delta_row_num = len(delta_table.to_pyarrow_table())
print(f"delta records={delta_row_num}")
assert delta_row_num == iceberg_row_num == duckdb_row_num

print("Delta data files:")
for data_file in delta_table.file_uris():
    print(data_file)

delta records=1000000
Delta data files:
/Users/kevinliu/repos/iceberg-python/warehouse/data_4.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_5.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_7.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_6.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_3.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_2.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_9.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_0.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_1.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_8.parquet


In [15]:
# nothing changed
!tree warehouse

[1;36mwarehouse[0m
├── [1;36m_delta_log[0m
│   └── 00000000000000000000.json
├── data_0.parquet
├── data_1.parquet
├── data_2.parquet
├── data_3.parquet
├── data_4.parquet
├── data_5.parquet
├── data_6.parquet
├── data_7.parquet
├── data_8.parquet
├── data_9.parquet
└── [1;36mfoo.db[0m
    └── [1;36mbar[0m
        └── [1;36mmetadata[0m
            ├── 00000-5165b4bf-26e5-408c-8bd5-197f807bd99a.metadata.json
            ├── 611f72cf-6486-430c-bd92-d6691be98c19-m0.avro
            └── snap-1222512222308558805-0-611f72cf-6486-430c-bd92-d6691be98c19.avro

5 directories, 14 files


In [16]:
%%time
# compact
target_size = 512 * 1024 * 1024 # 512MB
compaction_result = delta_table.optimize.compact(target_size=target_size)
pprint(compaction_result)

{'filesAdded': '{"avg":1324565.0,"max":1324565,"min":1324565,"totalFiles":1,"totalSize":1324565}',
 'filesRemoved': '{"avg":105539.2,"max":139492,"min":98580,"totalFiles":10,"totalSize":1055392}',
 'numBatches': 977,
 'numFilesAdded': 1,
 'numFilesRemoved': 10,
 'partitionsOptimized': 1,
 'preserveInsertionOrder': True,
 'totalConsideredFiles': 10,
 'totalFilesSkipped': 0}
CPU times: user 51.3 ms, sys: 14.9 ms, total: 66.2 ms
Wall time: 63.7 ms


In [17]:
delta_row_num = len(delta_table.to_pyarrow_table())
print(f"delta records={delta_row_num}")
assert delta_row_num == iceberg_row_num == duckdb_row_num

print("Delta data files:")
for data_file in delta_table.file_uris():
    print(data_file)

delta records=1000000
Delta data files:
/Users/kevinliu/repos/iceberg-python/warehouse/part-00001-05deccb1-8776-4163-8db3-b12499a73057-c000.zstd.parquet


In [18]:
!tree warehouse

[1;36mwarehouse[0m
├── [1;36m_delta_log[0m
│   ├── 00000000000000000000.json
│   └── 00000000000000000001.json
├── data_0.parquet
├── data_1.parquet
├── data_2.parquet
├── data_3.parquet
├── data_4.parquet
├── data_5.parquet
├── data_6.parquet
├── data_7.parquet
├── data_8.parquet
├── data_9.parquet
├── [1;36mfoo.db[0m
│   └── [1;36mbar[0m
│       └── [1;36mmetadata[0m
│           ├── 00000-5165b4bf-26e5-408c-8bd5-197f807bd99a.metadata.json
│           ├── 611f72cf-6486-430c-bd92-d6691be98c19-m0.avro
│           └── snap-1222512222308558805-0-611f72cf-6486-430c-bd92-d6691be98c19.avro
└── part-00001-05deccb1-8776-4163-8db3-b12499a73057-c000.zstd.parquet

5 directories, 16 files


In [19]:
!jq . warehouse/_delta_log/00000000000000000001.json

[1;37m{
  [0m[34;1m"remove"[0m[1;37m: [0m[1;37m{
    [0m[34;1m"path"[0m[1;37m: [0m[0;32m"data_0.parquet"[0m[1;37m,
    [0m[34;1m"dataChange"[0m[1;37m: [0m[0;37mfalse[0m[1;37m,
    [0m[34;1m"deletionTimestamp"[0m[1;37m: [0m[0;37m1740519686530[0m[1;37m,
    [0m[34;1m"partitionValues"[0m[1;37m: [0m[1;37m{}[0m[1;37m,
    [0m[34;1m"size"[0m[1;37m: [0m[0;37m139492[0m[1;37m
  [1;37m}[0m[1;37m
[1;37m}[0m
[1;37m{
  [0m[34;1m"remove"[0m[1;37m: [0m[1;37m{
    [0m[34;1m"path"[0m[1;37m: [0m[0;32m"data_5.parquet"[0m[1;37m,
    [0m[34;1m"dataChange"[0m[1;37m: [0m[0;37mfalse[0m[1;37m,
    [0m[34;1m"deletionTimestamp"[0m[1;37m: [0m[0;37m1740519686530[0m[1;37m,
    [0m[34;1m"partitionValues"[0m[1;37m: [0m[1;37m{}[0m[1;37m,
    [0m[34;1m"size"[0m[1;37m: [0m[0;37m102170[0m[1;37m
  [1;37m}[0m[1;37m
[1;37m}[0m
[1;37m{
  [0m[34;1m"remove"[0m[1;37m: [0m[1;37m{
    [0m[34;1m"path"[0m[1;37m: [0m[

In [20]:
# iceberg table is unaffected
for data_file in iceberg_table.inspect.files().to_pydict()['file_path']:
    print(data_file)

/Users/kevinliu/repos/iceberg-python/warehouse/data_0.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_1.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_2.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_3.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_4.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_5.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_6.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_7.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_8.parquet
/Users/kevinliu/repos/iceberg-python/warehouse/data_9.parquet


In [21]:
# since we haven't implemented REPLACE, this is a hack
with iceberg_table.transaction() as txn:
    from pyiceberg.table import ALWAYS_TRUE
    txn.delete(delete_filter=ALWAYS_TRUE) # deletes everything
    txn.add_files([os.path.abspath(data_file) for data_file in delta_table.file_uris()]) # add the new files

In [22]:
# iceberg table is now compacted
iceberg_row_num = iceberg_table.scan().count()
print(f"iceberg records={iceberg_row_num}")
for data_file in iceberg_table.inspect.files().to_pydict()['file_path']:
    print(data_file)

iceberg records=1000000
/Users/kevinliu/repos/iceberg-python/warehouse/part-00001-05deccb1-8776-4163-8db3-b12499a73057-c000.zstd.parquet


In [23]:
!tree warehouse

[1;36mwarehouse[0m
├── [1;36m_delta_log[0m
│   ├── 00000000000000000000.json
│   └── 00000000000000000001.json
├── data_0.parquet
├── data_1.parquet
├── data_2.parquet
├── data_3.parquet
├── data_4.parquet
├── data_5.parquet
├── data_6.parquet
├── data_7.parquet
├── data_8.parquet
├── data_9.parquet
├── [1;36mfoo.db[0m
│   └── [1;36mbar[0m
│       └── [1;36mmetadata[0m
│           ├── 00000-5165b4bf-26e5-408c-8bd5-197f807bd99a.metadata.json
│           ├── 00001-adc05917-c993-45e9-aecf-c0f29199985d.metadata.json
│           ├── 611f72cf-6486-430c-bd92-d6691be98c19-m0.avro
│           ├── 9d7dfcdd-9b9c-42bb-bf1c-5e764739d20f-m0.avro
│           ├── f93f88a8-cbc2-4eda-a882-3b5ffd1c7f93-m0.avro
│           ├── snap-1222512222308558805-0-611f72cf-6486-430c-bd92-d6691be98c19.avro
│           ├── snap-4992771804060990823-0-f93f88a8-cbc2-4eda-a882-3b5ffd1c7f93.avro
│           └── snap-6101942373643664436-0-9d7dfcdd-9b9c-42bb-bf1c-5e764739d20f.avro
└── part-00001-05deccb1-8776-4163-

In [24]:
"""
Resources
* Spark rewrite_data_files (https://iceberg.apache.org/docs/nightly/spark-procedures/#rewrite_data_files)
* PyIceberg docs (https://py.iceberg.apache.org/)
* Delta docs (https://delta-io.github.io/delta-rs/)
* PyIceberg repo (https://github.com/apache/iceberg-python)
* Delta repo (https://github.com/delta-io/delta-rs)

"""

'\nResources\n* Spark rewrite_data_files (https://iceberg.apache.org/docs/nightly/spark-procedures/#rewrite_data_files)\n* PyIceberg docs (https://py.iceberg.apache.org/)\n* Delta docs (https://delta-io.github.io/delta-rs/)\n* PyIceberg repo (https://github.com/apache/iceberg-python)\n* Delta repo (https://github.com/delta-io/delta-rs)\n\n'