In [1]:
import os
import sys
import json
import math
import glob
import yaml
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm
import random
import argparse
from typing import List, Dict, Optional
##Using detectron from Meta (uses Pytorch in the backend) for object detection
from detectron2.config import get_cfg
from detectron2 import model_zoo
from detectron2.engine import DefaultTrainer, DefaultPredictor
from detectron2.data import DatasetCatalog, MetadataCatalog
from detectron2.data.datasets import register_coco_instances
from detectron2.evaluation import COCOEvaluator, inference_on_dataset
from detectron2.data import build_detection_test_loader

#Utility functions
def set_seeds(seed: int = 42):
    import torch
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)


def ensure_dir(p: str):
    os.makedirs(p, exist_ok=True)

#Register a COCO-format dataset to Detectron2 by name, with custom class names.

def register_coco(name: str, json_path: str, img_root: str, classes: List[str]):
    if name in DatasetCatalog.list():
        MetadataCatalog.get(name).thing_classes = classes
        return
    register_coco_instances(name, {}, json_path, img_root)
    MetadataCatalog.get(name).thing_classes = classes


def build_cfg(
    classes: List[str],
    output_dir: str,
    ims_per_batch: int = 8,
    base_lr: float = 2.5e-4,
    max_iter: int = 50000,
    batch_size_per_image: int = 256,
    img_size_train: int = 1024,
    img_size_test: int = 1024,
    score_thresh_test: float = 0.25,
) -> "CfgNode":
    cfg = get_cfg()
    cfg.merge_from_file(model_zoo.get_config_file(
        "COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"
    ))
    cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url(
        "COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"
    )
    cfg.MODEL.ROI_HEADS.NUM_CLASSES = len(classes)
    cfg.SOLVER.IMS_PER_BATCH = ims_per_batch
    cfg.SOLVER.BASE_LR = base_lr
    cfg.SOLVER.MAX_ITER = max_iter
    cfg.SOLVER.STEPS = []  # no LR decay by default
    cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = batch_size_per_image
    cfg.INPUT.MIN_SIZE_TRAIN = (img_size_train,)
    cfg.INPUT.MAX_SIZE_TRAIN = img_size_train
    cfg.INPUT.MIN_SIZE_TEST = img_size_test
    cfg.INPUT.MAX_SIZE_TEST = img_size_test
    cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = score_thresh_test
    cfg.OUTPUT_DIR = output_dir
    ensure_dir(cfg.OUTPUT_DIR)
    return cfg


class Trainer(DefaultTrainer):
    @classmethod
    def build_evaluator(cls, cfg, dataset_name, output_folder=None):
        if output_folder is None:
            output_folder = os.path.join(cfg.OUTPUT_DIR, "inference")
        ensure_dir(output_folder)
        return COCOEvaluator(dataset_name, cfg, False, output_folder=output_folder)


#Convert Detectron2 predictor outputs to a list of {mask (H,W bool), class_id, score}.
def coco_instances_to_masks(outputs, score_thresh: float = 0.25):
    inst = outputs["instances"].to("cpu")
    keep = inst.scores >= score_thresh
    inst = inst[keep]
    results = []
    if inst.has("pred_masks"):
        masks = inst.pred_masks.numpy()  # (N,H,W) bool
        classes = inst.pred_classes.numpy().tolist()
        scores = inst.scores.numpy().tolist()
        for m, c, s in zip(masks, classes, scores):
            results.append({"mask": m, "class_id": c, "score": float(s)})
    return results

#Compute dominant roof azimuth (0°=North, 90°=East) from a binary mask via PCA on pixel coords.
def mask_to_azimuth_deg(mask_bool: np.ndarray) -> Optional[float]:
    ys, xs = np.nonzero(mask_bool)
    if xs.size < 10:
        return None
    pts = np.c_[xs, ys].astype(np.float32)
    pts -= pts.mean(axis=0, keepdims=True)
    # PCA via SVD
    _, _, vt = np.linalg.svd(pts, full_matrices=False)
    vx, vy = vt[0, 0], vt[0, 1]  # principal axis in image coords (x right, y down)
    angle_img = np.degrees(np.arctan2(vy, vx))   # 0 along +x
    azimuth = (90.0 - angle_img) % 180.0         # [0, 180)
    return float(azimuth)


def aggregate_tile_row(
    image_path: str,
    masks: List[Dict],
    class_names: List[str],
    aggregate_azimuth: str = "median"
) -> Dict:
    has_roofs = any(class_names[m["class_id"]] == "roof" for m in masks)
    pv_present = any(class_names[m["class_id"]] == "pv" for m in masks)
    num_roofs = sum(1 for m in masks if class_names[m["class_id"]] == "roof")

    roof_azis = []
    for m in masks:
        if class_names[m["class_id"]] != "roof":
            continue
        az = mask_to_azimuth_deg(m["mask"])
        if az is not None:
            roof_azis.append(az)
    if roof_azis:
        if aggregate_azimuth == "median":
            azimuth_tile = float(np.median(roof_azis))
        elif aggregate_azimuth == "mean":
            azimuth_tile = float(np.mean(roof_azis))
        else:
            azimuth_tile = float(np.median(roof_azis))
    else:
        azimuth_tile = ""

    return {
        "image path": os.path.basename(image_path),
        "has_roofs": int(has_roofs),
        "PV": int(pv_present),
        "num_roofs": int(num_roofs),
        "R/C": "",
        "Azimuth": azimuth_tile
    }


def add_rc_from_counts(df: pd.DataFrame, count_col: str = "num_roofs") -> pd.DataFrame:
    """
    Simple, tunable heuristic to fill R/C:
      - if no roofs: "0"
      - if many small roofs likely: "R"
      - else: "C"
    """
    rc_vals = []
    for _, r in df.iterrows():
        try:
            n = int(r.get(count_col, 0))
        except Exception:
            n = 0
        if int(r.get("has_roofs", 0)) == 0 or n == 0:
            rc_vals.append("0")
        else:
            rc_vals.append("R" if n >= 4 else "C")
    df["R/C"] = rc_vals
    return df

#subcommands

# Training mask R-CNN on COCO-format datasets (train/val).
def cmd_train(args):
    set_seeds(args.seed)

    classes = args.classes
    if len(classes) == 0:
        classes = ["roof"]
    #register datasets
    register_coco(args.train_name, args.train_json, args.train_root, classes)
    register_coco(args.val_name,   args.val_json,   args.val_root,   classes)
    cfg = build_cfg(
        classes=classes,
        output_dir=args.output_dir,
        ims_per_batch=args.ims_per_batch,
        base_lr=args.base_lr,
        max_iter=args.max_iter,
        batch_size_per_image=args.batch_size_per_image,
        img_size_train=args.img_size_train,
        img_size_test=args.img_size_test,
        score_thresh_test=args.score_thresh_test,
    )
    cfg.DATASETS.TRAIN = (args.train_name,)
    cfg.DATASETS.TEST = (args.val_name,)
    trainer = Trainer(cfg)
    trainer.resume_or_load(resume=False)
    trainer.train()
    evaluator = COCOEvaluator(cfg.DATASETS.TEST[0], cfg, False, output_dir=cfg.OUTPUT_DIR)
    val_loader = build_detection_test_loader(cfg, cfg.DATASETS.TEST[0])
    metrics = inference_on_dataset(trainer.model, val_loader, evaluator)
    print("Validation metrics:", metrics)
    print("Weights saved under:", cfg.OUTPUT_DIR)

#Run inference on a folder of PNG/JPEG tiles and write a CSV (optionally XLSX).
def cmd_infer(args):
    classes = args.classes
    if len(classes) == 0:
        classes = ["roof"]
    cfg = build_cfg(
        classes=classes,
        output_dir=os.path.join(os.path.dirname(args.weights), "TMP_infer"),
        img_size_test=args.img_size_test,
        score_thresh_test=args.score_thresh_test,
    )
    cfg.MODEL.WEIGHTS = args.weights
    predictor = DefaultPredictor(cfg)
    exts = ("*.png", "*.jpg", "*.jpeg", "*.tif", "*.tiff")
    imgs = []
    for e in exts:
        imgs.extend(glob.glob(os.path.join(args.img_dir, e)))
    imgs = sorted(imgs)
    if len(imgs) == 0:
        print("No images found in:", args.img_dir, file=sys.stderr)
        sys.exit(1)

    rows = []
    for p in tqdm(imgs, desc="Infer"):
        im = cv2.imread(p, cv2.IMREAD_COLOR)
        if im is None:
            continue
        outputs = predictor(im)
        masks = coco_instances_to_masks(outputs, score_thresh=args.score_thresh_test)
        row = aggregate_tile_row(p, masks, classes, aggregate_azimuth=args.az_agg)
        rows.append(row)

    df = pd.DataFrame(rows)
    if args.autofill_rc:
        df = add_rc_from_counts(df)

    out_csv = args.out_csv
    ensure_dir(os.path.dirname(out_csv)) if os.path.dirname(out_csv) else None
    df.to_csv(out_csv, index=False)
    print("Wrote CSV:", out_csv)

    if args.out_xlsx:
        ensure_dir(os.path.dirname(args.out_xlsx)) if os.path.dirname(args.out_xlsx) else None
        with pd.ExcelWriter(args.out_xlsx) as w:
            df.to_excel(w, index=False, sheet_name=args.sheet_name)
        print("Wrote Excel:", args.out_xlsx)


def cmd_export(args):
    """
    Convert a CSV produced by `infer` to Excel and (optionally) add R/C heuristic.
    """
    df = pd.read_csv(args.in_csv)
    if args.autofill_rc:
        df = add_rc_from_counts(df)
    ensure_dir(os.path.dirname(args.out_xlsx)) if os.path.dirname(args.out_xlsx) else None
    with pd.ExcelWriter(args.out_xlsx) as w:
        df.to_excel(w, index=False, sheet_name=args.sheet_name)
    print("Wrote Excel:", args.out_xlsx)


def build_parser():
    p = argparse.ArgumentParser(
        description="DeepRoof-style roof/PV segmentation + azimuth (single-file)."
    )
    sub = p.add_subparsers(dest="cmd", required=True)

    #Traininf
    t = sub.add_parser("train", help="Train Mask R-CNN on COCO-format datasets.")
    t.add_argument("--train-name", default="roofs_train", help="Detectron2 name for train dataset")
    t.add_argument("--val-name",   default="roofs_val",   help="Detectron2 name for val dataset")
    t.add_argument("--train-json", required=True, help="COCO JSON for train")
    t.add_argument("--train-root", required=True, help="Image root for train")
    t.add_argument("--val-json",   required=True, help="COCO JSON for val")
    t.add_argument("--val-root",   required=True, help="Image root for val")
    t.add_argument("--classes",    nargs="*", default=["roof","pv"], help="Class names in order")
    t.add_argument("--output-dir", default="runs/seg_maskrcnn", help="where to save weights/metrics")
    t.add_argument("--ims-per-batch", type=int, default=8)
    t.add_argument("--base-lr",    type=float, default=2.5e-4)
    t.add_argument("--max-iter",   type=int, default=50000)
    t.add_argument("--batch-size-per-image", type=int, default=256)
    t.add_argument("--img-size-train", type=int, default=1024)
    t.add_argument("--img-size-test",  type=int, default=1024)
    t.add_argument("--score-thresh-test", type=float, default=0.25)
    t.add_argument("--seed", type=int, default=42)
    t.set_defaults(func=cmd_train)

    #Inference
    i = sub.add_parser("infer", help="Infer on a folder of tiles and write CSV/XLSX with azimuth.")
    i.add_argument("--weights", required=True, help="Path to trained .pth/.pkl weights")
    i.add_argument("--img-dir", required=True, help="Folder of PNG/JPG/TIF tiles")
    i.add_argument("--classes", nargs="*", default=["roof","pv"], help="Class names (order must match training)")
    i.add_argument("--img-size-test", type=int, default=1024)
    i.add_argument("--score-thresh-test", type=float, default=0.25)
    i.add_argument("--az-agg", choices=["median","mean"], default="median", help="Aggregate roof azimuth per tile")
    i.add_argument("--autofill-rc", action="store_true", help="Fill R/C via simple heuristic from counts")
    i.add_argument("--out-csv",  default="outputs/infer_table.csv")
    i.add_argument("--out-xlsx", default="", help="Optional Excel path")
    i.add_argument("--sheet-name", default="L")
    i.set_defaults(func=cmd_infer)

    #Exporting results
    e = sub.add_parser("export", help="Convert a CSV to Excel and optionally add R/C heuristic.")
    e.add_argument("--in-csv", required=True)
    e.add_argument("--out-xlsx", required=True)
    e.add_argument("--sheet-name", default="L")
    e.add_argument("--autofill-rc", action="store_true")
    e.set_defaults(func=cmd_export)
    return p


def main():
    parser = build_parser()
    args = parser.parse_args()
    args.func(args)


if __name__ == "__main__":
    main()

ModuleNotFoundError: No module named 'detectron2'