In [1]:
%load_ext autoreload
%autoreload 2

from kedro.io import AbstractVersionedDataset
import pandas as pd
import rich.pretty
from iceberg_incremental.datasets import PyIcebergDataset
rich.pretty.install()

In [2]:
!curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o /tmp/yellow_tripdata_2023-01.parquet

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 45.4M  100 45.4M    0     0  6196k      0  0:00:07  0:00:07 --:--:-- 9352k


# First create a table 

In [5]:
# Downcast timestamp for compatability https://github.com/apache/iceberg-python/issues/1045#issuecomment-2445205707
import os
os.environ['PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE'] = 'true'

In [8]:
dataset = PyIcebergDataset("taxi_dataset")
df = pd.read_parquet("/tmp/yellow_tripdata_2023-01.parquet")

In [9]:
dataset.save(df)

Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.


If the table does not exists, the dataset will create an Iceberg table automatically. One way to verify this is using the `pyiceberg` CLI.

The uri is hard coded as `sqlite:///tmp/warehouse/pyiceberg_catalog.db". Normally you would not do this but this make the Iceberg table transparent within the directory for inspection.

In [10]:
uri = "sqlite:///tmp/warehouse/pyiceberg_catalog.db"

## Using PyIceberg CLI
```bash
➜  pyiceberg --help
Usage: pyiceberg [OPTIONS] COMMAND [ARGS]...

Options:
--catalog TEXT
--verbose BOOLEAN
--output [text|json]
--ugi TEXT
--uri TEXT
--credential TEXT
--help                Show this message and exit.

Commands:
describe    Describes a namespace xor table
drop        Operations to drop a namespace or table
list        Lists tables or namespaces
location    Returns the location of the table
properties  Properties on tables/namespaces
rename      Renames a table
schema      Gets the schema of the table
spec        Returns the partition spec of the table
uuid        Returns the UUID of the table
```

In [None]:
# List namespace
!pyiceberg --uri $uri list

default


In [None]:
# list table inside default namespace
!pyiceberg --uri $uri list default

default.taxi_dataset


In [15]:
!pyiceberg --uri $uri describe default.taxi_dataset

Table format version  2                                                         
Metadata location     file://tmp/warehouse/default.db/taxi_dataset/metadata/000…
Table UUID            7c224d55-a9ee-4e16-983b-ee3d33b54e19                      
Last Updated          1732274297944                                             
Partition spec        []                                                        
Sort order            []                                                        
Current schema        Schema, id=0                                              
                      ├── 1: VendorID: optional long                            
                      ├── 2: tpep_pickup_datetime: optional timestamp           
                      ├── 3: tpep_dropoff_datetime: optional timestamp          
                      ├── 4: passenger_count: optional double                   
                      ├── 5: trip_distance: optional double                     
                      ├── 6:

## Load Data

In [16]:
data = dataset.load()

In [17]:
data.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0


## Overwrite data

In [18]:
dataset._table.last_sequence_number

[1;36m1[0m

In [19]:
dataset.save(data)

Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.


In [20]:
dataset._table.last_sequence_number

[1;36m3[0m

Interestingly, the sequence number increase by 2 instead of 1. After some investigation, I couldn't find any documentation that explains this in details. But there are some hints in this docstring of `overwrite` method.

```md
        Shorthand for overwriting the table with a PyArrow table.

        An overwrite may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - REPLACE: In case existing Parquet files need to be rewritten.
            - APPEND: In case new data is being inserted into the table.
```

In this case, it seems to perform two operations `DELETE` and `APPEND`, thus two snapshots. We can inspect the snapshot history.

## Inspecting Snapshots

In [21]:
dataset._table.snapshots()


[1m[[0m
    [1;35mSnapshot[0m[1m([0m
        [33msnapshot_id[0m=[1;36m7841752524588546086[0m,
        [33mparent_snapshot_id[0m=[3;35mNone[0m,
        [33msequence_number[0m=[1;36m1[0m,
        [33mtimestamp_ms[0m=[1;36m1732274297944[0m,
        [33mmanifest_list[0m=[32m'file://tmp/warehouse/default.db/taxi_dataset/metadata/snap-7841752524588546086-0-fecbe5f5-9fbd-4dca-9bcc-a775c57166a1.avro'[0m,
        [33msummary[0m=[1;35mSummary[0m[1m([0m[33moperation[0m=[35mOperation[0m.APPEND[1m)[0m,
        [33mschema_id[0m=[1;36m0[0m
    [1m)[0m,
    [1;35mSnapshot[0m[1m([0m
        [33msnapshot_id[0m=[1;36m651915520631835796[0m,
        [33mparent_snapshot_id[0m=[1;36m7841752524588546086[0m,
        [33msequence_number[0m=[1;36m2[0m,
        [33mtimestamp_ms[0m=[1;36m1732275203862[0m,
        [33mmanifest_list[0m=[32m'file://tmp/warehouse/default.db/taxi_dataset/metadata/snap-651915520631835796-0-2e76d850-273f-4910-a93c-90230b

You can see that there are consecutive snapshots like this. Now let's load the data to confirm this.
```
summary=Summary(operation=Operation.DELETE),
summary=Summary(operation=Operation.APPEND),
```

Let's grab the snapshot_id and load the data.

## Load Iceberg table with snapshot_id

In [30]:
_ = dataset._load(snapshot_id=651915520631835796)
_

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee


It looks like an empty table after `DELETE`, let's check the other snapshot.

In [34]:
_ = dataset._load(snapshot_id=dataset._table.snapshots()[-1].snapshot_id)

In [35]:
_.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0


## Use Tag/Branch for versioning


In [36]:
t = dataset._table

In [37]:
t.scan?

[0;31mSignature:[0m
[0mt[0m[0;34m.[0m[0mscan[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mrow_filter[0m[0;34m:[0m [0;34m'Union[str, BooleanExpression]'[0m [0;34m=[0m [0mAlwaysTrue[0m[0;34m([0m[0;34m)[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mselected_fields[0m[0;34m:[0m [0;34m'Tuple[str, ...]'[0m [0;34m=[0m [0;34m([0m[0;34m'*'[0m[0;34m,[0m[0;34m)[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mcase_sensitive[0m[0;34m:[0m [0;34m'bool'[0m [0;34m=[0m [0;32mTrue[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msnapshot_id[0m[0;34m:[0m [0;34m'Optional[int]'[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0moptions[0m[0;34m:[0m [0;34m'Properties'[0m [0;34m=[0m [0;34m{[0m[0;34m}[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mlimit[0m[0;34m:[0m [0;34m'Optional[int]'[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m [0;34m->[0m [0;34m'DataScan'[0m[0;34m[0m[0;34m[0m[0m
[0;31

In [334]:
# Tags fits Kedro versioning better so I use tags instead of branch here
table = dataset._table

In [344]:
s = table.manage_snapshots()
(
    s.create_tag(
        snapshot_id=table.current_snapshot().snapshot_id, tag_name="2024-11-21"
    ).create_tag(snapshot_id=table.snapshots()[0].snapshot_id, tag_name="2024-11-20")
    .commit()
)
# Use the first snapshot as an "older" version

In [346]:
snapshot = table.snapshot_by_name("2024-11-21")
type(snapshot)

[1m<[0m[1;95mclass[0m[39m [0m[32m'pyiceberg.table.snapshots.Snapshot'[0m[1m>[0m

In [348]:
table.scan(snapshot_id=table.snapshot_by_name("2024-11-21")).to_arrow()

ValueError: Snapshot not found: Operation.APPEND: id=1188167320489258665, parent_id=6828333843454423456, schema_id=0

In [None]:
UPDATE
col_a from orders where x=1

row_filter: = x=1
selected_fields = col_a from orders where x=1


In [343]:
s.create_tag??

[0;31mSignature:[0m
[0ms[0m[0;34m.[0m[0mcreate_tag[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0msnapshot_id[0m[0;34m:[0m [0;34m'int'[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mtag_name[0m[0;34m:[0m [0;34m'str'[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmax_ref_age_ms[0m[0;34m:[0m [0;34m'Optional[int]'[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m [0;34m->[0m [0;34m'ManageSnapshots'[0m[0;34m[0m[0;34m[0m[0m
[0;31mSource:[0m   
    [0;32mdef[0m [0mcreate_tag[0m[0;34m([0m[0mself[0m[0;34m,[0m [0msnapshot_id[0m[0;34m:[0m [0mint[0m[0;34m,[0m [0mtag_name[0m[0;34m:[0m [0mstr[0m[0;34m,[0m [0mmax_ref_age_ms[0m[0;34m:[0m [0mOptional[0m[0;34m[[0m[0mint[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m)[0m [0;34m->[0m [0mManageSnapshots[0m[0;34m:[0m[0;34m[0m
[0;34m[0m        [0;34m"""[0m
[0;34m        Create a new tag pointing to the given snapshot id.[0m
[0;34m[0m
[0;34m    

In [338]:
table.metadata.refs


[1m{[0m
    [32m'main'[0m: [1;35mSnapshotRef[0m[1m([0m
        [33msnapshot_id[0m=[1;36m1188167320489258665[0m,
        [33msnapshot_ref_type[0m=[35mSnapshotRefType[0m.BRANCH,
        [33mmin_snapshots_to_keep[0m=[3;35mNone[0m,
        [33mmax_snapshot_age_ms[0m=[3;35mNone[0m,
        [33mmax_ref_age_ms[0m=[3;35mNone[0m
    [1m)[0m
[1m}[0m