![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)

### [Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!](https://tabular.io/blog/docker-spark-and-iceberg/)

In [201]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

spark

## Load One Month of NYC Taxi/Limousine Trip Data

For this notebook, we will use the New York City Taxi and Limousine Commision Trip Record Data that's available on the AWS Open Data Registry. This contains data of trips taken by taxis and for-hire vehicles in New York City. We'll save this into an iceberg table called `taxis`.

To be able to rerun the notebook several times, let's drop the table if it exists to start fresh.

In [202]:
%%sql

CREATE DATABASE IF NOT EXISTS nyc

In [203]:
%%sql

DROP TABLE IF EXISTS nyc.taxis

In [4]:
df = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2021-04.parquet")
df.write.saveAsTable("nyc.taxis")

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
                                                                                

In [5]:
%%sql

DESCRIBE EXTENDED nyc.taxis

col_name,data_type,comment
VendorID,bigint,
tpep_pickup_datetime,timestamp,
tpep_dropoff_datetime,timestamp,
passenger_count,double,
trip_distance,double,
RatecodeID,double,
store_and_fwd_flag,string,
PULocationID,bigint,
DOLocationID,bigint,
payment_type,bigint,


In [6]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis

cnt
2171187


## Schema Evolution

Adding, dropping, renaming, or altering columns is easy and safe in Iceberg. In this example, we'll rename `fare_amount` to `fare` and `trip_distance` to `distance`. We'll also add a float column `fare_per_distance_unit` immediately after `distance`.

In [8]:
%%sql

ALTER TABLE nyc.taxis RENAME COLUMN fare_amount TO fare

AnalysisException: Missing field fare_amount in table demo.nyc.taxis with schema:
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
; line 2 pos 0

## Evaluating the Schema after altering the table Name

In [9]:
%%sql

DESCRIBE EXTENDED nyc.taxis

col_name,data_type,comment
VendorID,bigint,
tpep_pickup_datetime,timestamp,
tpep_dropoff_datetime,timestamp,
passenger_count,double,
trip_distance,double,
RatecodeID,double,
store_and_fwd_flag,string,
PULocationID,bigint,
DOLocationID,bigint,
payment_type,bigint,


In [10]:
%%sql

ALTER TABLE nyc.taxis RENAME COLUMN trip_distance TO distance

23/05/08 01:33:16 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up


In [9]:
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual

iceberg_catalog = load_catalog('default')
iceberg_catalog

<pyiceberg.catalog.rest.RestCatalog at 0x7f27b30923d0>

In [10]:
iceberg_catalog.list_namespaces()

[('default',), ('nyc',), ('substrait',)]

In [11]:
iceberg_catalog.list_tables("nyc")

[('nyc', 'taxis')]

In [12]:
tbl = iceberg_catalog.load_table("nyc.taxis")

In [13]:
tbl.location()

's3://warehouse/nyc/taxis'

## Evaluate the Schema after trip_distance >>> distance

In [14]:
sc = tbl.scan()
df = sc.to_arrow().to_pandas()
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-04-01 00:00:18+00:00,2021-04-01 00:21:54+00:00,1.0,8.40,1.0,N,79,116,1,25.50,3.0,0.5,5.85,0.0,0.3,35.15,2.5,0.0
1,1,2021-04-01 00:42:37+00:00,2021-04-01 00:46:23+00:00,1.0,0.90,1.0,N,75,236,2,5.00,3.0,0.5,0.00,0.0,0.3,8.80,2.5,0.0
2,1,2021-04-01 00:57:56+00:00,2021-04-01 01:08:22+00:00,1.0,3.40,1.0,N,236,168,2,11.50,3.0,0.5,0.00,0.0,0.3,15.30,2.5,0.0
3,1,2021-04-01 00:01:58+00:00,2021-04-01 00:54:27+00:00,1.0,0.00,1.0,N,47,61,1,44.20,0.0,0.5,0.00,0.0,0.3,45.00,0.0,0.0
4,2,2021-04-01 00:24:55+00:00,2021-04-01 00:34:33+00:00,1.0,1.96,1.0,N,238,152,1,9.00,0.5,0.5,3.09,0.0,0.3,13.39,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2171182,2,2021-04-30 23:39:00+00:00,2021-04-30 23:56:00+00:00,,4.17,,,158,142,0,16.91,0.0,0.5,4.83,0.0,0.3,25.04,,
2171183,1,2021-04-30 23:20:32+00:00,2021-04-30 23:23:05+00:00,,0.90,,,141,229,0,4.50,0.0,0.5,1.56,0.0,0.3,9.36,,
2171184,2,2021-04-30 23:33:00+00:00,2021-04-30 23:55:00+00:00,,6.20,,,90,75,0,21.86,0.0,0.5,6.08,0.0,0.3,31.24,,
2171185,2,2021-04-30 23:31:38+00:00,2021-04-30 23:45:18+00:00,,3.71,,,75,116,0,16.63,0.0,0.5,3.20,0.0,0.3,20.63,,


In [11]:
%%sql

DESCRIBE EXTENDED nyc.taxis

col_name,data_type,comment
VendorID,bigint,
tpep_pickup_datetime,timestamp,
tpep_dropoff_datetime,timestamp,
passenger_count,double,
distance,double,
RatecodeID,double,
store_and_fwd_flag,string,
PULocationID,bigint,
DOLocationID,bigint,
payment_type,bigint,


In [15]:
file_paths = []
for file in sc.plan_files():
    print(file.file.file_path)
    file_paths.append(file.file.file_path)

s3://warehouse/nyc/taxis/data/00004-5-676870f6-e22f-4d85-bee5-ee0d95449cd7-00001.parquet


In [16]:
tbl.schema()

Schema(NestedField(field_id=1, name='VendorID', field_type=LongType(), required=False), NestedField(field_id=2, name='tpep_pickup_datetime', field_type=TimestamptzType(), required=False), NestedField(field_id=3, name='tpep_dropoff_datetime', field_type=TimestamptzType(), required=False), NestedField(field_id=4, name='passenger_count', field_type=DoubleType(), required=False), NestedField(field_id=5, name='distance', field_type=DoubleType(), required=False), NestedField(field_id=6, name='RatecodeID', field_type=DoubleType(), required=False), NestedField(field_id=7, name='store_and_fwd_flag', field_type=StringType(), required=False), NestedField(field_id=8, name='PULocationID', field_type=LongType(), required=False), NestedField(field_id=9, name='DOLocationID', field_type=LongType(), required=False), NestedField(field_id=10, name='payment_type', field_type=LongType(), required=False), NestedField(field_id=11, name='fare', field_type=DoubleType(), required=False), NestedField(field_id=12,

In [17]:
df.columns

Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
       'passenger_count', 'distance', 'RatecodeID', 'store_and_fwd_flag',
       'PULocationID', 'DOLocationID', 'payment_type', 'fare', 'extra',
       'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
       'total_amount', 'congestion_surcharge', 'airport_fee'],
      dtype='object')

In [18]:
schema = tbl.metadata.schema()

In [19]:
tbl.metadata.current_schema_id

2

In [20]:
# this basically shows the number of times the schema has been updated
# taking a look at what we did, we have done 2 changes, so there should 
# be 3 versions, (original, change 1, change 2)
schemas = tbl.metadata.schemas

In [21]:
for item in schemas:
    print(item)

table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: trip_distance: optional double
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare_amount: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
}
table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: trip_distance: optional double
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULo

In [22]:
# get the current schema
current_schema_id = tbl.metadata.current_schema_id
current_schema = tbl.metadata.schemas[current_schema_id]
print(current_schema)

table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
}


In [176]:
def evaluate_schema(tbl):
    schemas = tbl.metadata.schemas
    print(f"Number of Schemas So far: {len(schemas)}")
    for item in schemas:
        print(item)
    current_schema_id = tbl.metadata.current_schema_id
    current_schema = tbl.metadata.schemas[current_schema_id]
    print("current_schema")
    print("-" * 80)
    print(current_schema)
    return current_schema

current_schema = evaluate_schema(tbl)

Number of Schemas So far: 4
table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: trip_distance: optional double
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare_amount: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
}
table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: trip_distance: optional double
  6: RatecodeID: optional double
  7: store_and_fwd_fla

### Evaluate the current Schema information and extract required data

In [23]:
for column in current_schema.columns:
    print(column.field_id, column.field_type, column.name)

1 long VendorID
2 timestamptz tpep_pickup_datetime
3 timestamptz tpep_dropoff_datetime
4 double passenger_count
5 double distance
6 double RatecodeID
7 string store_and_fwd_flag
8 long PULocationID
9 long DOLocationID
10 long payment_type
11 double fare
12 double extra
13 double mta_tax
14 double tip_amount
15 double tolls_amount
16 double improvement_surcharge
17 double total_amount
18 double congestion_surcharge
19 double airport_fee


In [24]:
sc = tbl.scan()
df = sc.to_arrow().to_pandas()
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-04-01 00:00:18+00:00,2021-04-01 00:21:54+00:00,1.0,8.4,1.0,N,79,116,1,25.5,3.0,0.5,5.85,0.0,0.3,35.15,2.5,0.0
1,1,2021-04-01 00:42:37+00:00,2021-04-01 00:46:23+00:00,1.0,0.9,1.0,N,75,236,2,5.0,3.0,0.5,0.0,0.0,0.3,8.8,2.5,0.0
2,1,2021-04-01 00:57:56+00:00,2021-04-01 01:08:22+00:00,1.0,3.4,1.0,N,236,168,2,11.5,3.0,0.5,0.0,0.0,0.3,15.3,2.5,0.0
3,1,2021-04-01 00:01:58+00:00,2021-04-01 00:54:27+00:00,1.0,0.0,1.0,N,47,61,1,44.2,0.0,0.5,0.0,0.0,0.3,45.0,0.0,0.0
4,2,2021-04-01 00:24:55+00:00,2021-04-01 00:34:33+00:00,1.0,1.96,1.0,N,238,152,1,9.0,0.5,0.5,3.09,0.0,0.3,13.39,0.0,0.0


In [25]:
iceberg_catalog.properties

{'uri': 'http://rest:8181',
 's3.endpoint': 'http://minio:9000',
 's3.access-key-id': 'admin',
 's3.secret-access-key': 'password'}

In [137]:
import duckdb

def init_duckdb():
    con = duckdb.connect()
    con.install_extension("substrait")
    con.load_extension("substrait")

    con.install_extension("httpfs")
    con.load_extension("httpfs")

    con.execute(query="SET s3_endpoint='minio:9000';")
    con.execute(query="SET s3_region='us-east-1';")
    con.execute(query="SET s3_access_key_id='admin';")
    con.execute(query="SET s3_secret_access_key='password';")
    con.execute(query="SET s3_use_ssl=false;")
    con.execute(query="SET s3_url_style='path';")
    
    return con

con = init_duckdb()

In [138]:
file_path = "s3://warehouse/nyc/taxis/data/00004-5-676870f6-e22f-4d85-bee5-ee0d95449cd7-00001.parquet"
sql_query=f"SELECT * FROM '{file_path}';"
val = con.execute(query=sql_query)

In [140]:
val.df()

In [29]:
sc = tbl.scan()

In [30]:
type(sc)

pyiceberg.table.DataScan

In [132]:
# this is the projected schema from the current snapshot
# but if you take a look at the loaded table from parquet
# as Iceberg does, the columns are not the same. 
# Check: `trip_distance vs distance` and `fare_amount` vs `fare`
print(sc.projection())

table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  20: fare_per_distance_unit: optional float
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
}


In [32]:
"""
Evaluating DataScan.project
---------------------------

def projection(self) -> Schema:
        snapshot_schema = self.table.schema()
        if snapshot := self.snapshot():
            if snapshot_schema_id := snapshot.schema_id:
                snapshot_schema = self.table.schemas()[snapshot_schema_id]

        if "*" in self.selected_fields:
            return snapshot_schema

        return snapshot_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)
"""
print("Scan Schema")
scan_schema = sc.table.schema()
print(scan_schema)
snapshot = sc.snapshot()
print(snapshot)
snapshot_schema = sc.table.schema()
if snapshot_schema_id := snapshot.schema_id:
    snapshot_schema = self.table.schemas()[snapshot_schema_id]
    print(snapshot_schema)
else:
    print("No")
print(snapshot_schema_id)
print(snapshot.schema_id)
print("Snapshot Schema")
print(snapshot_schema)
print("-" * 80)
print(sc.selected_fields)

print("Project Schema")
if "*" in sc.selected_fields:
    projected_schema = snapshot_schema
else:
    projected_schema = snapshot_schema.select(sc.selected_fields, case_sensitive=sc.case_sensitive)
print(projected_schema)

Scan Schema
table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
}
Operation.APPEND: id=1724927924865781280, schema_id=0
No
0
0
Snapshot Schema
table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  6: Rateco

In [33]:
# pyiceberg.io.project_table
table = tbl
from pyiceberg.io.pyarrow import PyArrowFileIO
scheme, _ = PyArrowFileIO.parse_location(tbl.location())
print(scheme, _)
print(isinstance(table.io, PyArrowFileIO))
fs = table.io.get_fs(scheme)
print(fs)

s3 warehouse/nyc/taxis
True
<pyarrow._s3fs.S3FileSystem object at 0x7f27b310d0f0>


In [122]:
def p1(sc):
    tasks = sc.plan_files()
    table = sc.table
    row_filter = sc.filter
    projected_schema = sc.projection()
    case_sensitive = sc.case_sensitive

    print("params to project_table >>")
    print(tasks, table, row_filter, projected_schema, case_sensitive)
    scheme, _ = PyArrowFileIO.parse_location(table.location())
    if isinstance(table.io, PyArrowFileIO):
        fs = table.io.get_fs(scheme)
    print(fs)
    from pyiceberg.expressions.visitors import bind
    print(row_filter)
    #bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive)
    from pyiceberg.types import (
        MapType,
        ListType
    )
    projected_field_ids = {
        id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
    }
    print("projected_field_ids")
    print(projected_field_ids)
    return tasks, table, row_filter, projected_schema, case_sensitive, fs, projected_field_ids

tasks, table, row_filter, projected_schema, case_sensitive, fs, projected_field_ids = p1(sc)

params to project_table >>
<itertools.chain object at 0x7f25939d49a0> <pyiceberg.table.Table object at 0x7f25939d9f70> <bound method TableScan.filter of <pyiceberg.table.DataScan object at 0x7f259bb79100>> table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  20: fare_per_distance_unit: optional float
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
} True
<pyarrow._s3fs.S3FileSystem object at 0x7f27b30

In [35]:
task = next(tasks)

In [181]:
def p2(task, projected_field_ids):
    _, path = PyArrowFileIO.parse_location(task.file.file_path)
    import pyarrow.dataset as ds
    ONE_MEGABYTE = 1024 * 1024
    ICEBERG_SCHEMA = b"iceberg.schema"
    arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
    pyarrow_filter = None
    with fs.open_input_file(path) as fin:
        fragment = arrow_format.make_fragment(fin)
        physical_schema = fragment.physical_schema
        schema_raw = None
        if metadata := physical_schema.metadata:
            schema_raw = metadata.get(ICEBERG_SCHEMA)
        from pyiceberg.schema import Schema
        file_schema = Schema.parse_raw(schema_raw)
        print("File Schema")
        print(file_schema)
        from pyiceberg.schema import prune_columns
        file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
        print("-"*80)
        print("File Project Schema")
        print(file_project_schema)
        print("type")
        print(type(file_project_schema))
        print("-"*80)
        fragment_scanner = ds.Scanner.from_fragment(
            fragment=fragment,
            schema=physical_schema,
            filter=pyarrow_filter,
            columns=[col.name for col in file_project_schema.columns],
        )
        arrow_table = fragment_scanner.to_table()
        df = arrow_table.to_pandas()
        from pyiceberg.io.pyarrow import to_requested_schema
        tb_evolved = to_requested_schema(projected_schema, file_project_schema, arrow_table)
        df_evolved = tb_evolved.to_pandas()
        from pyiceberg.io.pyarrow import to_requested_schema, visit, visit_with_partner, ArrowAccessor, ArrowProjectionVisitor
        struct_array = visit_with_partner(projected_schema, arrow_table, ArrowProjectionVisitor(file_schema), ArrowAccessor(file_project_schema))
        # expanding visit_with_partner code
        # visit_with_partner(requested_schema, table, ArrowProjectionVisitor(file_schema), ArrowAccessor(file_schema))

        # 1. visit_with_partner Schema, Table call
        accessor = ArrowAccessor(file_schema)
        visitor = ArrowProjectionVisitor(file_schema)
        partner = arrow_table
        struct_partner = accessor.schema_partner(partner)
        print(type(struct_partner))
        # 2. visit_with_partner StructType, struct_partner (which could be pa.Table)
        schema_struct = projected_schema.as_struct()
        # res = visit_with_partner(schema_struct, struct_partner, visitor, accessor)
        import pyarrow as pa
        field_results = []
        for field in schema_struct.fields:
            # accessor.field_partner would give back the corresponding arrow array from 
            # the pa.Table by querying it via field.name by querying from file_schema
            # in the ArrowAccessor. Then it combines the chunks and returns the full column
            # data in a single pa.Array
            field_partner = accessor.field_partner(partner, field.field_id, field.name)
            visitor.before_field(field, field_partner)
            # field_result = visit_with_partner(field.field_type, field_partner, visitor, accessor)
            # 3. visit_with_partner PrimitiveType, field_partner (which could be pa.Array)
            field_result = visitor.primitive(field.field_type, field_partner) # returns pa.Array
            field_results.append(visitor.field(field, field_partner, field_result))
        visit_with_partner_struct_type = visitor.struct(schema_struct, struct_partner, field_results)
        visit_with_partner_schema_type = visitor.schema(projected_schema, partner, visit_with_partner_struct_type)
        return file_schema
file_schema = p2(task, projected_field_ids)

File Schema
table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  20: fare_per_distance_unit: optional float
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
}
--------------------------------------------------------------------------------
File Project Schema
table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passen

There are two schemas

1. File Schema (schema on Parquet file)
2. Requested Schema (schema which has evolved)

We have a `ArrowProjectionVisitor` which is initialized with `file_schema`.
We have a `ArrowAccessor` which is initialized with `file_schema` 

Meaning these two interfaces deals with the actual schema and unchanged data loaded from the
parquet file. So we use the `field_id` of the requested_schema and see if that's there in the `file_schema`.
To do that we use `ArrowAccessor.field_partner` function to retrieve the required column from the `pa.Table` loaded from the `parquet` file. 

```python
def field_partner(self, partner_struct: Optional[pa.Array], field_id: int, _: str) -> Optional[pa.Array]:
    if partner_struct:
        # use the field name from the file schema
        try:
            name = self.file_schema.find_field(field_id).name
        except ValueError:
            return None

        if isinstance(partner_struct, pa.StructArray):
            return partner_struct.field(name)
        elif isinstance(partner_struct, pa.Table):
            return partner_struct.column(name).combine_chunks()

    return None
```

For this to work the `file_schema` must contain the field_id we are looking for. 

Here we expect `field_id` to be unique. In case of a `rename` the `field_id` won't change for a given field in the schema, instead it would have a different name. So the above visitors would work on swapping the correct name when querying the data from the original data source (the loaded parquet file as a `pa.Table`). 

In case of a drop or adding a new column this would be the case. 

For our case we need to write a visitor which updates the `ReadRel` so that we add a projection (`MaskExpression` in Substrait) to meet these requirements.

There is a plan with a read operation. This read operation would have a schema which contains the evolved schema, we need to make sure we replace the `ReadRel` `base_schema` with the `file_schema` and use `projection` and emit to do the proper change.

In [184]:
print(file_schema, projected_schema)

table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  20: fare_per_distance_unit: optional float
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
} table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  20: fare_per_distance_unit: optional float
  6: Ratec

### Evaluate a Column `Add` Scenario

In [189]:
tbl_add = iceberg_catalog.load_table("nyc.taxis")
sc_add = tbl_add.scan()

In [108]:
%%sql

ALTER TABLE nyc.taxis
ADD COLUMN fare_per_distance_unit float AFTER distance

23/05/09 08:47:50 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up


Let's update the new `fare_per_distance_unit` to equal `fare` divided by `distance`.

In [109]:
%%sql

UPDATE nyc.taxis
SET fare_per_distance_unit = fare/distance

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
                                                                                

In [190]:
tasks, table, row_filter, projected_schema, case_sensitive, fs, projected_field_ids = p1(sc_add)

params to project_table >>
<itertools.chain object at 0x7f1fbfb99820> <pyiceberg.table.Table object at 0x7f1fbfb806a0> <bound method TableScan.filter of <pyiceberg.table.DataScan object at 0x7f260feba880>> table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  20: fare_per_distance_unit: optional float
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
} True
<pyarrow._s3fs.S3FileSystem object at 0x7f260fe

In [124]:
task = next(tasks)

In [185]:
file_schema = p2(task, projected_field_ids)

File Schema
table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  20: fare_per_distance_unit: optional float
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
}
--------------------------------------------------------------------------------
File Project Schema
table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passen

In [188]:
# %%sql

# SELECT * FROM nyc.taxis

In [196]:
iceberg_catalog = load_catalog('default')
tbl_add = iceberg_catalog.load_table("nyc.taxis")
sc_add = tbl_add.scan(snapshot_id=2)

In [197]:
projected_schema = sc_add.projection()
print(projected_schema)

table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  20: fare_per_distance_unit: optional float
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
}


In [198]:
print(file_schema)

table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: distance: optional double
  20: fare_per_distance_unit: optional float
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
}


In [169]:
for file in drop_sc.plan_files():
    print(file.file.file_path)

s3://warehouse/nyc/taxis/data/00154-1-897d4c48-da43-4ac6-b5c4-566f3aac107a-00001.parquet


In [170]:
file_path = "s3://warehouse/nyc/taxis/data/00154-1-897d4c48-da43-4ac6-b5c4-566f3aac107a-00001.parquet"
sql_query=f"SELECT * FROM '{file_path}';"
val_drop = con.execute(query=sql_query)

In [171]:
df_drop = val_drop.arrow().to_pandas()

In [178]:
df_drop.shape, drop_sc.to_pandas().shape

((2171187, 20), (2171187, 20))

In [173]:
drop_snapshot = sc.snapshot()

In [179]:
drop_snapshot.schema_id

3

In [177]:
current_schema = evaluate_schema(drop_tbl)

Number of Schemas So far: 4
table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: trip_distance: optional double
  6: RatecodeID: optional double
  7: store_and_fwd_flag: optional string
  8: PULocationID: optional long
  9: DOLocationID: optional long
  10: payment_type: optional long
  11: fare_amount: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
}
table {
  1: VendorID: optional long
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional double
  5: trip_distance: optional double
  6: RatecodeID: optional double
  7: store_and_fwd_fla

In [None]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance AFTER fare;

In [None]:
drop_table = 
current_schema = evaluate_schema(tbl)

In [None]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance TYPE double;

In [199]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,fare,distance,fare_per_distance_unit
1,2021-04-01 00:00:18,2021-04-01 00:21:54,25.5,8.4,3.0357143878936768
1,2021-04-01 00:42:37,2021-04-01 00:46:23,5.0,0.9,5.55555534362793
1,2021-04-01 00:57:56,2021-04-01 01:08:22,11.5,3.4,3.382352828979492
1,2021-04-01 00:01:58,2021-04-01 00:54:27,44.2,0.0,
2,2021-04-01 00:24:55,2021-04-01 00:34:33,9.0,1.96,4.591836929321289
2,2021-04-01 00:19:16,2021-04-01 00:21:46,4.5,0.77,5.844155788421631
2,2021-04-01 00:25:11,2021-04-01 00:31:53,11.5,3.65,3.1506848335266118
1,2021-04-01 00:27:53,2021-04-01 00:47:03,26.5,8.9,2.9775280952453613
2,2021-04-01 00:24:24,2021-04-01 00:37:50,12.0,2.98,4.026845455169678
1,2021-04-01 00:19:18,2021-04-01 00:41:25,28.0,8.9,3.146067380905152


In [None]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'

## Expressive SQL for Row Level Changes
With Iceberg tables, `DELETE` queries can be used to perform row-level deletes. This is as simple as providing the table name and a `WHERE` predicate. If the filter matches an entire partition of the table, Iceberg will intelligently perform a metadata-only operation where it simply deletes the metadata for that partition.

Let's perform a row-level delete for all rows that have a `fare_per_distance_unit` greater than 4 or a `distance` greater than 2. This should leave us with relatively short trips that have a relatively high fare per distance traveled.

In [None]:
%%sql

DELETE FROM nyc.taxis
WHERE fare_per_distance_unit > 4.0 OR distance > 2.0

There are some fares that have a `null` for `fare_per_distance_unit` due to the distance being `0`. Let's remove those as well.

In [None]:
%%sql

DELETE FROM nyc.taxis
WHERE fare_per_distance_unit is null

In [None]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

In [None]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis

## Partitioning

A table’s partitioning can be updated in place and applied only to newly written data. Query plans are then split, using the old partition scheme for data written before the partition scheme was changed, and using the new partition scheme for data written after. People querying the table don’t even have to be aware of this split. Simple predicates in WHERE clauses are automatically converted to partition filters that prune out files with no matches. This is what’s referred to in Iceberg as *Hidden Partitioning*.

In [None]:
%%sql

ALTER TABLE nyc.taxis
ADD PARTITION FIELD VendorID

## Metadata Tables

Iceberg tables contain very rich metadata that can be easily queried. For example, you can retrieve the manifest list for any snapshot, simply by querying the table's `snapshots` table.

In [None]:
%%sql

SELECT snapshot_id, manifest_list
FROM nyc.taxis.snapshots

The `files` table contains loads of information on data files, including column level statistics such as null counts, lower bounds, and upper bounds.

In [None]:
%%sql

SELECT file_path, file_format, record_count, null_value_counts, lower_bounds, upper_bounds
FROM nyc.taxis.files

## Time Travel

The history table lists all snapshots and which parent snapshot they derive from. The `is_current_ancestor` flag let's you know if a snapshot is part of the linear history of the current snapshot of the table.

In [None]:
%%sql

SELECT *
FROM nyc.taxis.history

You can time-travel by altering the `current-snapshot-id` property of the table to reference any snapshot in the table's history. Let's revert the table to it's original state by traveling to the very first snapshot ID.

In [None]:
%%sql --var df

SELECT *
FROM nyc.taxis.history

In [None]:
original_snapshot = df.head().snapshot_id
spark.sql(f"CALL system.rollback_to_snapshot('nyc.taxis', {original_snapshot})")
original_snapshot

In [None]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

Another look at the history table shows that the original state of the table has been added as a new entry
with the original snapshot ID.

In [None]:
%%sql

SELECT *
FROM nyc.taxis.history

In [None]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis