In [26]:
import rasterio
import numpy as np
from rasterio import Affine

from pathlib import Path
import os


In [27]:
# Paths to the datasets
ghs_tif_path = r'Cropped GHS BUILT C\ghs_matched_to_sentinel.tif'
sentinel_b02_path = r'Sentinel2_cropped\GRANULE\L2A_T43SCT_A018349_20181227T055233\IMG_DATA\R10m\T43SCT_20181227T055231_B02_10m.jp2'


In [28]:
# Function to check if two rasters are spatially aligned
def check_raster_alignment(raster1_path, raster2_path, tolerance=1e-6):
    with rasterio.open(raster1_path) as src1, rasterio.open(raster2_path) as src2:
        # Check CRS
        if src1.crs != src2.crs:
            print(f"CRS mismatch: {src1.crs} vs {src2.crs}")
            return False
        
        # Check resolution (pixel size)
        res1 = src1.res
        res2 = src2.res
        if not (abs(res1[0] - res2[0]) < tolerance and abs(res1[1] - res2[1]) < tolerance):
            print(f"Resolution mismatch: {res1} vs {res2}")
            return False
        
        # Check bounds
        bounds1 = src1.bounds
        bounds2 = src2.bounds
        if not all(abs(bounds1[i] - bounds2[i]) < tolerance for i in range(4)):
            print(f"Bounds mismatch: {bounds1} vs {bounds2}")
            return False
        
        # Check dimensions
        if src1.width != src2.width or src1.height != src2.height:
            print(f"Dimensions mismatch: ({src1.width}, {src1.height}) vs ({src2.width}, {src2.height})")
            return False
        
        print("Rasters are spatially aligned!")
        return True

In [29]:
# Verify alignment between GHS and Sentinel-2
print("Checking alignment between GHS BUILT C and Sentinel-2 B02...")
if not check_raster_alignment(ghs_tif_path, sentinel_b02_path):
    print("Warning: Rasters are not aligned. The RES+NRES mask may not correspond correctly to Sentinel-2 data.")
else:
    print("Proceeding with mask creation.")

Checking alignment between GHS BUILT C and Sentinel-2 B02...
Rasters are spatially aligned!
Proceeding with mask creation.


In [30]:

# Create RES+NRES mask from GHS BUILT C
with rasterio.open(ghs_tif_path) as src:
    smod = src.read(1)  # Read SMOD band (1=RES, 2=NRES, 3=Open Space, etc.)
    profile = src.profile  # Get metadata for output

# Verify unique SMOD values
print("Unique SMOD values:", np.unique(smod))

# Create masks
res_mask = (smod == 1).astype(np.uint8)      # Residential areas
nres_mask = (smod == 2).astype(np.uint8)     # Non-residential areas
res_nres_mask = np.isin(smod, [1, 2]).astype(np.float32)  # Combined RES+NRES mask

# Save the RES+NRES mask
output_path = 'res_nres_islamabad.tif'
with rasterio.open(output_path, 'w', **profile) as dst:
    dst.write(res_nres_mask, 1)

print(f"RES+NRES mask saved as '{output_path}'")

Unique SMOD values: [0 1 2]
RES+NRES mask saved as 'res_nres_islamabad.tif'


In [42]:
import rasterio
import numpy as np
from pathlib import Path
from sklearn.model_selection import train_test_split
import os
import glob

In [47]:
# Paths to the datasets
ghs_path = 'Cropped GHS BUILT C/ghs_matched_to_sentinel.tif'
sentinel_bands = {
    'B02': 'Sentinel2_cropped/GRANULE/L2A_T43SCT_A018349_20181227T055233/IMG_DATA/R10m/T43SCT_20181227T055231_B02_10m.jp2',
    'B03': 'Sentinel2_cropped/GRANULE/L2A_T43SCT_A018349_20181227T055233/IMG_DATA/R10m/T43SCT_20181227T055231_B03_10m.jp2',
    'B04': 'Sentinel2_cropped/GRANULE/L2A_T43SCT_A018349_20181227T055233/IMG_DATA/R10m/T43SCT_20181227T055231_B04_10m.jp2',
    'B08': 'Sentinel2_cropped/GRANULE/L2A_T43SCT_A018349_20181227T055233/IMG_DATA/R10m/T43SCT_20181227T055231_B08_10m.jp2'
}

In [48]:
# Verify file existence
for band_name, band_path in sentinel_bands.items():
    if not os.path.exists(band_path):
        raise FileNotFoundError(f"Sentinel-2 band {band_name} not found at {band_path}")
if not os.path.exists(ghs_path):
    raise FileNotFoundError(f"GHS file not found at {ghs_path}")

In [49]:
# Output directories for patches
patch_size = 256  # Reduced to ensure patches are generated
output_dir = Path('training_patches')
output_dir.mkdir(exist_ok=True)
for subset in ['train', 'val', 'test']:
    (output_dir / subset / 'inputs').mkdir(exist_ok=True, parents=True)
    (output_dir / subset / 'labels').mkdir(exist_ok=True, parents=True)

In [52]:
# Load and stack Sentinel-2 bands
bands = []
with rasterio.open(list(sentinel_bands.values())[0]) as src:
    profile = src.profile
for band_path in sentinel_bands.values():
    with rasterio.open(band_path) as src:
        band = src.read(1).astype(np.float32)
        band = band / 10000.0  # Normalize to [0, 1]
        bands.append(band)
input_stack = np.stack(bands, axis=-1)  # Shape: (height, width, 4)
print(f"Input stack shape: {input_stack.shape}")

Input stack shape: (3660, 3660, 4)


In [53]:
# Load the GHS SMOD band
with rasterio.open(ghs_path) as src:
    smod = src.read(1)  # SMOD values: 0 (non-built), 1 (RES), 2 (NRES)
    print("Unique SMOD values:", np.unique(smod))
print(f"SMOD shape: {smod.shape}")

Unique SMOD values: [0 1 2]
SMOD shape: (3660, 3660)


In [54]:
# Convert SMOD to one-hot encoded labels
label_stack = np.stack([
    (smod == 0).astype(np.float32),  # Non-built
    (smod == 1).astype(np.float32),  # RES
    (smod == 2).astype(np.float32)   # NRES
], axis=-1)  # Shape: (height, width, 3)


In [55]:
# Verify dimensions
if input_stack.shape[:2] != smod.shape:
    raise ValueError(f"Input and label dimensions mismatch: {input_stack.shape[:2]} vs {smod.shape}")


In [56]:

# Function to extract patches
def extract_patches(image, label, patch_size, stride=None):
    if stride is None:
        stride = patch_size // 2
    height, width = image.shape[:2]
    patches = []
    labels = []
    discarded = 0
    for y in range(0, height - patch_size + 1, stride):
        for x in range(0, width - patch_size + 1, stride):
            patch = image[y:y+patch_size, x:x+patch_size]
            patch_label = label[y:y+patch_size, x:x+patch_size]
            patches.append(patch)
            labels.append(patch_label)
        else:
            discarded += 1
    print(f"Kept {len(patches)} patches, discarded {discarded} patches")
    return patches, labels


In [57]:
# Extract patches
input_patches, label_patches = extract_patches(input_stack, label_stack, patch_size)
print(f"Total patches generated: {len(input_patches)}")

# Split into train (60%), validation (20%), test (20%)
indices = list(range(len(input_patches)))
if len(input_patches) < 5:
    train_idx = indices
    val_idx = []
    test_idx = []
    print("Too few patches, assigning all to training")
else:
    train_idx, temp_idx = train_test_split(indices, test_size=0.4, random_state=42)
    val_idx, test_idx = train_test_split(temp_idx, test_size=0.5, random_state=42)
print(f"Train: {len(train_idx)}, Val: {len(val_idx)}, Test: {len(test_idx)}")

Kept 729 patches, discarded 27 patches
Total patches generated: 729
Train: 437, Val: 146, Test: 146


In [58]:
# Save patches
for subset, idx in [('train', train_idx), ('val', val_idx), ('test', test_idx)]:
    for i, patch_idx in enumerate(idx):
        np.save(output_dir / subset / 'inputs' / f'patch_{i}.npy', input_patches[patch_idx])
        np.save(output_dir / subset / 'labels' / f'patch_{i}.npy', label_patches[patch_idx])


In [59]:
# Verify saved files
for subset in ['train', 'val', 'test']:
    inputs = glob.glob(str(output_dir / subset / 'inputs' / '*.npy'))
    labels = glob.glob(str(output_dir / subset / 'labels' / '*.npy'))
    print(f"{subset} inputs: {len(inputs)}, labels: {len(labels)}")

train inputs: 437, labels: 437
val inputs: 146, labels: 146
test inputs: 146, labels: 146


In [60]:
import torch
import torch.nn as nn
import torch.nn.functional as F

    
class AttentionGate(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(AttentionGate, self).__init__()
        self.theta_x = nn.Conv2d(in_channels, out_channels, kernel_size=1)
        self.phi_g = nn.Conv2d(out_channels, out_channels, kernel_size=1)
        self.psi = nn.Conv2d(out_channels, 1, kernel_size=1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x, g):
        theta_x = self.theta_x(x)
        phi_g = self.phi_g(g)
        add_xg = theta_x + phi_g
        act_xg = F.relu(add_xg, inplace=True)
        psi = self.psi(act_xg)
        sigmoid_xg = self.sigmoid(psi)
        return x * sigmoid_xg
    

In [61]:

class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ConvBlock, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.conv(x)
    

In [62]:
class AttentionUNet(nn.Module):
    def __init__(self, in_channels=4, out_channels=3):
        super(AttentionUNet, self).__init__()
        # Encoder
        self.c1 = ConvBlock(in_channels, 64)
        self.pool1 = nn.MaxPool2d(2)
        self.c2 = ConvBlock(64, 128)
        self.pool2 = nn.MaxPool2d(2)
        # Bottleneck
        self.c3 = ConvBlock(128, 256)
        # Decoder
        self.up4 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.att4 = AttentionGate(128, 128)
        self.c4 = ConvBlock(256, 128)
        self.up5 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.att5 = AttentionGate(64, 64)
        self.c5 = ConvBlock(128, 64)
        # Output
        self.out = nn.Conv2d(64, out_channels, kernel_size=1)
        self.sigmoid = nn.Sigmoid()  # Fractional output [0, 1] per class

    def forward(self, x):
        # Encoder
        c1 = self.c1(x)
        p1 = self.pool1(c1)
        c2 = self.c2(p1)
        p2 = self.pool2(c2)
        # Bottleneck
        c3 = self.c3(p2)
        # Decoder
        u4 = self.up4(c3)
        att4 = self.att4(c2, u4)
        c4 = self.c4(torch.cat([u4, att4], dim=1))
        u5 = self.up5(c4)
        att5 = self.att5(c1, u5)
        c5 = self.c5(torch.cat([u5, att5], dim=1))
        # Output
        out = self.out(c5)
        return self.sigmoid(out)

In [63]:
# Huber Loss implementation
class HuberLoss(nn.Module):
    def __init__(self, delta=24):
        super(HuberLoss, self).__init__()
        self.delta = delta

    def forward(self, y_pred, y_true):
        residual = torch.abs(y_pred - y_true)
        condition = residual <= self.delta
        small_error = 0.5 * residual ** 2
        large_error = self.delta * residual - 0.5 * self.delta ** 2
        return torch.mean(torch.where(condition, small_error, large_error))

In [64]:
model = AttentionUNet(in_channels=4, out_channels=3)
x = torch.randn(1, 4, 512, 512)
print(model(x).shape)

torch.Size([1, 3, 512, 512])


In [65]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from pathlib import Path
from sklearn.metrics import r2_score

# Custom dataset
class SegmentationDataset(Dataset):
    def __init__(self, input_dir, label_dir):
        self.input_files = sorted(Path(input_dir).glob('patch_*.npy'))
        self.label_files = sorted(Path(label_dir).glob('patch_*.npy'))
        assert len(self.input_files) == len(self.label_files), "Mismatch in input/label counts"

    def __len__(self):
        return len(self.input_files)

    def __getitem__(self, idx):
        input_patch = np.load(self.input_files[idx])  # Shape: (512, 512, 4)
        label_patch = np.load(self.label_files[idx])  # Shape: (512, 512, 3)
        input_patch = torch.from_numpy(input_patch).permute(2, 0, 1).float()  # (4, 512, 512)
        label_patch = torch.from_numpy(label_patch).permute(2, 0, 1).float()  # (3, 512, 512)
        return input_patch, label_patch


In [66]:
# Paths
base_dir = Path('training_patches')
train_dataset = SegmentationDataset(base_dir / 'train' / 'inputs', base_dir / 'train' / 'labels')
val_dataset = SegmentationDataset(base_dir / 'val' / 'inputs', base_dir / 'val' / 'labels')
test_dataset = SegmentationDataset(base_dir / 'test' / 'inputs', base_dir / 'test' / 'labels')

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=0)


In [67]:
# Model, loss, optimizer
device = torch.device('cpu')  # Use CPU for now
model = AttentionUNet(in_channels=4, out_channels=3).to(device)
criterion = HuberLoss(delta=0.5)  # Huber Loss with delta=0.5
optimizer = optim.RMSprop(model.parameters(), lr=0.01)

# Training loop with early stopping
num_epochs = 200
patience = 20
best_val_loss = float('inf')
epochs_no_improve = 0
best_model_path = 'attention_unet_best.pth'

In [None]:
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * inputs.size(0)
    train_loss /= len(train_dataset)

    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * inputs.size(0)
    val_loss /= len(val_dataset)

    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_no_improve = 0
        torch.save(model.state_dict(), best_model_path)
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print("Early stopping triggered")
            break


In [None]:

# Evaluate on test set
model.load_state_dict(torch.load(best_model_path))
model.eval()
test_loss = 0.0
preds, trues = [], []
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item() * inputs.size(0)
        preds.append(outputs.cpu().numpy().flatten())
        trues.append(labels.cpu().numpy().flatten())
test_loss /= len(test_dataset)

# Compute R² and RMSE
preds = np.concatenate(preds)
trues = np.concatenate(trues)
r2 = r2_score(trues, preds)
rmse = np.sqrt(np.mean((preds - trues) ** 2))

print(f"Test Loss: {test_loss:.4f}, R²: {r2:.4f}, RMSE: {rmse:.4f}")
print(f"Best model saved as '{best_model_path}'")