In [1]:
import duckdb
from pyiceberg.catalog.sql import SqlCatalog
import pyarrow as pa
import os
import shutil
import sqlite3

In [2]:
# get Q2 2023 to through april 2024 (latest available data)
trips_ls = []
months = [
    '2023-04',
    '2023-05', 
    '2023-06', 
    '2023-07', 
    '2023-08', 
    '2023-09', 
    '2023-10', 
    '2023-11', 
    '2023-12', 
    '2024-01', 
    '2024-02', 
    '2024-03', 
    '2024-04'
    ]
for month in months:
    table_path = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{month}.parquet'
    table = duckdb.sql(f"SELECT * FROM '{table_path}'").arrow()
    trips_ls.append(table)

# concatenate all tables
trips = pa.concat_tables(trips_ls)
print(trips.num_rows)

41994806


In [3]:
trips.schema

VendorID: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: int64
trip_distance: double
RatecodeID: int64
store_and_fwd_flag: string
PULocationID: int32
DOLocationID: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
Airport_fee: double

In [4]:
# get location zone mapping
zones = duckdb.sql("SELECT * FROM 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv'").arrow()
print(zones.num_rows)

265


In [5]:
zones.schema

LocationID: int64
Borough: string
Zone: string
service_zone: string

In [6]:
# create iceberg catalog using sqlite
warehouse_path = "/duck_iceberg_demo/"
name_space = 'demo_db'
# create iceberg catalog using sqlite
catalog = SqlCatalog(
    name_space,
    **{
        "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
        "warehouse": f"file://{warehouse_path}",
    },
)

# create a namespace for Iceberg
catalog.create_namespace(name_space)

In [7]:
def get_iceberg_tables(database_path, table_namespace=None, table_name=None):
    """
    Connect to the SQLite database and retrieve the list of Iceberg tables.
    Optionally filter by namespace and table name.

    Parameters:
        database_path (str): The path to the SQLite database file.
        table_namespace (str, optional): The namespace of the table to search for.
        table_name (str, optional): The name of the table to search for.

    Returns:
        list: A list of dictionaries, each representing an Iceberg table.

    Raises:
        ValueError: If only one of table_namespace or table_name is provided.
    """
    # Check if both namespace and table name are provided together
    if (table_namespace and not table_name) or (table_name and not table_namespace):
        raise ValueError("Both table_namespace and table_name must be provided together.")
    
    # Connect to the SQLite database
    con_meta = sqlite3.connect(database_path)
    con_meta.row_factory = sqlite3.Row

    # Create a cursor object to execute SQL queries
    cursor = con_meta.cursor()

    # Base query to list tables in the database
    query = 'SELECT * FROM "iceberg_tables" WHERE 1=1'
    params = []

    # Add conditions to the query based on provided namespace and table name
    if table_namespace and table_name:
        query += ' AND "table_namespace" = ? AND "table_name" = ?'
        params.append(table_namespace)
        params.append(table_name)

    # Execute the query with parameters
    cursor.execute(query, params)

    # Fetch all results
    results = cursor.fetchall()

    # Convert results to list of dictionaries
    table_list = []
    for row in results:
        row_dict = {key: row[key] for key in row.keys()}
        table_list.append(row_dict)

    # Close the connection
    con_meta.close()

    return table_list



In [8]:
def create_metadata_for_tables(tables):
    """
    Iterate through all tables and create metadata files.

    Parameters:
        tables (list): A list of dictionaries, each representing an Iceberg table with a 'metadata_location'.
    """
    for table in tables:
        metadata_location = table['metadata_location'].replace('file://', '')
        metadata_dir = os.path.dirname(metadata_location)
        new_metadata_file = os.path.join(metadata_dir, 'v1.metadata.json')
        version_hint_file = os.path.join(metadata_dir, 'version-hint.text')

        # Ensure the metadata directory exists
        os.makedirs(metadata_dir, exist_ok=True)

        # Copy the metadata file to v1.metadata.json
        shutil.copy(metadata_location, new_metadata_file)
        print(f"Copied metadata file to {new_metadata_file}")

        # Create the version-hint.text file with content "1"
        with open(version_hint_file, 'w') as f:
            f.write('1')
        print(f"Created {version_hint_file} with content '1'")

In [13]:
# add tables to iceberg catalog
for table, table_name in [
    (trips, "trips"),
    (zones, "zones"),
]:  
	# create the iceberg table
    iceberg_table = catalog.create_table(
        f"{name_space}.{table_name}",
        schema=table.schema,
    )

    # add data to iceberg table
    iceberg_table.append(table)

    # copy catalog metadata to iceberg table
    catalog_records = get_iceberg_tables(f"{warehouse_path}/pyiceberg_catalog.db", name_space, table_name)
    create_metadata_for_tables(catalog_records)
    
    print(f"Created {table_name}, {table.num_rows} rows")

In [12]:
## uncomment to append more data to iceberg tables, to simulate new data coming in
# for table, table_name in [
#     (trips, "trips"),
#     (zones, "zones"),
# ]:  
#     iceberg_table = catalog.load_table(f"{name_space}.{table_name}")
#     # add data to iceberg table
#     iceberg_table.append(table)

#     # copy catalog metadata to iceberg table
#     catalog_records = get_iceberg_tables(f"{warehouse_path}/pyiceberg_catalog.db", name_space, table_name)
#     create_metadata_for_tables(catalog_records)
    
#     print(f"Loaded {table_name}, {table.num_rows} rows")

In [10]:
# initiate a duckdb connection which we will use to be the query engine for iceberg
con = duckdb.connect(database=':memory:', read_only=False)
setup_sql = '''
INSTALL iceberg;
LOAD iceberg;
'''
res = con.execute(setup_sql)

# create the schema and views of iceberg tables in duckdb
database_path = f'{warehouse_path}/demo_db.db'

create_view_sql = f'''
CREATE SCHEMA IF NOT EXISTS taxi;

CREATE VIEW taxi.trips AS
SELECT * FROM iceberg_scan('{database_path}/trips', allow_moved_paths = true);

CREATE VIEW taxi.zones AS
SELECT * FROM iceberg_scan('{database_path}/zones', allow_moved_paths = true);
'''

con.execute(create_view_sql)


<duckdb.duckdb.DuckDBPyConnection at 0x149d79fb0>

In [11]:
sql = f'''
select 
    count(*)
from taxi.trips
'''

res = con.execute(sql)
res.fetchdf()

Unnamed: 0,count_star()
0,41994806


In [107]:
sql = f'''
select 
    date_trunc('month', tpep_pickup_datetime) as month,
    avg(passenger_count) as avg_passenger_count,
    avg(trip_distance) as avg_trip_distance,
    sum(trip_distance) as total_trip_distance,
    avg(total_amount) as avg_total_amount,
    sum(total_amount) as total_amount,
    count(*) as total_trips
from taxi.trips
-- some data pre and post our target date range is in the dataset, so we filter it out
where tpep_pickup_datetime between '2023-04-01' and '2024-05-01'
group by 1
order by 1
'''

%time res = con.execute(sql)
res.fetchdf()

CPU times: user 6.63 s, sys: 170 ms, total: 6.8 s
Wall time: 3.59 s


Unnamed: 0,month,avg_passenger_count,avg_trip_distance,total_trip_distance,avg_total_amount,total_amount,total_trips
0,2023-04-01,1.382822,4.09619,26937880.0,28.269478,185909300.0,6576326
1,2023-05-01,1.358801,4.345793,30539310.0,28.962935,203532000.0,7027328
2,2023-06-01,1.369012,4.368754,28897200.0,29.068591,192274700.0,6614518
3,2023-07-01,1.401961,4.489437,26102420.0,28.568068,166100100.0,5814186
4,2023-08-01,1.386979,4.782777,27015050.0,28.62803,161702600.0,5648402
5,2023-09-01,1.356404,4.274258,24335410.0,29.781914,169562800.0,5693482
6,2023-10-01,1.359725,3.926687,27661700.0,29.171275,205498200.0,7044538
7,2023-11-01,1.358013,3.632733,24264700.0,28.695792,191672500.0,6679462
8,2023-12-01,1.40816,3.676252,24826000.0,28.541505,192742900.0,6753074
9,2024-01-01,1.339277,3.652175,21654640.0,26.8016,158913300.0,5929246


In [109]:
sql = f'''
select 
    zones.Borough,
    count(*) as total_trips,
    sum(total_amount) as total_amount
from taxi.zones as zones
left join taxi.trips as trips
    on zones.LocationID = trips.DOLocationID
group by 1 
order by 2 desc
'''

%time res = con.execute(sql)
res.fetchdf()

CPU times: user 5.79 s, sys: 63.4 ms, total: 5.86 s
Wall time: 2.99 s


Unnamed: 0,Borough,total_trips,total_amount
0,Manhattan,148707792,3690928000.0
1,Queens,9018616,485342100.0
2,Brooklyn,6494860,325417300.0
3,Unknown,1469164,42960700.0
4,Bronx,1013362,52875460.0
5,,729832,86719680.0
6,EWR,500656,62501830.0
7,Staten Island,44948,4510061.0


In [111]:
sql = f'''
select 
    starting_zone.Borough as pickup_borough,
    ending_zone.Borough as dropoff_borough,
    count(*) as trip_count
from
taxi.trips as trips
left join taxi.zones as starting_zone
    on trips.PULocationID = starting_zone.LocationID
left join taxi.zones as ending_zone
    on trips.DOLocationID = ending_zone.LocationID
group by 1, 2
order by 1 asc, 3 desc
'''

%time res = con.execute(sql)
res.fetchdf().head(20)

CPU times: user 43.2 s, sys: 9.8 s, total: 53 s
Wall time: 26.2 s


Unnamed: 0,pickup_borough,dropoff_borough,trip_count
0,Bronx,Bronx,311200
1,Bronx,Manhattan,270232
2,Bronx,Queens,57432
3,Bronx,Brooklyn,55544
4,Bronx,,4176
5,Bronx,Unknown,1848
6,Bronx,Staten Island,1304
7,Bronx,EWR,280
8,Brooklyn,Brooklyn,1452112
9,Brooklyn,Manhattan,1045936
