<a href="https://colab.research.google.com/github/rtRitesh-rgb/colab-push/blob/Flood-Predection-project/FloodDetection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ============================================
#  SIMPLE FLOOD MAPPING (U-NET) - GOOGLE COLAB
#  optical + SAR 2-channel input
#  Training + Prediction + Polygon Export
#  All in one notebook
# ============================================

!pip install rasterio geopandas shapely opencv-python torch torchvision --quiet

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import rasterio
import cv2
import json
from rasterio.features import shapes
from google.colab import files
import os
import zipfile

print("‚úî All libraries loaded!")


# ============================================
# 1. SIMPLE U-NET MODEL
# ============================================
class SimpleUNet(nn.Module):
    def __init__(self, in_ch=2, out_ch=1):
        super().__init__()
        self.enc1 = nn.Sequential(
            nn.Conv2d(in_ch, 16, 3, padding=1), nn.ReLU(),
            nn.Conv2d(16, 16, 3, padding=1), nn.ReLU()
        )
        self.pool = nn.MaxPool2d(2, 2)

        self.enc2 = nn.Sequential(
            nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(),
            nn.Conv2d(32, 32, 3, padding=1), nn.ReLU()
        )

        self.up = nn.ConvTranspose2d(32, 16, 2, stride=2)

        self.dec1 = nn.Sequential(
            nn.Conv2d(32, 16, 3, padding=1), nn.ReLU(),
            nn.Conv2d(16, 16, 3, padding=1), nn.ReLU()
        )

        self.final = nn.Conv2d(16, out_ch, 1)

    def forward(self, x):
        e1 = self.enc1(x)
        p = self.pool(e1)
        e2 = self.enc2(p)
        up = self.up(e2)
        if up.shape != e1.shape:
            up = F.interpolate(up, size=e1.shape[-2:])
        d1 = self.dec1(torch.cat([e1, up], dim=1))
        return self.final(d1)


# ============================================
# 2. SIMPLE TIFF LOADER
# ============================================
def read_tiff(path):
    with rasterio.open(path) as src:
        img = src.read(1).astype("float32")
        img = (img - img.min()) / (img.max() - img.min() + 1e-6)
    return img


# ============================================
# 3. TRAINING SECTION
# ============================================
from torch.utils.data import Dataset, DataLoader

class SimpleDataset(Dataset):
    def __init__(self, samples):
        self.samples = samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        o, s, m = self.samples[idx]

        opt = read_tiff(o)
        sar = read_tiff(s)
        mask = read_tiff(m)

        H = min(opt.shape[0], sar.shape[0])
        W = min(opt.shape[1], sar.shape[1])

        opt = cv2.resize(opt, (W, H))
        sar = cv2.resize(sar, (W, H))
        mask = cv2.resize(mask, (W, H))

        img = np.stack([opt, sar], axis=0)
        return torch.from_numpy(img).float(), torch.from_numpy(mask).float()


# ============================================
# UPLOAD TRAINING DATA
# ============================================
print("\nüì§ Upload training images (optical, SAR, mask TIFFs)")
print("Upload format: opticalX.tif, sarX.tif, maskX.tif. Or a single .zip file containing them.\n")

uploaded = files.upload()

optical_files = []
sar_files = []
mask_files = []

# Check for a zip file
zip_file_name = None
for f_name in uploaded.keys():
    if f_name.endswith(".zip"):
        zip_file_name = f_name
        break

if zip_file_name:
    print(f"Detected zip file: {zip_file_name}. Extracting...")
    # Create a directory to extract files into
    extract_dir = "training_data"
    os.makedirs(extract_dir, exist_ok=True)
    with zipfile.ZipFile(zip_file_name, 'r') as zip_ref:
        zip_ref.extractall(extract_dir) # Extract to the new directory

    # Now list files from the extracted directory
    all_extracted_files = os.listdir(extract_dir)
    # Prepend the directory name to the file names
    extracted_paths = [os.path.join(extract_dir, f) for f in all_extracted_files]

    # Auto-detect files from extracted paths
    optical_files = sorted([f for f in extracted_paths if os.path.basename(f).lower().startswith("optical")])
    sar_files = sorted([f for f in extracted_paths if os.path.basename(f).lower().startswith("sar")])
    mask_files = sorted([f for f in extracted_paths if os.path.basename(f).lower().startswith("mask")])
else:
    # Original logic for individual file uploads
    optical_files = sorted([f for f in uploaded if f.lower().startswith("optical")])
    sar_files = sorted([f for f in uploaded if f.lower().startswith("sar")])
    mask_files = sorted([f for f in uploaded if f.lower().startswith("mask")])

samples = list(zip(optical_files, sar_files, mask_files))
print("Detected training samples:", samples)

# BUILD DATASET
ds = SimpleDataset(samples)
dl = DataLoader(ds, batch_size=1, shuffle=True)

# TRAIN
model = SimpleUNet(2)
optim = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.BCEWithLogitsLoss()

print("\nüöÄ Training starting...\n")
for epoch in range(10):   # small epochs (Colab friendly)
    for img, mask in dl:
        logits = model(img)
        loss = loss_fn(logits[:, 0], mask)

        optim.zero_grad()
        loss.backward()
        optim.step()

    print(f"Epoch {epoch+1}/10 ‚Äî Loss: {loss.item():.4f}")

torch.save(model.state_dict(), "simple_unet.pth")
print("\n‚úî Model saved as simple_unet.pth")
files.download("simple_unet.pth")


# ============================================
# 4. PREDICTION SECTION
# ============================================
print("\nüì§ Upload optical + SAR for prediction")
uploaded_pred = files.upload()

optical_path = [f for f in uploaded_pred if "optical" in f.lower()][0]
sar_path     = [f for f in uploaded_pred if "sar"     in f.lower()][0]

opt = read_tiff(optical_path)
sar = read_tiff(sar_path)

H = min(opt.shape[0], sar.shape[0])
W = min(opt.shape[1], sar.shape[1])

opt = cv2.resize(opt, (W, H))
sar = cv2.resize(sar, (W, H))

img = torch.from_numpy(np.stack([opt, sar], axis=0)).unsqueeze(0)

model.eval()
with torch.no_grad():
    mask = torch.sigmoid(model(img))[0, 0].numpy()

flood = (mask > 0.5).astype("uint8")

# save mask
with rasterio.open(optical_path) as src:
    meta = src.meta.copy()
meta.update({"count": 1, "dtype": "uint8"})

with rasterio.open("flood_mask.tif", "w", **meta) as dst:
    dst.write(flood, 1)

print("\n‚úî Saved flood_mask.tif")
files.download("flood_mask.tif")


# ============================================
# 5. CONVERT MASK TO POLYGONS
# ============================================
print("\nüîÑ Converting flood mask to polygons...")

def mask_to_geojson(mask_tif, out_geojson):
    with rasterio.open(mask_tif) as src:
        mask = src.read(1)
        transform = src.transform

    results = (
        {"geometry": geom, "properties": {"value": v}}
        for geom, v in shapes(mask.astype(np.int16), mask > 0, transform=transform)
        if v == 1
    )

    fc = {"type": "FeatureCollection", "features": list(results)}

    with open(out_geojson, "w") as f:
        json.dump(fc, f)

mask_to_geojson("flood_mask.tif", "flood_polygons.geojson")

print("‚úî Saved flood_polygons.geojson")
files.download("flood_polygons.geojson")

print("\nüéâ All steps completed! Training ‚Üí Prediction ‚Üí GeoJSON done!")


‚úî All libraries loaded!

üì§ Upload training images (optical, SAR, mask TIFFs)
Upload format: opticalX.tif, sarX.tif, maskX.tif. Or a single .zip file containing them.



Saving flood_training_sample.zip to flood_training_sample (3).zip
Detected zip file: flood_training_sample (3).zip. Extracting...
Detected training samples: [('training_data/optical1.tif', 'training_data/sar1.tif', 'training_data/mask1.tif')]

üöÄ Training starting...

Epoch 1/10 ‚Äî Loss: 0.7524
Epoch 2/10 ‚Äî Loss: 0.7488
Epoch 3/10 ‚Äî Loss: 0.7452
Epoch 4/10 ‚Äî Loss: 0.7415
Epoch 5/10 ‚Äî Loss: 0.7378
Epoch 6/10 ‚Äî Loss: 0.7335
Epoch 7/10 ‚Äî Loss: 0.7313
Epoch 8/10 ‚Äî Loss: 0.7290
Epoch 9/10 ‚Äî Loss: 0.7263
Epoch 10/10 ‚Äî Loss: 0.7231

‚úî Model saved as simple_unet.pth


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


üì§ Upload optical + SAR for prediction


Saving optical1.tif to optical1 (1).tif
Saving sar1.tif to sar1 (1).tif

‚úî Saved flood_mask.tif


  dataset = writer(


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


üîÑ Converting flood mask to polygons...
‚úî Saved flood_polygons.geojson


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


üéâ All steps completed! Training ‚Üí Prediction ‚Üí GeoJSON done!


In [None]:
# =======================================================
# SIMPLE IMPACT ASSESSMENT
#   - Flood area
#   - Buildings affected
#   - Population exposed
# =======================================================

!pip install geopandas rasterstats shapely --quiet

import geopandas as gpd
import rasterio
import numpy as np
from rasterstats import zonal_stats
import json
from rasterio.features import shapes
from google.colab import files

# Define mask_to_geojson here to make the cell self-contained for polygon generation
def mask_to_geojson(mask_tif, out_geojson):
    with rasterio.open(mask_tif) as src:
        mask = src.read(1)
        transform = src.transform

    results = (
        {"geometry": geom, "properties": {"value": v}}
        for geom, v in shapes(mask.astype(np.int16), mask > 0, transform=transform)
        if v == 1
    )

    fc = {"type": "FeatureCollection", "features": list(results)}

    with open(out_geojson, "w") as f:
        json.dump(fc, f)


# ------------------------
# Ensure flood polygons are up-to-date with flood_mask.tif
# ------------------------
print("\nüîÑ Regenerating flood polygons from flood_mask.tif...")
mask_to_geojson("flood_mask.tif", "flood_polygons.geojson")
print("‚úî Saved updated flood_polygons.geojson")

# -----------------------
# 1. Load flood polygons
# -----------------------
flood_gdf = gpd.read_file("flood_polygons.geojson")
print("Loaded polygons:", len(flood_gdf))

# ----------------------------------------------
# 2. (OPTIONAL) Upload building footprints layer
# ----------------------------------------------

print("\nüì§ Upload building footprints (GeoJSON / Shapefile)")
uploaded_bld = files.upload()

bld_path = list(uploaded_bld.keys())[0]
buildings = gpd.read_file(bld_path)

# Reproject to flood CRS
if buildings.crs != flood_gdf.crs:
    buildings = buildings.to_crs(flood_gdf.crs)

# Buildings intersecting flood area
buildings_impacted = gpd.overlay(buildings, flood_gdf, how="intersection")

# -----------------------------------
# Count buildings
# -----------------------------------
total_buildings_impacted = len(buildings_impacted)
print("üè¢ Buildings impacted:", total_buildings_impacted)


# ---------------------------------------------------------
# 3. (OPTIONAL) Upload population raster for population hit
# ---------------------------------------------------------
print("\nüì§ Upload population raster (GeoTIFF)")
uploaded_pop = files.upload()

pop_path = list(uploaded_pop.keys())[0]

# Zonal statistics: sum of population values inside flood area
stats = zonal_stats(
    flood_gdf,
    pop_path,
    stats=["sum"],
    all_touched=True
)

total_population = sum(s["sum"] for s in stats if s["sum"] is not None)
print("üë• Estimated population affected:", total_population)


# --------------------
# 4. Flood area
# --------------------
flood_gdf_area = flood_gdf.to_crs(epsg=3857)
area_km2 = flood_gdf_area.geometry.area.sum() / 1_000_000
print("üåä Flooded area:", round(area_km2, 3), "km¬≤")


# --------------------
# 5. Save report
# --------------------
report = {
    "buildings_impacted": int(total_buildings_impacted),
    "population_affected": float(total_population),
    "flooded_area_km2": float(area_km2)
}

with open("impact_report.json", "w") as f:
    json.dump(report, f, indent=2)

print("\n‚úî Impact report saved as impact_report.json")

files.download("impact_report.json")

In [None]:
import rasterio
import numpy as np

with rasterio.open("flood_mask.tif") as src:
    mask = src.read(1)
    meta = src.meta

mask[50:150, 50:150] = 1  # create artificial flood area

with rasterio.open("flood_mask.tif", "w", **meta) as dst:
    dst.write(mask, 1)

print("‚úî Added artificial flood to mask")


‚úî Added artificial flood to mask


  dataset = writer(
