In [90]:
import os
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, DoubleType, StringType, LongType ,StructType
import polars as pl
import pyarrow as pa

# setup environment
os.environ["PYICEBERG_HOME"] = os.getcwd()

In [91]:
# initialize th iceberg catalog
catalog = load_catalog(name="local")
print(catalog.properties)

{'type': 'sql', 'uri': 'sqlite:////home/bobo/Desktop/iceberg_demos/iceberg_catalog/catalog.db', 'warehouse': 'file:///home/bobo/Desktop/iceberg_demos/iceberg_catalog'}


In [92]:
from pprint import pprint
def snapshots(table):
    snaps = [snap.model_dump() for snap in table.snapshots()]
    pprint(snaps)

def print_snapshots(tbl):
    for s in tbl.snapshots():
        print(f"snapshot_id={s.snapshot_id}  timestamp_ms={getattr(s, 'timestamp_ms', None)}")

def print_current(tbl):
    arrow_tbl = tbl.scan().to_polars()
    print(arrow_tbl)  # just for nice printing

In [None]:
# updating and inserting data(upsert)

from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringType

import pyarrow as pa

schema = Schema(
    NestedField(1, "city", StringType(), required=True),
    NestedField(2, "inhabitants", IntegerType(), required=True),
    # Mark City as the identifier field, also known as the primary-key
    identifier_field_ids=[1]
)
catalog.create_namespace_if_not_exists("locations")
tbl = catalog.create_table("locations.cities", schema=schema)
snapshots(tbl)

[]


In [94]:
arrow_schema = pa.schema(
    [
        pa.field("city", pa.string(), nullable=False),
        pa.field("inhabitants", pa.int32(), nullable=False),
    ]
)

# Write some data
df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "inhabitants": 921402},
        {"city": "San Francisco", "inhabitants": 808988},
        {"city": "Drachten", "inhabitants": 45019},
        {"city": "Paris", "inhabitants": 2103000},
    ],
    schema=arrow_schema
)
tbl.append(df)

snapshots(tbl)

[{'manifest-list': 'file:///home/bobo/Desktop/iceberg_demos/iceberg_catalog/locations/cities/metadata/snap-7990933274021676277-0-122a6649-a95d-480a-a486-e2b3599ae1ef.avro',
  'schema-id': 0,
  'sequence-number': 1,
  'snapshot-id': 7990933274021676277,
  'summary': {'added-data-files': '1',
              'added-files-size': '977',
              'added-records': '4',
              'operation': 'append',
              'total-data-files': '1',
              'total-delete-files': '0',
              'total-equality-deletes': '0',
              'total-files-size': '977',
              'total-position-deletes': '0',
              'total-records': '4'},
  'timestamp-ms': 1768070726588}]


In [95]:
#--Next, we'll upsert a table into the Iceberg table:

df = pa.Table.from_pylist(
    [
        # Will be updated, the inhabitants has been updated
        {"city": "Drachten", "inhabitants": 45505},

        # New row, will be inserted
        {"city": "Berlin", "inhabitants": 3432000},

        # Ignored, already exists in the table
        {"city": "Paris", "inhabitants": 2103000},
    ],
    schema=arrow_schema
)
upd = tbl.upsert(df)

assert upd.rows_updated == 1
assert upd.rows_inserted == 1

snapshots(tbl)

[{'manifest-list': 'file:///home/bobo/Desktop/iceberg_demos/iceberg_catalog/locations/cities/metadata/snap-7990933274021676277-0-122a6649-a95d-480a-a486-e2b3599ae1ef.avro',
  'schema-id': 0,
  'sequence-number': 1,
  'snapshot-id': 7990933274021676277,
  'summary': {'added-data-files': '1',
              'added-files-size': '977',
              'added-records': '4',
              'operation': 'append',
              'total-data-files': '1',
              'total-delete-files': '0',
              'total-equality-deletes': '0',
              'total-files-size': '977',
              'total-position-deletes': '0',
              'total-records': '4'},
  'timestamp-ms': 1768070726588},
 {'manifest-list': 'file:///home/bobo/Desktop/iceberg_demos/iceberg_catalog/locations/cities/metadata/snap-3871475769872198349-0-398dd3a5-cd89-491f-b463-90a062689b06.avro',
  'parent-snapshot-id': 7990933274021676277,
  'schema-id': 0,
  'sequence-number': 2,
  'snapshot-id': 3871475769872198349,
  'summary': {

In [None]:
df = tbl.to_polars()
df.collect()


city,inhabitants
str,i32
"""Berlin""",3432000
"""Drachten""",45505
"""Amsterdam""",921402
"""San Francisco""",808988
"""Paris""",2103000


In [56]:
# deleteing data
tbl.delete(delete_filter="city == 'Amsterdam'")
df = tbl.to_polars()
df.collect()
snapshots(tbl)

[{'manifest-list': 'file:///home/bobo/Desktop/iceberg_demos/iceberg_catalog/locations/cities/metadata/snap-253764842000279830-0-cfab87ce-fefd-4440-8b7b-424ae0e63daf.avro',
  'schema-id': 0,
  'sequence-number': 1,
  'snapshot-id': 253764842000279830,
  'summary': {'added-data-files': '1',
              'added-files-size': '977',
              'added-records': '4',
              'operation': 'append',
              'total-data-files': '1',
              'total-delete-files': '0',
              'total-equality-deletes': '0',
              'total-files-size': '977',
              'total-position-deletes': '0',
              'total-records': '4'},
  'timestamp-ms': 1768047871886},
 {'manifest-list': 'file:///home/bobo/Desktop/iceberg_demos/iceberg_catalog/locations/cities/metadata/snap-3256103562982925249-0-9e2b72c5-c969-4ccf-84b3-d1fff4283f42.avro',
  'parent-snapshot-id': 253764842000279830,
  'schema-id': 0,
  'sequence-number': 2,
  'snapshot-id': 3256103562982925249,
  'summary': {'ad

In [57]:
df.collect()

city,inhabitants
str,i32
"""San Francisco""",808988
"""Paris""",2103000
"""Berlin""",3432000
"""Drachten""",45505


In [None]:
# schema evolution
# - add columns
try:

    with tbl.update_schema() as update:
        update.add_column("population", LongType(), "info about poulation")

except ValueError as ex:
    print(ex)


Cannot add column, name already exists: population


In [64]:
df = tbl.to_polars()
df.collect()


city,inhabitants,population
str,i32,i64
"""San Francisco""",808988,
"""Paris""",2103000,
"""Berlin""",3432000,
"""Drachten""",45505,


In [66]:
# - rename column

with tbl.update_schema() as update:
    update.rename_column("population", "country_code")

In [67]:
df = tbl.to_polars()
df.collect()

city,inhabitants,country_code
str,i32,i64
"""San Francisco""",808988,
"""Paris""",2103000,
"""Berlin""",3432000,
"""Drachten""",45505,


In [71]:
# - update column
with tbl.update_schema(allow_incompatible_changes=True) as update:
    # Promote a float to a double
    update.update_column("country_code",field_type=StringType())

In [72]:
df = tbl.to_polars()
df.collect()

city,inhabitants,country_code
str,i32,str
"""San Francisco""",808988,
"""Paris""",2103000,
"""Berlin""",3432000,
"""Drachten""",45505,


In [73]:
# - delete column
with tbl.update_schema(allow_incompatible_changes=True) as update:
    # Promote a float to a double
    update.delete_column("country_code")

In [74]:
df = tbl.to_polars()
df.collect()

city,inhabitants
str,i32
"""San Francisco""",808988
"""Paris""",2103000
"""Berlin""",3432000
"""Drachten""",45505


In [106]:
import polars as pl 

new_city = [
    {
        "city": "dublin", 
        "inhabitants": 1111111111, 
    }
]


# 3. Convert the row to a PyArrow Table
# PyIceberg expects a PyArrow table as input for writes
data_to_insert = pa.Table.from_pylist(new_city,schema=arrow_schema)

tbl.append(data_to_insert)

In [108]:
tbl.history()

[SnapshotLogEntry(snapshot_id=7990933274021676277, timestamp_ms=1768070726588),
 SnapshotLogEntry(snapshot_id=3871475769872198349, timestamp_ms=1768070728162),
 SnapshotLogEntry(snapshot_id=264687693665447278, timestamp_ms=1768070728173),
 SnapshotLogEntry(snapshot_id=1890383408724786095, timestamp_ms=1768070728187),
 SnapshotLogEntry(snapshot_id=9024544952478952156, timestamp_ms=1768071886954)]

In [110]:
tbl.scan().to_polars()

city,inhabitants
str,i32
"""dublin""",1111111111
"""Berlin""",3432000
"""Drachten""",45505
"""Amsterdam""",921402
"""San Francisco""",808988
"""Paris""",2103000


In [111]:
tbl.scan(snapshot_id=1890383408724786095).to_polars()

city,inhabitants
str,i32
"""Berlin""",3432000
"""Drachten""",45505
"""Amsterdam""",921402
"""San Francisco""",808988
"""Paris""",2103000


In [76]:
tbl.schema()

Schema(NestedField(field_id=1, name='city', field_type=StringType(), required=True), NestedField(field_id=2, name='inhabitants', field_type=IntegerType(), required=True), schema_id=0, identifier_field_ids=[1])

In [87]:
tbl.history()

[SnapshotLogEntry(snapshot_id=253764842000279830, timestamp_ms=1768047871886),
 SnapshotLogEntry(snapshot_id=3256103562982925249, timestamp_ms=1768047895222),
 SnapshotLogEntry(snapshot_id=986478525678206652, timestamp_ms=1768047895234),
 SnapshotLogEntry(snapshot_id=1629192013294756382, timestamp_ms=1768047895247),
 SnapshotLogEntry(snapshot_id=362259212802847366, timestamp_ms=1768048343052)]

In [79]:
tbl.snapshot_by_id(snapshot_id=253764842000279830)

Snapshot(snapshot_id=253764842000279830, parent_snapshot_id=None, sequence_number=1, timestamp_ms=1768047871886, manifest_list='file:///home/bobo/Desktop/iceberg_demos/iceberg_catalog/locations/cities/metadata/snap-253764842000279830-0-cfab87ce-fefd-4440-8b7b-424ae0e63daf.avro', summary=Summary(Operation.APPEND, **{'added-files-size': '977', 'added-data-files': '1', 'added-records': '4', 'total-data-files': '1', 'total-delete-files': '0', 'total-records': '4', 'total-files-size': '977', 'total-position-deletes': '0', 'total-equality-deletes': '0'}), schema_id=0)

In [80]:
tbl.snapshot_as_of_timestamp(1768047895234)

Snapshot(snapshot_id=986478525678206652, parent_snapshot_id=3256103562982925249, sequence_number=3, timestamp_ms=1768047895234, manifest_list='file:///home/bobo/Desktop/iceberg_demos/iceberg_catalog/locations/cities/metadata/snap-986478525678206652-0-3c003577-f50f-4093-8ca7-ee92adde56cb.avro', summary=Summary(Operation.APPEND, **{'added-files-size': '911', 'added-data-files': '1', 'added-records': '1', 'total-data-files': '2', 'total-delete-files': '0', 'total-records': '4', 'total-files-size': '1872', 'total-position-deletes': '0', 'total-equality-deletes': '0'}), schema_id=0)

In [96]:
tbl.scan().to_polars()

city,inhabitants
str,i32
"""Berlin""",3432000
"""Drachten""",45505
"""Amsterdam""",921402
"""San Francisco""",808988
"""Paris""",2103000


In [97]:
## add a row demonstrate time travel 
tbl.history()

[SnapshotLogEntry(snapshot_id=7990933274021676277, timestamp_ms=1768070726588),
 SnapshotLogEntry(snapshot_id=3871475769872198349, timestamp_ms=1768070728162),
 SnapshotLogEntry(snapshot_id=264687693665447278, timestamp_ms=1768070728173),
 SnapshotLogEntry(snapshot_id=1890383408724786095, timestamp_ms=1768070728187)]