In [17]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import matplotlib.pyplot as plt
from torchvision import transforms
from tqdm import tqdm
import os
from PIL import Image

In [18]:
# U-Net model definition
class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()

        # Encoder
        self.encoder1 = nn.Conv2d(in_channels, 64, kernel_size=3, padding=1)
        self.encoder2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.encoder3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.encoder4 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2)

        # Decoder
        self.upconv4 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)  # Up-conv for decoder
        self.upconv3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)  # Up-conv for decoder
        self.upconv2 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)   # Up-conv for decoder
        self.upconv1 = nn.ConvTranspose2d(64, 64, kernel_size=2, stride=2)    # Up-conv for decoder

        # Final layer
        self.final = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x):
        # Encoder path
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool(enc1))
        enc3 = self.encoder3(self.pool(enc2))
        enc4 = self.encoder4(self.pool(enc3))
        
        # Bottleneck (no pooling)
        mid = self.pool(enc4)

        # Decoder path (upsampling and concatenation)
        dec4 = self.upconv4(mid)
        dec4 = torch.cat([dec4, enc4], dim=1)  # Concatenate encoder and decoder
        dec3 = self.upconv3(dec4)
        dec3 = torch.cat([dec3, enc3], dim=1)  # Concatenate encoder and decoder
        dec2 = self.upconv2(dec3)
        dec2 = torch.cat([dec2, enc2], dim=1)  # Concatenate encoder and decoder
        dec1 = self.upconv1(dec2)
        dec1 = torch.cat([dec1, enc1], dim=1)  # Concatenate encoder and decoder

        # Final output
        return self.final(dec1)

In [19]:
# Custom dataset class
class GlacierDataset(Dataset):
    def __init__(self, images_dir, masks_dir, transform=None):
        self.images_dir = images_dir
        self.masks_dir = masks_dir
        self.transform = transform
        self.image_paths = sorted(os.listdir(images_dir))
        self.mask_paths = sorted(os.listdir(masks_dir))

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(os.path.join(self.images_dir, self.image_paths[idx]))
        mask = Image.open(os.path.join(self.masks_dir, self.mask_paths[idx]))

        if self.transform:
            image = self.transform(image)
            mask = self.transform(mask)

        return image, mask

In [20]:
# Helper function to calculate IOU
def calculate_iou(pred_mask, true_mask):
    intersection = np.logical_and(pred_mask, true_mask)
    union = np.logical_or(pred_mask, true_mask)
    return np.sum(intersection) / np.sum(union)

In [21]:
# Training function
def train_model(model, train_loader, device, num_epochs=10, lr=0.001):
    criterion = nn.BCEWithLogitsLoss()  # Binary cross entropy loss for segmentation
    optimizer = optim.Adam(model.parameters(), lr=lr)
    model.train()

    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, masks in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}'):
            images, masks = images.to(device), masks.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, masks.unsqueeze(1).float())  # Ensure mask has correct shape
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")


In [22]:
# Visualization and Calculation Function
def visualize_and_calculate(before_image_path, after_image_path, model, device):
    before_image = np.array(plt.imread(before_image_path))
    after_image = np.array(plt.imread(after_image_path))

    # Preprocessing and transforming images to tensor
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    
    before_image = transform(before_image).unsqueeze(0).to(device)
    after_image = transform(after_image).unsqueeze(0).to(device)

    model.eval()

    with torch.no_grad():
        # Predicting masks for both images
        before_output = model(before_image)
        after_output = model(after_image)
        
        before_mask = torch.sigmoid(before_output).squeeze(0).cpu().numpy()
        after_mask = torch.sigmoid(after_output).squeeze(0).cpu().numpy()

        before_mask = (before_mask > 0.5).astype(np.uint8)
        after_mask = (after_mask > 0.5).astype(np.uint8)

        # Calculate IOU
        iou_before_after = calculate_iou(before_mask, after_mask)
        print(f"IoU between Before and After Masks: {iou_before_after:.4f}")

        # Visualize before and after masks
        fig, ax = plt.subplots(1, 3, figsize=(12, 6))
        ax[0].imshow(before_mask, cmap='gray')
        ax[0].set_title("Before Mask")
        ax[1].imshow(after_mask, cmap='gray')
        ax[1].set_title("After Mask")
        ax[2].imshow(np.abs(before_mask - after_mask), cmap='hot')
        ax[2].set_title("Change Mask")
        plt.show()

In [23]:
# Main Script
if __name__ == "__main__":
    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Paths to the dataset (replace with your actual paths)
    images_dir = 'data/images'  # Your images path
    masks_dir = 'data/masks'  # Your masks path
    
    # Dataset and DataLoader setup
    transform = transforms.Compose([
        transforms.Resize((400, 400)),  # Ensure images are of the same size (adjust as needed)
        transforms.ToTensor(),
    ])
    
    dataset = GlacierDataset(images_dir=images_dir, masks_dir=masks_dir, transform=transform)
    train_loader = DataLoader(dataset, batch_size=4, shuffle=True)

    # Initialize the U-Net model
    model = UNet(in_channels=3, out_channels=1).to(device)

    # Train the model
    train_model(model, train_loader, device, num_epochs=10, lr=0.001)

    # After training, visualize and calculate
    before_image_path = "before.png"  # Example image path
    after_image_path = "after.png"  # Example image path
    visualize_and_calculate(before_image_path, after_image_path, model, device)

Epoch 1/10:   0%|          | 0/103 [00:01<?, ?it/s]


RuntimeError: Given transposed=1, weight of size [256, 128, 2, 2], expected input[4, 768, 50, 50] to have 256 channels, but got 768 channels instead