Install the duckdb package for python, and boto3 for s3 operations

In [None]:
!pip install duckdb boto3

Import the packages and load the configuration.

The _creds_ configuration contains your S3 secrets, you must provide them (in ~/.ssh/s3.ini in this example):
```
[default]
key=your S3 keyid
secret=your S3 secret
```

In [None]:
import duckdb
import boto3
import os
import configparser

creds = configparser.ConfigParser()
creds.read(os.getenv('HOME') + '/.ssh/s3.ini')
config = configparser.RawConfigParser()
config.read('config.ini')

At the moment DuckDB does not have a built-in catalog concept. To read Iceberg tables from S3, it is necessary to manually retrieve the Iceberg metadata. We define a helper function _get_metadata_ to identify the most recent metadata of an Iceberg table stored on S3.

In [None]:
def get_metadata(bucket, prefix, s3endpoint, s3key, s3secret):
    s3 = boto3.client('s3', endpoint_url='https://'+s3endpoint, aws_access_key_id=s3key, aws_secret_access_key=s3secret)
    response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix.strip('/') + '/metadata')
    latest_manifest = None
    latest_time = None
    for obj in response.get('Contents', []):
        key = obj['Key']
        if key.endswith('.json'):  # Check for manifest files
            last_modified = obj['LastModified']
            if latest_time is None or last_modified > latest_time:
                latest_time = last_modified
                latest_manifest = key
    s3.close()
    return latest_manifest

Find the metadata paths of all the tables needed for our query, and create a duckdb connection configured for iceberg on S3.
Duckdb does not have a concept of catalog like Trino has, but it should in theory(*) be possible to connect to multiple S3 using [secret scopes](https://duckdb.org/docs/configuration/secrets_manager.html#creating-multiple-secrets-for-the-same-service-type). The downside is that the credentials to all the individual sources must be managed by the user, which is not easily portable.

(*) As of duckdb v1.1.3 _SCOPE_ not recognized by _iceberg_scan_ on S3.

In [None]:
s3_prefixes = {
        'istdaten':'com490/data/sbb/parquet/istdaten/',
        'shapes':'com490/data/geo/parquet/',
        'stops': 'com490/data/sbb/parquet/timetable/stops'
}

bucket = config.get('default', 'bucket')
s3endpoint = config.get('default', 'endpoint')
s3key=creds.get('default', 'key')
s3secret=creds.get('default', 'secret')

metadata={}
for k,v in s3_prefixes.items():
    metadata[k] = get_metadata(bucket, v, s3endpoint, s3key, s3secret)
    print(f'{k}: s3://{bucket}/{metadata[k]}')

conn = duckdb.connect()
conn.execute("INSTALL 'httpfs';")
conn.execute("INSTALL 'iceberg';")
conn.execute("INSTALL 'spatial';")
conn.execute("LOAD 'spatial';")
conn.execute("LOAD 'httpfs';")
conn.execute("LOAD 'iceberg';")

conn.execute(f"""
CREATE SECRET secret3 (
    TYPE S3,
    KEY_ID '{s3key}',
    SECRET '{s3secret}',
    ENDPOINT '{s3endpoint}',
    URL_STYLE 'path',
    USE_SSL true,
    REGION 'ZH'
)
""")

In [None]:
ans = conn.execute(f"""FROM which_secret('s3://{bucket}/{metadata["stops"]}', 's3')""")
print(ans.fetchall())

In [None]:
ans = conn.execute(f"""SELECT COUNT(*) FROM iceberg_scan('s3://{bucket}/{metadata["stops"]}')""")
print(ans.fetchone())

Prepare the SQL query:
- _stop_: names and geolocations SBB stops
- _shape_: geospatial shapes of the swiss city boundaries (administrative zones)
- _geo_tagged_stop_: table derived from _stop_ and _shape_ placing stops in their respective cities
- _geo_tagged_istdaten_: actual arrival and departure delays, with information about day of week, hour and city containing the stop (from geo_tagged_stop)

In [None]:
aggregatePercentiles = f"""
WITH
        stop  AS (
                SELECT TRY_CAST(stop_id[:7] as INTEGER) as bpuic, stop_lat, stop_lon
                FROM iceberg_scan('s3://{bucket}/{metadata["stops"]}')
                WHERE year=2024 AND month=9 AND day=9
        ),

        shape AS (
                SELECT ST_GeomFromWKB(wkb_geometry) as geometry, name
                FROM iceberg_scan('s3://{bucket}/{metadata["shapes"]}')
                WHERE level='city'
        ),

        geo_tagged_stop AS (
                SELECT stop.bpuic, stop.stop_lat, stop.stop_lon, shape.name
                FROM stop JOIN shape ON ST_Contains(shape.geometry, ST_Point(stop.stop_lon, stop.stop_lat))
        ),

        geo_tagged_istdaten AS (
                SELECT dayofweek(iceberg_scan_data.arr_actual) as day_week, hour(iceberg_scan_data.arr_actual) as hour_day,
                       date_diff('second', iceberg_scan_data.arr_time, iceberg_scan_data.arr_actual) as arr_delay, date_diff('second',iceberg_scan_data.dep_time, iceberg_scan_data.dep_actual) as dep_delay, geo_tagged_stop.name
                FROM iceberg_scan('s3://{bucket}/{metadata["istdaten"]}')
                JOIN geo_tagged_stop ON geo_tagged_stop.bpuic = iceberg_scan_data.bpuic
        )
SELECT AVG(arr_delay) as arr_delay, AVG(dep_delay) as dep_delay, COUNT(*) as num,
           approx_quantile(arr_delay, 0.25), approx_quantile(arr_delay,0.5), approx_quantile(arr_delay,0.75), hour_day, name
       FROM geo_tagged_istdaten WHERE day_week >= 1 AND day_week <= 5 GROUP BY name,hour_day ORDER BY name,hour_day
"""

Execute the query and get the results.

You can for instance iterate the cursor and write the rows to file or create a pandas DataFrame.

```
%%time
from contextlib import closing
import pandas as pd
with closing(conn.cursor()) as cur:
    cur.execute(aggregatePercentiles)
    columns = [col[0] for col in cur.description]
    df = pd.DataFrame(cur, columns=columns)
```

You can also directly use the pandas.read_sql_query directly. If using this second option you will need to suppress a warning, because pandas does not know that duckdb is DBAPI2 compliant

⚠️ because duckdb runs locally, make sure that you have sufficient CPU and memory.

In [None]:
%%time
import pandas as pd
import warnings

with warnings.catch_warnings():
    # Catch UserWarning: pandas only supports SQLAlchemy
    warnings.simplefilter("ignore", category=UserWarning)
    df = pd.read_sql_query(aggregatePercentiles, conn)

In [None]:
df

In [None]:
con.close()