In [None]:
import os
import glob
import numpy as np
from PIL import Image
import rasterio
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
import matplotlib.pyplot as plt


In [None]:
# model
class WaterMaskModel(nn.Module):
    def __init__(self):
        super(WaterMaskModel, self).__init__()
        self.conv1 = nn.Conv2d(6, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(128, 64, kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(64, 32, kernel_size=3, padding=1)
        self.conv6 = nn.Conv2d(32, 1, kernel_size=3, padding=1)

    def forward(self, x):
        x = F.relu(self.conv1(x.float())) 
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = F.relu(self.conv5(x))
        x = torch.sigmoid(self.conv6(x))
        return x

In [None]:
# params

batch_size = 4

num_epochs = 4
model = WaterMaskModel().to('cuda')
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
accumulation_steps = 1


In [None]:
#DataLoader
class datasetloader(Dataset):
    def __init__(self, sar_paths, mask_paths, transform=None):
        self.sar_paths = sar_paths
        self.mask_paths = mask_paths
        self.transform = transform

    def __len__(self):
        return len(self.sar_paths)

    def __getitem__(self, idx):
        sar_path = self.sar_paths[idx]

        # Load all bands from the image using rasterio
        sar_image = np.zeros((6, 512, 512), dtype=np.float32)
        with rasterio.open(sar_path) as src:
            sar_image = np.stack([src.read(band + 1) for band in range(6)])

        if self.mask_paths is not None:
            mask_path = self.mask_paths[idx]
            if mask_path is not None:
                # Load mask if available
                mask = np.array(Image.open(mask_path))
            else:
                mask = None
        else:
            mask = None

        return sar_image, mask


In [None]:
# set paths and load the images and masks filenames for training
path_SAR = "/home/mdhia/DFC2024/DATA/Data/Track1/train/images/"
path_masks = "/home/mdhia/DFC2024/DATA/Data/Track1/train/labels/"
img_files = sorted(glob.glob(os.path.join(path_SAR, '*.tif')))
msk_files = sorted(glob.glob(os.path.join(path_masks, '*.png')))

print("Number of images =", len(img_files))
print("Number of labels =", len(msk_files))

# Create a DataLoader instance for training

data_transform = transforms.Compose([
    transforms.ToTensor(), 
    transforms.Normalize(0.5, 0.5) #mean , #std #maybe should be adapted
 
])
dataset = datasetloader(img_files, msk_files, transform=data_transform)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, pin_memory=True)


# Example: Iterate through the first batch
for sar_images, masks in dataloader:
    print("SAR Image shape:", sar_images.shape)
    print("Mask shape:", masks.shape)
    break

In [None]:
def load_single_example(image_path, mask_path):
    # Load the image
    sar_image = np.zeros((6, 512, 512), dtype=np.float32)
    with rasterio.open(image_path) as src:
        sar_image = np.stack([src.read(band + 1) for band in range(6)])

    # Load mask if available
    ground_truth_mask = None
    if mask_path is not None:
        try:
            ground_truth_mask = np.array(Image.open(mask_path))
        except Exception as e:
            print(f"Error loading mask: {e}")

    return sar_image, ground_truth_mask


---
## ONLY training 

In [None]:
# training

# store training metrics
losses = []
iterations = []

for epoch in range(num_epochs):
    total_loss = 0.0
    for i, (sar_images, masks) in enumerate(dataloader):
        sar_images, masks = sar_images.to('cuda'), masks.to('cuda')

        # forward pass
        outputs = model(sar_images)

        #ground truth masks
        targets = masks.unsqueeze(1).float()
        
        # compute the loss
        loss = criterion(outputs, targets)

        # gradients accumulation
        loss = loss / accumulation_steps
        loss.backward()

        if (i + 1) % accumulation_steps == 0:
            # Perform optimization step only after accumulating gradients for accumulation_steps batches
            optimizer.step()
            optimizer.zero_grad()

        total_loss += loss.item()

        # print information every 10 iterations
        if (i + 1) % 10 == 0:
            print(f'Epoch {epoch + 1}/{num_epochs}, Batch {i + 1}/{len(dataloader)}, Loss: {loss.item()}')

        # Store metrics for plotting
        iterations.append(epoch * len(dataloader) + i)
        losses.append(loss.item())

    # Print average loss for the epoch
    average_loss = total_loss / len(dataloader)
    print(f'Epoch {epoch + 1}/{num_epochs}, Average Loss: {average_loss}')

# Plot the training loss over iterations
plt.plot(iterations, losses, label='Training Loss')
plt.title('Training Loss Over Iterations')
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.legend()
plt.show()


In [None]:
# Set the model to evaluation mode
model.eval()

# index an image and its corresponding mask for comparison
example_index = 795
example_image_path = img_files[example_index]
example_mask_path = msk_files[example_index]

# load indexed image and mask
sar_image, ground_truth_mask = load_single_example(example_image_path, example_mask_path)

# convert to tensor and add batch dimension
sar_image = torch.from_numpy(sar_image).unsqueeze(0).to('cuda')
ground_truth_mask = torch.from_numpy(ground_truth_mask).unsqueeze(0).to('cuda')

# prediction using the trained model
predicted_mask = model(sar_image)

# threshold the predicted mask to have binary values
threshold = 0.35 # we can test and change this value
binary_mask = (predicted_mask > threshold).float()

# convert the binary mask to a numpy array
predicted_mask_np = binary_mask.squeeze().detach().cpu().numpy()

# Plot image, mask, predicted mask
plt.figure(figsize=(12, 4))

# image
plt.subplot(1, 3, 1)
plt.imshow((sar_image.squeeze().cpu().numpy()[5]))
plt.title('SAR Image')
plt.axis('off')

# ground truth mask
plt.subplot(1, 3, 2)
plt.imshow(ground_truth_mask.squeeze().cpu().numpy())
plt.title('Ground Truth Mask')
plt.axis('off')

# predicted mask
plt.subplot(1, 3, 3)
plt.imshow(predicted_mask_np)
plt.title('Predicted Mask')
plt.axis('off')

plt.tight_layout()
plt.show()

In [None]:
import os
import cv2
import numpy as np
import torch
import glob

# Set the model to evaluation mode
model.eval()

# Specify the path to the images folder
image_folder = "/home/mdhia/DFC2024/DATA/Data/Track1/val/images/"
img_files = sorted(glob.glob(os.path.join(image_folder, '*.tif')))

# Create a folder to save the predicted masks
output_folder = "/home/mdhia/DFC2024/wab"
os.makedirs(output_folder, exist_ok=True)

# Iterate through all images in the folder and save the predicted masks
for i, image_path in enumerate(img_files):
    # Load the image
    sar_image, _ = load_single_example(image_path, None)  # Set the mask to None or use a placeholder

    # Convert to tensor and add batch dimension
    sar_image = torch.from_numpy(sar_image).unsqueeze(0).to('cuda')

    # Prediction using the trained model
    predicted_mask = model(sar_image)

    # Threshold the predicted mask to have binary values
    threshold = 0.5  # You can test and change this value
    binary_mask = (predicted_mask > threshold).float()

    # Convert the binary mask to a numpy array
    binary_mask_np = binary_mask.squeeze().detach().cpu().numpy().astype(np.uint8)

    # Extract the filename without extension
    file_name = os.path.splitext(os.path.basename(image_path))[0]

    # Save the generated mask with the same name as the image
    save_path = os.path.join(output_folder, f'{file_name}_mask.png')
    cv2.imwrite(save_path, binary_mask_np)

print("Generated masks saved in:", output_folder)


---
## Train & test

In [None]:
# params
num_epochs = 2
model = WaterMaskModel().to('cuda')
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
accumulation_steps = 1

In [None]:
torch.cuda.empty_cache()

In [None]:
# Lists to store training and test metrics for plotting
train_losses = []
test_losses = []
test_accuracies = []
test_f1_scores = []

# Training loop
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    for i, (sar_images, masks) in enumerate(dataloader):
        sar_images, masks = sar_images.to('cuda'), masks.to('cuda')

        # Forward pass
        outputs = model(sar_images)

        # Assuming you have a binary water mask ground truth
        targets = masks.unsqueeze(1).float()
        
        # Compute the loss
        loss = criterion(outputs, targets)

        # Accumulate gradients
        loss = loss / accumulation_steps
        loss.backward()

        if (i + 1) % accumulation_steps == 0:
            # Perform optimization step only after accumulating gradients for accumulation_steps batches
            optimizer.step()
            optimizer.zero_grad()

        total_loss += loss.item()

        # Print detailed information every few iterations
        if (i + 1) % 10 == 0:
            print(f'Epoch {epoch + 1}/{num_epochs}, Batch {i + 1}/{len(dataloader)}, Loss: {loss.item()}')

    # Store training loss for plotting
    average_loss = total_loss / len(dataloader)
    train_losses.append(average_loss)

    # Validation loop (on the test set)
    model.eval()
    total_correct = 0
    total_samples = 0
    f1_scores = []
    test_loss = 0.0

    with torch.no_grad():
        for i, (sar_images, masks) in enumerate(dataloader_test):
            sar_images, masks = sar_images.to('cuda'), masks.to('cuda')

            # Forward pass
            outputs = model(sar_images)

            # Assuming you have a binary water mask ground truth
            targets = masks.unsqueeze(1).float()

            # Compute accuracy
            predicted_labels = (outputs > 0.5).float()
            correct = (predicted_labels == targets).sum().item()
            total_correct += correct
            total_samples += targets.numel()

            # Compute F1 score
            tp = (predicted_labels * targets).sum().item()
            fp = ((predicted_labels == 1) & (targets == 0)).sum().item()
            fn = ((predicted_labels == 0) & (targets == 1)).sum().item()

            precision = tp / (tp + fp + 1e-8)
            recall = tp / (tp + fn + 1e-8)
            f1 = 2 * (precision * recall) / (precision + recall + 1e-8)
            f1_scores.append(f1)

            # Compute test loss
            test_loss += criterion(outputs, targets).item()

    # Store test loss, accuracy, and F1 score for plotting
    average_test_loss = test_loss / len(dataloader_test)
    test_losses.append(average_test_loss)

    test_accuracy = total_correct / total_samples
    test_accuracies.append(test_accuracy)

    average_f1_score = sum(f1_scores) / len(f1_scores)
    test_f1_scores.append(average_f1_score)

    print(f'Epoch {epoch + 1}/{num_epochs}, Average Training Loss: {average_loss}, Average Test Loss: {average_test_loss}, Test Accuracy: {test_accuracy * 100:.2f}%, Average F1 Score: {average_f1_score:.4f}')




In [None]:
# Plot training and test loss over epochs
plt.plot(range(1, num_epochs + 1), train_losses, label='Training Loss')
plt.plot(range(1, num_epochs + 1), test_losses, label='Test Loss')
plt.title('Training and Test Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plot test accuracy and F1 score over epochs
plt.plot(range(1, num_epochs + 1), test_accuracies, label='Test Accuracy')
plt.plot(range(1, num_epochs + 1), test_f1_scores, label='Test F1 Score')
plt.title('Test Accuracy and F1 Score Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Metric Value')
plt.legend()
plt.show()

In [None]:
# Set the model to evaluation mode
model.eval()

# index an image and its corresponding mask for comparison
example_index = 45 
example_image_path = img_files[example_index]
example_mask_path = msk_files[example_index]

# load indexed image and mask
sar_image, ground_truth_mask = load_single_example(example_image_path, example_mask_path)

# convert to tensor and add batch dimension
sar_image = torch.from_numpy(sar_image).unsqueeze(0).to('cuda')
ground_truth_mask = torch.from_numpy(ground_truth_mask).unsqueeze(0).to('cuda')

# prediction using the trained model
predicted_mask = model(sar_image)

# threshold the predicted mask to have binary values
threshold = 0.4 # we can test and change this value
binary_mask = (predicted_mask > threshold).float()

# convert the binary mask to a numpy array
predicted_mask_np = binary_mask.squeeze().detach().cpu().numpy()

# Plot image, mask, predicted mask
plt.figure(figsize=(12, 4))

# image
plt.subplot(1, 3, 1)
plt.imshow((sar_image.squeeze().cpu().numpy()[5]))
plt.title('SAR Image')
plt.axis('off')

# ground truth mask
plt.subplot(1, 3, 2)
plt.imshow(ground_truth_mask.squeeze().cpu().numpy())
plt.title('Ground Truth Mask')
plt.axis('off')

# predicted mask
plt.subplot(1, 3, 3)
plt.imshow(predicted_mask_np)
plt.title('Predicted Mask')
plt.axis('off')

plt.tight_layout()
plt.show()

---
## Playground

In [None]:

# Specify the path to your raster image
image_path = '/home/mdhia/DFC2024/DATA/subset/train/images/0.tif'

for i in range (6):
    with rasterio.open(image_path) as src:
        bands = src.read(i+1)  # reading band
        metadata = src.meta

metadata

In [None]:
image_path = '/home/mdhia/DFC2024/DATA/subset/train/images/0.tif'

bands_list = []

# Open the raster image and read each band
for i in range(6):
    with rasterio.open(image_path) as src:
        band = src.read(i + 1)
        bands_list.append(band)

# Stack the bands into a single NumPy array
sar_image = np.stack(bands_list)


In [None]:
with rasterio.open(image_path) as src:
    sar_image = np.stack([src.read(band + 1) for band in range(6)])

In [None]:
sar_image.shape

In [None]:
mask = np.array(Image.open("/home/mdhia/DFC2024/DATA/subset/train/labels/0.png"))
mask.shape
