In [None]:
# import duckdb

# # Install extensions globally (only needed once)
# duckdb.sql("INSTALL h3 FROM community")
# duckdb.sql("INSTALL httpfs")
# duckdb.sql("INSTALL spatial")
# duckdb.sql("INSTALL pdal FROM community")


# def get_con():
#     """In-memory connection for workers. LOAD only, no INSTALL."""
#     con = duckdb.connect()
#     con.sql("""
#         SET temp_directory = 'maplibre-gl-usgs-lidar/notebooks/tmp'
#         SET s3_region = 'us-west-2';
#         LOAD h3;
#         LOAD httpfs;
#         LOAD spatial;
#         LOAD pdal;
#         SET enable_progress_bar = true;
#     """)
#     return con

: 

In [None]:
import concurrent.futures
import mercantile
import pyarrow as pa
import time
import duckdb

# Install extensions globally (only needed once)
duckdb.sql("INSTALL h3 FROM community")
duckdb.sql("INSTALL httpfs")
duckdb.sql("INSTALL spatial")
duckdb.sql("INSTALL pdal FROM community")


def get_con():
    """In-memory connection for workers. LOAD only, no INSTALL."""
    con = duckdb.connect()
    con.sql("""
        SET temp_directory = './tmp';
        SET memory_limit = '512';
        SET s3_region = 'us-west-2';
        LOAD h3;
        LOAD httpfs;
        LOAD spatial;
        LOAD pdal;
        SET enable_progress_bar = false;
    """)
    return con

# Config
ept_url = "https://s3-us-west-2.amazonaws.com/usgs-lidar-public/CA_SanFrancisco_1_B23/ept.json"
src_crs = 'EPSG:3857'
dst_crs = 'EPSG:4326'
res = 11

# Full dataset extent from PDAL_Info metadata
bbox_min_x, bbox_min_y = -13638426, 4536715
bbox_max_x, bbox_max_y = -13617318, 4556481

lat = f"ST_Y(ST_Transform(ST_Point(X, Y), '{src_crs}', '{dst_crs}', always_xy := true))"
lng = f"ST_X(ST_Transform(ST_Point(X, Y), '{src_crs}', '{dst_crs}', always_xy := true))"

# Use mercantile to tile the bbox — aligns with how EPT octree is organized
sw = mercantile.lnglat(bbox_min_x, bbox_min_y)
ne = mercantile.lnglat(bbox_max_x, bbox_max_y)
zoom = 18

tiles = list(mercantile.tiles(sw.lng, sw.lat, ne.lng, ne.lat, zooms=zoom))
print(f"{len(tiles)} tiles at z{zoom}")


def process_tile(tile):
    """Process a single tile with its own in-memory DuckDB connection."""
    con = get_con()
    tb = mercantile.xy_bounds(tile)
    tile_bounds = f"([{tb.left},{tb.right}],[{tb.bottom},{tb.top}])"
    result = con.sql(f"""
        SELECT 
            h3_latlng_to_cell({lat}, {lng}, {res}) AS hex,
            AVG(Z) AS avg_elevation,
            MIN(Z) AS min_z,
            MAX(Z) AS max_z,
            MAX(Z) - MIN(Z) AS z_range,
            COUNT(1) AS cnt
        FROM PDAL_Read('{ept_url}', options => MAP {{'bounds': '{tile_bounds}'}})
        GROUP BY 1;
      
    """).fetch_arrow_table()
    con.close()
    return tile, result


max_workers = 4
start = time.time()
results = []

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = {executor.submit(process_tile, t): t for t in tiles}
    for future in concurrent.futures.as_completed(futures):
        tile = futures[future]
        try:
            _, tbl = future.result()
            results.append(tbl)
            print(f"  z{zoom}/{tile.x}/{tile.y} — {tbl.num_rows} hex ({time.time()-start:.0f}s)")
        except Exception as e:
            print(f"  z{zoom}/{tile.x}/{tile.y} FAILED: {e}")

# Combine Arrow tables
combined = pa.concat_tables(results)
print(f"\n{combined.num_rows} raw hex rows from {len(results)} tiles")

# Re-aggregate hex spanning tile boundaries, write to persistent db
db = duckdb.connect('duckdb/san_fran_ept_lpc.ddb')
db.sql("LOAD h3")
db.sql("CREATE OR REPLACE TABLE san_fran_res_11 AS SELECT * FROM combined")
db.sql("""
    CREATE OR REPLACE TABLE san_fran_res_11 AS
    SELECT hex,
           SUM(avg_elevation * cnt) / SUM(cnt) AS avg_elevation,
           MIN(min_z) AS min_z, MAX(max_z) AS max_z,
           MAX(max_z) - MIN(min_z) AS z_range,
           SUM(cnt) AS cnt
    FROM san_fran_res_11
    GROUP BY 1
""")

elapsed = time.time() - start
df = db.sql("FROM san_fran_res_11").df()
print(f"{len(df)} hex, {df['cnt'].sum():,} points, {elapsed:.1f}s")
# db.close()

18070 tiles at z18


In [2]:

  import duckdb, mercantile

  con = duckdb.connect()
  con.sql("""
      SET s3_region = 'us-west-2';
      LOAD h3; LOAD httpfs; LOAD spatial;INSTALL pdal FROM community; LOAD pdal;
  """)

  ept_url = "https://s3-us-west-2.amazonaws.com/usgs-lidar-public/CA_SanFrancisco_1_B23/ept.json"
  tile = mercantile.Tile(x=41926, y=101398, z=18)
  tb = mercantile.xy_bounds(tile)
  bounds = f"([{tb.left},{tb.right}],[{tb.bottom},{tb.top}])"

  result = con.sql(f"""
      SELECT COUNT(*) FROM PDAL_Read('{ept_url}', options => MAP {{'bounds': '{bounds}'}})
  """).fetchone()
  print(f"Single tile: {result[0]} points")

HTTPException: HTTP Error: Failed to download extension "pdal" at URL "http://community-extensions.duckdb.org/v1.3.2/osx_arm64/pdal.duckdb_extension.gz" (HTTP 404)

Candidate extensions: "delta", "ducklake", "parquet", "spatial", "md"
For more info, visit https://duckdb.org/docs/stable/extensions/troubleshooting/?version=v1.3.2&platform=osx_arm64&extension=pdal

In [4]:
import pdal
pipeline = pdal.Reader.ept(
    "https://s3-us-west-2.amazonaws.com/usgs-lidar-public/CA_SanFrancisco_1_B23/ept.json",
    bounds="([-13638426, -13617318], [4536715, 4556481])"
)

In [None]:
import pdal
import pyarrow as pa

# 1. Define the pipeline
# We use the pipeline string approach to ensure all stages are connected correctly
pipeline_json = """
{
    "pipeline": [
        {
            "type": "readers.ept",
            "filename": "https://s3-us-west-2.amazonaws.com/usgs-lidar-public/CA_SanFrancisco_1_B23/ept.json",
            "bounds": "([-13638426, -13617318], [4536715, 4556481])"
        }
    ]
}
"""

# 2. Execute the pipeline
pipeline = pdal.Pipeline(pipeline_json)
count = pipeline.execute()

# 3. Retrieve the result as a NumPy structured array
# PDAL's .arrays property returns a list of arrays (one for each view)
arrays = pipeline.arrays
if len(arrays) > 0:
    data = arrays[0]
    
    # 4. Convert to PyArrow Table
    # pa.Table.from_pydict or from_arrays works well with structured arrays
    table = pa.Table.from_struct_array(pa.array(data))
    
    print(f"Successfully converted {count} points to Arrow.")
    print(table.schema)

In [None]:
import pdal
import pyarrow as pa
import duckdb
import mercantile
import concurrent.futures
import time

# Config
ept_url = "https://s3-us-west-2.amazonaws.com/usgs-lidar-public/CA_SanFrancisco_1_B23/ept.json"
res = 11
src_crs = 'EPSG:3857'
dst_crs = 'EPSG:4326'

# Bounding Box for SF
bbox_min_x, bbox_min_y = -13638426, 4536715
bbox_max_x, bbox_max_y = -13617318, 4556481

sw = mercantile.lnglat(bbox_min_x, bbox_min_y)
ne = mercantile.lnglat(bbox_max_x, bbox_max_y)
tiles = list(mercantile.tiles(sw.lng, sw.lat, ne.lng, ne.lat, zooms=18))

def fetch_tile(tile):
    """Worker: Fetches data via PDAL and returns an Arrow Table."""
    tb = mercantile.xy_bounds(tile)
    bounds = f"([{tb.left},{tb.right}],[{tb.bottom},{tb.top}])"
    
    # PDAL pipeline configuration
    pdal_json = {
        "pipeline": [
            {
                "type": "readers.ept",
                "filename": ept_url,
                "bounds": bounds
            }
        ]
    }
    
    try:
        pipeline = pdal.Pipeline(pdal.Reader.ept(**pdal_json["pipeline"][0]).pipeline)
        pipeline.execute()
        # Convert NumPy structured array to Arrow Table
        return pa.Table.from_struct_array(pa.array(pipeline.arrays[0]))
    except Exception as e:
        return None

# --- Step 1: Parallel Fetch ---
start = time.time()
results = []
print(f"Fetching {len(tiles)} tiles...")

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    futures = {executor.submit(fetch_tile, t): t for t in tiles}
    for future in concurrent.futures.as_completed(futures):
        tbl = future.result()
        if tbl and tbl.num_rows > 0:
            results.append(tbl)

if not results:
    print("No data fetched. Check your bounds.")
    exit()

# Combine all tile tables into one massive Arrow Table in memory
combined_table = pa.concat_tables(results)
print(f"Fetched {combined_table.num_rows:,} total points in {time.time()-start:.1f}s")

# --- Step 2: Aggregation and Storage in DuckDB ---
# Connect to your persistent database
db = duckdb.connect('duckdb/san_fran_ept_lpc.ddb')
db.sql("INSTALL h3 FROM community; LOAD h3;")
db.sql("INSTALL spatial; LOAD spatial;")

print("Processing H3 bins and writing to disk...")

# DuckDB can query the 'combined_table' variable directly from the local Python scope
db.sql(f"""
    CREATE OR REPLACE TABLE san_fran_res_11 AS
    SELECT 
        h3_latlng_to_cell(
            ST_Y(ST_Transform(ST_Point(X, Y), '{src_crs}', '{dst_crs}', always_xy := true)),
            ST_X(ST_Transform(ST_Point(X, Y), '{src_crs}', '{dst_crs}', always_xy := true)),
            {res}
        ) AS hex,
        AVG(Z) AS avg_elevation,
        MIN(Z) AS min_z,
        MAX(Z) AS max_z,
        MAX(Z) - MIN(Z) AS z_range,
        COUNT(*) AS cnt
    FROM combined_table
    GROUP BY 1
""")

final_count = db.sql("SELECT COUNT(*) FROM san_fran_res_11").fetchone()[0]
print(f"Finished! {final_count:,} unique H3 cells stored in san_fran_res_11.")
# db.close()

Fetching 18070 tiles...
No data fetched. Check your bounds.


ArrowInvalid: Must pass at least one table

: 

In [None]:
import pdal
import pyarrow as pa
import duckdb
import mercantile
import concurrent.futures
import time
import os

# --- Configuration ---
ept_url = "https://s3-us-west-2.amazonaws.com/usgs-lidar-public/CA_SanFrancisco_1_B23/ept.json"
res = 11
src_crs = 'EPSG:3857'
dst_crs = 'EPSG:4326'
db_path = 'duckdb/san_fran_ept_lpc.ddb'

os.makedirs(os.path.dirname(db_path), exist_ok=True)

# Full extent from your metadata
min_x, min_y = -13638426.0, 4536715.0
max_x, max_y = -13617318.0, 4556481.0

sw = mercantile.lnglat(min_x, min_y)
ne = mercantile.lnglat(max_x, max_y)
tiles = list(mercantile.tiles(sw.lng, sw.lat, ne.lng, ne.lat, zooms=18))

# --- Setup DuckDB ---
db = duckdb.connect(db_path)
db.sql("INSTALL h3 FROM community; LOAD h3;")
db.sql("INSTALL spatial; LOAD spatial;")

# Create a staging table to hold raw points temporarily
db.sql("CREATE OR REPLACE TEMP TABLE staging_points (X DOUBLE, Y DOUBLE, Z DOUBLE)")

def fetch_and_insert(tile):
    """Worker: Fetches data via PDAL and returns an Arrow Table to the main thread."""
    tb = mercantile.xy_bounds(tile)
    bounds = f"([{tb.left},{tb.right}],[{tb.bottom},{tb.top}])"
    
    try:
        # Requesting slightly lower resolution or specific dimensions saves bandwidth
        reader = pdal.Reader.ept(filename=ept_url, bounds=bounds, requests=4)
        pipeline = reader.pipeline()
        pipeline.execute()
        
        arr = pipeline.arrays[0]
        if len(arr) > 0:
            # We only need X, Y, Z for your H3 analysis
            # Using only necessary columns keeps the Arrow table lean
            return pa.Table.from_arrays(
                [pa.array(arr['X']), pa.array(arr['Y']), pa.array(arr['Z'])],
                names=['X', 'Y', 'Z']
            )
    except Exception:
        return None

# --- Execution ---
start_time = time.time()
print(f"Streaming {len(tiles)} tiles into DuckDB...")

with concurrent.futures.ThreadPoolExecutor(max_workers=7) as executor:
    future_to_tile = {executor.submit(fetch_and_insert, t): t for t in tiles}
    
    for i, future in enumerate(concurrent.futures.as_completed(future_to_tile)):
        arrow_batch = future.result()
        if arrow_batch:
            # Pushes Arrow data into DuckDB staging table
            db.register('temp_chunk', arrow_batch)
            db.sql("INSERT INTO staging_points SELECT * FROM temp_chunk")
            db.unregister('temp_chunk')
            
        if i % 20 == 0:
            count = db.sql("SELECT count(*) FROM staging_points").fetchone()[0]
            print(f"Tiles: {i}/{len(tiles)} | Points staged: {count:,}")

# --- Final Aggregation ---
print("Calculating H3 Hexagons and final aggregates...")



db.sql(f"""
    CREATE OR REPLACE TABLE san_fran_res_{res} AS
    SELECT 
        h3_latlng_to_cell(
            ST_Y(ST_Transform(ST_Point(X, Y), '{src_crs}', '{dst_crs}', always_xy := true)),
            ST_X(ST_Transform(ST_Point(X, Y), '{src_crs}', '{dst_crs}', always_xy := true)),
            {res}
        ) AS hex,
        AVG(Z) AS avg_elevation,
        MIN(Z) AS min_z,
        MAX(Z) AS max_z,
        COUNT(*) AS cnt
    FROM staging_points
    GROUP BY 1
""")

# Clean up staging table to free disk space
db.sql("DROP TABLE staging_points")

runtime = time.time() - start_time
final_hex_count = db.sql(f"SELECT COUNT(*) FROM san_fran_res_{res}").fetchone()[0]
print(f"Finished in {runtime/60:.2f} minutes.")
print(f"Final Result: {final_hex_count:,} hexagons stored.")
db.close()

Streaming 18070 tiles into DuckDB...
Tiles: 0/18070 | Points staged: 0
Tiles: 20/18070 | Points staged: 0
Tiles: 40/18070 | Points staged: 0


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Tiles: 60/18070 | Points staged: 449,760
Tiles: 80/18070 | Points staged: 449,760
Tiles: 100/18070 | Points staged: 449,760
Tiles: 120/18070 | Points staged: 449,760
Tiles: 140/18070 | Points staged: 449,760
Tiles: 160/18070 | Points staged: 449,760
Tiles: 180/18070 | Points staged: 594,633
Tiles: 200/18070 | Points staged: 7,432,078
Tiles: 220/18070 | Points staged: 7,432,078
Tiles: 240/18070 | Points staged: 7,432,078
Tiles: 260/18070 | Points staged: 7,432,078
Tiles: 280/18070 | Points staged: 7,432,078
Tiles: 300/18070 | Points staged: 7,432,078
Tiles: 320/18070 | Points staged: 20,207,664
Tiles: 340/18070 | Points staged: 27,052,525
Tiles: 360/18070 | Points staged: 27,052,525
Tiles: 380/18070 | Points staged: 27,052,525
Tiles: 400/18070 | Points staged: 27,052,525
Tiles: 420/18070 | Points staged: 27,052,525
Tiles: 440/18070 | Points staged: 29,095,777
Tiles: 460/18070 | Points staged: 52,632,567
Tiles: 480/18070 | Points staged: 56,358,324
Tiles: 500/18070 | Points staged: 56,35

In [2]:
# --- USER PARAMETERS ---
EPT_URL = "https://s3-us-west-2.amazonaws.com/usgs-lidar-public/CA_SanFrancisco_1_B23/ept.json"
H3_RES = 12
SRC_CRS = 'EPSG:3857'
DST_CRS = 'EPSG:4326'
DB_PATH = 'duckdb/san_fran_ept_lpc.ddb'
TILE_ZOOM = 16
MAX_WORKERS = 14
SUB_RESOLUTION = None  # Example: 1.0 (meters) to thin the data as it downloads
BBOX = [-13638426.0, 4536715.0, -13617318.0, 4556481.0] # [min_x, min_y, max_x, max_y]

In [3]:
# # Install extensions globally (only needed once)
import duckdb
duckdb.sql("INSTALL h3 FROM community")
duckdb.sql("INSTALL httpfs")
duckdb.sql("INSTALL spatial")
# duckdb.sql("INSTALL pdal FROM community")
def get_con():
    """In-memory connection for workers. LOAD only, no INSTALL."""
    con = duckdb.connect()
    con.sql("""
        SET temp_directory = './tmp';
        SET memory_limit = '512MB';
     --   SET s3_region = 'us-west-2';
        LOAD h3;
        LOAD httpfs;
        LOAD spatial;
        SET enable_progress_bar = false;
    """)
    return con

In [4]:
import pdal
import pyarrow as pa
import duckdb
import mercantile
import concurrent.futures
import time
import os

def process_tile_to_h3(tile):
    """Worker: PDAL Points -> H3 Aggregates -> Returns Arrow Table"""
    tb = mercantile.xy_bounds(tile)
    bounds = f"([{tb.left},{tb.right}],[{tb.bottom},{tb.top}])"
    
    # Configure PDAL Reader
    reader_opts = {"filename": EPT_URL, "bounds": bounds}
    if SUB_RESOLUTION:
        reader_opts["resolution"] = SUB_RESOLUTION

    try:
        # 1. PDAL Fetch
        pipeline = pdal.Reader.ept(**reader_opts).pipeline()
        count = pipeline.execute()
        if count == 0 or len(pipeline.arrays) == 0:
            return None
        arr = pipeline.arrays[0]
        if len(arr) == 0: return None

        # 2. Bridge to Arrow (Limited columns to save RAM)
        arrow_tbl = pa.Table.from_arrays(
            [pa.array(arr['X']), pa.array(arr['Y']), pa.array(arr['Z'])],
            names=['X', 'Y', 'Z']
        )
        
        # 3. Local In-Memory Aggregation
        con = get_con()
        con.register('tile_data', arrow_tbl)
        hex_summary = con.sql(f"""
            SELECT 
                h3_latlng_to_cell(
                    ST_Y(ST_Transform(ST_Point(X, Y), '{SRC_CRS}', '{DST_CRS}', always_xy := true)), 
                    ST_X(ST_Transform(ST_Point(X, Y), '{SRC_CRS}', '{DST_CRS}', always_xy := true)), 
                    {H3_RES}
                ) AS hex,
                AVG(Z) AS avg_z,
                MIN(Z) AS min_z,
                MAX(Z) AS max_z,
                MAX(Z) - MIN(Z) AS z_range,
                COUNT(*) AS cnt
            FROM tile_data
            GROUP BY 1
        """).fetch_arrow_table()
        con.unregister('tile_data')
        con.close()
        return hex_summary
    except Exception as e:
        print(f"  TILE FAILED {tile}: {e}")
        return None

In [5]:
def run_pipeline():
    os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
    
    # 1. Generate Tile List
    sw = mercantile.lnglat(BBOX[0], BBOX[1])
    ne = mercantile.lnglat(BBOX[2], BBOX[3])
    tiles = list(mercantile.tiles(sw.lng, sw.lat, ne.lng, ne.lat, zooms=[TILE_ZOOM]))
    
    # 2. Initialize Persistent Storage
    main_db = duckdb.connect(DB_PATH)
    main_db.sql(f"""
        CREATE OR REPLACE TABLE raw_hex_batches (
            hex UBIGINT, avg_z DOUBLE, min_z DOUBLE, max_z DOUBLE, z_range DOUBLE, cnt BIGINT
        )
    """)

    print(f"Processing {len(tiles)} tiles with {MAX_WORKERS} workers...")
    start = time.time()

    # 3. Parallel Process
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(process_tile_to_h3, t): t for t in tiles}
        for i, future in enumerate(concurrent.futures.as_completed(futures)):
            pa_temp_table = future.result()
            if pa_temp_table:
                main_db.sql("INSERT INTO raw_hex_batches SELECT * FROM pa_temp_table")
                del pa_temp_table
            
            if i % 100 == 0:
                print(f"Batch {i}/{len(tiles)} | Time: {time.time()-start:.1f}s")

    # 4. Final Global Reduction
    # Merge hex spanning tile boundaries with weighted average
    print("Finalizing global reduction...")
    main_db.sql("LOAD h3")
    main_db.sql(f"""
        CREATE OR REPLACE TABLE san_fran_h3_res_{H3_RES} AS
        SELECT 
            hex,
            SUM(avg_z * cnt) / SUM(cnt) AS avg_z,
            MIN(min_z) AS min_z,
            MAX(max_z) AS max_z,
            MAX(max_z) - MIN(min_z) AS z_range,
            SUM(cnt) AS cnt
        FROM raw_hex_batches
        GROUP BY 1
    """)
    
    print(f"Done! Created table: san_fran_h3_res_{H3_RES}")
    main_db.sql(f"SELECT COUNT(*) as total_hexagons FROM san_fran_h3_res_{H3_RES}").show()
    elapsed = time.time() - start
    print(f"Elapsed: {elapsed/60:.1f} min")

run_pipeline()

Processing 1188 tiles with 14 workers...
Batch 0/1188 | Time: 1.1s


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Batch 100/1188 | Time: 75.2s


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Batch 200/1188 | Time: 382.4s


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Batch 300/1188 | Time: 786.3s


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Batch 400/1188 | Time: 1211.2s


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Batch 500/1188 | Time: 1612.2s
Batch 600/1188 | Time: 1987.8s


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Batch 700/1188 | Time: 2373.0s


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Batch 800/1188 | Time: 2616.1s


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Batch 900/1188 | Time: 2748.2s


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Batch 1000/1188 | Time: 2798.5s
Batch 1100/1188 | Time: 2807.2s
Finalizing global reduction...
Done! Created table: san_fran_h3_res_12
┌────────────────┐
│ total_hexagons │
│     int64      │
├────────────────┤
│         426731 │
└────────────────┘

Elapsed: 46.9 min


In [6]:
import duckdb
con = duckdb.connect(DB_PATH)
# con.sql("show tables")
con.table(f"raw_hex_batches")
con.sql("select *from  raw_hex_batches")

┌────────────────────┬─────────────────────┬───────────────────────┬─────────────────────┬─────────────────────┬───────┐
│        hex         │        avg_z        │         min_z         │        max_z        │       z_range       │  cnt  │
│       uint64       │       double        │        double         │       double        │       double        │ int64 │
├────────────────────┼─────────────────────┼───────────────────────┼─────────────────────┼─────────────────────┼───────┤
│ 631210973956483583 │  0.1011799999999956 │  -0.11000000000000437 │  0.4999999999999956 │                0.61 │   500 │
│ 631210973956498431 │   0.092147651006707 │  -0.07000000000000438 │ 0.45999999999999563 │                0.53 │   298 │
│ 631210973956497919 │ 0.14130940052928082 │ -0.010000000000004372 │  0.5599999999999956 │                0.57 │  8691 │
│ 631210973958019071 │ 0.14426954732509864 │   0.06999999999999564 │ 0.21999999999999564 │ 0.15000000000000002 │   972 │
│ 631210973956499455 │ 0.1324147

In [17]:
con.sql("select min(max_z), max(max_z) from san_fran_h3_res_12")

┌─────────────────────┬────────────┐
│     min(max_z)      │ max(max_z) │
│       double        │   double   │
├─────────────────────┼────────────┤
│ -0.6900000000000044 │     590.46 │
└─────────────────────┴────────────┘

In [37]:
from lonboard import Map, H3HexagonLayer
from arro3.core import Table
from lonboard.colormap import apply_continuous_cmap
from palettable.matplotlib import Viridis_20
from matplotlib.colors import Normalize, LogNorm
import numpy as np


In [30]:
con = duckdb.connect(DB_PATH)
con.sql("install h3 from community; load h3")
df = con.sql("select h3_h3_to_string(hex) as hex, max_z, avg_z  from san_fran_h3_res_12").fetch_arrow_table()
table = Table.from_arrow(df)

In [39]:
heights = table["max_z"].to_numpy()
heights = np.nan_to_num(heights, nan=0)
normalizer_heights = Normalize(0, 590)
normalizer_range = LogNorm(0, 200)
normalized_heights = normalizer_heights(table['max_z'])
normalized_range = normalizer_heights(table['avg_z'])
colors = apply_continuous_cmap(normalized, Viridis_20)
layer = H3HexagonLayer(
    table,
    get_hexagon=table["hex"],
    get_fill_color=colors,
    extruded=True,
    get_elevation=heights,
    elevation_scale=3,
    high_precision=True,
    stroked=False,
    auto_highlight=False,
    opacity=1,
    coverage=1,
    )

m = Map(layers=layer)
m


NameError: name 'normalized' is not defined

## Visualization options with current schema

Current columns: `hex`, `avg_z`, `min_z`, `max_z`, `z_range`, `cnt`

### What each metric tells you
- **`avg_z`** = terrain elevation — basically a DEM. Shows SF hills but nothing you can't get from a raster DEM
- **`z_range`** = vertical complexity — **this is the interesting one**. Flat ground ≈ 0, buildings/trees/bridges have high z_range. This reveals 3D structures
- **`cnt`** = point density — shows data coverage (water/gaps vs dense urban)
- **`min_z`** = ground surface proxy (not perfect — some noise below ground)
- **`max_z`** = highest return — rooftops, treetops, bridge decks

### Best combos for lonboard H3HexagonLayer
1. **Color by `z_range`, extrude by `z_range`** — structures pop off the map, flat ground stays flat. Most visually striking.
2. **Color by `avg_z`, extrude by `z_range`** — dual encoding: color = terrain elevation, height = structure complexity. Shows WHERE structures are AND what elevation they sit at.
3. **Color by `cnt`, no extrusion** — data coverage / quality map

### For suspended hex (bridges etc.) — current schema is NOT enough
- `H3HexagonLayer` always starts extrusion from z=0 (ground). No base elevation offset.
- Need `deck.gl ColumnLayer` with `diskResolution=6` + `getPosition=[lng, lat, min_z]` to float hex
- Would require adding **centroid lat/lng** to the pipeline output
- Also need **LiDAR classification** to distinguish bridge (class 17) from building (6) from ground (2)

### Next run: richer schema for structures
Add Classification to the worker Arrow table and aggregate per-hex:
```python
# In process_tile_to_h3, grab Classification too:
arrow_tbl = pa.Table.from_arrays(
    [pa.array(arr['X']), pa.array(arr['Y']), pa.array(arr['Z']), pa.array(arr['Classification'])],
    names=['X', 'Y', 'Z', 'Classification']
)

# Then in the DuckDB query, add:
#   COUNT(*) FILTER (WHERE Classification = 2) AS ground_cnt,
#   COUNT(*) FILTER (WHERE Classification = 6) AS building_cnt,
#   COUNT(*) FILTER (WHERE Classification = 17) AS bridge_cnt,
#   AVG(Z) FILTER (WHERE Classification = 2) AS ground_z,
#   h3_cell_to_lat(hex) AS lat,
#   h3_cell_to_lng(hex) AS lng
```
With `ground_z` as base and `max_z - ground_z` as structure height, you could render suspended hex via ColumnLayer.

In [42]:
# 3D elevation map — color + extrude by max_z
from lonboard import Map, H3HexagonLayer
from arro3.core import Table
from lonboard.colormap import apply_continuous_cmap
from palettable.matplotlib import Viridis_20
from matplotlib.colors import Normalize
import numpy as np

con = duckdb.connect(DB_PATH)
con.sql("LOAD h3")
df = con.sql(f"""
    SELECT h3_h3_to_string(hex) AS hex, max_z, avg_z, z_range, cnt
    FROM san_fran_h3_res_{H3_RES}
    WHERE cnt > 10
""").fetch_arrow_table()
table = Table.from_arrow(df)

max_z = np.array(table["max_z"])
max_z = np.nan_to_num(max_z, nan=0)
normalizer = Normalize(vmin=max_z.min(), vmax=np.percentile(max_z, 99), clip=True)
normalized = normalizer(max_z)

colors = apply_continuous_cmap(normalized, Viridis_20)
layer = H3HexagonLayer(
    table,
    get_hexagon=table["hex"],
    get_fill_color=colors,
    extruded=True,
    get_elevation=max_z,
    elevation_scale=3,
    stroked=False,
    opacity=1,
    coverage=1,
)

Map(
    layers=[layer],
    view_state={"longitude": -122.44, "latitude": 37.76, "zoom": 12, "pitch": 60, "bearing": 30},
)

VBox(children=(<lonboard._map.Map object at 0x17c041190>, VBox(children=(ErrorOutput(), ErrorOutput()), layout…