# Cool Roofs Detection Demonstration

## 1. Import packages

In [None]:
from pathlib import Path
import random
import shutil
import rasterio
import shapefile
from shapely.geometry import shape, Polygon, MultiPolygon
import yaml
from ultralytics import YOLO

## 2. Define functions for creating a custom dataset
These functions create a custom dataset that follows the format required for training a YOLO segmentation model.
```
dataset
└───images
│   └───test
│       │   img1.jpg
│       │   img2.jpg
│       │   ...
│   └───train
|       |   ...
|   └───validate
|       |   ...
|
└───labels
    └───test
        │   img1.txt
        │   img2.txt
        │   ...
    └───train
        |   ...
    └───validate
        |   ...
```
In the example dataset above, the test image `img1.jpg` could have any number of roofs. The extents of those roofs are found in `img1.txt` in the corresponding folder for test labels and are represented as coordinates corresponding to pixels in the image. A sigle row in the label file represents one roof and is formatted as `<class-index> <x1> <y1> <x2> <y2> ... <xn> <yn>`, where `<class-index>` is either 0 for warm roofs or 1 for cool roofs and is followed by the roof's coordinates.

In [None]:
def create_dataset_structure(output_folder):
    """Create the dataset directory structure"""
    splits = ['train', 'val', 'test']
    types = ['images', 'labels']
    
    for type_dir in types:
        for split in splits:
            path = Path(output_folder) / 'segment' / type_dir / split
            path.mkdir(parents=True, exist_ok=True)
    
    # Create the YAML configuration file for the dataset
    yaml_content = {
        'path': '',
        'train': 'images/train',
        'val': 'images/val',
        'test': 'images/test',
        'names': {
            0: 'warm_roof',
            1: 'cool_roof'
        },
    }
    
    yaml_path = Path(output_folder) / 'segment' / 'segment.yaml'
    with open(yaml_path, 'w') as f:
        yaml.safe_dump(yaml_content, f, sort_keys=False)

    return splits

def assign_split(splits, split_ratios):
    """Assign a split based on the provided ratios"""
    rand_val = random.random()
    cumulative = 0
    for split, ratio in zip(splits, split_ratios):
        cumulative += ratio
        if rand_val <= cumulative:
            return split
    return splits[-1]  # Fallback to last split

def coords_to_pixel(x, y, transform, img_width, img_height):
    """Convert coordinates to pixel row and column with bounds checking"""
    col, row = ~transform * (x, y)
    
    # Normalize and clamp coordinates to [0,1]
    col = max(0, min(1, col / img_width))
    row = max(0, min(1, row / img_height))

    return col, row

def validate_polygon(points, img_width, img_height):
    """
    Validate polygon coordinates and ensure they are within image boundaries
    Returns: (is_valid, message)
    """
    if not points or len(points) < 3:  # Need at least 3 points for a polygon
        return False, "Too few points for polygon"
    
    # Check if all coordinates are within [0,1] range
    for x, y in points:
        if not (0 <= x <= 1 and 0 <= y <= 1):
            return False, f"Coordinates out of bounds: ({x}, {y})"
    
    # Check if polygon has non-zero area
    # Convert normalized coordinates back to pixels for area calculation
    pixel_points = [(x * img_width, y * img_height) for x, y in points]
    polygon = Polygon(pixel_points)
    if polygon.area < 1:  # Area less than 1 pixel
        return False, "Polygon area too small"
        
    return True, "Valid polygon"

def process_geometry(geom, transform, img_width, img_height):
    """
    Process a geometry (Polygon or MultiPolygon) and return list of valid coordinate strings
    """
    valid_polygons = []
    
    if isinstance(geom, MultiPolygon):
        # Process each part of the MultiPolygon separately
        for part in geom.geoms:
            if isinstance(part, Polygon):
                points = list(part.exterior.coords)
                pixel_points = [coords_to_pixel(x, y, transform, img_width, img_height) 
                              for x, y in points]
                
                is_valid, message = validate_polygon(pixel_points, img_width, img_height)
                if is_valid:
                    coord_str = ' '.join(f"{x} {y}" for x, y in pixel_points)
                    valid_polygons.append(coord_str)
    
    elif isinstance(geom, Polygon):
        points = list(geom.exterior.coords)
        pixel_points = [coords_to_pixel(x, y, transform, img_width, img_height) 
                       for x, y in points]
        
        is_valid, message = validate_polygon(pixel_points, img_width, img_height)
        if is_valid:
            coord_str = ' '.join(f"{x} {y}" for x, y in pixel_points)
            valid_polygons.append(coord_str)
    
    return valid_polygons

def create_dataset(image_folder, shapefile_folder, output_folder, split_ratios=(0.7, 0.2, 0.1)):
    """
    Create a custom dataset by converting shapefiles to YOLO segmentation format using 'cool_roof'
    field as class_id and split into train/val/test sets
    """
    # Validate split ratios
    if len(split_ratios) != 3 or abs(sum(split_ratios) - 1.0) > 1e-9:
        raise ValueError("Split ratios must be a tuple of 3 numbers that sum to 1.0")
    
    # Set random seed for reproducibility
    random.seed(42)
    
    # Create dataset directory structure
    splits = create_dataset_structure(output_folder)
    
    # Convert paths to Path objects
    image_folder = Path(image_folder)
    shapefile_folder = Path(shapefile_folder)
    output_folder = Path(output_folder)
    
    # Track statistics
    stats = {
        'processed': 0,
        'skipped': 0,
        'invalid_polygons': 0,
        'empty_labels': 0,
        'multipart_processed': 0
    }

    # Process each shapefile
    shp_files = list(shapefile_folder.glob("*.shp"))
    for shp_file in shp_files:        
        # Use shapefile name to find corresponding image
        base_name = shp_file.stem
        
        # Search recursively through image_folder and its subdirectories
        matching_images = list(image_folder.rglob(f"{base_name}.jpg"))
        
        if not matching_images:
            print(f"Warning: No matching image for {shp_file}")
            continue
        
        # Use the first matching image if multiple are found
        if len(matching_images) > 1:
            print(f"Warning: Multiple matching images found for {shp_file}. Using {matching_images[0]}")
        
        img_path = matching_images[0]
        
        with rasterio.open(img_path) as src:
            width, height = src.width, src.height
            transform = src.transform

        # Assign to a split
        split = assign_split(splits, split_ratios)
        
        # Define paths for label and image in new structure
        label_path = output_folder / 'segment' / 'labels' / split / f"{base_name}.txt"
        new_img_path = output_folder / 'segment' / 'images' / split / f"{base_name}.jpg"
        
        sf = shapefile.Reader(shp_file)
        valid_polygons = False  # Track if file has any valid polygons

        # Create temporary label file
        temp_label_path = label_path.with_suffix('.tmp')

        # Write label file
        with open(temp_label_path, 'w') as f:
            for shape_record in sf.shapeRecords():
                label = int(shape_record.record['cool_roof']) # Assuming the label is stored in the attribute table
                geom = shape(shape_record.shape.__geo_interface__)
                
                # Process the geometry and get valid coordinate strings
                valid_coord_strings = process_geometry(geom, transform, width, height)
                
                # Write valid polygons to file
                for coord_str in valid_coord_strings:
                    valid_polygons = True
                    f.write(f"{label} {coord_str}\n")
                    
                if isinstance(geom, MultiPolygon) and valid_coord_strings:
                    stats['multipart_processed'] += 1
        
        # Only keep files with valid polygons
        if valid_polygons:
            temp_label_path.rename(label_path)
            shutil.copy2(img_path, new_img_path)
            stats['processed'] += 1
            print(f"Processed {base_name} -> {split}")
        else:
            temp_label_path.unlink(missing_ok=True)
            stats['empty_labels'] += 1
            print(f"Skipping {base_name}: No valid polygons")
    
    # Print distribution and statistics summary
    print("\nDataset Distribution:")
    for split in splits:
        label_files = list((output_folder / 'segment' / 'labels' / split).glob('*.txt'))
        image_files = list((output_folder / 'segment' / 'images' / split).glob('*.jpg'))
        print(f"{split}: {len(label_files)} labels, {len(image_files)} images")
    
    print("\nProcessing Statistics:")
    print(f"Successfully processed: {stats['processed']} files")
    print(f"Skipped (no matching image): {stats['skipped']} files")
    print(f"Invalid polygons encountered: {stats['invalid_polygons']}")
    print(f"Files with no valid polygons: {stats['empty_labels']}")
    print(f"Multipart polygons successfully processed: {stats['multipart_processed']}")

## 3. Create the custom dataset
Using orthoimages and their corresponding shapefiles, create the custom dataset and split the images between training, validation, and testing sets.

In [None]:
image_folder = "C:/Users/jdhoc/Desktop/DOT Volunteer Project/data/demo/imgs"
shapefile_folder = "C:/Users/jdhoc/Desktop/DOT Volunteer Project/data/demo/shps"
output_folder = "C:/Users/jdhoc/Desktop/DOT Volunteer Project/data/demo/dataset"

# Custom split ratios (must sum to 1.0)
split_ratios = (0.7, 0.2, 0.1)  # train, val, test

create_dataset(image_folder, shapefile_folder, output_folder)

## 4. Train the YOLO segmentation model
Using the custom dataset, train the model to identify individual instances of roofs and classify their type (cool or warm). 

In [None]:
PRETRAINED_MODEL = "yolo11n-seg.pt"
CUSTOM_DATASET = "C:/Users/jdhoc/Desktop/DOT Volunteer Project/data/demo/dataset/segment/segment.yaml"

# Load a model
model = YOLO(PRETRAINED_MODEL)

# Train the model
results = model.train(data = CUSTOM_DATASET, batch = -1, imgsz = 1024, cache = "disk", device = 0, degrees = 90.0, scale = 0.5, flipud = 0.25)