# Multi-Site Canopy Height Model (CHM) Batch Processing

Generate Canopy Height Models from USGS 3DEP lidar data for multiple forest sites.

**Features:**
- Process multiple sites from `forest_sites.yaml` configuration
- Adaptive resolution based on point density
- Direct CHM calculation (no smoothing/IDW)
- Automatic metadata and summary generation
- Designed for CyVerse VICE environment

## Requirements

Create and activate the 3dep conda environment:

```bash
# Create the environment (first time only)
mamba env create -f environments/3dep-environment.yml

# Install the Jupyter kernel
/opt/conda/envs/3dep/bin/python -m ipykernel install --user --name 3dep --display-name "Python (3DEP)"

# Activate the environment
conda activate 3dep
```

Then select the **"Python (3DEP)"** kernel in Jupyter before running this notebook.

## 1. Setup and Configuration

In [4]:
# Environment validation
import sys

REQUIRED = ['pdal', 'geopandas', 'rioxarray', 'pyproj', 'shapely', 'numpy', 'yaml']
missing = []
for pkg in REQUIRED:
    try:
        __import__(pkg if pkg != 'yaml' else 'yaml')
    except ImportError:
        missing.append(pkg)

if missing:
    raise ImportError(f"Missing: {missing}. Run: conda activate 3dep")

print("Environment OK!")

Environment OK!


In [5]:
# Import libraries
import json
import os
from pathlib import Path
from datetime import datetime
import traceback

import geopandas as gpd
import numpy as np
import pandas as pd
import pdal
import pyproj
import requests
import rioxarray as rio
import yaml
from rasterio.enums import Resampling
from shapely.geometry import box
from shapely.ops import transform
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

print(f"Loaded at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

Loaded at 2025-12-19 16:51:20


In [6]:
# Configuration
# Output directory for all processing outputs
OUTPUT_BASE = Path("/home/jovyan/data-store/data/output/3dep")
SITES_CONFIG = Path("../forest_sites.yaml")

# Create directories
for subdir in ['chm', 'dtm', 'dsm', 'logs']:
    (OUTPUT_BASE / subdir).mkdir(parents=True, exist_ok=True)

print(f"Output: {OUTPUT_BASE.absolute()}")
print(f"Config: {SITES_CONFIG.absolute()}")

Output: /home/jovyan/data-store/data/output/3dep
Config: /home/jovyan/data-store/fractal-notebooks/docs/notebooks/3dep/modified/../forest_sites.yaml


## 2. Load Site Configuration

In [None]:
# Load forest sites configuration
with open(SITES_CONFIG, 'r') as f:
    config = yaml.safe_load(f)

sites = config['sites']
processing_config = config['processing']
groups = config['groups']

print(f"Loaded {len(sites)} forest sites")
print(f"Available groups: {list(groups.keys())}")
print(f"\nProcessing settings:")
print(f"  Density threshold: {processing_config['density_threshold']} pts/mÂ²")
print(f"  High resolution: {processing_config['resolution_high']}m")
print(f"  Standard resolution: {processing_config['resolution_standard']}m")

In [None]:
# Display available sites
site_list = []
for site_id, site_info in sites.items():
    site_list.append({
        'id': site_id,
        'name': site_info['name'],
        'state': site_info['state'],
        'forest_type': site_info['forest_type'],
        'priority': site_info.get('priority', 3),
        'expected_height': site_info.get('expected_max_height_m', 'N/A')
    })

df_sites = pd.DataFrame(site_list).sort_values(['priority', 'state'])
print("\nAvailable Forest Sites:")
print(df_sites.to_string(index=False))

## 3. Select Sites to Process

In [None]:
# SELECT SITES TO PROCESS
# Option 1: Process specific sites by ID
# SELECTED_SITES = ['sequoia_giant_forest', 'redwood_humboldt', 'great_smoky_cove']

# Option 2: Process a predefined group
# SELECTED_SITES = groups['all_priority_1']['sites']

# Option 3: Process all sites (be careful - this is a lot of data!)
# SELECTED_SITES = list(sites.keys())

# Default: Process priority 1 sites
SELECTED_SITES = groups['all_priority_1']['sites']

print(f"Selected {len(SELECTED_SITES)} sites for processing:")
for site_id in SELECTED_SITES:
    print(f"  - {site_id}: {sites[site_id]['name']}")

## 4. Core Processing Functions

In [None]:
def gcs_to_proj(poly):
    """Reproject from EPSG:4326 to EPSG:3857."""
    wgs84 = pyproj.CRS("EPSG:4326")
    web_mercator = pyproj.CRS("EPSG:3857")
    project = pyproj.Transformer.from_crs(wgs84, web_mercator, always_xy=True).transform
    return transform(project, poly)


def bbox_to_polygon(bbox):
    """Convert [west, south, east, north] to (poly_4326, poly_3857)."""
    poly = box(*bbox)
    return poly, gcs_to_proj(poly)


def get_resolution(density):
    """Get resolution based on point density."""
    if density >= processing_config['density_threshold']:
        return processing_config['resolution_high']
    return processing_config['resolution_standard']


def make_dem_pipeline(extent_wkt, dataset_names, pc_res, dem_res, dem_type, out_path, out_crs=3857):
    """Build PDAL pipeline for DEM generation (no IDW)."""
    readers = []
    for name in dataset_names:
        readers.append({
            "type": "readers.ept",
            "filename": f"https://s3-us-west-2.amazonaws.com/usgs-lidar-public/{name}/ept.json",
            "polygon": extent_wkt,
            "requests": 3,
            "resolution": pc_res
        })
    
    pipeline = {"pipeline": readers}
    
    # Filter noise
    pipeline['pipeline'].append({"type": "filters.range", "limits": "Classification![7:7]"})
    pipeline['pipeline'].append({"type": "filters.range", "limits": "Classification![18:18]"})
    
    # Reproject
    pipeline['pipeline'].append({"type": "filters.reprojection", "out_srs": f"EPSG:{out_crs}"})
    
    # Ground filter for DTM
    if dem_type == 'dtm':
        pipeline['pipeline'].append({"type": "filters.range", "limits": "Classification[2:2]"})
        grid_method = "min"
    else:
        grid_method = "max"
    
    # Writer - NO IDW
    pipeline['pipeline'].append({
        "type": "writers.gdal",
        "filename": str(out_path),
        "gdaldriver": "GTiff",
        "nodata": -9999,
        "output_type": grid_method,
        "resolution": float(dem_res),
        "gdalopts": "COMPRESS=LZW,TILED=YES,BLOCKXSIZE=256,BLOCKYSIZE=256"
    })
    
    return pipeline

In [None]:
def process_site(site_id, site_info, df_3dep, output_base):
    """Process a single site and generate CHM.
    
    Returns:
        dict with processing results and statistics
    """
    result = {
        'site_id': site_id,
        'name': site_info['name'],
        'status': 'pending',
        'error': None,
        'outputs': {},
        'statistics': {},
        'timing': {}
    }
    
    start_time = datetime.now()
    
    try:
        # Create site output directories
        chm_dir = output_base / 'chm' / site_id
        dtm_dir = output_base / 'dtm' / site_id
        dsm_dir = output_base / 'dsm' / site_id
        for d in [chm_dir, dtm_dir, dsm_dir]:
            d.mkdir(parents=True, exist_ok=True)
        
        # Convert bbox to polygons
        bbox = site_info['bbox']
        aoi_gcs, aoi_3857 = bbox_to_polygon(bbox)
        aoi_area_m2 = aoi_3857.area
        
        # Find intersecting 3DEP datasets
        intersecting = []
        for idx, row in df_3dep.iterrows():
            if row['geometry_3857'].intersects(aoi_3857):
                intersecting.append({
                    'name': row['name'],
                    'count': row['count'],
                    'area': row['geometry_3857'].area
                })
        
        if not intersecting:
            result['status'] = 'failed'
            result['error'] = 'No 3DEP coverage'
            return result
        
        dataset_names = [ds['name'] for ds in intersecting]
        
        # Estimate point density
        total_points_est = sum(
            (aoi_area_m2 / ds['area']) * ds['count'] 
            for ds in intersecting
        )
        density_est = total_points_est / aoi_area_m2
        
        # Select resolution
        resolution = get_resolution(density_est)
        
        result['statistics']['area_km2'] = aoi_area_m2 / 1e6
        result['statistics']['estimated_points'] = int(total_points_est)
        result['statistics']['estimated_density'] = density_est
        result['statistics']['resolution'] = resolution
        result['statistics']['datasets'] = dataset_names
        
        # Define output paths
        dsm_path = dsm_dir / f"{site_id}_dsm.tif"
        dtm_path = dtm_dir / f"{site_id}_dtm.tif"
        chm_path = chm_dir / f"{site_id}_chm.tif"
        
        # Generate DSM
        dsm_start = datetime.now()
        dsm_pipeline = make_dem_pipeline(
            aoi_3857.wkt, dataset_names, 1.0, resolution, 'dsm', dsm_path
        )
        pdal.Pipeline(json.dumps(dsm_pipeline)).execute_streaming(chunk_size=1000000)
        result['timing']['dsm_seconds'] = (datetime.now() - dsm_start).total_seconds()
        
        # Generate DTM
        dtm_start = datetime.now()
        dtm_pipeline = make_dem_pipeline(
            aoi_3857.wkt, dataset_names, 1.0, resolution, 'dtm', dtm_path
        )
        pdal.Pipeline(json.dumps(dtm_pipeline)).execute_streaming(chunk_size=1000000)
        result['timing']['dtm_seconds'] = (datetime.now() - dtm_start).total_seconds()
        
        # Calculate CHM
        chm_start = datetime.now()
        dsm = rio.open_rasterio(dsm_path, masked=True)
        dtm = rio.open_rasterio(dtm_path, masked=True)
        
        # Align if needed
        if dsm.shape != dtm.shape:
            if dsm.shape > dtm.shape:
                dsm = dsm.rio.reproject_match(dtm)
            else:
                dtm = dtm.rio.reproject_match(dsm)
        
        dsm = dsm.assign_coords({"x": dtm.x, "y": dtm.y})
        
        # Direct subtraction - NO smoothing
        chm = dsm - dtm
        chm = chm.compute()
        chm.rio.set_nodata(dtm.rio.nodata, inplace=True)
        chm.rio.to_raster(chm_path)
        
        result['timing']['chm_seconds'] = (datetime.now() - chm_start).total_seconds()
        
        # Calculate CHM statistics
        chm_data = chm.values.flatten()
        chm_valid = chm_data[~np.isnan(chm_data)]
        chm_valid = chm_valid[chm_valid != chm.rio.nodata]
        chm_heights = chm_valid[(chm_valid >= 0) & (chm_valid <= 150)]
        
        if len(chm_heights) > 0:
            result['statistics']['chm_min_m'] = float(np.min(chm_heights))
            result['statistics']['chm_max_m'] = float(np.max(chm_heights))
            result['statistics']['chm_mean_m'] = float(np.mean(chm_heights))
            result['statistics']['chm_median_m'] = float(np.median(chm_heights))
            result['statistics']['chm_std_m'] = float(np.std(chm_heights))
            result['statistics']['valid_pixels'] = int(len(chm_heights))
        
        # Generate preview
        preview_path = chm_dir / f"{site_id}_preview.png"
        fig, ax = plt.subplots(figsize=(8, 8))
        chm.squeeze().plot(ax=ax, cmap='Greens', robust=True)
        ax.set_title(f"CHM: {site_info['name']}")
        ax.set_aspect('equal')
        plt.savefig(preview_path, dpi=100, bbox_inches='tight')
        plt.close()
        
        # Cleanup
        dsm.close()
        dtm.close()
        chm.close()
        
        # Record outputs
        result['outputs'] = {
            'chm': str(chm_path),
            'dsm': str(dsm_path),
            'dtm': str(dtm_path),
            'preview': str(preview_path)
        }
        
        result['status'] = 'completed'
        
    except Exception as e:
        result['status'] = 'failed'
        result['error'] = str(e)
        result['traceback'] = traceback.format_exc()
    
    result['timing']['total_seconds'] = (datetime.now() - start_time).total_seconds()
    return result

## 5. Load 3DEP Boundaries

In [None]:
print("Loading 3DEP dataset boundaries...")

url = 'https://raw.githubusercontent.com/hobuinc/usgs-lidar/master/boundaries/resources.geojson'
r = requests.get(url)

boundaries_file = OUTPUT_BASE / 'resources.geojson'
with open(boundaries_file, 'w') as f:
    f.write(r.content.decode("utf-8"))

df_3dep = gpd.read_file(boundaries_file)
df_3dep['geometry_3857'] = df_3dep['geometry'].apply(gcs_to_proj)

print(f"Loaded {len(df_3dep)} 3DEP datasets")

## 6. Process Selected Sites

In [None]:
# Process all selected sites
results = []
batch_start = datetime.now()

print(f"\n{'='*60}")
print(f"BATCH PROCESSING: {len(SELECTED_SITES)} sites")
print(f"Started: {batch_start.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*60}\n")

for i, site_id in enumerate(tqdm(SELECTED_SITES, desc="Processing sites")):
    if site_id not in sites:
        print(f"  [SKIP] {site_id}: Not found in config")
        continue
    
    site_info = sites[site_id]
    print(f"\n[{i+1}/{len(SELECTED_SITES)}] Processing: {site_info['name']}")
    
    result = process_site(site_id, site_info, df_3dep, OUTPUT_BASE)
    results.append(result)
    
    if result['status'] == 'completed':
        print(f"  [OK] Completed in {result['timing']['total_seconds']:.1f}s")
        print(f"       Max height: {result['statistics'].get('chm_max_m', 'N/A'):.1f}m")
    else:
        print(f"  [FAILED] {result['error']}")

batch_end = datetime.now()
batch_elapsed = (batch_end - batch_start).total_seconds()

print(f"\n{'='*60}")
print(f"BATCH COMPLETE")
print(f"Total time: {batch_elapsed:.1f}s ({batch_elapsed/60:.1f} min)")
print(f"{'='*60}")

## 7. Generate Summary Report

In [None]:
# Summary statistics
completed = [r for r in results if r['status'] == 'completed']
failed = [r for r in results if r['status'] == 'failed']

print(f"\n{'='*60}")
print("PROCESSING SUMMARY")
print(f"{'='*60}")
print(f"Total sites: {len(results)}")
print(f"Completed:   {len(completed)}")
print(f"Failed:      {len(failed)}")

if completed:
    max_heights = [r['statistics'].get('chm_max_m', 0) for r in completed]
    mean_heights = [r['statistics'].get('chm_mean_m', 0) for r in completed]
    
    print(f"\nCanopy Height Statistics Across Sites:")
    print(f"  Tallest canopy: {max(max_heights):.1f}m")
    print(f"  Average max height: {np.mean(max_heights):.1f}m")
    print(f"  Average mean height: {np.mean(mean_heights):.1f}m")

if failed:
    print(f"\nFailed Sites:")
    for r in failed:
        print(f"  - {r['site_id']}: {r['error']}")

In [None]:
# Create results DataFrame
summary_data = []
for r in results:
    summary_data.append({
        'site_id': r['site_id'],
        'name': r['name'],
        'status': r['status'],
        'area_km2': r['statistics'].get('area_km2'),
        'resolution_m': r['statistics'].get('resolution'),
        'max_height_m': r['statistics'].get('chm_max_m'),
        'mean_height_m': r['statistics'].get('chm_mean_m'),
        'time_seconds': r['timing'].get('total_seconds'),
        'error': r.get('error')
    })

df_summary = pd.DataFrame(summary_data)
print("\nResults Summary:")
print(df_summary.to_string(index=False))

In [None]:
# Save detailed results
results_file = OUTPUT_BASE / 'logs' / f"batch_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"

batch_report = {
    'batch_info': {
        'started': batch_start.isoformat(),
        'completed': batch_end.isoformat(),
        'total_seconds': batch_elapsed,
        'sites_processed': len(results),
        'sites_completed': len(completed),
        'sites_failed': len(failed)
    },
    'processing_config': processing_config,
    'results': results
}

with open(results_file, 'w') as f:
    json.dump(batch_report, f, indent=2, default=str)

print(f"\nFull results saved to: {results_file}")

In [None]:
# Save CSV summary
csv_file = OUTPUT_BASE / 'logs' / f"batch_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df_summary.to_csv(csv_file, index=False)
print(f"CSV summary saved to: {csv_file}")

## 8. Visualize Results Comparison

In [None]:
# Compare canopy heights across sites
if len(completed) > 1:
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Bar chart of max heights
    ax1 = axes[0]
    df_completed = df_summary[df_summary['status'] == 'completed'].sort_values('max_height_m', ascending=True)
    ax1.barh(df_completed['name'], df_completed['max_height_m'], color='forestgreen')
    ax1.set_xlabel('Maximum Canopy Height (m)')
    ax1.set_title('Maximum Canopy Heights by Site')
    
    # Scatter: mean vs max height
    ax2 = axes[1]
    ax2.scatter(df_completed['mean_height_m'], df_completed['max_height_m'], 
                s=100, c='forestgreen', alpha=0.7)
    for _, row in df_completed.iterrows():
        ax2.annotate(row['site_id'], (row['mean_height_m'], row['max_height_m']),
                    fontsize=8, ha='left')
    ax2.set_xlabel('Mean Canopy Height (m)')
    ax2.set_ylabel('Maximum Canopy Height (m)')
    ax2.set_title('Mean vs Maximum Canopy Height')
    
    plt.tight_layout()
    
    comparison_path = OUTPUT_BASE / 'logs' / 'site_comparison.png'
    plt.savefig(comparison_path, dpi=150, bbox_inches='tight')
    print(f"Comparison plot saved to: {comparison_path}")
    
    plt.show()
else:
    print("Need at least 2 completed sites for comparison visualization.")

## 9. Output Summary

In [None]:
print("\n" + "="*60)
print("OUTPUT FILES")
print("="*60)
print(f"\nBase directory: {OUTPUT_BASE.absolute()}")

for r in completed:
    print(f"\n{r['site_id']}:")
    for output_type, path in r['outputs'].items():
        print(f"  {output_type}: {Path(path).name}")

---

## References

- Original workflows: [OpenTopography OT_3DEP_Workflows](https://github.com/OpenTopography/OT_3DEP_Workflows)
- 3DEP Program: https://www.usgs.gov/3d-elevation-program
- PDAL: https://pdal.io/