<a href="https://colab.research.google.com/github/lamb-russell/spark-pyiceberg-compare/blob/main/duckdb_merge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# About this Notebook
This notebook covers my exploration of the capabilities of PyIceberg and DuckDB.  

My hope is that to reduce cost of processing and loading of data by doing some of it outside Snowflake.  


## Steps I take in this notebook

1. Create an Iceberg Catalog (sqlite / pyiceberg)
2. Write data to Iceberg Table (pyArrow)
3. Read Data from Iceberg Table (pyIceberg)
4. Update Data in ICeberg Table (duckdb)
5. Compare updated data to original
6. Merge data into duckdb and write to Iceberg
7. Explore futher ...

After acchieving steps 1 to 6 i try to explore Further to underestand more about iceberg and how it can be used.  

## What's Iceberg
Iceberg is an open data standard that uses data files in open source format, like parquet, along with metadata, like avro, to achieve history, versioning, multi-user access, and efficient columnar storage.  This is similar to the benefits you might receive from a Data warehouse like snowflake, except much of the data and metadata can be tracked in a file system.  

See the [Specification](https://iceberg.apache.org/spec/#overview) for more info about Iceberg

To explore further, watch some talks from the [iceberg summit](https://iceberg-summit.org/)


## What's DuckDB?
DuckDB is a powerful free, open-source, analytical database management system (DBMS) designed for speed, reliability, and ease of use.  It has the ability to read, query, and write in many data formats.

See the [homepage](https://duckdb.org/) for documentation

See [Motherduck events](https://www.eventbrite.com/e/motherduck-duckdb-meetup-nyc-edition-tickets-949275387237?aff=oddtdtcreator) for local events about DuckDB.  Motherduck is the commercial cloud offering for DuckDB.

## Why use these tools together?
As of current version (1.0) duckDB does not have iceberg write support.  However, duckDB makes it easy to convert and query pandas, parquet, and arrow data sets.  Currently pyIceberg handles writing to iceberg via Apache Arrow and can handle reads.  There are some synergies between the tools that can enable easy querying, transformation, and analysis of data without the expense and overhead of a data warehouse.  

That's what I want to explore in this notebook.

### Example
Specifically, DuckDB cannot write to Iceberg in the current version but it could query an iceberg table and join to a parquet or pa  

In [1]:
!pip install --quiet duckdb

In [2]:
!pip install --quiet pyiceberg

In [3]:
!pip install sqlalchemy



# Step 1: Create an Iceberg Catalog
Use sqlalchemy to create a sqlite catalog for iceberg.  Create a table in iceberg and load the taxi dataset.

In [4]:
!mkdir "/tmp/warehouse" # make directory for catalog SqlLite file

mkdir: cannot create directory ‘/tmp/warehouse’: File exists


In [5]:
from pyiceberg.catalog.sql import SqlCatalog

warehouse_path = "/tmp/warehouse"
catalog = SqlCatalog(
    "default",
    **{
        "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
        "warehouse": f"file://{warehouse_path}",
    },
)

# Step 2: Load Data into a Table
Load taxi data into a iceberg table

In [6]:
import duckdb

arrow_df = duckdb.sql("select * from read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet')").arrow()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [7]:
schema = arrow_df.schema
schema

VendorID: int64
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
RatecodeID: double
store_and_fwd_flag: string
PULocationID: int64
DOLocationID: int64
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double

In [8]:
# create the iceberg table

from pyiceberg.exceptions import NoSuchNamespaceError, NamespaceAlreadyExistsError, TableAlreadyExistsError

# Define the table name and namespace
namespace = "taxi_namespace"
table_name = "taxi_trips"

# Create the namespace if it doesn't exist
try:
    catalog.create_namespace(namespace)
except NamespaceAlreadyExistsError:
    print("Namespace already exists.")
    pass  # Namespace already exists

# Create or load the Iceberg table
try:
    table = catalog.create_table(f"{namespace}.{table_name}", schema)
except TableAlreadyExistsError:
    table = catalog.load_table(f"{namespace}.{table_name}")
    print("Iceberg table already exists.")

Namespace already exists.
Iceberg table already exists.


In [9]:
table.overwrite(arrow_df) # the data frame to the iceberg dataset

In [10]:
len(table.scan().to_arrow()) # how many rows are in the new table?


3066766

# Use PyIceberg CLI to read
Use the pyiceberg cli to read describe the newly created table

In [11]:
!pyiceberg --help

Usage: pyiceberg [OPTIONS] COMMAND [ARGS]...

Options:
  --catalog TEXT
  --verbose BOOLEAN
  --output [text|json]
  --ugi TEXT
  --uri TEXT
  --credential TEXT
  --help                Show this message and exit.

Commands:
  create      Operation to create a namespace.
  describe    Describe a namespace or a table.
  drop        Operations to drop a namespace or table.
  files       List all the files of the table.
  list        List tables or namespaces.
  list-refs   List all the refs in the provided table.
  location    Return the location of the table.
  properties  Properties on tables/namespaces.
  rename      Rename a table.
  schema      Get the schema of the table.
  spec        Return the partition spec of the table.
  uuid        Return the UUID of the table.
  version     Print pyiceberg version.


In [12]:
!pyiceberg --catalog default --uri sqlite:////tmp/warehouse/pyiceberg_catalog.db list

taxi_namespace


In [13]:
!pyiceberg --catalog default --uri sqlite:////tmp/warehouse/pyiceberg_catalog.db describe taxi_namespace.taxi_trips

Table format version  2                                                                             
Metadata location     file:///tmp/warehouse/taxi_namespace.db/taxi_trips/metadata/00006-0b478923-31…
Table UUID            4f8bb460-d267-4811-adb0-796f518152c5                                          
Last Updated          1722874741203                                                                 
Partition spec        []                                                                            
Sort order            []                                                                            
Current schema        Schema, id=0                                                                  
                      ├── 1: VendorID: optional long                                                
                      ├── 2: tpep_pickup_datetime: optional timestamp                               
                      ├── 3: tpep_dropoff_datetime: optional timestamp                     

# Step 3: Read from ICeberg
Use pyiceberg to read from the iceberg table

In [14]:
# create a new connection variable to iceberg catalog
from pyiceberg.catalog import load_catalog

read_catalog=load_catalog( # pass uri to the sqlite database from earlier
    "default",
    **{
        "uri": "sqlite:////tmp/warehouse/pyiceberg_catalog.db",
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",

    }
)

read_catalog.list_namespaces() # list all namespaces

[('taxi_namespace',)]

In [15]:
read_catalog.list_tables("taxi_namespace") # list all tables in the namespace

[('taxi_namespace', 'taxi_trips')]

In [16]:
# read table via a separate connection to iceberg
read_table = read_catalog.load_table("taxi_namespace.taxi_trips")
read_table

taxi_trips(
  1: VendorID: optional long,
  2: tpep_pickup_datetime: optional timestamp,
  3: tpep_dropoff_datetime: optional timestamp,
  4: passenger_count: optional double,
  5: trip_distance: optional double,
  6: RatecodeID: optional double,
  7: store_and_fwd_flag: optional string,
  8: PULocationID: optional long,
  9: DOLocationID: optional long,
  10: payment_type: optional long,
  11: fare_amount: optional double,
  12: extra: optional double,
  13: mta_tax: optional double,
  14: tip_amount: optional double,
  15: tolls_amount: optional double,
  16: improvement_surcharge: optional double,
  17: total_amount: optional double,
  18: congestion_surcharge: optional double,
  19: airport_fee: optional double
),
partition by: [],
sort order: [],
snapshot: Operation.APPEND: id=5293420678179375332, parent_id=8688263363512208932, schema_id=0

In [17]:
# create a new duckdb connection to contain the data from iceberg
read_duck=read_table.scan().to_duckdb("temp_taxi_trips") # scan the table to duckdb temp table
read_duck.sql("select * from information_schema.tables")

┌───────────────┬──────────────┬─────────────────┬───┬────────────────────┬──────────┬───────────────┬───────────────┐
│ table_catalog │ table_schema │   table_name    │ … │ is_insertable_into │ is_typed │ commit_action │ TABLE_COMMENT │
│    varchar    │   varchar    │     varchar     │   │      varchar       │ varchar  │    varchar    │    varchar    │
├───────────────┼──────────────┼─────────────────┼───┼────────────────────┼──────────┼───────────────┼───────────────┤
│ temp          │ main         │ temp_taxi_trips │ … │ NO                 │ NO       │ NULL          │ NULL          │
├───────────────┴──────────────┴─────────────────┴───┴────────────────────┴──────────┴───────────────┴───────────────┤
│ 1 rows                                                                                        13 columns (7 shown) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

# Step 4: Update table
Save the temporary table as a "real" base table in duckdb to be able to update it.  Then modify the table in DuckDB, then save to Iceberg

In [18]:
# load updated_trips table from temp_taxi_trips so it can be udpated
read_duck.sql("Create or replace table updated_trips as select * from temp_taxi_trips")
read_duck.sql("select * from updated_trips where VendorID=1").df().head(10) # show records before update


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
1,1,2023-01-01 00:43:37,2023-01-01 01:17:18,4.0,7.3,1.0,N,79,264,1,33.8,3.5,0.5,7.75,0.0,1.0,46.55,2.5,0.0
2,1,2023-01-01 00:04:33,2023-01-01 00:19:22,1.0,4.5,1.0,N,113,255,1,20.5,3.5,0.5,4.0,0.0,1.0,29.5,2.5,0.0
3,1,2023-01-01 00:03:36,2023-01-01 00:09:36,3.0,1.2,1.0,N,237,239,2,8.6,3.5,0.5,0.0,0.0,1.0,13.6,2.5,0.0
4,1,2023-01-01 00:15:23,2023-01-01 00:29:41,2.0,2.5,1.0,N,143,229,2,15.6,3.5,0.5,0.0,0.0,1.0,20.6,2.5,0.0
5,1,2023-01-01 00:51:45,2023-01-01 00:58:18,1.0,1.4,1.0,N,137,79,1,9.3,3.5,0.5,2.85,0.0,1.0,17.15,2.5,0.0
6,1,2023-01-01 00:13:30,2023-01-01 00:44:00,1.0,17.8,2.0,N,132,116,1,70.0,1.25,0.5,15.85,6.55,1.0,95.15,0.0,1.25
7,1,2023-01-01 00:21:49,2023-01-01 00:29:15,4.0,0.8,1.0,N,163,161,4,8.6,3.5,0.5,0.0,0.0,1.0,13.6,2.5,0.0
8,1,2023-01-01 00:52:06,2023-01-01 01:02:18,2.0,1.7,1.0,N,161,164,4,11.4,3.5,0.5,0.0,0.0,1.0,16.4,2.5,0.0
9,1,2023-01-01 00:22:24,2023-01-01 00:35:11,2.0,2.3,1.0,N,43,262,1,13.5,3.5,0.5,3.7,0.0,1.0,22.2,2.5,0.0


In [19]:
# do the update
read_duck.sql("update updated_trips set trip_distance=trip_distance+1 where VendorID=1")  # only change vendor 1

In [20]:
# create arrow table with the updated data
modified_arrow_table=read_duck.sql("select * from updated_trips").arrow()


In [21]:
# write the updated table to iceberg via original connection
table.overwrite(modified_arrow_table)  # table is the same variable we used when creating the iceberg table

In [22]:
# read out of a separate connection to iceberg (read_table)
updated_df=read_table.scan().to_pandas() # read the table
updated_df.head(10)


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0
5,2,2023-01-01 00:50:34,2023-01-01 01:02:52,1.0,1.84,1.0,N,161,137,1,12.8,1.0,0.5,10.0,0.0,1.0,27.8,2.5,0.0
6,2,2023-01-01 00:09:22,2023-01-01 00:19:49,1.0,1.66,1.0,N,239,143,1,12.1,1.0,0.5,3.42,0.0,1.0,20.52,2.5,0.0
7,2,2023-01-01 00:27:12,2023-01-01 00:49:56,1.0,11.7,1.0,N,142,200,1,45.7,1.0,0.5,10.74,3.0,1.0,64.44,2.5,0.0
8,2,2023-01-01 00:21:44,2023-01-01 00:36:40,1.0,2.95,1.0,N,164,236,1,17.7,1.0,0.5,5.68,0.0,1.0,28.38,2.5,0.0
9,2,2023-01-01 00:39:42,2023-01-01 00:50:36,1.0,3.01,1.0,N,141,107,2,14.9,1.0,0.5,0.0,0.0,1.0,19.9,2.5,0.0


# Step 5: Compare to original
Compare the updated data to the original to show the update worked.  

In [23]:
# read from separate connection to catalog (to show the file is really updated)
from pyiceberg.catalog import load_catalog

compare_catalog=load_catalog( # pass uri to the sqlite database from earlier
    "default",
    **{
        "uri": "sqlite:////tmp/warehouse/pyiceberg_catalog.db",
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",

    }
)

compare_table = compare_catalog.load_table("taxi_namespace.taxi_trips")
compare_duck = compare_table.scan().to_duckdb("temp_updated_taxi_trips")
compare_duck.sql("select * from information_schema.tables")

┌───────────────┬──────────────┬──────────────────────┬───┬──────────┬───────────────┬───────────────┐
│ table_catalog │ table_schema │      table_name      │ … │ is_typed │ commit_action │ TABLE_COMMENT │
│    varchar    │   varchar    │       varchar        │   │ varchar  │    varchar    │    varchar    │
├───────────────┼──────────────┼──────────────────────┼───┼──────────┼───────────────┼───────────────┤
│ temp          │ main         │ temp_updated_taxi_…  │ … │ NO       │ NULL          │ NULL          │
├───────────────┴──────────────┴──────────────────────┴───┴──────────┴───────────────┴───────────────┤
│ 1 rows                                                                        13 columns (6 shown) │
└────────────────────────────────────────────────────────────────────────────────────────────────────┘

In [24]:
# read the updated taxi trips from iceberg into a duckdb table so we can modify
compare_duck.sql("CREATE OR REPLACE table updated_taxi_trips as select * from temp_updated_taxi_trips")
compare_duck.sql("select * from information_schema.tables").df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action,TABLE_COMMENT
0,memory,main,updated_taxi_trips,BASE TABLE,,,,,,YES,NO,,
1,temp,main,temp_updated_taxi_trips,VIEW,,,,,,NO,NO,,


In [25]:
# read the original trips into a duckdb table to compare
compare_duck.sql("CREATE OR REPLACE TABLE original_trips as select * from read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet')")
compare_duck.sql("select * from information_schema.tables").df()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action,TABLE_COMMENT
0,memory,main,original_trips,BASE TABLE,,,,,,YES,NO,,
1,memory,main,updated_taxi_trips,BASE TABLE,,,,,,YES,NO,,
2,temp,main,temp_updated_taxi_trips,VIEW,,,,,,NO,NO,,


In [26]:
# write SQL to do the comparison
compare_duck.sql("""
select
  u.trip_distance as updated_distance,
  o.trip_distance as original_distance,
  u.trip_distance-o.trip_distance as difference,
  o.tpep_pickup_datetime,
  o.VendorID
from original_trips o
inner join updated_taxi_trips u
  on o.VendorID=u.VendorID
  and o.tpep_pickup_Datetime=u.tpep_pickup_datetime
  and u.tpep_dropoff_datetime=o.tpep_dropoff_datetime
where o.VendorID in (1,2)"""
).df().head(20) # note that vendor id 1 is different

Unnamed: 0,updated_distance,original_distance,difference,tpep_pickup_datetime,VendorID
0,0.97,0.97,0.0,2023-01-01 00:32:10,2
1,2.9,1.9,1.0,2023-01-01 00:03:48,1
2,1.43,1.43,0.0,2023-01-01 00:10:29,2
3,1.84,1.84,0.0,2023-01-01 00:50:34,2
4,1.66,1.66,0.0,2023-01-01 00:09:22,2
5,11.7,11.7,0.0,2023-01-01 00:27:12,2
6,2.95,2.95,0.0,2023-01-01 00:21:44,2
7,3.01,3.01,0.0,2023-01-01 00:39:42,2
8,1.8,1.8,0.0,2023-01-01 00:53:01,2
9,8.3,7.3,1.0,2023-01-01 00:43:37,1


In [27]:
updated_df=compare_duck.sql("select * from updated_taxi_trips").df()


# Step 6: Merge / Upsert data
simulate a merge / upsert of new data to the table.  

In [28]:
compare_duck.sql("select * from original_trips where tpep_pickup_Datetime='2023-01-02 19:31:04'").df().head(10)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-02 19:31:04,2023-01-02 19:44:15,1.0,8.75,1.0,N,132,39,1,33.8,0.0,0.5,5.0,0.0,1.0,41.55,0.0,1.25
1,2,2023-01-02 19:31:04,2023-01-02 19:58:58,1.0,7.29,1.0,N,186,62,1,33.8,0.0,0.5,7.56,0.0,1.0,45.36,2.5,0.0
2,2,2023-01-02 19:31:04,2023-01-02 19:52:25,1.0,7.11,1.0,N,24,79,1,31.0,0.0,0.5,7.0,0.0,1.0,42.0,2.5,0.0
3,2,2023-01-02 19:31:04,2023-01-02 19:45:11,2.0,3.36,1.0,N,229,114,1,17.7,0.0,0.5,5.42,0.0,1.0,27.12,2.5,0.0
4,1,2023-01-02 19:31:04,2023-01-02 19:33:32,2.0,0.4,1.0,N,161,237,1,5.1,5.0,0.5,0.0,0.0,1.0,11.6,2.5,0.0
5,2,2023-01-02 19:31:04,2023-01-02 19:36:51,2.0,0.88,1.0,N,144,158,2,7.9,0.0,0.5,0.0,0.0,1.0,11.9,2.5,0.0


In [29]:
compare_duck.sql("CREATE OR REPLACE TABLE my_merge as select * from read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet')")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [30]:
#compare_duck.sql("CREATE UNIQUE INDEX my_merge_ux on my_merge (VendorID, tpep_pickup_datetime,tpep_dropoff_datetime )")
compare_duck.sql("DESCRIBE my_merge")


┌───────────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐
│      column_name      │ column_type │  null   │   key   │ default │  extra  │
│        varchar        │   varchar   │ varchar │ varchar │ varchar │ varchar │
├───────────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤
│ VendorID              │ BIGINT      │ YES     │ NULL    │ NULL    │ NULL    │
│ tpep_pickup_datetime  │ TIMESTAMP   │ YES     │ NULL    │ NULL    │ NULL    │
│ tpep_dropoff_datetime │ TIMESTAMP   │ YES     │ NULL    │ NULL    │ NULL    │
│ passenger_count       │ DOUBLE      │ YES     │ NULL    │ NULL    │ NULL    │
│ trip_distance         │ DOUBLE      │ YES     │ NULL    │ NULL    │ NULL    │
│ RatecodeID            │ DOUBLE      │ YES     │ NULL    │ NULL    │ NULL    │
│ store_and_fwd_flag    │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ PULocationID          │ BIGINT      │ YES     │ NULL    │ NULL    │ NULL    │
│ DOLocationID          │ BIGINT      │ 

In [31]:
# fare amount gets negated sometimes e.g. refunded trips
compare_duck.sql("select * from my_merge where tpep_pickup_datetime='2023-01-02 19:57:02'").df()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-02 19:57:02,2023-01-02 20:09:02,2.0,2.34,1.0,N,239,229,4,-14.2,0.0,-0.5,0.0,0.0,-1.0,-18.2,-2.5,0.0
1,2,2023-01-02 19:57:02,2023-01-02 20:09:02,2.0,2.34,1.0,N,239,229,4,14.2,0.0,0.5,0.0,0.0,1.0,18.2,2.5,0.0
2,2,2023-01-02 19:57:02,2023-01-02 20:07:54,1.0,2.36,1.0,N,163,262,1,12.8,0.0,0.5,2.52,0.0,1.0,19.32,2.5,0.0


In [32]:
# check unique columns
compare_duck.sql("""
select m.* from my_merge m inner join
(
  select VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, PULocationID, DOLocationID, total_amount, store_and_fwd_flag, count(*) as count
  from my_merge group by VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, PULocationID ,DOLocationID, total_amount, store_and_fwd_flag
  having count > 1) u
on m.VendorID=u.VendorID
and m.tpep_pickup_datetime=u.tpep_pickup_datetime
and m.tpep_dropoff_datetime=u.tpep_dropoff_datetime
and m.PULocationID=u.PULocationID
and m.DOLocationID=u.DOLocationID
and m.total_amount=u.total_amount
""").df()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee


In [33]:
# add unique index
compare_duck.sql("CREATE UNIQUE INDEX my_merge_ux on my_merge (VendorID, tpep_pickup_datetime,tpep_dropoff_datetime, PULocationID, DOLocationID, total_amount, store_and_fwd_flag)")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [34]:
compare_duck.sql("select * from read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-02.parquet')").df().head(10)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,1,2023-02-01 00:32:53,2023-02-01 00:34:34,2.0,0.3,1.0,N,142,163,2,4.4,3.5,0.5,0.0,0.0,1.0,9.4,2.5,0.0
1,2,2023-02-01 00:35:16,2023-02-01 00:35:30,1.0,0.0,1.0,N,71,71,4,-3.0,-1.0,-0.5,0.0,0.0,-1.0,-5.5,0.0,0.0
2,2,2023-02-01 00:35:16,2023-02-01 00:35:30,1.0,0.0,1.0,N,71,71,4,3.0,1.0,0.5,0.0,0.0,1.0,5.5,0.0,0.0
3,1,2023-02-01 00:29:33,2023-02-01 01:01:38,0.0,18.8,1.0,N,132,26,1,70.9,2.25,0.5,0.0,0.0,1.0,74.65,0.0,1.25
4,2,2023-02-01 00:12:28,2023-02-01 00:25:46,1.0,3.22,1.0,N,161,145,1,17.0,1.0,0.5,3.3,0.0,1.0,25.3,2.5,0.0
5,1,2023-02-01 00:52:40,2023-02-01 01:07:18,1.0,5.1,1.0,N,148,236,1,21.9,3.5,0.5,5.35,0.0,1.0,32.25,2.5,0.0
6,1,2023-02-01 00:12:39,2023-02-01 00:40:36,1.0,8.9,1.0,N,137,244,1,41.5,3.5,0.5,3.5,0.0,1.0,50.0,2.5,0.0
7,1,2023-02-01 00:56:53,2023-02-01 01:00:37,1.0,1.2,1.0,N,263,141,1,7.2,3.5,0.5,2.44,0.0,1.0,14.64,2.5,0.0
8,2,2023-02-01 00:20:40,2023-02-01 00:33:56,1.0,7.49,1.0,N,48,243,1,30.3,1.0,0.5,8.82,0.0,1.0,44.12,2.5,0.0
9,2,2023-02-01 00:33:51,2023-02-01 00:37:34,1.0,0.51,1.0,N,114,211,1,5.8,1.0,0.5,1.62,0.0,1.0,12.42,2.5,0.0


In [35]:
# add february to the table
compare_duck.sql("INSERT OR REPLACE INTO my_merge select * from read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-02.parquet')")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [36]:
# write the resulting table to iceberg
merged_arrow=compare_duck.sql("select * from my_merge").arrow()
table.overwrite(merged_arrow) # write the updated table to iceberg

# Step 7: Explore Further
1. Scan table to Pandas
2. Try DuckDB Iceberg extension
3. Check out the metadata file structure

Read data from the parquet files to a pandas data frame and do some experimentaton

In [37]:
# find only trips that are longer than 10 miles
from pyiceberg.expressions import GreaterThanOrEqual
scan = table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
    limit=100,
)

[task.file.file_path for task in scan.plan_files()] # show the file that contains the rows


['file:///tmp/warehouse/taxi_namespace.db/taxi_trips/data/00000-0-7f1ae8c4-eb92-4e30-aee1-9511df3a69a4.parquet',
 'file:///tmp/warehouse/taxi_namespace.db/taxi_trips/data/00000-1-7f1ae8c4-eb92-4e30-aee1-9511df3a69a4.parquet']

In [38]:

scan_df = scan.to_pandas() # read the data to a pandas data frame
scan_df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime
0,2,2023-01-03 07:29:25,2023-01-03 08:20:21
1,2,2023-01-03 07:55:00,2023-01-03 08:17:00
2,2,2023-01-03 07:07:00,2023-01-03 07:45:00
3,2,2023-01-03 07:45:26,2023-01-03 08:18:23
4,2,2023-01-03 07:14:00,2023-01-03 07:35:00
...,...,...,...
95,1,2023-01-03 19:32:12,2023-01-03 20:21:06
96,2,2023-01-03 19:36:43,2023-01-03 20:04:50
97,1,2023-01-03 19:56:09,2023-01-03 20:33:26
98,1,2023-01-03 19:52:55,2023-01-03 20:19:17


# Scan into DuckDB
query the scanned rows using duckdb and join with a pandas data frame

In [39]:
scan_duck = scan.to_duckdb(table_name="distant_taxi_trips") # scan into duckdb table
scan_duck.sql("select * from distant_taxi_trips")


┌──────────┬──────────────────────┬───────────────────────┐
│ VendorID │ tpep_pickup_datetime │ tpep_dropoff_datetime │
│  int64   │      timestamp       │       timestamp       │
├──────────┼──────────────────────┼───────────────────────┤
│        2 │ 2023-01-03 07:29:25  │ 2023-01-03 08:20:21   │
│        2 │ 2023-01-03 07:55:00  │ 2023-01-03 08:17:00   │
│        2 │ 2023-01-03 07:07:00  │ 2023-01-03 07:45:00   │
│        2 │ 2023-01-03 07:45:26  │ 2023-01-03 08:18:23   │
│        2 │ 2023-01-03 07:14:00  │ 2023-01-03 07:35:00   │
│        2 │ 2023-01-03 07:01:15  │ 2023-01-03 07:38:28   │
│        2 │ 2023-01-03 07:30:27  │ 2023-01-03 08:04:26   │
│        2 │ 2023-01-03 08:44:00  │ 2023-01-03 09:11:00   │
│        1 │ 2023-01-03 08:49:44  │ 2023-01-03 09:21:12   │
│        2 │ 2023-01-03 08:40:01  │ 2023-01-03 09:10:51   │
│        · │          ·           │          ·            │
│        · │          ·           │          ·            │
│        · │          ·           │     

In [40]:
# prepare a pandas data frame
import pandas as pd
import numpy as np

# Sample data
data = {
    'id': [1, 2, 3, 4, 5],
    'data': ['a', 'b', 'c', 'd', 'e'],
    'timestamp': pd.date_range(start='2023-01-01', periods=5, freq='D')
}

# Create DataFrame
df = pd.DataFrame(data)
df

Unnamed: 0,id,data,timestamp
0,1,a,2023-01-01
1,2,b,2023-01-02
2,3,c,2023-01-03
3,4,d,2023-01-04
4,5,e,2023-01-05


In [41]:
scan_duck.register("sample",df)

<duckdb.duckdb.DuckDBPyConnection at 0x79ff39a7aff0>

In [42]:
scan_duck.sql("select * from distant_taxi_trips inner join sample on distant_taxi_trips.VendorID = sample.id")

┌──────────┬──────────────────────┬───────────────────────┬───────┬─────────┬─────────────────────┐
│ VendorID │ tpep_pickup_datetime │ tpep_dropoff_datetime │  id   │  data   │      timestamp      │
│  int64   │      timestamp       │       timestamp       │ int64 │ varchar │    timestamp_ns     │
├──────────┼──────────────────────┼───────────────────────┼───────┼─────────┼─────────────────────┤
│        1 │ 2023-01-03 19:20:13  │ 2023-01-03 19:38:11   │     1 │ a       │ 2023-01-01 00:00:00 │
│        2 │ 2023-01-03 19:36:43  │ 2023-01-03 20:04:50   │     2 │ b       │ 2023-01-02 00:00:00 │
│        1 │ 2023-01-03 19:52:55  │ 2023-01-03 20:19:17   │     1 │ a       │ 2023-01-01 00:00:00 │
│        2 │ 2023-01-03 17:53:14  │ 2023-01-03 18:55:26   │     2 │ b       │ 2023-01-02 00:00:00 │
│        1 │ 2023-01-03 19:56:09  │ 2023-01-03 20:33:26   │     1 │ a       │ 2023-01-01 00:00:00 │
│        2 │ 2023-01-03 17:25:03  │ 2023-01-03 18:21:02   │     2 │ b       │ 2023-01-02 00:00:00 │


# Query Iceberg Table
Use duckdb and the Iceberg extension to query iceberg

In [43]:
duckdb.sql("INSTALL iceberg;")

In [44]:
duckdb.sql("LOAD ICEBERG;")

In [45]:
query=f"""
    SELECT *
    FROM iceberg_scan('{warehouse_path}/{namespace}.db/{table_name}')
"""
query

"\n    SELECT *\n    FROM iceberg_scan('/tmp/warehouse/taxi_namespace.db/taxi_trips')\n"

In [46]:
from duckdb import IOException
try:
  # Query the Iceberg table
  result = duckdb.sql(query)
except IOException as e:
  print("DuckDB iceberg extension expects a version-hint.text file, commonly generated by Spark and other frameworks.")
  print("PyIceberg doesn't generate this, but you can pass the latest metadata directly to duckdb")
  print(e)

DuckDB iceberg extension expects a version-hint.text file, commonly generated by Spark and other frameworks.
PyIceberg doesn't generate this, but you can pass the latest metadata directly to duckdb
IO Error: Cannot open file "/tmp/warehouse/taxi_namespace.db/taxi_trips/metadata/version-hint.text": No such file or directory


In [47]:
# Detach the existing "sqlite_db" if it exists
duckdb.execute("DETACH DATABASE IF EXISTS sqlite_db;")

# Attach the SQLite database to DuckDB
duckdb.execute("INSTALL sqlite_scanner;")
duckdb.execute("LOAD sqlite_scanner;")
duckdb.execute("ATTACH DATABASE '/tmp/warehouse/pyiceberg_catalog.db' AS sqlite_db;")



<duckdb.duckdb.DuckDBPyConnection at 0x7a0021e6c970>

In [48]:
duckdb.sql("select * from main.sqlite_master")

┌─────────┬──────────────────────┬──────────────────────┬──────────┬───────────────────────────────────────────────────┐
│  type   │         name         │       tbl_name       │ rootpage │                        sql                        │
│ varchar │       varchar        │       varchar        │  int32   │                      varchar                      │
├─────────┼──────────────────────┼──────────────────────┼──────────┼───────────────────────────────────────────────────┤
│ table   │ iceberg_tables       │ iceberg_tables       │        0 │ CREATE TABLE iceberg_tables(catalog_name VARCHA…  │
│ table   │ iceberg_namespace_…  │ iceberg_namespace_…  │        0 │ CREATE TABLE iceberg_namespace_properties(catal…  │
│ index   │ sqlite_autoindex_i…  │ iceberg_tables       │        0 │ CREATE INDEX sqlite_autoindex_iceberg_tables_1 …  │
│ index   │ sqlite_autoindex_i…  │ iceberg_namespace_…  │        0 │ CREATE INDEX sqlite_autoindex_iceberg_namespace…  │
└─────────┴─────────────────────

In [61]:
duckdb.sql("select * from sqlite_db.main.iceberg_tables").df()

Unnamed: 0,catalog_name,table_namespace,table_name,metadata_location,previous_metadata_location
0,default,taxi_namespace,taxi_trips,file:///tmp/warehouse/taxi_namespace.db/taxi_t...,file:///tmp/warehouse/taxi_namespace.db/taxi_t...


In [49]:
latest_metadata = duckdb.sql("select * from sqlite_db.main.iceberg_tables").df()["metadata_location"].iloc[0]
latest_metadata

'file:///tmp/warehouse/taxi_namespace.db/taxi_trips/metadata/00008-bb02de5e-8f23-4fa0-b655-a03ef20d3299.metadata.json'

In [50]:
# Print the value of latest_metadata to check the file path
print(latest_metadata)

# Check if the file exists
!if [ -f "{latest_metadata.replace('file://', '')}" ]; then echo "File exists"; else echo "File does not exist"; fi

# If the file doesn't exist, you'll need to find the correct file path
# and update the latest_metadata variable accordingly.

# If the file does exist, check the file permissions
!ls -l "{latest_metadata.replace('file://', '')}"

# Ensure the code has read permissions for the file.

file:///tmp/warehouse/taxi_namespace.db/taxi_trips/metadata/00008-bb02de5e-8f23-4fa0-b655-a03ef20d3299.metadata.json
File exists
-rw-r--r-- 1 root root 11204 Aug  5 16:19 /tmp/warehouse/taxi_namespace.db/taxi_trips/metadata/00008-bb02de5e-8f23-4fa0-b655-a03ef20d3299.metadata.json


In [51]:
latest_metadata=latest_metadata.replace('file://','')
latest_metadata
print(latest_metadata)

/tmp/warehouse/taxi_namespace.db/taxi_trips/metadata/00008-bb02de5e-8f23-4fa0-b655-a03ef20d3299.metadata.json


In [52]:
# scan the latest metadata file directly
try:
  duckdb.sql(f"select count(*) from iceberg_scan('{latest_metadata}')")
except IOException as e:
  print("This should be supported.  See issue https://github.com/duckdb/duckdb_iceberg/issues/23")
  print(e)

This should be supported.  See issue https://github.com/duckdb/duckdb_iceberg/issues/23
IO Error: Cannot open file "file:///tmp/warehouse/taxi_namespace.db/taxi_trips/metadata/snap-1151606007071017395-0-7f1ae8c4-eb92-4e30-aee1-9511df3a69a4.avro": No such file or directory


In [53]:
!ls /tmp/

dap_multiplexer.319d224cda95.root.log.INFO.20240805-152142.85
dap_multiplexer.INFO
debugger_1kw8f1k2xk
initgoogle_syslog_dir.0
language_service.319d224cda95.root.log.INFO.20240805-152442.849
language_service.319d224cda95.root.log.INFO.20240805-152453.924
language_service.319d224cda95.root.log.INFO.20240805-152507.1008
language_service.319d224cda95.root.log.INFO.20240805-152514.1053
language_service.319d224cda95.root.log.INFO.20240805-152518.1085
language_service.319d224cda95.root.log.INFO.20240805-152617.1338
language_service.319d224cda95.root.log.INFO.20240805-152646.1478
language_service.319d224cda95.root.log.INFO.20240805-152653.1527
language_service.319d224cda95.root.log.INFO.20240805-152659.1567
language_service.319d224cda95.root.log.INFO.20240805-152659.1581
language_service.319d224cda95.root.log.INFO.20240805-161614.13516
language_service.319d224cda95.root.log.INFO.20240805-161619.13557
language_service.319d224cda95.root.log.INFO.20240805-161624.13601
language_service.319d224cda

In [54]:
!ls /tmp/warehouse/

pyiceberg_catalog.db  taxi_namespace.db


In [55]:
!cat /tmp/warehouse/pyiceberg_catalog.db

SQLite format 3   @                                                                     .WJ� � u��                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    

In [56]:
!ls /tmp/warehouse/taxi_namespace.db

taxi_trips


In [57]:
!ls /tmp/warehouse/taxi_namespace.db/taxi_trips

data  metadata


In [58]:
!ls /tmp/warehouse/taxi_namespace.db/taxi_trips/metadata

00000-b65d5b0f-fff9-42c0-b4ec-a56ffac95a36.metadata.json
00000-def0b54a-ef2a-4fe0-9839-e9017cea4353.metadata.json
00000-f423d6e5-c8e1-43fa-ba7d-f545d3a75815.metadata.json
00001-7f3e64bf-14ea-43ae-b7d1-52cf98b17985.metadata.json
00002-2f125da5-883f-437c-ae8b-8c9538b63409.metadata.json
00003-aa62b2a5-de41-4799-9ce2-4eb0154641a3.metadata.json
00004-0568cb74-674a-49b4-9d85-31fc14e7a939.metadata.json
00005-03e70199-a657-426f-9502-c745f2e2cfa6.metadata.json
00006-0b478923-31f5-4f35-8f66-229b6368af2f.metadata.json
00007-754dee8a-4a4a-4278-a617-2326756237c3.metadata.json
00008-bb02de5e-8f23-4fa0-b655-a03ef20d3299.metadata.json
1054b4a0-0e0a-4f9c-9dc7-817abd041670-m0.avro
111b8ec5-c786-459a-bca9-d36547b3c49b-m0.avro
2283b772-8bd7-41e8-aaa8-47aa277bcc3c-m0.avro
4b3724a3-ab5c-40b8-a801-3f43a530965f-m0.avro
55c34539-6837-46d6-b435-4dac407daa23-m0.avro
71351af3-33bd-46f1-8a91-f5f77898e5ba-m0.avro
74560a3e-1d13-42a2-a645-e89d10c795c3-m0.avro
7f1ae8c4-eb92-4e30-aee1-9511df3a69a4-m0.avro
ca8d9f6c-e654

In [64]:
!cat "//tmp/warehouse/taxi_namespace.db/taxi_trips/metadata/00008-bb02de5e-8f23-4fa0-b655-a03ef20d3299.metadata.json"

{"location":"file:///tmp/warehouse/taxi_namespace.db/taxi_trips","table-uuid":"4f8bb460-d267-4811-adb0-796f518152c5","last-updated-ms":1722874794901,"last-column-id":19,"schemas":[{"type":"struct","fields":[{"id":1,"name":"VendorID","type":"long","required":false},{"id":2,"name":"tpep_pickup_datetime","type":"timestamp","required":false},{"id":3,"name":"tpep_dropoff_datetime","type":"timestamp","required":false},{"id":4,"name":"passenger_count","type":"double","required":false},{"id":5,"name":"trip_distance","type":"double","required":false},{"id":6,"name":"RatecodeID","type":"double","required":false},{"id":7,"name":"store_and_fwd_flag","type":"string","required":false},{"id":8,"name":"PULocationID","type":"long","required":false},{"id":9,"name":"DOLocationID","type":"long","required":false},{"id":10,"name":"payment_type","type":"long","required":false},{"id":11,"name":"fare_amount","type":"double","required":false},{"id":12,"name":"extra","type":"double","required":false},{"id":13,"n

In [59]:
!pip freeze

absl-py==1.4.0
accelerate==0.32.1
aiohttp==3.9.5
aiosignal==1.3.1
alabaster==0.7.16
albucore==0.0.12
albumentations==1.4.12
altair==4.2.2
annotated-types==0.7.0
anyio==3.7.1
argon2-cffi==23.1.0
argon2-cffi-bindings==21.2.0
array_record==0.5.1
arviz==0.18.0
asn1crypto==1.5.1
astropy==6.1.2
astropy-iers-data==0.2024.7.29.0.32.7
astunparse==1.6.3
async-timeout==4.0.3
atpublic==4.1.0
attrs==23.2.0
audioread==3.0.1
autograd==1.6.2
Babel==2.15.0
backcall==0.2.0
beautifulsoup4==4.12.3
bidict==0.23.1
bigframes==1.11.1
bleach==6.1.0
blinker==1.4
blis==0.7.11
blosc2==2.0.0
bokeh==3.4.3
bqplot==0.12.43
branca==0.7.2
build==1.2.1
CacheControl==0.14.0
cachetools==5.4.0
catalogue==2.0.10
certifi==2024.7.4
cffi==1.16.0
chardet==5.2.0
charset-normalizer==3.3.2
chex==0.1.86
clarabel==0.9.0
click==8.1.7
click-plugins==1.1.1
cligj==0.7.2
cloudpathlib==0.18.1
cloudpickle==2.2.1
cmake==3.30.1
cmdstanpy==1.2.4
colorcet==3.1.0
colorlover==0.3.0
colour==0.1.5
community==1.0.0b1
confection==0.1.5
cons==0.4.6
c