# Apache Iceberg Workbook

This notebook allows to work with Iceberg tables using PyIceberg lib. This workbook helps you to 
- [x] Connect with Polaris REST Catalog
- [x] Create Iceberg Tables 
- [x] Query Tables
- [ ] Schema Evolution
- [ ] Partition Evolution


## Imports


In [75]:
import os
from pathlib import Path

from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.exceptions import (
    NamespaceAlreadyExistsError,
    NoSuchTableError,
    TableAlreadyExistsError
)

from pyiceberg.schema import Schema
from pyiceberg.types import (
    NestedField,
    StringType,
    LongType,
    DecimalType,
    TimestamptzType,
)

from pyiceberg.partitioning import (
    PartitionSpec,
    PartitionField,
)
from pyiceberg.transforms import (
    DayTransform,
    IdentityTransform,
    VoidTransform,
    HourTransform,
)

from pyiceberg.table.sorting import (
    SortOrder,
    SortField,
    SortDirection,
    NullOrder,
)

## PyIceberg Version

In [76]:
import pyiceberg

print(f"PyIceberg version: {pyiceberg.__version__}")

PyIceberg version: 0.8.1


## Retrieve Principal Credentials

As part of the catalog setup script, the Principal(`super_user`) credentials are stored in `$PROJECT_HOME/work/principal.txt`, let us retrieve it for further operations.


In [77]:
principal_cred_file = Path(os.getcwd()).parent.joinpath("work", "principal.txt")
with open(principal_cred_file, "r") as file:
    realm, client_id, client_secret = file.readline().split(",")

## Define Variables

Let us define some variables for us across the notebook


In [78]:
POLARIS_BASE_URI="http://localhost:18181"
# IMPORTANT!!! /api/catalog or get the prefix from your OpenCatalog instance
CATALOG_URI = f"{POLARIS_BASE_URI}/api/catalog"
OAUTH2_SERVER_URI= f"{POLARIS_BASE_URI}/api/catalog/v1/oauth/tokens"
catalog_name = "balloon-game"
# database
namespace = "balloon_pops"

## Working with Catalog

Let us retrieve the catalog `polardb` that we created earlier using the `catalog_setup.yml` script.


In [79]:
catalog = RestCatalog(
    name=catalog_name,
    **{
        "uri": CATALOG_URI,
        "credential": f"{client_id}:{client_secret}",
        "header.content-type": "application/vnd.api+json",
        "header.X-Iceberg-Access-Delegation": "vended-credentials",
        "header.Polaris-Realm": realm,
        "oauth2-server-uri": OAUTH2_SERVER_URI,
        "warehouse": catalog_name,
        "scope": "PRINCIPAL_ROLE:ALL",
    },
)

### Create Database(Namespace)

Create a new database named `balllon_pops`


In [49]:
try:
    catalog.create_namespace(namespace)
except NamespaceAlreadyExistsError:
    print(f"Namespace '{namespace}' already exists")
except Exception as e:
    print(e)

Namespace 'balloon_pops' already exists


## Create Tables


#### Leaderboard


In [80]:
## Create leaderboard Table
leaderboard_table_id = f"{namespace}.leaderboard"
# since its demo, dropping and creating it for sanity
#catalog.drop_table(leaderboard_table_id)
## Schema
leaderboard_schema = Schema(
    NestedField(
        field_id=1,
        name="player",
        type=StringType(),
        required=True,
    ),
    NestedField(
        field_id=2,
        name="total_score",
        type=LongType(),
        required=True,
    ),
    NestedField(
        field_id=3,
        name="bonus_hits",
        type=LongType(),
        required=True,
    ),
    NestedField(
        field_id=4,
        name="event_ts",
        type=TimestamptzType(),
        required=True,
    ),
)
## Partition Specification
partition_fields = [
    PartitionField(
        source_id=4,
        field_id=1001,
        transform=DayTransform(),
        name="datetime_day",
    ),
    PartitionField(
        source_id=1,
        field_id=1002,
        transform=IdentityTransform(),
        name="by_player",
    ),
]
leaderboard_partition_spec = PartitionSpec(*partition_fields)
## Sort Specification
order_fields = [
    SortField(
        source_id=2,
        direction=SortDirection.DESC,
        null_order=NullOrder.NULLS_LAST,
        transform=VoidTransform(),
    ),
    SortField(
        source_id=3,
        direction=SortDirection.DESC,
        null_order=NullOrder.NULLS_LAST,
        transform=VoidTransform(),
    ),
]
sort_order = SortOrder(*order_fields)

leaderboard_table = catalog.create_table(
    identifier=leaderboard_table_id,
    schema=leaderboard_schema,
    partition_spec=leaderboard_partition_spec,
    sort_order=sort_order,
    properties={
        "format-version": "2",  # Required for merge-on-read
        "write.delete.mode": "merge-on-read",  # required for upserts
        "write.update.mode": "merge-on-read",  # required for upserts
        "write.merge.mode": "merge-on-read",  # required for upserts
    },
)
print(f"Table {leaderboard_table_id},\n{leaderboard_table}")

TableAlreadyExistsError: AlreadyExistsException: Table already exists: balloon_pops.leaderboard

#### Balloon Color Stats

In [None]:
## Create balloon_color_stats table
balloon_color_stats_table_id = f"{namespace}.balloon_color_stats"
# since its demo, dropping and creating it for sanity
#catalog.drop_table(balloon_color_stats_table_id)
## Schema
balloon_color_stats_schema = Schema(
    NestedField(
        field_id=1,
        name="player",
        type=StringType(),
        required=True,
    ),
    NestedField(
        field_id=2,
        name="balloon_color",
        type=StringType(),
        required=True,
    ),
    NestedField(
        field_id=3,
        name="balloon_pops",
        type=LongType(),
        required=True,
    ),
    NestedField(
        field_id=4,
        name="points_by_color",
        type=LongType(),
        required=True,
    ),
    NestedField(
        field_id=5,
        name="bonus_hits",
        type=LongType(),
        required=True,
    ),
    NestedField(
        field_id=6,
        name="event_ts",
        type=TimestamptzType(),
        required=True,
    ),
)
## Partition Specification
partition_fields = [
    PartitionField(
        source_id=6,
        field_id=1001,
        transform=DayTransform(),
        name="datetime_day",
    ),
    PartitionField(
        source_id=2,
        field_id=1002,
        transform=IdentityTransform(),
        name="by_balloon_color",
    ),
    PartitionField(
        source_id=1,
        field_id=1003,
        transform=IdentityTransform(),
        name="by_player",
    ),
]
balloon_color_stats_partition_spec = PartitionSpec(*partition_fields)
## Sort Specification
order_fields = [
    SortField(
        source_id=4,
        direction=SortDirection.DESC,
        null_order=NullOrder.NULLS_LAST,
        transform=VoidTransform(),
    ),
    SortField(
        source_id=3,
        direction=SortDirection.DESC,
        null_order=NullOrder.NULLS_LAST,
        transform=VoidTransform(),
    ),
]
sort_order = SortOrder(*order_fields)
## Create Table
balloon_color_stats_table = catalog.create_table(
    identifier=balloon_color_stats_table_id,
    schema=balloon_color_stats_schema,
    partition_spec=balloon_color_stats_partition_spec,
    sort_order=sort_order,
    properties={
        "format-version": "2",  # Required for merge-on-read
        "write.delete.mode": "merge-on-read",  # required for upserts
        "write.update.mode": "merge-on-read",  # required for upserts
        "write.merge.mode": "merge-on-read",  # required for upserts
    },
)
print(f"Table {balloon_color_stats_table_id},\n{balloon_color_stats_table}")

Table balloon_pops.balloon_color_stats,
balloon_color_stats(
  1: player: required string,
  2: balloon_color: required string,
  3: balloon_pops: required long,
  4: points_by_color: required long,
  5: bonus_hits: required long,
  6: event_ts: required timestamptz
),
partition by: [datetime_day, by_balloon_color, by_player],
sort order: [void(4) DESC NULLS LAST, void(3) DESC NULLS LAST],
snapshot: null


### Timseries

#### Realtime Scores

In [None]:
## Create realtime_scores table
realtime_scores_tbl_id = f"{namespace}.realtime_scores"
# since its demo, dropping and creating it for sanity
try:
    catalog.drop_table(realtime_scores_tbl_id)
    ## Schema
    realtime_scores_schema = Schema(
        NestedField(
            field_id=1,
            name="player",
            type=StringType(),
            required=True,
        ),
        NestedField(
            field_id=2,
            name="total_score",
            type=LongType(),
            required=True,
        ),
        NestedField(
            field_id=3,
            name="window_start",
            type=TimestamptzType(),
            required=True,
        ),
        NestedField(
            field_id=4,
            name="window_end",
            type=TimestamptzType(),
            required=True,
        ),
    )

    realtime_scores_table = catalog.create_table(
        identifier=realtime_scores_tbl_id,
        schema=realtime_scores_schema,
        partition_spec=PartitionSpec(
            PartitionField(
                source_id=3,
                field_id=1001,
                name="by_window_start_hour",
                transform=HourTransform(),
            ),
        ),
        properties={
            "format-version": "2",
        },
    )
    print(f"Table {realtime_scores_tbl_id},\n{realtime_scores_table}")
except NoSuchTableError as err:
    print(err)
except TableAlreadyExistsError as err2:
    print(err2)

NoSuchTableException: Table does not exist: balloon_pops.realtime_scores


#### Balloon Colored Pops

In [None]:
## Create balloon_colored_pops table
balloon_colored_pops_tbl_id = f"{namespace}.balloon_colored_pops"
#catalog.drop_table(balloon_colored_pops_tbl_id)
## Schema
balloon_colored_pops_schema = Schema(
    NestedField(
        field_id=1,
        name="player",
        type=StringType(),
        required=True,
    ),
    NestedField(
        field_id=2,
        name="balloon_color",
        type=StringType(),
        required=True,
    ),
    NestedField(
        field_id=3,
        name="balloon_pops",
        type=LongType(),
        required=True,
    ),
    NestedField(
        field_id=4,
        name="points_by_color",
        type=LongType(),
        required=True,
    ),
    NestedField(
        field_id=5,
        name="bonus_hits",
        type=LongType(),
        required=True,
    ),
    NestedField(
        field_id=6,
        name="window_start",
        type=TimestamptzType(),
        required=True,
    ),
    NestedField(
        field_id=7,
        name="window_end",
        type=TimestamptzType(),
        required=True,
    ),
)

partition_fields = [
    PartitionField(
        source_id=6,  # window_start
        field_id=1001,
        name="by_window_start_hour",
        transform=HourTransform(),
    ),
    PartitionField(
        source_id=1,  # player field
        field_id=1002,
        name="by_player",
        transform=IdentityTransform(),
    ),
]
balloon_colored_pops_table = catalog.create_table(
    identifier=balloon_colored_pops_tbl_id,
    schema=balloon_colored_pops_schema,
    partition_spec=PartitionSpec(*partition_fields),
    sort_order=SortOrder(
        SortField(
            source_id=2,  # balloon_color field
            transform=IdentityTransform(),
            direction=SortDirection.ASC,
            null_order=NullOrder.NULLS_LAST,
        ),
    ),
    properties={
        "format-version": "2",
    },
)
print(f"Table {balloon_colored_pops_tbl_id},\n{balloon_colored_pops_table}")

Table balloon_pops.balloon_colored_pops,
balloon_colored_pops(
  1: player: required string,
  2: balloon_color: required string,
  3: balloon_pops: required long,
  4: points_by_color: required long,
  5: bonus_hits: required long,
  6: window_start: required timestamptz,
  7: window_end: required timestamptz
),
partition by: [by_window_start_hour, by_player],
sort order: [2 ASC NULLS LAST],
snapshot: null


#### Color Performance Trends

In [None]:
## Create color_performance_trends table
color_performance_trends_pops_tbl_id = f"{namespace}.color_performance_trends"
# catalog.drop_table(color_performance_trends_pops_tbl_id)
## Schema
color_performance_trends_pops_schema = Schema(
    NestedField(
        field_id=1,
        name="balloon_color",
        type=StringType(),
        required=True,
    ),
    NestedField(
        field_id=2,
        name="avg_score_per_pop",
        type=DecimalType(2,10),
        required=True,
    ),
    NestedField(
        field_id=3,
        name="total_pops",
        type=LongType(),
        required=True,
    ),
    NestedField(
        field_id=4,
        name="window_start",
        type=TimestamptzType(),
        required=True,
    ),
    NestedField(
        field_id=5,
        name="window_end",
        type=TimestamptzType(),
        required=True,
    ),
)

partition_fields = [
    PartitionField(
        source_id=4,  # window_start
        field_id=1001,
        name="by_window_start_hour",
        transform=HourTransform(),
    ),
    PartitionField(
        source_id=1,  # balloon_color field
        field_id=1002,
        name="by_balloon_color",
        transform=IdentityTransform(),
    ),
]
color_performance_trends_pops_table = catalog.create_table(
    identifier=color_performance_trends_pops_tbl_id,
    schema=color_performance_trends_pops_schema,
    partition_spec=PartitionSpec(*partition_fields),
    sort_order=SortOrder(
        SortField(
            source_id=1,  # balloon_color field
            transform=IdentityTransform(),
            direction=SortDirection.ASC,
            null_order=NullOrder.NULLS_LAST,
        ),
    ),
    properties={
        "format-version": "2",
    },
)
print(f"Table {color_performance_trends_pops_tbl_id}, {color_performance_trends_pops_table}")

Table balloon_pops.color_performance_trends, color_performance_trends(
  1: balloon_color: required string,
  2: avg_score_per_pop: required decimal(2, 10),
  3: total_pops: required long,
  4: window_start: required timestamptz,
  5: window_end: required timestamptz
),
partition by: [by_window_start_hour, by_balloon_color],
sort order: [1 ASC NULLS LAST],
snapshot: null


## Query Tables


#### `leaderboard` Table


In [81]:
try:
    table = catalog.load_table(f"{namespace}.leaderboard")
    print(table)
except Exception as e:
    print(e)

leaderboard(
  1: player: required string,
  2: total_score: required long,
  3: bonus_hits: required long,
  4: event_ts: required timestamptz
),
partition by: [datetime_day, by_player],
sort order: [void(2) DESC NULLS LAST, void(3) DESC NULLS LAST],
snapshot: Operation.OVERWRITE: id=19, parent_id=18, schema_id=0


In [82]:
df = table.scan().to_pandas()
df.head()

ValueError: PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568

#### `balloon_color_stats` table


In [None]:
try:
    tbl_balloon_color_stats = catalog.load_table(f"{namespace}.balloon_color_stats")
    print(tbl_balloon_color_stats)
except Exception as e:
    print(e)

balloon_color_stats(
  1: player: required string,
  2: balloon_color: required string,
  3: points_by_color: required int,
  4: bonus_hits: required int,
  5: event_ts: required timestamptz
),
partition by: [datetime_day, by_balloon_color, by_player],
sort order: [void(3) DESC NULLS LAST, void(4) DESC NULLS LAST],
snapshot: null


In [None]:
_balloon_color_stats_df = tbl_balloon_color_stats.scan().to_pandas()
print(_balloon_color_stats_df.head())

### Time Series

#### `realtime_scores` table


In [None]:
try:
    tbl_realtime_scores = catalog.load_table(f"{namespace}.realtime_scores")
    print(tbl_realtime_scores)
except Exception as e:
    print(e)

realtime_scores(
  1: player: required string,
  2: total_score: required int,
  3: window_start: required timestamptz,
  4: window_end: required timestamptz
),
partition by: [by_window],
sort order: [],
snapshot: null


In [None]:
_realtime_scores_df = tbl_realtime_scores.scan().to_pandas()
print(_realtime_scores_df.head())

#### `balloon_colored_pops` table


In [None]:
try:
    tbl_balloon_colored_pops = catalog.load_table(f"{namespace}.balloon_colored_pops")
    print(tbl_balloon_colored_pops)
except Exception as e:
    print(e)

balloon_colored_pops(
  1: player: required string,
  2: balloon_color: required string,
  3: balloon_pops: required int,
  4: points_by_color: required int,
  5: bonus_hits: required int,
  6: window_start: required timestamptz,
  7: window_end: required timestamptz
),
partition by: [by_window_start_hour, by_player],
sort order: [2 ASC NULLS LAST],
snapshot: null


In [None]:
_balloon_colored_pops_df = tbl_balloon_colored_pops.scan().to_pandas()
print(_balloon_colored_pops_df.head())

#### `color_performance_trends` table


In [None]:
try:
    tbl_color_performance_trends = catalog.load_table(
        f"{namespace}.color_performance_trends"
    )
    print(tbl_color_performance_trends)
except Exception as e:
    print(e)

color_performance_trends(
  1: balloon_color: required string,
  2: avg_score_per_pop: required double,
  3: total_pops: required int,
  4: window_start: required timestamptz,
  5: window_end: required timestamptz
),
partition by: [by_window_start_hour, by_balloon_color],
sort order: [1 ASC NULLS LAST],
snapshot: null


In [None]:
_color_performance_trends_df = tbl_color_performance_trends.scan().to_pandas()
print(_color_performance_trends_df.head())

## Schema Evolution

An Optional example of how to do schema evolution with the leaderboard table. Table definition with two columns `player` and `total_score`, but as part of my analytics I thought to add the `bonus_hits`. Now the sink from Rising will not work as the query returns 3 columns where as the target table has two columns.

**Solution** is to evolve the schema to accomodate :)


In [None]:
from pyiceberg.types import LongType  # int64

with table.update_schema() as update:
    update.add_column(
        "bonus_hits", LongType(), "Total number of bonus hits popped by the player"
    )

Now scanning the table and loading again will result in additional column but with Null values.


In [None]:
df = table.scan().to_pandas()
print(df.head(10))

Lets recreate the sink.


In [None]:
table.inspect.snapshots()

In [None]:
try:
    table2 = catalog.load_table(f"{namespace}.realtime_scores")
    print(table2)
except Exception as e:
    print(e)

In [None]:
df = table2.scan().to_pandas()
print(df.head(10))