In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [11]:
!pip install --quiet geopandas rasterio shapely fiona pyproj rtree ruamel.yaml
!pip install arcgis --quiet

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/119.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m119.9/119.9 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/753.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m747.5/753.1 kB[0m [31m24.0 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m753.1/753.1 kB[0m [31m17.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [12]:
from arcgis.gis import GIS
from arcgis.features import FeatureLayer
from arcgis.geometry import SpatialReference

import os
import math
import json
import uuid
import glob
import shutil
from pathlib import Path
from typing import List, Tuple, Dict, Optional

import numpy as np
import pandas as pd
from PIL import Image, ImageDraw
import cv2

import geopandas as gpd
from shapely.geometry import Point, box as shp_box
from shapely.ops import transform as shp_transform

import rasterio
from rasterio.windows import Window
from rasterio.transform import Affine

from ruamel.yaml import YAML

# Optional: YOLOv8
# pip install ultralytics
try:
    from ultralytics import YOLO
    YOLO_AVAILABLE = True
except Exception:
    YOLO_AVAILABLE = False

# Optional: DeepForest
# pip install deepforest
try:
    from deepforest import main as deepforest_main
    DEEPFOREST_AVAILABLE = True
except Exception:
    DEEPFOREST_AVAILABLE = False

In [13]:
gis = GIS()

In [14]:
# If you want anonymous access:
# gis = GIS(anonymous=True)

# Or with credentials (works in local Python 3.11 with Esri SDK; in Colab uses REST):
gis = GIS("https://www.arcgis.com", username="stefany.carty", password="xyrMog-1jecbe-kegfif")

In [15]:
city_trees = r"https://maps6.stlouis-mo.gov/arcgis/rest/services/FORESTRY/FORESTRY_TREES/MapServer/0"
forest_park_trees = r"https://maps6.stlouis-mo.gov/arcgis/rest/services/FORESTRY/FORESTRY_TREES/MapServer/1"

In [16]:
class Config:  # Container for all configuration settings used across the pipeline
    def __init__(self, name, overwrite=False):  # Initialize with a run name and whether to overwrite outputs
        # Define run name
        self.name = name  # Store the experiment/run name so outputs are grouped per run

        # Input data
        self.AERIAL_DIR = "/content/drive/MyDrive/GIS/STL_TREES/chunk_732"   # folder of GeoTIFFs (.tif) to tile and label
        self.POINTS_PATH = "https://maps6.stlouis-mo.gov/arcgis/rest/services/FORESTRY/FORESTRY_TREES/MapServer/0"  # source of tree points (Esri Feature Service)

        # Tiling
        self.TILE_SIZE = 640            # pixels (YOLO-friendly) — 640×640 matches YOLOv8 default input size
        self.TILE_OVERLAP = 0.2         # 20% overlap to avoid missing trees on tile edges and boost context for detection
        self.MIN_VALID_PIXELS = 0.8     # discard tiles with >20% nodata — ensures model sees mostly real imagery
        self.RGB_BANDS = (1,2,3)        # RGB band indices (1-based) in TIF — maps to raster bands used to render tiles

        # Geo parameters
        self.TARGET_MPP = None  # target meters-per-pixel for resampling (None = use raster native resolution)
        self.POINT_BUFFER_M = 12.0  # approximate crown width (meters) used for circular/rectangular label width
        self.SHADOW_LENGTH_M = 60.0  # total rectangle length (crown + shadow) for rectangular labels (meters)
        self.SHADOW_ANGLE_DEG = -15.0  # default shadow direction (deg from north, clockwise) if not auto-estimated
        self.USE_CIRCULAR_LABELS = False  # toggle between circular (buffer) vs rectangular (crown+shadow) labels
        self.AUTO_DETECT_SHADOW = False  # if True, estimate dominant shadow angle from image edges/lines

        # YOLO training
        self.MODEL = "yolov8n"  # choose YOLOv8 variant (n = nano, fast/small; l/x = larger, slower, more accurate)
        self.EPOCHS = 80  # number of training epochs (full passes over dataset)
        self.BATCH = 16  # batch size per iteration (set based on GPU memory)
        self.IMG_SIZE = 640  # image size given to YOLO (should match tile size for simplicity)

        # Splits
        self.VAL_RATIO = 0.2  # fraction of data used for validation (model tuning)
        self.TEST_RATIO = 0.1  # fraction of data held out for final evaluation
        self.SEED = 42  # random seed for reproducibility (same splits each run)

        # Overwrite Outputs
        self.OVERWRITE = overwrite  # if True, allow re-creating directories/files; else keep existing outputs

        # Output
        self.OUT_DIR = f"/content/drive/MyDrive/GIS/STL_TREES/model_runs/{self.name}"  # root folder for this run’s artifacts
        self.TILES_DIR = os.path.join(self.OUT_DIR, "tiles")  # where image tiles (.jpg) will be saved
        self.LABELS_DIR = os.path.join(self.OUT_DIR, "labels")  # where YOLO label files (.txt) will be saved
        self.SPLITS_DIR = os.path.join(self.OUT_DIR, "splits")  # lists of train/val/test image paths
        self.VIZ_DIR = os.path.join(self.OUT_DIR, "visualizations")  # quick-look images with boxes drawn for QA
        self.METRICS_DIR = os.path.join(self.OUT_DIR, "metrics")  # place to store metrics, logs, curves
        self.MODELS_DIR = os.path.join(self.OUT_DIR, "models")  # place where trained model weights/checkpoints go

        self.ALL_DIRS = [  # convenient list of directories to create in one shot
            self.OUT_DIR, self.TILES_DIR, self.LABELS_DIR,
            self.SPLITS_DIR, self.VIZ_DIR, self.METRICS_DIR
        ]

        print("Initialization Successful.")  # basic confirmation so the user knows config built correctly
        self.initDirs()  # create all necessary directories (if not already present)

    def initDirs(self):  # utility to create all directories in self.ALL_DIRS
        _ = [os.makedirs(d, exist_ok=self.OVERWRITE) for d in self.ALL_DIRS]  # build each folder; allow overwrite based on flag

    def initChecks(self):  # verify that model directory and expected model name exist before training
        if os.path.exists(self.MODELS_DIR):  # confirm models root exists
            pass  # nothing to do — existence is enough
        else:
            raise Exception("Error: Model directory does not exist.")  # fail early to avoid confusing downstream errors

        if os.path.exists(os.path.join(self.METRICS_DIR, self.MODEL)):  # check presence of a subfolder named after the model
            pass  # OK — found expected model subfolder
        else:
            raise Exception(f"Model {self.MODEL} is missing in the models folder")  # alert user to misconfiguration

        print("Model found. Creating output folders.")  # status message before building folders
        self.initDirs()  # create remaining output folders if needed


# -------------------------------
# 1) UTILITIES
# -------------------------------

def list_tifs(folder: str) -> list[str]:  # return sorted list of all .tif files in a folder
    return sorted(glob.glob(os.path.join(folder, "*.tif")))  # use glob to find GeoTIFFs and sort for deterministic order

def load_points(points_path: str, target_crs=4326) -> gpd.GeoDataFrame:  # load tree points from URL or file, project to target CRS
    ## IF it is an Esri Feature Service or Web Map
    if "HTTPS" in points_path.upper():  # simple heuristic: treat HTTPS path as a web feature service URL
        gis_query = FeatureLayer(points_path).query(where="1=1", out_sr=target_crs.to_wkt())  # query all features and request target spatial ref
        attributes = [dict(x.attributes, LAT = x.geometry['y'], LON = x.geometry['x']) for x in gis_query.features]  # flatten attributes and add XY
        df = pd.DataFrame(attributes)  # convert to pandas DataFrame for easy manipulation
        gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.LON, df.LAT), crs = target_crs.to_wkt())  # build GeoDataFrame with correct CRS
        gdf.head()  # no-op preview in notebooks; safe even if not used programmatically

    else:  # else treat as a local file (e.g., shapefile/geojson)
        gdf = gpd.read_file(points_path)  # read geospatial file from disk into GeoDataFrame
        if gdf.crs is None:  # ensure the dataset has a defined CRS
            raise ValueError("Point dataset lacks a CRS; assign the correct CRS before running.")  # fail early if CRS is missing
        if target_crs and gdf.crs != target_crs:  # if target CRS requested and different from current
            gdf = gdf.to_crs(target_crs)  # reproject to target CRS for consistent math with rasters

    return gdf  # return GeoDataFrame of points in the requested CRS

def read_geotiff_rgb(path: str, bands=(1,2,3)) -> tuple[np.ndarray, dict]:  # read a GeoTIFF and return RGB image + metadata
    with rasterio.open(path) as src:  # open the raster safely using context manager
        count = src.count  # number of bands available in the file
        if count == 1:  # handle single-band grayscale imagery
            img = src.read(1)  # read the single band
            img = np.stack([img, img, img], axis=2)  # replicate into 3 channels to make pseudo-RGB
        else:
            b = [i for i in bands if i <= count]  # clip requested bands to what exists in the file
            img = src.read(b)  # read selected bands as (C, H, W)
            img = np.transpose(img, (1,2,0))  # HWC  # rearrange to height×width×channels for OpenCV/PIL
        meta = {  # capture necessary geospatial metadata for later transformations
            "crs": src.crs,  # coordinate reference system
            "transform": src.transform,  # affine transform from pixel to map coords
            "width": src.width,  # raster width in pixels
            "height": src.height,  # raster height in pixels
            "res": src.res,  # pixel size in map units (xres, yres)
            "nodata": src.nodata  # nodata value if defined, else None
        }
    # Normalize to uint8 if not already
    if img.dtype != np.uint8:  # many GeoTIFFs are 16-bit/float — convert to 8-bit for visualization/YOLO
        imin, imax = np.nanmin(img), np.nanmax(img)  # find min and max ignoring NaNs
        if imax > imin:  # avoid division by zero if image is constant
            img = ((img - imin) / (imax - imin) * 255.0).clip(0,255).astype(np.uint8)  # scale to [0,255] and cast to uint8
        else:
            img = np.zeros_like(img, dtype=np.uint8)  # if flat image, return a black image to keep shape consistent
    return img, meta  # return ready-to-tile RGB array and associated geospatial metadata

def tile_indices(width: int, height: int, tile: int, overlap: float):  # generator yielding (x,y) pixel offsets for tiles with overlap
    step = int(tile * (1.0 - overlap))  # stride between tiles based on overlap fraction
    ys = list(range(0, max(1, height - tile + 1), step))  # starting y positions for tiles
    xs = list(range(0, max(1, width - tile + 1), step))  # starting x positions for tiles
    if ys[-1] + tile < height:  # if last tile doesn't touch bottom edge
        ys.append(height - tile)  # add final row anchored to bottom
    if xs[-1] + tile < width:  # if last tile doesn't touch right edge
        xs.append(width - tile)  # add final column anchored to right
    for y in ys:  # iterate all y starts
        for x in xs:  # iterate all x starts
            yield x, y  # produce top-left pixel for this tile

def compute_tile_bounds(transform: Affine, x0: int, y0: int, tile: int) -> tuple[float,float,float,float]:  # map pixel tile to geographic bbox
    # Rasterio affine: x_geo = a*x + b*y + c ; y_geo = d*x + e*y + f
    # Typically b=d=0; a=resx; e=-resy
    x_left, y_top = transform * (x0, y0)  # transform tile’s top-left pixel to map coordinates
    x_right, y_bottom = transform * (x0 + tile, y0 + tile)  # transform bottom-right pixel of tile to map coordinates
    xmin, xmax = min(x_left, x_right), max(x_left, x_right)  # normalize x min/max in case of negative pixel size
    ymin, ymax = min(y_bottom, y_top), max(y_bottom, y_top)  # normalize y min/max (note y axis may be inverted)
    return xmin, ymin, xmax, ymax  # return geographic bounding box of the tile

def yolo_bbox_from_geo_circle(pt: Point, radius_m: float, transform: Affine, tile_bounds: tuple[float,float,float,float], tile_size: int):  # make YOLO box from a circle buffer
    xmin, ymin, xmax, ymax = tile_bounds  # unpack geographic tile bounds
    cx, cy = pt.x, pt.y  # center point of tree in map units
    # clamp circle to tile bbox
    bxmin = max(xmin, cx - radius_m)  # clamp left
    bymin = max(ymin, cy - radius_m)  # clamp bottom
    bxmax = min(xmax, cx + radius_m)  # clamp right
    bymax = min(ymax, cy + radius_m)  # clamp top
    if bxmin >= bxmax or bymin >= bymax:  # if circle doesn’t intersect tile
        return None  # no label for this tile

    # Convert geo -> pixel within tile coordinates
    # inverse affine
    inv = ~transform  # invert transform to go from map coords to pixel coords
    # Tile origin (x0, y0) in pixels
    x0_pix, y0_pix = inv * (xmin, ymax)  # note ymax used for y due to affine sign  # compute tile’s pixel origin
    # Corners in pixels in full image coords
    xmin_pix, ymin_pix = inv * (bxmin, bymin)  # convert clamped bbox min to pixel
    xmax_pix, ymax_pix = inv * (bxmax, bymax)  # convert clamped bbox max to pixel
    # Convert to tile-local pixel coords
    x_min_local = xmin_pix - x0_pix  # shift to tile’s local x origin
    x_max_local = xmax_pix - x0_pix  # shift to tile’s local x origin
    y_min_local = ymax_pix - y0_pix  # note y inversion  # adjust for raster coordinate direction
    y_max_local = ymin_pix - y0_pix  # adjust for raster coordinate direction

    # Clamp to tile size
    x_min_local = np.clip(x_min_local, 0, tile_size)  # keep inside tile bounds
    x_max_local = np.clip(x_max_local, 0, tile_size)  # keep inside tile bounds
    y_min_local = np.clip(y_min_local, 0, tile_size)  # keep inside tile bounds
    y_max_local = np.clip(y_max_local, 0, tile_size)  # keep inside tile bounds

    if x_max_local - x_min_local <= 1 or y_max_local - y_min_local <= 1:  # ignore boxes that are basically zero-sized
        return None  # too small to be useful

    # Convert to YOLO normalized cx, cy, w, h in [0,1]
    cx_local = (x_min_local + x_max_local) / 2.0  # center x in pixels
    cy_local = (y_min_local + y_max_local) / 2.0  # center y in pixels
    w_local = (x_max_local - x_min_local)  # width in pixels
    h_local = (y_max_local - y_min_local)  # height in pixels

    return (
        (cx_local / tile_size),  # normalize center x to [0,1]
        (cy_local / tile_size),  # normalize center y to [0,1]
        (w_local / tile_size),   # normalize width to [0,1]
        (h_local / tile_size)    # normalize height to [0,1]
    )

def yolo_bbox_from_geo_rectangle(pt: Point, crown_width_m: float, shadow_length_m: float,
                                shadow_angle_deg: float, transform: Affine,
                                tile_bounds: tuple[float,float,float,float], tile_size: int):  # make YOLO box from oriented rectangle (crown+shadow)
    """
    Create rectangular bounding box for tree with shadow

    Args:
        pt: Tree point location
        crown_width_m: Crown width (perpendicular to shadow)
        shadow_length_m: Total length including shadow
        shadow_angle_deg: Shadow direction (degrees from north, clockwise)
        transform: Rasterio transform
        tile_bounds: Tile geographic bounds
        tile_size: Tile size in pixels

    Returns:
        YOLO format bbox (cx, cy, w, h) normalized [0,1] or None
    """
    import math  # local import to keep function self-contained

    xmin, ymin, xmax, ymax = tile_bounds  # unpack tile bounds in map coordinates
    cx, cy = pt.x, pt.y  # center at tree point location (map units)

    # Convert angle to radians (0° = North, clockwise positive)
    angle_rad = math.radians(shadow_angle_deg)  # convert degrees to radians for trig

    # Calculate rectangle corners
    # Half dimensions
    half_width = crown_width_m / 2.0  # half of rectangle width (across shadow)
    half_length = shadow_length_m / 2.0  # half of rectangle length (along shadow)

    # Rectangle corners in local coordinate system (before rotation)
    corners_local = [
        (-half_width, -half_length),  # Bottom-left relative to center
        (half_width, -half_length),   # Bottom-right relative to center
        (half_width, half_length),    # Top-right relative to center
        (-half_width, half_length)    # Top-left relative to center
    ]

    # Rotate corners around center point
    cos_a = math.cos(angle_rad)  # precompute cosine for rotation
    sin_a = math.sin(angle_rad)  # precompute sine for rotation

    corners_geo = []  # will hold rotated corners in map coordinates
    for x_local, y_local in corners_local:  # rotate each local corner around the center
        # Rotate point
        x_rot = x_local * cos_a - y_local * sin_a  # x rotation formula
        y_rot = x_local * sin_a + y_local * cos_a  # y rotation formula

        # Translate to world coordinates
        x_world = cx + x_rot  # shift rotated x to map x
        y_world = cy + y_rot  # shift rotated y to map y
        corners_geo.append((x_world, y_world))  # collect rotated corner

    # Find bounding box of rotated rectangle
    x_coords = [c[0] for c in corners_geo]  # list of x coords
    y_coords = [c[1] for c in corners_geo]  # list of y coords

    bbox_xmin = max(min(x_coords), xmin)  # clamp bbox to tile left
    bbox_xmax = min(max(x_coords), xmax)  # clamp bbox to tile right
    bbox_ymin = max(min(y_coords), ymin)  # clamp bbox to tile bottom
    bbox_ymax = min(max(y_coords), ymax)  # clamp bbox to tile top

    # Check if bbox is within tile bounds
    if bbox_xmin >= bbox_xmax or bbox_ymin >= bbox_ymax:  # if box is empty after clamping
        return None  # nothing to label

    # Convert to pixel coordinates
    inv_transform = ~transform  # invert affine to map → pixel

    # Convert bbox corners to pixels
    xmin_pix, ymin_pix = inv_transform * (bbox_xmin, bbox_ymin)  # min corner in pixels
    xmax_pix, ymax_pix = inv_transform * (bbox_xmax, bbox_ymax)  # max corner in pixels

    # Convert to tile-local coordinates
    tile_x0, tile_y0 = inv_transform * (xmin, ymax)  # Tile origin  # get pixel origin for this tile

    x_min_local = xmin_pix - tile_x0  # shift to tile coordinates (x min)
    x_max_local = xmax_pix - tile_x0  # shift to tile coordinates (x max)
    y_min_local = ymax_pix - tile_y0  # Note: y-axis flip  # adjust for raster y direction
    y_max_local = ymin_pix - tile_y0  # adjust for raster y direction

    # Ensure correct ordering
    if x_min_local > x_max_local:  # if swapped due to transform peculiarities
        x_min_local, x_max_local = x_max_local, x_min_local  # swap so min < max
    if y_min_local > y_max_local:  # if swapped due to y inversion
        y_min_local, y_max_local = y_max_local, y_min_local  # swap so min < max

    # Clamp to tile size
    x_min_local = max(0, min(x_min_local, tile_size))  # clamp to [0, tile_size]
    x_max_local = max(0, min(x_max_local, tile_size))  # clamp to [0, tile_size]
    y_min_local = max(0, min(y_min_local, tile_size))  # clamp to [0, tile_size]
    y_max_local = max(0, min(y_max_local, tile_size))  # clamp to [0, tile_size]

    # Check minimum size
    if x_max_local - x_min_local <= 2 or y_max_local - y_min_local <= 2:  # avoid tiny boxes that don’t help training
        return None  # skip unusable boxes

    # Convert to YOLO format (center_x, center_y, width, height) normalized [0,1]
    center_x = (x_min_local + x_max_local) / 2.0 / tile_size  # normalize center x
    center_y = (y_min_local + y_max_local) / 2.0 / tile_size  # normalize center y
    width = (x_max_local - x_min_local) / tile_size  # normalize width
    height = (y_max_local - y_max_local + y_min_local) / tile_size if False else (y_max_local - y_min_local) / tile_size  # normalize height (explicit to show intent)

    return (center_x, center_y, width, height)  # final YOLO box tuple

def estimate_shadow_angle_from_image(image: np.ndarray, tree_points: list) -> float:  # estimate dominant shadow direction from edges/lines
    """
    Estimate shadow direction from image analysis

    Args:
        image: RGB image array
        tree_points: List of tree point locations in pixel coordinates

    Returns:
        Shadow angle in degrees from north (clockwise)
    """
    import cv2  # use OpenCV for edge detection and Hough transform
    from scipy import ndimage  # available for additional filtering if needed

    # Convert to grayscale
    gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)  # shadows pop more clearly in grayscale

    # Apply edge detection to find shadow edges
    edges = cv2.Canny(gray, 50, 150)  # Canny emphasizes edges that Hough uses to detect lines

    # Find lines using Hough transform
    lines = cv2.HoughLines(edges, 1, np.pi/180, threshold=100)  # detect straight lines and their angles

    if lines is not None:  # if we detected any candidate lines
        # Analyze line angles
        angles = []  # will collect each line’s angle
        for rho, theta in lines[:, 0]:  # iterate Hough results
            angle_deg = np.degrees(theta) - 90  # Convert to standard angle (0 = vertical) to align with “from north”
            angles.append(angle_deg)  # store for histogram analysis

        # Find most common angle (shadow direction)
        angles = np.array(angles)  # numpy array for efficient histogramming

        # Use histogram to find dominant angle
        hist, bins = np.histogram(angles, bins=36, range=(-180, 180))  # coarse binning to get dominant orientation
        dominant_angle_idx = np.argmax(hist)  # pick bin with most votes
        shadow_angle = (bins[dominant_angle_idx] + bins[dominant_angle_idx + 1]) / 2  # center of dominant bin

        return shadow_angle % 360  # normalize to [0, 360)

    # Default angle if detection fails
    return 45.0  # Assume northeast shadow directio  # fallback that still produces rectangular labels

def estimate_tile_nodata_fraction(img: np.ndarray, nodata: Optional[float]) -> float:  # compute proportion of nodata pixels in a tile
    if nodata is None:  # if raster has no nodata defined, assume fully valid
        return 0.0  # zero nodata fraction
    mask = (img[:,:,0] == nodata) & (img[:,:,1] == nodata) & (img[:,:,2] == nodata)  # treat pixel as nodata only if all channels equal nodata
    return float(mask.mean())  # return fraction of nodata pixels as a float

def draw_yolo_boxes_on_image(image: np.ndarray, labels_file: str, color=(0,255,0)) -> np.ndarray:  # overlay YOLO txt boxes onto an image (for QA)
    h, w = image.shape[:2]  # get image height and width
    if not os.path.exists(labels_file):  # if there is no label file for this tile
        return image  # return the image unchanged
    out = image.copy()  # copy so we don’t modify the input array
    with open(labels_file, "r") as f:  # open YOLO label file
        for line in f:  # parse each label line
            parts = line.strip().split()  # split by whitespace
            if len(parts) < 5:  # ensure line has class + 4 box values
                continue  # skip malformed lines
            _, cx, cy, bw, bh = map(float, parts[:5])  # YOLO format: class cx cy w h (normalized)
            cxp, cyp = int(cx * w), int(cy * h)  # denormalize center to pixel coords
            wp, hp = int(bw * w), int(bh * h)  # denormalize width/height to pixels
            x1, y1 = int(cxp - wp/2), int(cyp - hp/2)  # compute top-left pixel
            x2, y2 = x1 + wp, y1 + hp  # compute bottom-right pixel
            cv2.rectangle(out, (x1,y1), (x2,y2), color, 2)  # draw rectangle for the label
    return out  # return image with boxes rendered

# -------------------------------
# 2) PIPELINE: TILING + PSEUDO-LABELS FROM POINTS
# -------------------------------

def process_imagery_and_generate_labels(cfg):  # master function: tile rasters and create YOLO label files from point data
    """
    Updated processing function with rectangular bounding boxes
    """
    tif_paths = list_tifs(cfg.AERIAL_DIR)  # gather all GeoTIFFs from the input directory
    if not tif_paths:  # if no imagery found, fail loudly with a clear message
        raise FileNotFoundError(f"No .tif found in {cfg.AERIAL_DIR}")  # help user fix paths

    label_count = 0  # running total of labels created
    tile_count = 0  # running total of tiles generated
    yolo_items = []  # list of (image_path, label_path) for later splitting

    for tif in tif_paths:  # process each image one by one
        img, meta = read_geotiff_rgb(tif, bands=cfg.RGB_BANDS)  # read RGB array and metadata for this raster
        transform = meta["transform"]  # affine used to convert between pixels and map coords
        crs = meta["crs"]  # coordinate reference system of the raster

        # Load points in the image CRS
        points_gdf = load_points(cfg.POINTS_PATH, crs)  # fetch trees points and project to raster CRS for accurate geometry tests

        # Estimate shadow angle if auto-detection is enabled
        shadow_angle = cfg.SHADOW_ANGLE_DEG  # start with default shadow angle
        if cfg.AUTO_DETECT_SHADOW:  # only compute if user requested
            # Convert some tree points to pixel coordinates for shadow analysis
            tree_pixels = []  # will store point locations in pixel space
            inv_transform = ~transform  # invert transform for map→pixel conversion
            for idx, row in points_gdf.iterrows():  # iterate over points
                if not row.geometry or row.geometry.is_empty:  # skip invalid geometries
                        continue  # move on if no usable geometry
                # Get coordinates
                x_coord = row.geometry.x  # map x of tree
                y_coord = row.geometry.y  # map y of tree

                if np.isnan(x_coord) or np.isnan(y_coord):  # guard against NaN coords
                    print(f"Warning: NaN coordinates found in point {idx}: ({x_coord}, {y_coord})")  # warn but continue
                    continue  # skip bad point

                px, py = inv_transform * (x_coord, y_coord)  # convert to pixel coordinates
                tree_pixels.append((int(px), int(py)))  # keep integer pixel positions for analysis

                if np.isnan(px) or np.isnan(py):  # extra guard after conversion
                    print(f"Warning: NaN in transformed coordinates: ({px}, {py})")  # warn if conversion failed oddly
                    continue  # skip if invalid

            if tree_pixels:  # only attempt estimation if we have some points
                estimated_angle = estimate_shadow_angle_from_image(img, tree_pixels)  # detect dominant line angle
                shadow_angle = estimated_angle  # use the estimated angle for rectangular labels
                print(f"Estimated shadow angle: {shadow_angle:.1f}°")  # show value for transparency

        H, W = img.shape[:2]  # image height and width in pixels
        for x0, y0 in tile_indices(W, H, cfg.TILE_SIZE, cfg.TILE_OVERLAP):  # loop over all tile start positions
            tile = img[y0:y0+cfg.TILE_SIZE, x0:x0+cfg.TILE_SIZE, :]  # crop tile from full image
            if tile.shape[0] < cfg.TILE_SIZE or tile.shape[1] < cfg.TILE_SIZE:  # skip partial tiles at borders
                continue  # only keep exact-size tiles

            # Compute tile bounds in geo coords
            xmin, ymin, xmax, ymax = compute_tile_bounds(transform, x0, y0, cfg.TILE_SIZE)  # geographic extent of this tile
            tile_poly = shp_box(xmin, ymin, xmax, ymax)  # shapely polygon for fast point-in-polygon tests

            # Skip nodata-heavy tiles
            nodata_frac = estimate_tile_nodata_fraction(tile, meta["nodata"])  # measure how much nodata this tile has
            if nodata_frac > (1 - cfg.MIN_VALID_PIXELS):  # if too much nodata, this tile is low quality
                continue  # skip to keep dataset clean

            # Filter points inside tile bounds
            pts_in = points_gdf[points_gdf.geometry.within(tile_poly)]  # select only trees that fall inside this tile

            # Build YOLO labels
            labels = []  # will store YOLO strings (class cx cy w h)
            for _, row in pts_in.iterrows():  # for each tree in the tile
                pt: Point = row.geometry  # get the shapely point

                if cfg.USE_CIRCULAR_LABELS:  # choose circular (buffer) style boxes
                    # Use original circular approach
                    yolo_box = yolo_bbox_from_geo_circle(
                        pt=pt,  # tree location
                        radius_m=cfg.POINT_BUFFER_M,  # circular radius ~crown width/2
                        transform=transform,  # affine to convert geo→pixel
                        tile_bounds=(xmin, ymin, xmax, ymax),  # geographic bounds of current tile
                        tile_size=cfg.TILE_SIZE  # size of tile in pixels for normalization
                    )
                else:  # choose rectangular (crown+shadow) style boxes
                    # Use new rectangular approach
                    yolo_box = yolo_bbox_from_geo_rectangle(
                        pt=pt,  # tree location
                        crown_width_m=cfg.POINT_BUFFER_M,  # rectangle width across shadow direction
                        shadow_length_m=cfg.SHADOW_LENGTH_M,  # rectangle length along shadow direction
                        shadow_angle_deg=shadow_angle,  # orientation of the rectangle
                        transform=transform,  # affine for geo→pixel
                        tile_bounds=(xmin, ymin, xmax, ymax),  # geographic tile bbox
                        tile_size=cfg.TILE_SIZE  # used to normalize to YOLO space
                    )

                if yolo_box is None:  # skip trees that couldn’t form a valid box (e.g., off-edge)
                    continue  # move to next point

                cx, cy, w, h = yolo_box  # unpack YOLO-format values
                if w <= 0 or h <= 0:  # ensure positive box dimensions
                    continue  # skip non-sensical boxes

                labels.append(f"0 {cx:.6f} {cy:.6f} {w:.6f} {h:.6f}")  # write class 0 (tree) with normalized box values

            # Save tile and labels
            base = f"{Path(tif).stem}_{x0}_{y0}"  # create a unique basename per tile using raster name and tile offset
            img_out = os.path.join(cfg.TILES_DIR, base + ".jpg")  # output path for the tile image
            lbl_out = os.path.join(cfg.LABELS_DIR, base + ".txt")  # output path for the label file

            Image.fromarray(tile).save(img_out, quality=95)  # save the tile as a JPEG for YOLO
            with open(lbl_out, "w") as f:  # write labels to text file
                f.write("\n".join(labels))  # one YOLO line per detected tree

            yolo_items.append((img_out, lbl_out))  # remember pairing for dataset splitting
            tile_count += 1  # increment number of tiles created
            label_count += len(labels)  # accumulate number of labels generated

    print(f"Tiled images: {tile_count}, rectangular labels: {label_count}")  # quick summary of how much was produced
    return yolo_items  # return list of (image_path, label_path) to feed the splitter

# -------------------------------
# 3) DATA SPLIT
# -------------------------------

def split_data(yolo_items: List[tuple[str,str]], cfg):  # split (img,label) pairs into train/val/test lists
    rng = np.random.default_rng(cfg.SEED)  # seeded RNG for reproducible shuffling
    items = np.array(yolo_items, dtype=object)  # convert to numpy array for easy shuffling/slicing
    rng.shuffle(items)  # shuffle in place to randomize order

    n = len(items)  # total number of samples
    n_test = int(n * cfg.TEST_RATIO)  # size of test set
    n_val = int(n * cfg.VAL_RATIO)  # size of validation set
    n_train = n - n_val - n_test  # remaining goes to training

    train, val, test = items[:n_train], items[n_train:n_train+n_val], items[n_train+n_val:]  # slice into splits

    # Save split lists
    def save_split(split, name):  # helper to write list of image paths to a text file
        img_list = [i[0] for i in split]  # only save image paths (YOLO finds labels by stem)
        with open(os.path.join(cfg.SPLITS_DIR, f"{name}.txt"), "w") as f:  # open split file for writing
            f.write("\n".join(img_list))  # write one path per line

    save_split(train, "train")  # write train.txt
    save_split(val, "val")  # write val.txt
    save_split(test, "test")  # write test.txt

    print(f"Split: train={len(train)}, val={len(val)}, test={len(test)}")  # report split sizes for sanity
    return train, val, test  # return split arrays (each element is (img_path, label_path))

def restructure_splits_for_yolo(cfg, train_list, val_list, test_list):  # copy/symlink split files into YOLO folder structure
    """
    Restructure dataset to YOLO format using split lists from split_data()

    Args:
        cfg: Configuration object
        train_list: List of tuples (image_path, label_path) from split_data()
        val_list: List of tuples (image_path, label_path) from split_data()
        test_list: List of tuples (image_path, label_path) from split_data()

    Returns:
        yolo_root: Path to restructured YOLO dataset
    """
    import shutil  # copying files across directories
    from pathlib import Path  # robust path handling

    # Create YOLO structure
    yolo_root = cfg.OUT_DIR  # use run’s OUT_DIR as the YOLO dataset root
    for split in ['train', 'val', 'test']:  # make images/labels directories for each split
        os.makedirs(os.path.join(yolo_root, 'images', split), exist_ok=cfg.OVERWRITE)  # ensure images split dir exists
        os.makedirs(os.path.join(yolo_root, 'labels', split), exist_ok=cfg.OVERWRITE)  # ensure labels split dir exists

    # Process each split
    splits_data = {  # map split name to its list
        'train': train_list,
        'val': val_list,
        'test': test_list
    }

    stats = {'train': 0, 'val': 0, 'test': 0}  # track how many images actually have labels

    for split_name, split_items in splits_data.items():  # iterate over each split
        print(f"Processing {split_name} split: {len(split_items)} items...")  # progress message

        for img_path, label_path in split_items:  # for each (image, label) pair
            if not os.path.exists(img_path):  # guard against missing images
                print(f"Warning: Image not found: {img_path}")  # warn and skip
                continue  # move to next item

            # Get file names
            img_name = Path(img_path).name  # extract just the image file name
            label_name = Path(img_path).stem + '.txt'  # label file name must match image stem

            # Destination paths
            dst_img = os.path.join(yolo_root, 'images', split_name, img_name)  # where the image should live in YOLO tree
            dst_label = os.path.join(yolo_root, 'labels', split_name, label_name)  # where the label should live

            # Copy image
            try:
                shutil.copy2(img_path, dst_img)  # copy with metadata preserved
            except Exception as e:
                print(f"Error copying image {img_path}: {e}")  # report copy issues
                continue  # skip if we can’t copy

            # Copy label (if exists and has content)
            if os.path.exists(label_path):  # if the label file exists
                try:
                    with open(label_path, 'r') as f:  # read the label
                        label_content = f.read().strip()  # trim whitespace to see if it’s empty

                    if label_content:  # if the label has at least one bbox
                        # Copy non-empty label
                        shutil.copy2(label_path, dst_label)  # copy label beside image
                        stats[split_name] += 1  # count as labeled
                    else:
                        # Create empty label file for negative examples
                        open(dst_label, 'w').close()  # empty txt still valid — tells YOLO “no objects”
                except Exception as e:
                    print(f"Error processing label {label_path}: {e}")  # show why label failed
                    # Create empty label file as fallback
                    open(dst_label, 'w').close()  # still create an empty label so the image is included
            else:
                # Create empty label file for missing labels
                open(dst_label, 'w').close()  # negative example: no boxes in this image
                print(f"Warning: No label file found for {img_path}")  # notify about missing label

    # Print statistics
    print(f"\n=== YOLO Dataset Creation Summary ===")  # section header for clarity
    print(f"Dataset root: {yolo_root}")  # remind path where dataset lives
    for split_name, count in stats.items():  # report per split
        total_imgs = len(splits_data[split_name])  # total images in this split
        print(f"{split_name.capitalize()}: {total_imgs} images, {count} with labels ({count/total_imgs*100:.1f}%)")  # percent labeled

# -------------------------------
# 4) YOLO DATA.YAML AND TRAIN
# -------------------------------

def write_yolo_data_yaml(cfg):  # create a minimal data.yaml that points YOLO to images/labels
    yaml_path = os.path.join(cfg.OUT_DIR, f"{cfg.name}.yaml")  # name the YAML after the run for clarity

    ## Initialize YAML
    yaml = YAML()  # ruamel YAML instance for robust dumping

    yaml.preserve_quotes = True  # keep any quotes if present (style preference)

    data_config = {  # structure expected by Ultralytics YOLO
    'path': os.path.abspath(cfg.OUT_DIR),  # Absolute path to your dataset root  # ensures YOLO can resolve relative dirs
    'train': 'images/train',  # Relative path to training images from 'path'  # keeps file tree portable
    'val': 'images/val',      # Relative path to validation images from 'path'  # standard YOLO layout
    'test': 'images/test',    # Relative path to test images from 'path' (optional)  # included for completeness
    'names': {0: 'tree'}  # class index mapping — single class “tree”
    }

    # Write the configuration to a YAML file
    with open(yaml_path, 'w') as f:  # open target YAML path for writing
        yaml.dump(data_config, f)  # serialize the configuration to disk

    print("YOLO YAML created successfully.")  # status update for the user

    return yaml_path  # return path so caller can pass it to YOLO train

def train_yolov8(cfg, train, val, test):  # orchestrate restructuring and invoke Ultralytics training
    if not YOLO_AVAILABLE:  # guard if ultralytics isn’t installed
        print("Ultralytics YOLO not installed; skipping training step.")  # tell user why training is skipped
        return None  # nothing to return in this case

    restructure_splits_for_yolo(cfg, train, val, test)  # lay out images/labels into YOLO directory tree
    print("Data splits to moved to YOLO format")  # confirmation that data is ready for training

    data_yaml = write_yolo_data_yaml(cfg)  # generate data.yaml and get its path

    # Ultralytics expects images and labels in the same root; labels use .txt alongside tiles in labels dir (auto-detected)
    # To ensure compatibility, symlink or ensure directory


In [None]:
# --- keep your imports, Config, utilities, and pipeline code above ---

def visualize_samples(cfg, n=12):  # randomly render a few tiles with their labels for a quick QA pass
    # Ensure viz dir exists (usually created by cfg.initDirs)
    os.makedirs(cfg.VIZ_DIR, exist_ok=True)

    # Clean the viz folder first so you only see fresh outputs
    for f in os.listdir(cfg.VIZ_DIR):
        try:
            os.remove(os.path.join(cfg.VIZ_DIR, f))
        except Exception as e:
            print(f"Warning: could not remove {f}: {e}")

    img_paths = sorted(glob.glob(os.path.join(cfg.TILES_DIR, "*.jpg")))  # all generated tile images
    if not img_paths:
        print(f"No images found in {cfg.TILES_DIR}. Did tiling run and save .jpg tiles?")
        return

    rng = np.random.default_rng(cfg.SEED)  # seeded RNG for reproducibility
    picks = rng.choice(img_paths, size=min(n, len(img_paths)), replace=False)  # pick up to n images
    for p in picks:
        base = Path(p).stem
        lbl = os.path.join(cfg.LABELS_DIR, base + ".txt")
        img = cv2.imread(p)
        if img is None:
            print(f"Warning: failed to read image {p}")
            continue
        img = img[:, :, ::-1]  # BGR->RGB
        drawn = draw_yolo_boxes_on_image(img, lbl)  # overlay boxes if label file exists
        out_path = os.path.join(cfg.VIZ_DIR, base + "_viz.jpg")
        try:
            Image.fromarray(drawn).save(out_path, quality=95)
        except Exception as e:
            print(f"Warning: failed to save {out_path}: {e}")
    print(f"Saved visualizations to: {cfg.VIZ_DIR}")

# ---------- RUN SECTION (execute after the definitions above) ----------
cfg = Config(name="debug", overwrite=True)

# Step 1–2: Tiling + initial pseudo-labels
yolo_items = process_imagery_and_generate_labels(cfg)

# Quick QA thumbnails
visualize_samples(cfg, n=10)


Initialization Successful.


In [None]:
%matplotlib inline
visualize_samples(cfg,n=10)

In [None]:
# Step 3: Split
train, val, test = split_data(yolo_items, cfg)

In [None]:
# Step 4: Option A: Train YOLOv8 (object detection baseline)
train_yolov8(cfg, train, val, test)

In [None]:
# Optional: Step 4b: DeepForest baseline (uncomment if desired)
# if DEEPFOREST_AVAILABLE:
#     m = deepforest_main.deepforest()
#     m.use_release()
#     # DeepForest expects CSV annotations; convert labels if needed.

# Step 5: Quick visual checks
visualize_samples(cfg,n=10)
print("Phase 1 completed. Check output/ for tiles, labels, splits, metrics, and visualizations.")

In [None]:
def dataset_health_report(cfg, list_examples=True, max_examples=8):
    import glob
    from pathlib import Path

    img_paths = sorted(glob.glob(os.path.join(cfg.TILES_DIR, "*.jpg")))
    lbl_paths = sorted(glob.glob(os.path.join(cfg.LABELS_DIR, "*.txt")))

    imgs_by_stem = {Path(p).stem: p for p in img_paths}
    lbls_by_stem = {Path(p).stem: p for p in lbl_paths}

    total_imgs = len(img_paths)
    total_lbls = len(lbl_paths)

    labeled_imgs, empty_label_imgs, missing_label_imgs = [], [], []
    total_boxes = 0

    for stem, imgp in imgs_by_stem.items():
        lblp = lbls_by_stem.get(stem)
        if lblp is None:
            missing_label_imgs.append(imgp)
            continue
        try:
            with open(lblp, "r") as f:
                lines = [ln.strip() for ln in f.readlines()]
            valid_lines = []
            for ln in lines:
                if not ln:
                    continue
                parts = ln.split()
                if len(parts) >= 5:
                    try:
                        float(parts[1]); float(parts[2]); float(parts[3]); float(parts[4])
                        valid_lines.append(ln)
                    except Exception:
                        pass
            if valid_lines:
                labeled_imgs.append(imgp)
                total_boxes += len(valid_lines)
            else:
                empty_label_imgs.append(imgp)
        except Exception as e:
            print(f"Warning: could not read label file for {imgp}: {e}", flush=True)
            empty_label_imgs.append(imgp)

    n_labeled = len(labeled_imgs)
    n_empty = len(empty_label_imgs)
    n_missing = len(missing_label_imgs)
    coverage_pct = ( (n_labeled + n_empty) / total_imgs * 100.0 ) if total_imgs else 0.0
    labeled_pct  = ( n_labeled / total_imgs * 100.0 ) if total_imgs else 0.0
    avg_boxes = (total_boxes / n_labeled) if n_labeled else 0.0

    # PRINT (force flush so it shows in Colab)
    print("\n=== DATASET HEALTH REPORT ===", flush=True)
    print(f"Run name:         {cfg.name}", flush=True)
    print(f"Tiles dir:        {cfg.TILES_DIR}", flush=True)
    print(f"Labels dir:       {cfg.LABELS_DIR}\n", flush=True)

    print(f"Total images:                 {total_imgs}", flush=True)
    print(f"Total label files:            {total_lbls}", flush=True)
    print(f"Images with ≥1 box:           {n_labeled}", flush=True)
    print(f"Images with empty label file: {n_empty}", flush=True)
    print(f"Images missing label file:    {n_missing}", flush=True)
    print(f"Image-label coverage:         {coverage_pct:.1f}%", flush=True)
    print(f"Labeled image rate:           {labeled_pct:.1f}%", flush=True)
    print(f"Total boxes:                  {total_boxes}", flush=True)
    print(f"Avg boxes per labeled image:  {avg_boxes:.2f}", flush=True)

    if list_examples:
        def show_examples(title, items):
            if not items:
                return
            print(f"\n{title} (showing up to {max_examples}):", flush=True)
            for p in items[:max_examples]:
                print("  -", p, flush=True)

        show_examples("Examples: images with ≥1 box", labeled_imgs)
        show_examples("Examples: images with empty label file", empty_label_imgs)
        show_examples("Examples: images missing label file", missing_label_imgs)

    print("=== END REPORT ===\n", flush=True)

    # Also return a dict you can display or log
    return {
        "run": cfg.name,
        "tiles_dir": cfg.TILES_DIR,
        "labels_dir": cfg.LABELS_DIR,
        "total_images": total_imgs,
        "total_label_files": total_lbls,
        "images_with_boxes": n_labeled,
        "images_empty_label": n_empty,
        "images_missing_label": n_missing,
        "coverage_pct": coverage_pct,
        "labeled_pct": labeled_pct,
        "total_boxes": total_boxes,
        "avg_boxes_per_labeled": avg_boxes,
    }

report = dataset_health_report(cfg)  # <-- this line actually runs & prints
report  # also display the returned dict as cell output
