In [1]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from PIL import Image, ImageOps
from skimage.util import random_noise
from skimage import exposure
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, jaccard_score

`Helper functions`

In [None]:
def add_gaussian_noise(image, mean=0, std=0.05):
    np_image = np.array(image) / 255.0 
    noisy_image = random_noise(np_image, mode='gaussian', mean=mean, var=std**2)
    noisy_image = np.clip(noisy_image * 255, 0, 255).astype(np.uint8)
    return Image.fromarray(noisy_image)

def add_salt_pepper_noise(image, amount=0.02):
    np_image = np.array(image) / 255.0
    noisy_image = random_noise(np_image, mode='s&p', amount=amount)
    noisy_image = np.clip(noisy_image * 255, 0, 255).astype(np.uint8)
    return Image.fromarray(noisy_image)

def histogram_equalization(image):
    np_image = np.array(image)
    equalized_image = exposure.equalize_hist(np_image) * 255 
    equalized_image = equalized_image.astype(np.uint8)
    return Image.fromarray(equalized_image)

def augment_image(image, dem):
    augmentations = [
        ('original', None),
        ('horizontal_flip', ImageOps.mirror),
        ('vertical_flip', ImageOps.flip),
        ('rotate_90', lambda img: img.rotate(90)),
        ('gaussian_noise', add_gaussian_noise),
        ('salt_pepper_noise', add_salt_pepper_noise),
        ('hist_eq', histogram_equalization)
    ]

    augmentation = random.choice(augmentations)

    if augmentation[0] == 'original':
        return image, dem
    elif augmentation[0] == 'horizontal_flip' or augmentation[0] == 'vertical_flip':
        return augmentation[1](image), augmentation[1](dem)
    elif augmentation[0] == 'rotate_90':
        return augmentation[1](image), augmentation[1](dem)
    elif augmentation[0] == 'gaussian_noise':
        return augmentation[1](image), augmentation[1](dem)
    elif augmentation[0] == 'salt_pepper_noise':
        return augmentation[1](image), augmentation[1](dem)
    elif augmentation[0] == 'hist_eq':
        return augmentation[1](image), augmentation[1](dem)

def load_image(image_path, target_size):
    image = Image.open(image_path).convert('RGB' if target_size[2] == 3 else 'L')
    transform = transforms.Compose([
        transforms.Resize(target_size[:2]),
    ])
    return transform(image)

def load_real_data(data_dir, target_size=(256, 256)):
    landslide_dir = os.path.join(data_dir, 'landslide')
    non_landslide_dir = os.path.join(data_dir, 'non-landslide')

    images = []
    dems = []
    labels = []

    def process_and_augment(image_path, dem_path, label):
        image = load_image(image_path, target_size + (3,))
        dem = load_image(dem_path, target_size + (1,))

        augmented_image, augmented_dem = augment_image(image, dem)

        images.append(transforms.ToTensor()(image))  # Original image
        dems.append(transforms.ToTensor()(dem))  # Original DEM
        labels.append(label)

        images.append(transforms.ToTensor()(augmented_image))
        dems.append(transforms.ToTensor()(augmented_dem))
        labels.append(label)

    for filename in os.listdir(os.path.join(landslide_dir, 'image')):
        if filename.endswith(".png"):
            image_path = os.path.join(landslide_dir, 'image', filename)
            dem_path = os.path.join(landslide_dir, 'dem', filename)
            process_and_augment(image_path, dem_path, label=1)  # Label 1 for landslide

    for filename in os.listdir(os.path.join(non_landslide_dir, 'image')):
        if filename.endswith(".png"):
            image_path = os.path.join(non_landslide_dir, 'image', filename)
            dem_path = os.path.join(non_landslide_dir, 'dem', filename)
            process_and_augment(image_path, dem_path, label=0)  # Label 0 for non-landslide

    images = torch.stack(images)
    dems = torch.stack(dems)
    labels = torch.tensor(labels, dtype=torch.float32)

    return images, dems, labels

`Network components`

In [2]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ConvBlock, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.conv(x)
        
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.conv1_rgb = ConvBlock(3, 16)
        self.conv1_dem = ConvBlock(1, 8)
        self.conv2 = ConvBlock(24, 32)
        self.conv3 = ConvBlock(32, 64)
        self.conv4 = ConvBlock(64, 128)
        self.bottleneck = ConvBlock(128, 256)
        
        self.global_pool = nn.AdaptiveAvgPool2d(1) 

        self.fc = nn.Sequential(
            nn.Flatten(),           
            nn.Linear(256, 1),       
            nn.Sigmoid()              
        )

    def forward(self, x_rgb, x_dem):
        c1_rgb = self.conv1_rgb(x_rgb)
        c1_dem = self.conv1_dem(x_dem)
        combined = torch.cat([c1_rgb, c1_dem], dim=1)

        c2 = self.conv2(combined)
        p2 = nn.MaxPool2d(kernel_size=2)(c2)

        c3 = self.conv3(p2)
        p3 = nn.MaxPool2d(kernel_size=2)(c3)

        c4 = self.conv4(p3)
        p4 = nn.MaxPool2d(kernel_size=2)(c4)

        bn = self.bottleneck(p4)
        pooled = self.global_pool(bn)  
        return self.fc(pooled)        

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
data_dir = '/kaggle/input/bijie/Bijie_dataset'
images, dems, labels = load_real_data(data_dir, target_size=(256, 256))

In [None]:
len(dems)

In [None]:
from sklearn.model_selection import train_test_split

X_train_img, X_test_img, X_train_dem, X_test_dem, y_train, y_test = train_test_split(
    images, dems, labels, test_size=0.2, random_state=42
)

print(X_train_img.shape, X_train_dem[0].shape, y_train[0].shape)
print("Training data size:", len(X_train_img))
print("Testing data size:", len(X_test_img))

In [None]:
class CustomDataset(Dataset):
    def __init__(self, images, dems, masks):
        self.images = images
        self.dems = dems
        self.masks = masks

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        return {
            'image': self.images[idx],
            'dem': self.dems[idx],
            'label': self.masks[idx]
        }

train_dataset = CustomDataset(X_train_img, X_train_dem, y_train)
val_dataset = CustomDataset(X_test_img, X_test_dem, y_test)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

In [4]:
model = Encoder()#.to(device)
epochs = 200
best_val_loss = float('inf')
best_model_path = 'best_unet_model.pth'

In [5]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params}")

Total parameters: 1184009


In [None]:
criterion = nn.MSELoss()

optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)

for epoch in range(epochs):
    model.train()
    train_loss = 0.0

    for batch in train_loader:
        images = batch['image'].to(device)
        dems = batch['dem'].to(device)
        labels = batch['label'].to(device)

        optimizer.zero_grad()
        outputs = model(images, dems).squeeze(1) 

        binary_preds = (outputs > 0.5).float()

        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_loader)

    model.eval()
    val_loss = 0.0
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for batch in val_loader:
            images = batch['image'].to(device)
            dems = batch['dem'].to(device)
            labels = batch['label'].to(device)

            outputs = model(images, dems).squeeze(1)

            binary_preds = (outputs > 0.5).float()

            loss = criterion(outputs, labels)
            val_loss += loss.item()

            preds = binary_preds.cpu().numpy().astype(int)
            targets = labels.cpu().numpy().astype(int)
            all_preds.append(preds)
            all_targets.append(targets)

    val_loss /= len(val_loader)

    all_preds = np.concatenate([pred.flatten() for pred in all_preds])
    all_targets = np.concatenate([target.flatten() for target in all_targets])

    accuracy = accuracy_score(all_targets, all_preds)
    precision = precision_score(all_targets, all_preds, zero_division=0)
    recall = recall_score(all_targets, all_preds, zero_division=0)
    f1 = f1_score(all_targets, all_preds)
    iou = jaccard_score(all_targets, all_preds)

    print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss}, Val Loss: {val_loss}, Accuracy: {accuracy}, Precision: {precision}, Recall: {recall}, IoU: {iou}, F1: {f1}")

    scheduler.step(val_loss)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), best_model_path)
        print(f"Saved best model with val loss: {best_val_loss}")

print("Training complete. Best model saved to", best_model_path)

In [None]:
from torchinfo import summary

model = Encoder().to(device)
summary(model, input_size=[(1, 3, 256, 256), (1, 1, 256, 256)]) 