In [1]:
import os
from pathlib import Path

AWS_REGION = "us-east-1"
AWS_PROFILE = "duckdb"  # aws configure sso --profile duckdb

os.environ["AWS_PROFILE"] = AWS_PROFILE
os.environ["AWS_REGION"] = AWS_REGION
os.environ["PULUMI_CONFIG_PASSPHRASE"] = ""

S3_BUCKET_NAME = "mlops-club-datalake-stream"
GLUE_DATABASE_NAME = "nyc_taxi"
GREEN_TAXI_TABLE_NAME = "green_taxi_trips"

DATA_DIR = Path("../../src/data/green")

# Create S3 Bucket

Yeah... pulumi is probly overkill. Sorry. I figured it'd help me clean everything up at the end.

Creates

- an S3 bucket for our datalake
- a glue database in the global AWS Glue catalog

In [2]:
# Pulumi Automation API example: create an S3 bucket

import pulumi
import pulumi_aws as aws
from pulumi import automation as auto

def pulumi_program():
    # The first argument is the Pulumi resource name, the second is the bucket name property
    bucket = aws.s3.Bucket(resource_name="datalake-bucket", bucket=S3_BUCKET_NAME, force_destroy=True)
    glue_db = aws.glue.CatalogDatabase(
        resource_name="glue-db",
        name=GLUE_DATABASE_NAME
    )
    pulumi.export("bucket_name", bucket.id)
    pulumi.export("glue_database_name", glue_db.name)

# Set up and run the Pulumi stack
stack_name = "dev"
project_name = "s3-bucket-project"

stack = auto.create_or_select_stack(
    stack_name=stack_name,
    project_name=project_name,
    program=pulumi_program,
)

stack.set_config("aws:region", auto.ConfigValue(value="us-east-1"))
stack.workspace.install_plugin("aws", "v5.0.0")
# up_res = stack.up(on_output=print)
up_res = stack.up()

print(f"Bucket name: {up_res.outputs['bucket_name'].value}")
bucket_url = f"https://s3.console.aws.amazon.com/s3/buckets/{up_res.outputs['bucket_name'].value}?region=us-east-1"
print(f"S3 Console URL: {bucket_url}")
print(f"Glue Database: {up_res.outputs['glue_database_name'].value}")
glue_db_url = f"https://us-east-1.console.aws.amazon.com/glue/home?region=us-east-1#database:name={up_res.outputs['glue_database_name'].value}"
print(f"Glue Database Console URL: {glue_db_url}")

   $ brew update && brew upgrade pulumi
or visit https://pulumi.com/docs/install/ for manual instructions and release notes.
I0000 00:00:1749410305.714508 1290710 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
I0000 00:00:1749410311.064201 1290710 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
I0000 00:00:1749410311.279778 1290710 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
I0000 00:00:1749410311.479644 1290710 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers


Bucket name: mlops-club-datalake-stream
S3 Console URL: https://s3.console.aws.amazon.com/s3/buckets/mlops-club-datalake-stream?region=us-east-1
Glue Database: nyc_taxi
Glue Database Console URL: https://us-east-1.console.aws.amazon.com/glue/home?region=us-east-1#database:name=nyc_taxi


# Define our table in the database

We could do this

- with a `CREATE OR REPLACE TABLE` statement and DDL via athena. But then we would need to know the exact types of each column.
- or we could have the aws data wrangler library infer this for us [(`wr.athena.to_iceberg(df, database, table)`)](https://aws-sdk-pandas.readthedocs.io/en/3.2.1/stubs/awswrangler.athena.to_iceberg.html)
- or `pyiceberg` can create tables, since, after all, tables are the primitive of iceberg. But does this risk inefficiencies such as not declaring the partition?

### Connect to AWS Glue Catalog

We would not have to connect like this if we were using AWS data wrangler's functions for creating tables.

I really do not love that DS would potentially be exposed to all this. 

And I don't want DS thinking about which specific S3 bucket contains the underlying files for their tables 🤔.

I definitely iceberg to be an implementation detail for them, and for them to think about their data simply as tables in a database like in Snowflake.

In [3]:
# connect to glue the catalog/database via pyiceberg and credential vending
from pyiceberg.catalog import load_catalog, Catalog

# Configure catalog connection properties
catalog_properties = {
    "type": "rest",
    "uri": f"https://glue.{AWS_REGION}.amazonaws.com/iceberg",
    "s3.region": AWS_REGION,
    "rest.sigv4-enabled": "true",
    "rest.signing-name": "glue",
    "rest.signing-region": AWS_REGION,
}

# presumably, this uses boto3's credential chain under the hood
iceberg_catalog: Catalog = load_catalog(**catalog_properties)

We can create the table using `pyiceberg`. I like this because `pyiceberg` can get the schema (cols and dtypes) of one of our parquet files from pyarrow.

This saves us from having to hand write a DDL statement ourselves like

```sql
CREATE TABLE AwsDataCatalog.nyc_taxi.green_taxi_trips (
    vendor_id STRING,
    lpep_pickup_datetime TIMESTAMP,
    lpep_dropoff_datetime TIMESTAMP,
    passenger_count INT,
    trip_distance DOUBLE,
    ...
)
PARTITIONED BY (hour(lpep_pickup_datetime))
STORED AS ICEBERG
LOCATION 's3://<bucket>/iceberg/<db>/<table>/';
```

Question for later: the snowflake equivalent of `LOCATION` here is creating a stage. Why would some organizations require you to get approval before creating a stage? Is there messiness that could be caused by letting anyone create any stages they like? 🤔

Question: what is Athena's equivalent of `COPY INTO`? I worry that if DS yeet files into a S3 dir and then run a glue crawler on that, there's a risk of them adding files that do not have valid schemas, therefore corrupting the whole table. Need to see how this would work.

Question: how do you do data governance? How do you restrict who can access certain columns? How do you restrict who can access certain rows?

Question: how do you do data documentation? How do you annotate what each column means with human-readable comments.

Question: how do you see data lineage? If I have a chain of `create table ... as select ...` queries, how do we see the column-level lineage? (how is each column derived from upstream columns?)

Question: how can lineage hook into OpenLineage? Can 

In [4]:
import pandas as pd
import pyarrow as pa
from IPython.display import display
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC

# read one of the parquet files and get its pyarrow schema
data_files: list[str] = list(DATA_DIR.glob("*.parquet"))

# concat each of the df's into one parquet file to help infer the schema
df = pd.concat(
    [pd.read_parquet(file) for file in data_files],
    ignore_index=True,
)
display(df.describe())

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,cbd_congestion_fee
count,146486.0,146486,146486,138584.0,146486.0,146486.0,138584.0,146486.0,146486.0,146486.0,146486.0,146486.0,146486.0,0.0,146486.0,146486.0,138584.0,138560.0,138584.0,142662.0
mean,1.924423,2025-02-15 23:22:51.800254,2025-02-15 23:43:06.287597,1.255167,95.588363,141.688455,1.281252,17.408229,17.059759,0.89536,0.601063,2.49088,0.202184,,0.979542,23.18034,1.267931,1.041513,0.844509,0.063007
min,1.0,2024-12-25 23:13:15,2024-12-25 23:13:17,1.0,1.0,1.0,0.0,0.0,-470.6,-5.0,-0.5,-20.0,0.0,,-1.0,-473.1,1.0,1.0,-2.75,-0.75
25%,2.0,2025-01-24 20:55:54,2025-01-24 21:12:27.750000,1.0,74.0,74.0,1.0,1.11,9.3,0.0,0.5,0.0,0.0,,1.0,13.8,1.0,1.0,0.0,0.0
50%,2.0,2025-02-15 19:17:49,2025-02-15 19:33:20,1.0,75.0,140.0,1.0,1.8,13.5,0.0,0.5,2.02,0.0,,1.0,18.9,1.0,1.0,0.0,0.0
75%,2.0,2025-03-10 07:29:45.750000,2025-03-10 07:46:14.500000,1.0,97.0,229.0,1.0,3.09,19.8,2.5,0.5,3.7,0.0,,1.0,27.18,2.0,1.0,2.75,0.0
max,6.0,2025-04-01 23:41:29,2025-04-01 23:41:50,99.0,265.0,265.0,9.0,147993.11,633.7,12.5,61.5,252.05,48.94,,1.0,642.14,5.0,2.0,2.75,0.75
std,0.580098,,,2.791136,55.782122,77.184062,0.933006,940.146342,14.264941,1.355397,0.391043,3.278479,1.260738,,0.152176,16.48806,0.474899,0.199474,1.268505,0.208072


In [5]:
arrow_df = pa.Table.from_pandas(df)

# iceberg_catalog.drop_table(
#     identifier=f"{GLUE_DATABASE_NAME}.{GREEN_TAXI_TABLE_NAME}",
# )

# now create an iceberg table using that schema
iceberg_catalog.create_table_if_not_exists(
    identifier=f"{GLUE_DATABASE_NAME}.{GREEN_TAXI_TABLE_NAME}",
    schema=arrow_df.schema,
    # partition by lpep_pickup_datetime at an hourly granularity
    # ...
    # I couldn't figure out how to cleanly partition by hour(lpep_pickup_datetime)
    # the aws data wrangler library would make this much easier
    partition_spec=UNPARTITIONED_PARTITION_SPEC,
    # it's lame that we have to specify the location here since this would be exposed to DS 🤔
    location=f"s3://{S3_BUCKET_NAME}/iceberg/{GLUE_DATABASE_NAME}/{GREEN_TAXI_TABLE_NAME}/",
)

glue_url = f"https://us-east-1.console.aws.amazon.com/glue/home?region=us-east-1#/v2/data-catalog/tables/view/{GREEN_TAXI_TABLE_NAME}?database={GLUE_DATABASE_NAME}&catalogId=847068433460&versionId=latest&mainTab=tab-table-overview"
print(f"Created Iceberg table: '{GREEN_TAXI_TABLE_NAME}'")
print(f"Glue Table Console URL: {glue_url}")

Created Iceberg table: 'green_taxi_trips'
Glue Table Console URL: https://us-east-1.console.aws.amazon.com/glue/home?region=us-east-1#/v2/data-catalog/tables/view/green_taxi_trips?database=nyc_taxi&catalogId=847068433460&versionId=latest&mainTab=tab-table-overview


### Insert some data into the table

At this point the table is empty. Let's write some data to it.

In [7]:
import pyarrow as pa
import pandas as pd

data_file_fpaths: list[Path] = list(DATA_DIR.glob("*.parquet"))

for path in data_file_fpaths[:1]:
    print(f"Inserting data file: {path}")
    df = pd.read_parquet(path)  # drop empty columns
    pyarrow_table = pa.Table.from_pandas(df)
    iceberg_catalog.load_table(f"{GLUE_DATABASE_NAME}.{GREEN_TAXI_TABLE_NAME}").append(pyarrow_table)
    # iceberg_catalog.load_table(f"{GLUE_DATABASE_NAME}.{GREEN_TAXI_TABLE_NAME}").upsert(
    #     pyarrow_table,
    #     join_cols=["lpep_pickup_datetime", "lpep_dropoff_datetime", "passenger_count", "trip_distance", "fare_amount", "tip_amount", "total_amount"],
    #     case_sensitive=False,
    # ) # matt martin's sweet function



Inserting data file: ../../src/data/green/2025-01.parquet


In [8]:
# validate it!
import awswrangler as wr
import boto3

# query = """\
# SELECT VendorID, AVG(trip_distance) AS avg_dist
# FROM AwsDataCatalog.nyc_taxi.green_taxi_trips
# WHERE passenger_count > 1
# GROUP BY VendorID;
# """

query = """\
SELECT *
FROM AwsDataCatalog.nyc_taxi.green_taxi_trips
WHERE lpep_pickup_datetime > TIMESTAMP '2024-12-01 00:00:00'
LIMIT 1000
"""

boto3.setup_default_session(region_name=AWS_REGION)
wr.athena.read_sql_query(
    sql=query,
    database="nyc",
    ctas_approach=False,
    boto3_session=boto3.Session(region_name=AWS_REGION)
)

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,cbd_congestion_fee
0,2025-01-01 00:03:01,2025-01-01 00:17:12,N,1.0,5.93,24.70,1.0,0.5,6.80,0.00,,1.0,34.00,1.0,1.0,0.00,0.0
1,2025-01-01 00:19:59,2025-01-01 00:25:52,N,1.0,1.32,8.60,1.0,0.5,0.00,0.00,,1.0,11.10,2.0,1.0,0.00,0.0
2,2025-01-01 00:05:29,2025-01-01 00:07:21,N,1.0,0.41,25.55,0.0,0.0,0.00,0.00,,1.0,26.55,2.0,2.0,0.00,0.0
3,2025-01-01 00:52:24,2025-01-01 01:07:52,N,1.0,4.12,21.20,1.0,0.5,6.13,6.94,,1.0,36.77,1.0,1.0,0.00,0.0
4,2025-01-01 00:25:05,2025-01-01 01:01:10,N,1.0,4.71,33.80,1.0,0.5,7.81,0.00,,1.0,46.86,1.0,1.0,2.75,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,2025-01-02 01:14:23,2025-01-02 01:21:46,N,1.0,1.66,30.00,0.0,0.0,1.00,0.00,,1.0,32.00,1.0,2.0,0.00,0.0
996,2025-01-02 02:07:29,2025-01-02 02:07:32,N,1.0,0.00,20.00,0.0,0.0,0.00,0.00,,1.0,21.00,1.0,2.0,0.00,0.0
997,2025-01-02 02:09:21,2025-01-02 02:09:23,N,1.0,0.00,17.00,0.0,0.0,0.00,0.00,,1.0,18.00,1.0,2.0,0.00,0.0
998,2025-01-02 02:27:25,2025-01-02 02:27:28,N,1.0,0.07,10.00,0.0,0.0,2.20,0.00,,1.0,13.20,1.0,2.0,0.00,0.0


## Attempt to connect via duckDB

Using the method documented here: https://duckdb.org/docs/stable/core_extensions/iceberg/amazon_sagemaker_lakehouse

In [13]:
# duckdb

import duckdb
import boto3

# Get AWS account ID for the ATTACH statement
sts = boto3.client("sts")
account_id = sts.get_caller_identity()["Account"]

con = duckdb.connect()

# Attach to AWS Glue Iceberg REST catalog using explicit REST endpoint and SigV4 options
con.sql(f"""
INSTALL iceberg;
LOAD iceberg;
CREATE SECRET glue_iceberg_secret (
    TYPE s3,
    PROVIDER credential_chain,
    CHAIN sts,
    REGION '{AWS_REGION}',
    PROFILE '{AWS_PROFILE}'
);

ATTACH '{account_id}:s3tablescatalog/nyc_taxi' AS glue_catalog (
    TYPE iceberg,
    ENDPOINT_TYPE glue
);
""")

print(con.execute("SHOW ALL TABLES;").fetchdf())

HTTPException: HTTP Error: Failed to query https://glue.us-east-1.amazonaws.com/iceberg/v1/config?warehouse=792808862870%3As3tablescatalog%2Fnyc_taxi, http error 403 thrown. Message: {"message":"The security token included in the request is invalid."}

## Another attempt to connect using the REST Catalog interface

Docs here: https://duckdb.org/docs/stable/core_extensions/iceberg/iceberg_rest_catalogs

In [14]:
# duckdb

import duckdb
import boto3

# Get AWS account ID for the ATTACH statement
sts = boto3.client("sts")
account_id = sts.get_caller_identity()["Account"]

con = duckdb.connect()


# Attach to AWS Glue Iceberg REST catalog using explicit REST endpoint and SigV4 options
con.sql(f"""
INSTALL iceberg;
LOAD iceberg;
CREATE SECRET glue_iceberg_secret (
    TYPE s3,
    PROVIDER credential_chain,
    CHAIN sts,
    REGION '{AWS_REGION}',
    PROFILE '{AWS_PROFILE}'
);

ATTACH '{account_id}:nyc_taxi' AS glue_catalog (
    TYPE iceberg,
    ENDPOINT_TYPE 'GLUE',
    ENDPOINT 'https://glue.{AWS_REGION}.amazonaws.com/iceberg',
    SECRET glue_iceberg_secret
);
""")

print(con.execute("SHOW ALL TABLES;").fetchdf())

HTTPException: HTTP Error: Failed to query https://glue.us-east-1.amazonaws.com/iceberg/v1/config?warehouse=792808862870%3Anyc_taxi, http error 403 thrown. Message: {"message":"The security token included in the request is invalid."}

## Another attempt to connect

Our bucket is not an S3 tables bucket, so possibly we need to specify the database name in the connection string.

In [15]:
import duckdb
import boto3

# Get AWS account ID for the ATTACH statement
sts = boto3.client("sts")
account_id = sts.get_caller_identity()["Account"]

con = duckdb.connect()


# Attach to AWS Glue Iceberg REST catalog using explicit REST endpoint and SigV4 options
con.sql(f"""
INSTALL iceberg;
LOAD iceberg;
CREATE SECRET glue_iceberg_secret (
    TYPE s3,
    PROVIDER credential_chain,
    CHAIN sts,
    REGION '{AWS_REGION}',
    PROFILE '{AWS_PROFILE}'
);

ATTACH '{account_id}:nyc_taxi' AS glue_catalog (
    TYPE iceberg,
    ENDPOINT_TYPE glue
);
""")

print(con.execute("SHOW ALL TABLES;").fetchdf())

HTTPException: HTTP Error: Failed to query https://glue.us-east-1.amazonaws.com/iceberg/v1/config?warehouse=792808862870%3Anyc_taxi, http error 403 thrown. Message: {"message":"The security token included in the request is invalid."}

# Cleanup created resources

See. There was a reason for Pulumi!

In [16]:
# cleanup the stack
stack.destroy()

DestroyResult(stdout='Destroying (dev):\n\n\n\n@ destroying.....\n\n -  aws:glue:CatalogDatabase glue-db deleting (0s) \n\n -  aws:s3:Bucket datalake-bucket deleting (0s) \n\n@ destroying....\n\n -  aws:glue:CatalogDatabase glue-db deleted (1s) \n\n@ destroying.....\n\n -  aws:s3:Bucket datalake-bucket deleted (3s) \n\n -  pulumi:pulumi:Stack s3-bucket-project-dev deleting (0s) \n\n -  pulumi:pulumi:Stack s3-bucket-project-dev deleted (0.00s) \n\nOutputs:\n\n  - bucket_name       : "mlops-club-datalake-stream"\n\n  - glue_database_name: "nyc_taxi"\n\n\n\nResources:\n\n    - 3 deleted\n\n\n\nDuration: 5s\n\n\n\nThe resources in the stack have been deleted, but the history and configuration associated with the stack are still maintained. \n\nIf you want to remove the stack completely, run `pulumi stack rm dev`.\n', stderr='', summary=UpdateSummary(result='succeeded', version=0, start_time=datetime.datetime(2025, 6, 8, 19, 30, 8), end_time=datetime.datetime(2025, 6, 8, 19, 30, 13), kind='

: 

: 

: 

: 

: 

: 

: 

: 