In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
import shutil
import os
import re

def generate_label_mapping(root_dir, other_dir, input_subdir, output_csv):
    """
    Generate a CSV mapping input chips to corresponding segmentation maps.
s2_train/s2_train//chips/chip_20170701_S2A_MSIL2A_T42RWQ_20170526T054641_2_20.tif'
    Args:
        root_dir (str or Path): Root directory containing the subdirectories for chips and segmentation maps.
        input_subdir (str): Subdirectory path for chips within the root directory.
        output_csv (str or Path): Output path for the generated CSV file.
    """
    root_dir = Path(root_dir)
    chips_orig = os.listdir(root_dir / input_subdir / "chips")

    chips = [chip.replace("chip", f"{input_subdir}/chips/chip") for chip in chips_orig]
    seg_maps = [chip.replace("chip", f"{input_subdir}/seg_maps/seg_map") for chip in chips_orig]

    df = pd.DataFrame({"Input": chips, "Label": seg_maps})
    df.to_csv(other_dir + '/' + output_csv, index=False)
    
    print(f"Number of rows is: {df.shape[0]}")
    print(f"CSV generated and saved to: {root_dir / output_csv}")
    

In [None]:
generate_label_mapping('/kaggle/input/geo-ai-hack/', '/kaggle/working/',"s2_train/s2_train", "s2_train_ds.csv")
generate_label_mapping('/kaggle/input/geo-ai-hack/', '/kaggle/working/', "s2_test/s2_test", "s2_test_ds.csv")

In [None]:
!pip install rasterio
import os
import torch
import numpy as np
import rasterio
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from transformers import ViTModel, ViTConfig
import torch.optim as optim
import torch.nn as nn
import pandas as pd
from tqdm import tqdm

# Custom Dataset Class with 3 Time-Step Stacking
class CustomSegmentationDataset(Dataset):
    def __init__(self, image_paths, mask_paths=None, transform=None, is_test=False):
        self.image_paths = image_paths
        self.mask_paths = mask_paths if not is_test else None
        self.transform = transform
        self.is_test = is_test
        self.samples = []
        
        # 🔥 Ne garder que les x premiers échantillons
        self.samples = self.samples[:10]  # For debugging purposes

        if not is_test:
            for i in range(len(image_paths) - 2):  # Ensure 3 consecutive images exist
                if os.path.exists(image_paths[i]) and os.path.exists(image_paths[i+1]) and os.path.exists(image_paths[i+2]) and os.path.exists(mask_paths[i]):
                    self.samples.append((image_paths[i], image_paths[i+1], image_paths[i+2], mask_paths[i]))
                else:
                    print(f"Missing files for index {i}: {[image_paths[i], image_paths[i+1], image_paths[i+2], mask_paths[i]]}")
        else:
            for i in range(len(image_paths) - 2):
                if os.path.exists(image_paths[i]) and os.path.exists(image_paths[i+1]) and os.path.exists(image_paths[i+2]):
                    self.samples.append((image_paths[i], image_paths[i+1], image_paths[i+2], None))
                else:
                    print(f"Missing files for index {i}: {[image_paths[i], image_paths[i+1], image_paths[i+2]]}")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img1_path, img2_path, img3_path, mask_path = self.samples[idx]
        preprocessing = Preprocessing()
        img1 = preprocessing.preprocess_image(img1_path)
        img2 = preprocessing.preprocess_image(img2_path)
        img3 = preprocessing.preprocess_image(img3_path)
        
        # Stack images as additional channels (8*3 = 24 channels total)
        image = np.concatenate([img1, img2, img3], axis=-1)
        image = torch.tensor(image.transpose(2, 0, 1), dtype=torch.float32)

        if self.is_test:
            return image, {"image_id": torch.tensor([idx])}
        else:
            mask = plt.imread(mask_path)
            if mask.ndim == 3:
                mask = mask[..., 0]
            mask = torch.tensor(mask, dtype=torch.long)
            return image, mask

# Preprocessing Class for 8-Channel Images
import cv2  # OpenCV for resizing

class Preprocessing:
    def preprocess_image(self, image_path):
        with rasterio.open(image_path) as src:
            bands = [src.read(i) for i in range(1, 7)]
        
        ndvi = self.compute_ndvi(bands[2], bands[3])
        evi = self.compute_evi(bands[3], bands[2], bands[0])
        ndwi = self.compute_ndwi(bands[1], bands[3])
        nbr = self.compute_nbr(bands[3], bands[5])
        normalized_bands = [self.normalize_band(band) for band in bands]
        image = np.stack(normalized_bands + [ndvi, evi, ndwi, nbr], axis=-1)  # Shape: (H, W, 8)

        # 🔥 Resize to 256x256
        image = cv2.resize(image, (256, 256), interpolation=cv2.INTER_NEAREST)
        
        return image

    def normalize_band(self, band):
        min_val, max_val = np.min(band), np.max(band)
        return (band - min_val) / (max_val - min_val + 1e-6)

    def compute_ndvi(self, red, nir):
        return (nir - red) / (nir + red + 1e-6)

    def compute_evi(self, nir, red, blue, g=2.5, c1=6, c2=7.5, l=1, epsilon=1e-6):
        denominator = nir + c1 * red - c2 * blue + l
        denominator = np.where(denominator == 0, epsilon, denominator)  # Avoid zero division
        return np.clip(g * (nir - red) / denominator, 0, 1)
        
 # TODO: add ndwi and change for 10 bands       
    def compute_ndwi(self, green, nir, epsilon=1e-6):
        denominator = green + nir
        denominator = np.where(denominator == 0, epsilon, denominator)  # Avoid zero division
        return (green - nir) / denominator
    
    def compute_nbr(self, nir, swir2, epsilon=1e-6):
        denominator = nir + swir2
        denominator = np.where(denominator == 0, epsilon, denominator)  # Avoid zero division
        return (nir - swir2) / denominator


# Vision Transformer for Segmentation (Updated for 30 channels) # 24
import torch.nn.functional as F

class ViTSegmentationModel(nn.Module):
    def __init__(self, num_classes=2):
        super(ViTSegmentationModel, self).__init__()

        # ViT configuration with 30 channels # 24
        config = ViTConfig(
            image_size=256,        
            patch_size=16,         
            num_channels=30,      # 24 
            hidden_size=384,       
            num_attention_heads=6, 
            num_hidden_layers=12,  
            intermediate_size=768, 
            num_classes=num_classes
        )

        self.vit = ViTModel(config)

        # Segmentation decoder head
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(384, 128, kernel_size=2, stride=2),
            nn.ReLU(),
            nn.ConvTranspose2d(128, num_classes, kernel_size=2, stride=2)
        )

        # Final upsampling layer to match 256×256
        self.upsample = nn.Upsample(size=(256, 256), mode='bilinear', align_corners=False)

    def forward(self, x):
        batch_size = x.shape[0]
        features = self.vit(x).last_hidden_state  # (batch_size, 257, hidden_size)

        # Remove CLS token
        features = features[:, 1:, :]  # Shape: (batch_size, 256, hidden_size)

        # Reshape for decoder
        spatial_size = int(features.shape[1] ** 0.5)  # Should be 16
        features = features.permute(0, 2, 1).reshape(batch_size, 384, spatial_size, spatial_size)  # (batch, 384, 16, 16)

        output = self.decoder(features)  # (batch, num_classes, 64, 64)

        # Upsample to 256×256
        output = self.upsample(output)  # (batch, num_classes, 256, 256)

        return output

# Update the input image size and patch size based on your dataset dimensions

# Training Setup
train_csv = pd.read_csv("/kaggle/working/s2_train_ds.csv")
test_csv = pd.read_csv("/kaggle/working/s2_test_ds.csv")
train_image_paths = train_csv["Input"].tolist()
train_mask_paths = train_csv["Label"].tolist()
test_image_paths = test_csv["Input"].tolist()

%cd /kaggle/input/geo-ai-hack

train_dataset = CustomSegmentationDataset(image_paths=train_image_paths, mask_paths=train_mask_paths)
test_dataset = CustomSegmentationDataset(image_paths=test_image_paths, is_test=True)
train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=False)

# Model Training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ViTSegmentationModel().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss()

for epoch in range(2):
    model.train()
    pbar = tqdm(train_dataloader, desc=f"Epoch {epoch+1}")
    for images, masks in pbar:
        images, masks = images.to(device), masks.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        pbar.set_postfix({"Loss": loss.item()})
    print(f"Epoch {epoch+1} complete")

# Save Model
torch.save(model.state_dict(), "/kaggle/working/vit_segmentation.pth")


In [None]:
import torch
import numpy as np
from sklearn.metrics import confusion_matrix
from tqdm import tqdm
import matplotlib.pyplot as plt

# Fonction pour calculer l'IoU
def compute_iou(preds, targets, num_classes):
    iou = []
    preds = preds.flatten()
    targets = targets.flatten()

    for cls in range(num_classes):
        intersection = np.sum((preds == cls) & (targets == cls))
        union = np.sum((preds == cls) | (targets == cls))
        iou.append(intersection / (union + 1e-6))  # Éviter la division par zéro
    
    return np.mean(iou)

def evaluate(model, dataloader, device, num_classes):
    model.eval()
    all_preds = []
    all_targets = []
    total_loss = 0

    with torch.no_grad():
        pbar = tqdm(dataloader, desc="Evaluating")
        for images, masks in pbar:
            images = images.to(device)
            
            # Si on est en phase de test, masks sera un dictionnaire
            if masks is not None:
                masks = masks.to(device)
                outputs = model(images)
                loss = criterion(outputs, masks)
                total_loss += loss.item()

                preds = torch.argmax(outputs, dim=1)
                all_preds.append(preds.cpu().numpy())
                all_targets.append(masks.cpu().numpy())
            else:
                outputs = model(images)
                preds = torch.argmax(outputs, dim=1)
                all_preds.append(preds.cpu().numpy())

        # Calcul des métriques
        all_preds = np.concatenate(all_preds, axis=0)
        if all_targets:  # Si des cibles sont disponibles (dans le cas d'une phase de test avec des masques)
            all_targets = np.concatenate(all_targets, axis=0)
            # Calcul de l'IoU
            iou = compute_iou(all_preds, all_targets, num_classes)
            # Calcul des autres métriques
            cm = confusion_matrix(all_targets.flatten(), all_preds.flatten(), labels=np.arange(num_classes))
            precision = cm.diagonal() / (cm.sum(axis=0) + 1e-6)
            recall = cm.diagonal() / (cm.sum(axis=1) + 1e-6)
            f1 = 2 * (precision * recall) / (precision + recall + 1e-6)
            return total_loss / len(dataloader), iou, precision, recall, f1
        else:
            # Si pas de masque de test, calculer seulement les prédictions
            return all_preds


# Paramètres d'évaluation
num_classes = 2  # Ajustez en fonction de votre cas
model = ViTSegmentationModel().to(device)
model.load_state_dict(torch.load("/kaggle/working/vit_segmentation.pth"))
model.eval()

# Évaluation sur les données de test
test_loss, iou, precision, recall, f1 = evaluate(model, train_dataloader, device, num_classes)

# Affichage des résultats
print(f"Test Loss: {test_loss:.4f}")
print(f"Mean IoU: {iou:.4f}")
print(f"Precision per class: {precision}")
print(f"Recall per class: {recall}")
print(f"F1 score per class: {f1}")

# Optionnel: Afficher une image test avec la prédiction
def show_sample(model, dataloader, device):
    model.eval()
    images, masks = next(iter(dataloader))
    images, masks = images.to(device), masks.to(device)

    with torch.no_grad():
        outputs = model(images)
        preds = torch.argmax(outputs, dim=1)

    image = images[0].cpu().numpy().transpose(1, 2, 0)  # Convertir en image
    mask = masks[0].cpu().numpy()
    pred = preds[0].cpu().numpy()

    fig, ax = plt.subplots(1, 3, figsize=(12, 4))
    ax[0].imshow(image)
    ax[0].set_title("Image")
    ax[1].imshow(mask)
    ax[1].set_title("True Mask")
    ax[2].imshow(pred)
    ax[2].set_title("Predicted Mask")
    plt.show()

show_sample(model, train_dataloader, device)
