# Minimal example pyIceberg

## Setting up the catalog

Iceberg leverages the catalog to have one centralized place to organize the tables. Since we don't want any dependencies on the Java stack, we use a local `sqlite` catalog. Main learnings from writing this notebook:

- We need to use `pyiceberg >= 0.7` due to [known issue in mapping of datatypes](https://github.com/apache/iceberg-python/issues/378)
- For ease of use and portability, we use `sqlite` as our catalog. However, `sqlite` does not support concurrency, so we need to make sure only one process has write access at any given time. We may decide to use PostgreSQL as a proper catalog at a later stage (serverless?!)
- You need a catalog to write to Iceberg, but you can read tables without a catalog (see static table below)


Further reading:

- https://tabular.io/apache-iceberg-cookbook/pyiceberg-polars/


In [1]:
from pathlib import Path

import polars as pl
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError


warehouse_path = Path("./tmp/warehouse")
if not warehouse_path.exists():
    warehouse_path.mkdir(parents=True)

catalog = SqlCatalog(
    "default",
    **{
        "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
        "warehouse": f"file://{warehouse_path}",
    },
)

## Demonstrate full roundtrip creating, loading and reading a table

We use DIS data as our demonstrator.

### Defining the schema (similar to DDL in SQL)

- Iceberg needs a `pyiceberg.schema.Schema` or `pyarrow.Schema` to define a table.
- Note that `polars`' native parser handles dates differently than `pyarrow` (see [here](https://github.com/pola-rs/polars/issues/1330) and [here](https://github.com/vega/altair/pull/3377)). **Hence `use_pyarrow=True` throughout**.

In [2]:
catalog.create_namespace("dis")
DIS = {
    "zorgproducten": "https://www.opendisdata.nl/download/csv/01_DBC.csv",
    "zorgactiviteiten": "https://www.opendisdata.nl/download/csv/02_DBC_PROFIEL.csv",
    "ref-zorgproducten": "https://www.opendisdata.nl/download/csv/05_REF_ZPD.csv",
    "ref-zorgactiviteiten": "https://www.opendisdata.nl/download/csv/03_REF_ZAT.csv",
    "ref-specialismen": "https://www.opendisdata.nl/download/csv/06_REF_SPC.csv",
    "ref-diagnoses": "https://www.opendisdata.nl/download/csv/04_REF_DGN.csv",
}

zp = pl.read_csv(DIS.get("zorgproducten"), try_parse_dates=True)
zp

VERSIE,DATUM_BESTAND,PEILDATUM,JAAR,BEHANDELEND_SPECIALISME_CD,TYPERENDE_DIAGNOSE_CD,ZORGPRODUCT_CD,AANTAL_PAT_PER_ZPD,AANTAL_SUBTRAJECT_PER_ZPD,AANTAL_PAT_PER_DIAG,AANTAL_SUBTRAJECT_PER_DIAG,AANTAL_PAT_PER_SPC,AANTAL_SUBTRAJECT_PER_SPC,GEMIDDELDE_VERKOOPPRIJS
f64,date,date,i64,i64,str,i64,i64,i64,i64,i64,i64,i64,i64
1.0,2024-04-15,2024-04-01,2021,318,"""203""",119499078,14213,16904,44133,58881,560226,832680,280
1.0,2024-04-15,2024-04-01,2020,307,"""B13""",159899007,102,102,118,118,677776,1128999,4455
1.0,2024-04-15,2024-04-01,2021,361,"""104""",990061029,1574,2000,3439,5157,95133,142589,195
1.0,2024-04-15,2024-04-01,2020,307,"""G14""",990003024,67,67,19592,20820,677776,1128999,150
1.0,2024-04-15,2024-04-01,2021,318,"""605""",119499075,529,574,5831,7354,560226,832680,705
…,…,…,…,…,…,…,…,…,…,…,…,…,…
1.0,2024-04-15,2024-04-01,2016,303,"""212""",199299062,1,1,39064,44560,1332254,1831326,5450
1.0,2024-04-15,2024-04-01,2014,313,"""263""",29699006,1,1,1743,2262,1037157,2061888,4865
1.0,2024-04-15,2024-04-01,2016,303,"""237""",199299063,1,1,1500,1744,1332254,1831326,3380
1.0,2024-04-15,2024-04-01,2014,313,"""779""",979003008,1,1,777,1239,1037157,2061888,7560


In [3]:
%%time
zorgproducten = catalog.create_table(
    "dis.zorgproducten",
    schema=zp.to_arrow().schema,
)

CPU times: user 1.64 s, sys: 117 ms, total: 1.75 s
Wall time: 317 ms


### Writing data

You can append to tables.

In [4]:
%%time
zorgproducten.append(zp.to_arrow())

CPU times: user 130 ms, sys: 7.87 ms, total: 138 ms
Wall time: 89.8 ms


### Reading data

In [5]:
zp.equals(pl.scan_iceberg(zorgproducten).collect())

True

## Demonstrate for full DIS dataset

I am being lazy, so with trial and error I am overriding the infered schema using `dtypes` keyword argument (see [docs](https://docs.pola.rs/py-polars/html/reference/api/polars.read_csv.html))

In [6]:
%%time
dtypes = {"TYPERENDE_DIAGNOSE_CD": pl.datatypes.String}

for table, url in DIS.items():
    if table != "zorgproducten":
        try:
            catalog.drop_table(f"dis.{table}")
        except NoSuchTableError as e:
            pass
        print(f"Loading {table} from {url}")
        df = pl.read_csv(url, dtypes=dtypes, try_parse_dates=True)
        table_ = catalog.create_table(f"dis.{table}", schema=df.to_arrow().schema)
        table_.append(df.to_arrow())

Loading zorgactiviteiten from https://www.opendisdata.nl/download/csv/02_DBC_PROFIEL.csv
Loading ref-zorgproducten from https://www.opendisdata.nl/download/csv/05_REF_ZPD.csv
Loading ref-zorgactiviteiten from https://www.opendisdata.nl/download/csv/03_REF_ZAT.csv
Loading ref-specialismen from https://www.opendisdata.nl/download/csv/06_REF_SPC.csv
Loading ref-diagnoses from https://www.opendisdata.nl/download/csv/04_REF_DGN.csv
CPU times: user 3.92 s, sys: 842 ms, total: 4.77 s
Wall time: 10.4 s


## Reading static table with metadata

Not sure how this works: you don't know which `*.metadata.json` is the current one?! Only thing I can think of is looking at the file attribute

In [7]:
def get_latest(path: Path, pattern: str = "*"):
    files = path.glob(pattern)
    return max(files, key=lambda x: x.stat().st_ctime).as_posix()


pl.scan_iceberg(
    get_latest(warehouse_path / "dis.db/zorgproducten/metadata", "*.metadata.json")
).collect()

VERSIE,DATUM_BESTAND,PEILDATUM,JAAR,BEHANDELEND_SPECIALISME_CD,TYPERENDE_DIAGNOSE_CD,ZORGPRODUCT_CD,AANTAL_PAT_PER_ZPD,AANTAL_SUBTRAJECT_PER_ZPD,AANTAL_PAT_PER_DIAG,AANTAL_SUBTRAJECT_PER_DIAG,AANTAL_PAT_PER_SPC,AANTAL_SUBTRAJECT_PER_SPC,GEMIDDELDE_VERKOOPPRIJS
f64,date,date,i64,i64,str,i64,i64,i64,i64,i64,i64,i64,i64
1.0,2024-04-15,2024-04-01,2021,318,"""203""",119499078,14213,16904,44133,58881,560226,832680,280
1.0,2024-04-15,2024-04-01,2020,307,"""B13""",159899007,102,102,118,118,677776,1128999,4455
1.0,2024-04-15,2024-04-01,2021,361,"""104""",990061029,1574,2000,3439,5157,95133,142589,195
1.0,2024-04-15,2024-04-01,2020,307,"""G14""",990003024,67,67,19592,20820,677776,1128999,150
1.0,2024-04-15,2024-04-01,2021,318,"""605""",119499075,529,574,5831,7354,560226,832680,705
…,…,…,…,…,…,…,…,…,…,…,…,…,…
1.0,2024-04-15,2024-04-01,2016,303,"""212""",199299062,1,1,39064,44560,1332254,1831326,5450
1.0,2024-04-15,2024-04-01,2014,313,"""263""",29699006,1,1,1743,2262,1037157,2061888,4865
1.0,2024-04-15,2024-04-01,2016,303,"""237""",199299063,1,1,1500,1744,1332254,1831326,3380
1.0,2024-04-15,2024-04-01,2014,313,"""779""",979003008,1,1,777,1239,1037157,2061888,7560


## Where do we go from here?

- We can run local catalogs with `sqlite`, but we need to be careful with concurrency for write; only one process at a time
- Use static table via metadata for read-access
- Option: run central `PostgreSQL` catalog as server on Azure
  - [PostgreSQL single server](https://learn.microsoft.com/en-us/azure/postgresql/single-server/overview-single-server), minimal setup starting at [EUR 27 per month](https://azure.microsoft.com/en-us/pricing/details/postgresql/server/)
