# Experimenting with PyIceberg

This notebooks serves as experimenting snippes with PyIceberg capabilities. It will include:

- Read operations
- Write operations (MERGE, write Hive partitioned, partition overwrite etc.)

For demo purposes, we'll follow the example provided in the documentation [here](https://py.iceberg.apache.org/api/), so we'll create a local catalog, some tables and perform operations.

Everytime the notebook is executed, we'll recreate the warehouse folder so that we don't get errors in `load_catalog` function (CREATE IF NOT EXISTS seems not to be supported).

## Connect to Nessie Catalog

In [None]:
from datetime import UTC, datetime

from pyiceberg.catalog import load_catalog

catalog_config = {
    "type": "sql",
    "uri": "postgresql+psycopg2://user:password@postgres_db/catalog_db",
}
catalog = load_catalog("sql_catalog", **catalog_config)

In [None]:
print(catalog.list_namespaces())
if ("default",) not in catalog.list_namespaces():
    catalog.create_namespace("default")

ns = catalog.list_namespaces()

In [None]:
catalog.list_tables("default")

## Create a table

In [None]:
import polars as pl
import pyarrow as pa
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.sorting import SortField, SortOrder
from pyiceberg.transforms import DayTransform, IdentityTransform
from pyiceberg.types import (
    DoubleType,
    FloatType,
    NestedField,
    StringType,
    StructType,
    TimestampType,
)

schema = Schema(
    NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
    NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
    NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
    NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
    NestedField(
        field_id=5,
        name="details",
        field_type=StructType(
            NestedField(field_id=4, name="created_by", field_type=StringType(), required=False),
        ),
        required=False,
    ),
)


partition_spec = PartitionSpec(
    PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day")
)


# Sort on the symbol
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))

if not catalog.table_exists("default.bids"):
    catalog.create_table(
        identifier="default.bids",
        schema=schema,
        location="s3://warehouse/bids",
        partition_spec=partition_spec,
        sort_order=sort_order,
    )

## Load a table

In [None]:
table = catalog.load_table("default.bids")

## Check if table exists

In [None]:
catalog.table_exists("default.bids")

## Convert to Polars DataFrame

In [None]:
table_df = table.scan().to_polars()
table_df

## Append some data

Note that is necessary to convert the pyarrow schema to the one of the table, not the one inferred by Polars.

This is because Polars does not care about not null columns, therefore Iceberg will give errors due to schema missmatch.

In [None]:
def cast_to_pyarrow(df: pl.DataFrame, schema: pa.Schema) -> pa.Table:
    """Cast a Polars DataFrame to a PyArrow Table with the given schema."""
    arrow_df = df.to_arrow()
    return arrow_df.cast(schema)


data = pl.DataFrame(
    {
        "datetime": [
            datetime(2023, 1, 1, 12, 0, tzinfo=UTC),
            datetime(2023, 1, 2, 12, 0, tzinfo=UTC),
            datetime(2023, 1, 3, 12, 0, tzinfo=UTC),
        ],
        "symbol": ["AAPL", "GOOGL", "MSFT"],
        "bid": [150.0, 2800.0, 300.0],
        "ask": [151.0, 2805.0, 305.0],
        "details": [
            {"created_by": "user1"},
            {"created_by": "user2"},
            {"created_by": None},
        ],
    },
)

In [None]:
table.append(df=cast_to_pyarrow(data, table.schema().as_arrow()))

In [None]:
table.scan().to_polars()

## Test upsert

In [None]:
df = pa.Table.from_pylist(
    [
        {
            "datetime": datetime(2023, 1, 1, 12, 0, tzinfo=UTC),
            "symbol": "AAPL2",
            "bid": 150.0,
            "ask": 151.0,
            "details": {"created_by": "user1"},
        },
        {
            "datetime": datetime(2023, 1, 4, 12, 0, tzinfo=UTC),
            "symbol": "AMZ",
            "bid": 2800.0,
            "ask": 2805.0,
            "details": {"created_by": "user1"},
        },
    ],
    schema=table.schema().as_arrow(),
)
pl.from_arrow(df)

In [None]:
table.upsert(df, join_cols=["datetime"])

In [None]:
table.scan().to_polars().sort("datetime")

# Check partition overwrite

Here we'll check if partial overwrite does work. We'll use the same dataframe as before for the upsert, but we'll run an `overwrite` operation only on the partition `datetime=2023-01-01T12:00:00`

In [None]:
from pyiceberg.expressions import EqualTo

df = pa.Table.from_pylist(
    [
        {
            "datetime": datetime(2023, 1, 1, 12, 0, tzinfo=UTC),
            "symbol": "AAPL2",
            "bid": 150.0,
            "ask": 151.0,
            "details": {"created_by": "user1"},
        },
        {
            "datetime": datetime(2023, 1, 4, 12, 0, tzinfo=UTC),
            "symbol": "AMZ",
            "bid": 2800.0,
            "ask": 2805.0,
            "details": {"created_by": "user1"},
        },
    ],
    schema=table.schema().as_arrow(),
)
pl.from_arrow(df)
print(pl.from_arrow(df))
table.overwrite(df, overwrite_filter=EqualTo("datetime", "2023-01-01T12:00:00"))

In [None]:
table.scan().to_polars()

This is cool, because it respected the filter, and it created a duplicate for ` 2023-01-04 12:00:00` since it was not specified in the filter.

## Overwrite from Polars

Polars has `read_iceberg` and `write_iceberg` predicates, let's see if they work.

In [None]:
pl.scan_iceberg(table, reader_override="pyiceberg").collect()

There are still problems with how schema is evaluated for required fields.

In [None]:
data.write_iceberg(table, "overwrite")

## Partition evolution

In [None]:
from pyiceberg.transforms import IdentityTransform

with table.update_spec() as update:
    # update.add_field("symbol", IdentityTransform(), "symbol")
    update.rename_field("datetime_day", "datetime")

In [None]:
table.inspect.partitions()

## Check History

In [None]:
pl.from_arrow(table.inspect.history())