In [None]:
# Install Dependencies
!pip install -q pycocotools
!pip install -q 'git+https://github.com/facebookresearch/detectron2.git'
!pip install -q pandas geopandas osmnx folium tqdm

# Imports
import os
import cv2
import json
import random
import numpy as np
import pandas as pd
import geopandas as gpd
import osmnx as ox
import folium
from folium.plugins import HeatMap
from shapely.geometry import Polygon, Point
from tqdm import tqdm
import torch

# Detectron2 Imports
from detectron2.config import get_cfg
from detectron2 import model_zoo
from detectron2.engine import DefaultTrainer, DefaultPredictor, HookBase
from detectron2.data import MetadataCatalog, DatasetCatalog, build_detection_test_loader
from detectron2.data.datasets import register_coco_instances
from detectron2.evaluation import COCOEvaluator, SemSegEvaluator, inference_on_dataset
from detectron2.utils.visualizer import Visualizer, ColorMode
from pycocotools.coco import COCO
from PIL import Image

# Mount Drive
from google.colab import drive
drive.mount('/content/drive')

# Unzip Data (Adjust path as needed)
if not os.path.exists("/content/train"):
    !unzip -q "/content/drive/My Drive/data.zip" -d /content/

In [None]:
# --- Configuration & Registration ---
register_coco_instances("train", {}, "/content/train/_annotations.coco.json", "/content/train")
register_coco_instances("valid", {}, "/content/valid/_annotations.coco.json", "/content/valid")

# Define Semantic Segmentation Helper for Custom Evaluator
def load_semseg_dicts(image_dir, mask_dir):
    dataset_dicts = []
    for fname in sorted(os.listdir(image_dir)):
        if not fname.endswith((".png", ".jpg")): continue
        record = {
            "file_name": os.path.join(image_dir, fname),
            "sem_seg_file_name": os.path.join(mask_dir, fname),
            "image_id": fname,
        }
        dataset_dicts.append(record)
    return dataset_dicts

# --- Custom Classes for Training Monitoring ---
class IOUEvaluator(SemSegEvaluator):
    def __init__(self, dataset_name, distributed=True, output_dir=None):
        super().__init__(dataset_name, distributed, output_dir)
        self.reset()
    def reset(self):
        self._scores = []
    def process(self, inputs, outputs):
        for input_data, output in zip(inputs, outputs):
            pred_mask = output['instances'].pred_masks.float().sum(dim=0) > 0
            gt_mask = input_data['sem_seg'].cpu().numpy()
            # Simple IoU calculation
            intersection = np.logical_and(pred_mask.cpu().numpy(), gt_mask).sum()
            union = np.logical_or(pred_mask.cpu().numpy(), gt_mask).sum()
            iou = intersection / union if union > 0 else 0.0
            self._scores.append(iou)
    def evaluate(self):
        return {"sem_seg": {"IoU": np.mean(self._scores) if self._scores else 0.0}}

class CocoTrainer(DefaultTrainer):
    @classmethod
    def build_evaluator(cls, cfg, dataset_name, output_folder=None):
        if output_folder is None: output_folder = os.path.join(cfg.OUTPUT_DIR, "coco_eval")
        return COCOEvaluator(dataset_name, cfg, distributed=False, output_dir=output_folder)

class IOUHook(HookBase):
    def __init__(self, trainer, val_loader):
        self.trainer = trainer
        self.val_loader = val_loader
        self.best_metric = -1
    def after_step(self):
        if (self.trainer.iter + 1) % self.trainer.cfg.TEST.EVAL_PERIOD == 0:
            metrics = inference_on_dataset(self.trainer.model, self.val_loader, IOUEvaluator("validsemseg"))
            val_iou = metrics["sem_seg"]["IoU"]
            print(f"[Iter {self.trainer.iter}] Validation IoU: {val_iou:.4f}")
            if val_iou > self.best_metric:
                self.best_metric = val_iou
                torch.save(self.trainer.model.state_dict(), os.path.join(self.trainer.cfg.OUTPUT_DIR, "best_model.pth"))

# --- Run Training ---
cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"))
cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml")
cfg.DATASETS.TRAIN = ("train",)
cfg.DATASETS.TEST = ("valid",)
cfg.DATALOADER.NUM_WORKERS = 2
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1
cfg.SOLVER.IMS_PER_BATCH = 4
cfg.SOLVER.BASE_LR = 0.00025
cfg.SOLVER.MAX_ITER = 4000
cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 256
cfg.TEST.EVAL_PERIOD = 500
cfg.OUTPUT_DIR = "/content/output/"
os.makedirs(cfg.OUTPUT_DIR, exist_ok=True)

# Register dummy semseg datasets for the hook
DatasetCatalog.register("validsemseg", lambda: load_semseg_dicts("/content/valid", "/content/validmask")) # Ensure masks exist or skip hook
MetadataCatalog.get("validsemseg").set(evaluator_type="sem_seg", ignore_label=255)

trainer = CocoTrainer(cfg)
trainer.resume_or_load(resume=False)
# Note: Ensure valid_loader is built if using IOUHook, otherwise remove hook
# val_loader = build_detection_test_loader(cfg, "validsemseg")
# trainer.register_hooks([IOUHook(trainer, val_loader)])
trainer.train()

In [None]:
# Setup Predictor with Best Weights
cfg.MODEL.WEIGHTS = os.path.join(cfg.OUTPUT_DIR, "best_model.pth")
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5
predictor = DefaultPredictor(cfg)

# Register Test Set
try:
    register_coco_instances("test_dataset", {}, "/content/test/_annotations.coco.json", "/content/test")
except AssertionError: pass

# Evaluate
evaluator = COCOEvaluator("test_dataset", output_dir="./output/")
val_loader = build_detection_test_loader(cfg, "test_dataset")
print(inference_on_dataset(predictor.model, val_loader, evaluator))

In [None]:
def generate_city_heatmap(city_name, zip_path, output_csv_name, radius_km=None, zoom=19, tile_size_px=640):
    """
    Generates a solar panel heatmap for a specific city.
    1. Downloads/Generates grid coordinates.
    2. Unzips image tiles.
    3. Runs inference.
    4. Saves data and creates a Folium map.
    """
    print(f"\n--- Processing {city_name} ---")

    # 1. Generate Coordinate Map
    print("üåç Generating Tile-to-Coordinate map...")
    tile_coord_map = {}
    try:
        if radius_km:
            # Radius Buffer Method (e.g. Lucknow)
            center_lat, center_lon = ox.geocode(city_name)
            df_point = gpd.GeoDataFrame(geometry=[Point(center_lon, center_lat)], crs="EPSG:4326")
            polygon = df_point.to_crs(epsg=3857).buffer(radius_km * 1000).to_crs(epsg=4326).geometry.iloc[0]
        else:
            # Boundary Method (e.g. Jaipur, Chandigarh)
            gdf_city = ox.geocode_to_gdf(city_name).to_crs(epsg=4326)
            polygon = gdf_city.union_all()

        minx, miny, maxx, maxy = polygon.bounds
        avg_lat_rad = np.radians((miny + maxy) / 2)
        meters_per_pixel = (156543.03 * np.cos(avg_lat_rad)) / (2**zoom)
        tile_size_deg = (tile_size_px * meters_per_pixel) / 111320.0

        current_index = 0
        # Iterate grid
        for lat in np.arange(miny, maxy, tile_size_deg):
            for lon in np.arange(minx, maxx, tile_size_deg):
                # Approximate check if tile is within city/circle
                tile_point = Point(lon, lat)
                if polygon.intersects(tile_point.buffer(tile_size_deg/2)):
                    tile_coord_map[current_index] = (lat, lon)
                current_index += 1
        print(f"‚úÖ Mapped {len(tile_coord_map)} potential tile locations.")
    except Exception as e:
        print(f"‚ùå Error generating map: {e}")
        return

    # 2. Unzip Images
    img_folder = f"/content/{city_name.split(',')[0].replace(' ', '_')}_images"
    if not os.path.exists(img_folder):
        print(f"‚è≥ Unzipping {zip_path}...")
        !unzip -qn "{zip_path}" -d "{img_folder}"

    # Finds the subfolder with images
    target_folder = img_folder
    for root, dirs, files in os.walk(img_folder):
        if any(f.endswith('.png') for f in files):
            target_folder = root
            break

    image_files = sorted(glob.glob(os.path.join(target_folder, "*.png")))
    print(f"‚úÖ Found {len(image_files)} images.")

    # 3. Run Inference
    print("üöÄ Starting Inference...")
    results = []
    for img_path in tqdm(image_files):
        try:
            # Extract ID from filename (assuming tile_123.png format)
            tile_id = int(os.path.basename(img_path).replace("tile_", "").replace(".png", ""))
        except: continue

        img = cv2.imread(img_path)
        if img is None: continue

        outputs = predictor(img)
        instances = outputs["instances"].to("cpu")
        num_panels = len(instances)

        if num_panels > 0:
            lat, lon = tile_coord_map.get(tile_id, (None, None))
            if lat:
                results.append([lat, lon, num_panels])

    # 4. Save & Plot
    if results:
        df = pd.DataFrame(results, columns=['lat', 'lon', 'count'])
        df.to_csv(output_csv_name, index=False)
        print(f"üíæ Saved CSV to {output_csv_name}")

        # Generate Map
        center_lat = df['lat'].mean()
        center_lon = df['lon'].mean()
        m = folium.Map(location=[center_lat, center_lon], zoom_start=13, tiles="CartoDB dark_matter")
        HeatMap(df.values.tolist(), radius=15, blur=10).add_to(m)

        map_path = output_csv_name.replace(".csv", ".html")
        m.save(map_path)
        print(f"‚úÖ Heatmap saved to {map_path}")
    else:
        print("‚ö†Ô∏è No panels detected or coordinate mapping failed.")

# --- Example Usage for Cities mentioned in notebook ---

# 1. Jaipur
# generate_city_heatmap("Jaipur Municipal Corporation, India",
#                       "/content/drive/My Drive/Jaipur_Correct_City_Tiles.zip",
#                       "/content/drive/My Drive/Jaipur_Summary.csv")

# 2. Chandigarh
# generate_city_heatmap("Chandigarh, India",
#                       "/content/drive/My Drive/Chandigarh_Correct_City_Tiles.zip",
#                       "/content/drive/My Drive/Chandigarh_Summary.csv")

# 3. Lucknow (Uses Radius)
# generate_city_heatmap("Lucknow, India",
#                       "/content/drive/My Drive/Lucknow_City_Radius_Tiles.zip",
#                       "/content/drive/My Drive/Lucknow_Summary.csv",
#                       radius_km=14)