# 📓 Brick E2 – Cloud Object Storage & Cloud-Native Formats

**Part of Modern GIS Bricks**  

**Objective:** In this notebook, you will learn how to create, manage, and query an Apache Iceberg table using **PyIceberg**, then read it with PyIceberg and **DuckDB**.

## 1️⃣ Setup & Catalog Configuration
Load a local Iceberg catalog pointed at your S3 warehouse (or local filesystem) and configure AWS credentials via environment variables.

In [None]:
import os
from pyiceberg.catalog import load_catalog

# 1. Make a local folder for both metadata + data
warehouse_dir = os.path.abspath("iceberg_warehouse")
os.makedirs(warehouse_dir, exist_ok=True)

# 2. Point the catalog at a local SQLite file
sqlite_path = os.path.join(warehouse_dir, "catalog.db")
db_uri = f"sqlite:///{sqlite_path}"
warehouse_uri = f"file://{warehouse_dir}"


In [None]:
# 3. Create / load the catalog
catalog = load_catalog(
    name="local_sql",
    type="sql",
    uri=db_uri,
    warehouse=warehouse_uri
)

In [None]:
# 4. Ensure the namespace exists
ns = ("modern_gis",)
if ns not in catalog.list_namespaces():
    catalog.create_namespace(ns)
print("✅ Local SQL catalog ready:", db_uri)

## 2️⃣ Create an Iceberg Table
Define a schema matching the CSV columns plus a WKB geometry, then create the Iceberg table.

In [None]:
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, DateType, StringType, LongType, BinaryType

schema = Schema(
    NestedField(1, "sale_id", StringType(), required=False),
    NestedField(2, "price", LongType(),  required=False),
    NestedField(3, "sale_date",  DateType(),  required=False),
    NestedField(4, "geom_wkb",   BinaryType(),  required=False),
)

In [None]:
table_id = ("modern_gis", "kingco_sales")
table = catalog.create_table(
    identifier=table_id,
    schema=schema,
    location=f"{warehouse_uri}/kingco_sales"
)
print("🆕 Created table:", table)

In [None]:
# 3. Load the SQL (SQLite) catalog
catalog = load_catalog(
    name="local_sql",       # any name you like
    type="sql",             # use the SQL Catalog implementation
    uri=db_uri,             # where to write the SQLite DB
    warehouse=warehouse_uri # where to store table data
)

print("✅ SQL Catalog ready")
print("Properties:", catalog.properties)

## 3️⃣ Ingest Data by Year (2020–Present)
Read the CSV, generate WKB geometry, and append data for each year to create distinct snapshots.

In [None]:
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point

# read the original Parquet
df = pd.read_parquet("data/kingco_sales.parquet")

In [None]:
# create a GeoDataFrame with a true geometry column
gdf = gpd.GeoDataFrame(
    df,
    geometry = [Point(x, y) for x, y in zip(df.longitude, df.latitude)],
    crs      = "EPSG:4326"
)

In [None]:
# drop the old lon/lat if you like
gdf = gdf.drop(columns=["longitude","latitude", "city"])

gdf["geom_wkb"] = gdf.geometry.apply(lambda g: g.wkb)
gdf = gdf.drop(columns="geometry")

# write GeoParquet
gdf.to_parquet("data/kingco_sales_geoparquet.parquet", index=False)

In [None]:
# Read the full Parquet into a PyArrow Table and convert to pandas
arrow_tbl = pq.read_table('data/kingco_sales_geoparquet.parquet')
df = arrow_tbl.to_pandas()

In [None]:
# Ensure sale_date is a datetime and extract the year
df["year"] = pd.to_datetime(df["sale_date"]).dt.year

In [None]:
# Loop over each year, append that slice, and create a new snapshot
for year in sorted(df["year"].unique()):
    df_year = df[df["year"] == year].copy().drop(columns=["year"])
    
    # Reorder to match your schema exactly
    df_year = df_year[["sale_id", "price", "sale_date", "geom_wkb"]]
    
    # 6) Build an Arrow table (now with timestamp[us])
    arrow_year = pa.Table.from_pandas(df_year, preserve_index=False)
    
    # 7) Append and commit a snapshot for this year
    table.append(arrow_year)
    snap_id = table.current_snapshot().snapshot_id
    print(f"✅ Appended year {year} → snapshot {snap_id}")

## 4️⃣ Snapshot History & Time Travel
List all snapshots (one per ingested year) and demonstrate loading a mid-range year.

In [None]:
def show_snapshots(tbl):
    print("Snapshots (yearly commits):")
    for snap in tbl.snapshots():    # ← note the () here
        ts = snap.timestamp_ms / 1000
        from datetime import datetime
        print(f"- ID: {snap.snapshot_id}, Time: {datetime.fromtimestamp(ts)}")

show_snapshots(table)


In [None]:
# 1️⃣ Grab all snapshots
snaps = table.snapshots()

In [None]:
print("Snapshot history and row counts:")
for snap in snaps:
    # read that snapshot into a pandas DataFrame
    df_snap = table.scan(snapshot_id=snap.snapshot_id).to_pandas()
    print(f"- Snapshot {snap.snapshot_id} @ {ts}: {len(df_snap)} rows")

## 5️⃣ Schema Evolution
Add a new optional `zipcode` column to the schema if present in CSV.

In [None]:
from pyiceberg.types import NestedField, StringType

# 1️⃣ Load your existing Iceberg table
table = catalog.load_table(("modern_gis", "kingco_sales"))

schema = table.schema()

# 2️⃣ Compute the next field ID
existing_ids = [col.field_id for col in schema.columns]
next_id = max(existing_ids) + 1

# 2️⃣ Pick the next available field ID
existing_ids = [col.field_id for col in schema.columns]
next_id      = max(existing_ids) + 1

# 3️⃣ Evolve the schema, adding "city" as an optional string
with table.update_schema() as upd:
    upd.add_column(
        ["city"],               # path to the new field
        StringType(),           # its Iceberg type
        required=False,        # optional column
    )
    
# 3️⃣ Verify the new schema includes "city"
print("✅ Schema after evolution:")
print(table.schema)

In [None]:
import pandas as pd
import pyarrow as pa
from shapely.geometry import Point


# 2️⃣ Build a small DataFrame with the new data, including `city`
df_new = pd.read_parquet("data/kingco_sales_geoparquet.parquet")

# 3️⃣ Convert to a PyArrow Table (columns auto-inferred)
arrow_new = pa.Table.from_pandas(df_new, preserve_index=False)

# 4️⃣ Append via PyIceberg
table.append(arrow_new)

# 5️⃣ Check the new snapshot
new_snap = table.current_snapshot().snapshot_id
print(f"✅ Appended new rows with city → snapshot {new_snap}")


## 6️⃣ Query via DuckDB
Use DuckDB’s Iceberg extension to query the table.

In [None]:
import duckdb
# 4️⃣ Spin up DuckDB and register that DataFrame as a table
con = duckdb.connect()
con.register("sales", df)

# Run whatever SQL you like. Examples:

# All rows sold in 2021
df_2021 = con.execute("""
  SELECT * 
  FROM sales
  WHERE EXTRACT(YEAR FROM sale_date) = 2021;
""").df()
print(f"Found {len(df_2021)} rows in 2021")

# 5b) Year‐over‐year row counts by snapshot (if you still want that)
# Note: you could also loop over table.snapshots() and re‐scan, but here's 
# just a SQL that groups by the year-string.
df_counts = con.execute("""
  SELECT EXTRACT(YEAR FROM sale_date) AS year, COUNT(*) as cnt
    FROM sales
   GROUP BY EXTRACT(YEAR FROM sale_date)
   ORDER BY EXTRACT(YEAR FROM sale_date)
""").df()
print(df_counts)

## 7️⃣ Badge Proof
Write a JSON summary of table state for automated badge verification.

In [None]:
import json

# 1️⃣ (Re)load your Iceberg table
table = catalog.load_table(("modern_gis", "kingco_sales"))

# 2️⃣ Unpack the table name
namespace, tablename = table.name()   # returns the Identifier tuple

# 3️⃣ Fetch snapshots and schema
snaps  = table.snapshots()            # call the method to get the list
schema = table.schema()               # call the method to get the Schema

# 4️⃣ Build the proof dict
proof = {
    "table":     f"{namespace}.{tablename}",
    "snapshots": [s.snapshot_id for s in snaps],
    "columns":   [col.name for col in schema.columns]
}

# 5️⃣ Write to JSON
with open("badge_proof.json", "w") as f:
    json.dump(proof, f, indent=2)

print("✅ Badge proof written to badge_proof.json")


# Create Iceberg tables on AWS S3 with AWS Glue (Optional)
Use this code to optionally create tables on AWS using S3 and Glue, a catalog service similar to the SQLite database we used in the local version


## 8️⃣ Create Glue Tables
Create and clean tables on AWS S3 and Glue

In [None]:
# Load in AWS keys and environment variables
AWS_KEY    = os.environ["AWS_ACCESS_KEY_ID"]
AWS_SECRET = os.environ["AWS_SECRET_ACCESS_KEY"]
AWS_REGION = os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
BUCKET     = os.environ["YOUR_BUCKET"]  # e.g. "my-iceberg-bucket"

In [None]:
# Establish your Glue catalog

WAREHOUSE  = f"s3://{BUCKET}/iceberg_warehouse"

glue_cat = load_catalog(
    name="glue_cat",
    type="glue",
    warehouse=WAREHOUSE,
    **{
        "client.access-key-id":     AWS_KEY,
        "client.secret-access-key": AWS_SECRET,
        "client.region":            AWS_REGION,
    }
)

# ensure the namespace exists
ns = ("modern_gis",)
if ns not in glue_cat.list_namespaces():
    glue_cat.create_namespace(ns)

In [None]:
# Create the tables

schema = Schema(
    NestedField(1, "sale_id", StringType(), required=False),
    NestedField(2, "price", LongType(),  required=False),
    NestedField(3, "sale_date",  DateType(),  required=False),
    NestedField(4, "geom_wkb",   BinaryType(),  required=False),
)

table_id = ("modern_gis", "sales_data")
try:
    table = glue_cat.load_table(table_id)
except Exception:
    table = glue_cat.create_table(
        identifier=table_id,
        schema=schema,
        location=f"{WAREHOUSE}/modern_gis/sales_data"
    )

In [None]:
# Load your data

import pyarrow.parquet as pq
import pandas as pd
import pyarrow as pa

# 1️⃣ Read your GeoParquet into a PyArrow Table 
arrow_tbl = pq.read_table("data/kingco_sales_geoparquet.parquet")

# 2️⃣ Convert to pandas for easy geometry → WKB transformation
df = arrow_tbl.to_pandas().head()

# 3️⃣ Select & cast exactly the columns in your Iceberg schema 
df = df[["sale_id", "price", "sale_date", "geom_wkb"]]


# 4️⃣ Convert back to PyArrow and append into your Iceberg table 
arrow_to_append = pa.Table.from_pandas(df, preserve_index=False)
table.append(arrow_to_append)

print("✅ Appended GeoParquet data → snapshot", table.current_snapshot().snapshot_id)

In [None]:
# assume you already have your GlueCatalog or SQLCatalog in `catalog`
# and your table identifier:
table_id = ("modern_gis", "sales_data")

# Drop just the metadata entry
glue_cat.drop_table(table_id)

In [None]:
# Cleanup and delete tables

import boto3, os

bucket = os.environ["YOUR_BUCKET"]
prefix = "iceberg_warehouse/modern_gis/sales_data/"

s3 = boto3.resource("s3")
objs = s3.Bucket(bucket).objects.filter(Prefix=prefix)
deleted = objs.delete()

print(f"Deleted {len(deleted)} S3 objects under {prefix}")
