#### Tarea: Detección de cambios basado en Segmentación Semántica

#### Dataset: LEVIR-CD+ [LEarning Vision and Remote sensing laboratory]

* Tipo: No-Geoespacial,
* Fuete: Google Earth, 
* muestras: 985, 
* clases: 2 (cambio=255 o no cambio=0), 
* tamaño imágenes: 1024x1024 pixeles
* resolución: 50 cm, 
* 3 bandas espectrales (RGB),
* 20 regiones urbanas diferentes en Texas, USA,
* imágenes bi-temporales con span de 5 años (entre 2002 y 2020)

https://justchenhao.github.io/LEVIR/

https://arxiv.org/abs/2107.09244

Modelo utilizado: Unet

(fecha: 3-2-2023)

***
#### Importamos las librerías necesarias

In [1]:
import torchgeo
from torchgeo.datasets import LEVIRCDPlus
from torchgeo.datasets.utils import unbind_samples
from torchgeo.trainers import SemanticSegmentationTask
from torchgeo.datamodules.utils import dataset_split

import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger

import torch
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

import torchvision
from torchvision.transforms import Compose

import kornia.augmentation as K
import matplotlib.pyplot as plt
import numpy as np
import os


Verifico si tengo GPU disponible

In [2]:
print(torch.cuda.is_available())
torch.cuda.empty_cache()

True


***
#### Creamos un directorio para guardar los checkpoint (*.ckpt) del entrenamiento de nuestra red neuronal (unet).

In [3]:
exp_name = "exp_1"
exp_dir = f"Checkpoint/{exp_name}"
os.makedirs(exp_dir, exist_ok=True)

***
#### Seteamos parámetros e hiperparámetros

In [4]:
batch_size = 8
lr = 0.0001  #learning rate
gpu_id = 0   #si tengo varias GPUs, selecciono la que quiero usar
# device = torch.device(f"cuda:{gpu_id}")
num_workers = 12
patch_size = 256  #tamaño del crop (256x256 pixeles)
val_split_pct = 0.2  #20% del conjunto de entrenamiento se separará para validación


***
#### Descargamos el dataset (archivo zip), lo descomprimos y dividimos (split) en dos conjuntos:

* train: 65% --> 637 imágenes
* test: 35%  --> 348 imágenes

In [5]:
#los datos se alojarán en la carpeta "LEVIRCDPlus"
train_dataset = LEVIRCDPlus(root="LEVIRCDPlus", split="train", download=True, checksum=True)
test_dataset = LEVIRCDPlus(root="LEVIRCDPlus", split="test", download=True, checksum=True)

print(f'train: {len(train_dataset)} images')
print(f'test: {len(test_dataset)} images')

Files already downloaded and verified
Files already downloaded and verified
train: 637 images
test: 348 images


***
#### Extenderemos la task `SemanticSegmentationTask` de TorchGeos. 

In [6]:
class CustomSemanticSegmentationTask(SemanticSegmentationTask):
    
    def plot(self, sample):
        # sample["image"] es un tensor de pytorch de tamaño (6, 256, 256)
        # c/sample contiene 4 imágenes, 2 RGB y 2 B/W
        image1 = sample["image"][:3]
        image2 = sample["image"][3:]
        mask = sample["mask"]
        prediction = sample["prediction"]

        #grafico las 4 imágenes y las visualizo con Tensorboard
        fig, axs = plt.subplots(nrows=1, ncols=4, figsize=(4*5, 5))
        axs[0].imshow(image1.permute(1,2,0)) #(3, 256, 256) --> (256, 256, 3)
        axs[0].axis("off")
        axs[1].imshow(image2.permute(1,2,0))
        axs[1].axis("off")
        axs[2].imshow(mask)  #(1024, 1024)
        axs[2].axis("off")
        axs[3].imshow(prediction) #(1024, 1024)
        axs[3].axis("off")


        axs[0].set_title("image 1")
        axs[1].set_title("image 2")
        axs[2].set_title("mask")
        axs[3].set_title("prediction")

        plt.tight_layout()
        return fig

    def training_step(self, *args, **kwargs):
        """Computa y devuelve la training loss.
        La diferencia entre este código y el original de SemanticSegmentationTask
        es el uso de la función de ploteo

        Args:
            batch: la salida del DataLoader

        Returns:
            training loss
        """

        batch = args[0]
        batch_idx = args[1]

        x = batch["image"]
        y = batch["mask"]

        y_hat = self.forward(x)
        y_hat_hard = y_hat.argmax(dim=1)

        loss = self.loss(y_hat, y)

        # Mientras entrenamos el modelo, vamos a registrar en un log los resultados del entrenamiento. 
        # En TensorBoard podremos ver la evolución de la función de pérdida de entrenamiento
        self.log("train_loss", loss, on_step=True, on_epoch=False)
        self.train_metrics(y_hat_hard , y)

        if batch_idx < 10:
            batch["prediction"] = y_hat_hard
            
            for key in ["image", "mask", "prediction"]:
                batch[key] = batch[key].cpu()
            
            sample = unbind_samples(batch)[0]

            fig = self.plot(sample)
            
            summary_writer = self.logger.experiment
            summary_writer.add_figure(f"image/train/{batch_idx}", fig, global_step = self.global_step)
            # Visualizaremos en TensorBoard las todas las tulas de 4 imágenes 
            # (image1, image2, mask, prediction) de los primeros 10 batches.
            # Se observa la evolución del aprendizaje del modelo (ver prediction) a lo largo los epochs
                        
            plt.close()

        return loss

    def validation_step(self, *args, **kwargs):

        """Computa la pérdida para el conjunto de validación 
        y registra en un log las predicciones de ejemplo.

        Args:
            batch: la salida del DataLoader
            batch_idx: el índice de este patch
        """

        batch = args[0]
        batch_idx = args[1]

        x = batch["image"]
        y = batch["mask"]

        y_hat = self.forward(x)
        y_hat_hard = y_hat.argmax(dim=1)

        loss = self.loss(y_hat, y)

        self.log("val_loss", loss, on_step=False, on_epoch=True)
        self.val_metrics(y_hat_hard, y)

        #idem a trianing
        if batch_idx < 10:
            batch["prediction"] = y_hat_hard
            for key in ["image", "mask", "prediction"]:
                batch[key] = batch[key].cpu()

            sample = unbind_samples(batch)[0]
            fig = self.plot(sample)
            summary_writer = self.logger.experiment
            summary_writer.add_figure(f"image/val/{batch_idx}", fig, global_step = self.global_step)
            plt.close()
            
    def test_step(self, *args, **kwargs): #NEW from original
        """Computa la pérdida en el conjunto de testeo.

        Args:
            batch: la salida del DataLoader
        """
        batch = args[0]
        x = batch["image"]
        y = batch["mask"]
        y_hat = self(x)
        y_hat_hard = y_hat.argmax(dim=1)

        loss = self.loss(y_hat, y)

        # Los steps de test y validación sólo registran logs por c/epoch
        self.log("test_loss", loss, on_step=False, on_epoch=True)
        self.test_metrics(y_hat_hard, y)

***
#### Creamos un nuevo `LightningDataModule` para el dataset LEVIR-CD+

In [7]:
class LEVIRCDPlusDataModule(pl.LightningDataModule):

    def __init__(
        self,
        batch_size=12,
        num_workers=0,
        val_split_pct=0.2,
        patch_size=(256,256),
        **kwargs,
    ):
        super().__init__()

        self.batch_size = batch_size
        self.num_workers = num_workers
        self.val_split_pct = val_split_pct
        self.patch_size = patch_size
        self.kwargs = kwargs
    
    def on_after_batch_transfer(self, batch, batch_idx):
        if (
            hasattr(self, "trainer")
            and self.trainer is not None
            and hasattr(self.trainer, "training")
            and self.trainer.training
        ):

            #Kornia espera que masks tenga datos de tipo "float" con un canal de dimensión
            x = batch["image"]  #[12, 6, 1024, 1024]
            y = batch["mask"].float().unsqueeze(1)  #[12, 1024, 1024] --> [12, 1, 1024, 1024])

            #Aplicamos Augmentations a nuestros datos usando la librería Kornia.
            train_augmentations = K.AugmentationSequential(
                K.RandomRotation(p=0.5, degrees=90),
                K.RandomHorizontalFlip(p=0.5),
                K.RandomVerticalFlip(p=0.5),
                K.RandomCrop(self.patch_size),
                K.RandomSharpness(p=0.5),
                data_keys=["input", "mask"],
            )

            x, y = train_augmentations(x, y)

            #torchmetrics espera que masks tenga datos de tipo "long" sin un canal de dimensión
            batch["image"] = x
            batch["mask"] = y.squeeze(1).long()
        
        return batch
    
    def preprocess(self, sample):  
        #normalizamos los datos de las imágenes, uint8 --> float [0, 1]
        sample["image"] = (sample["image"]/255.0).float() #[2, 3, 1024, 1024]
        sample["image"] = torch.flatten(sample["image"], 0, 1) #[6, 1024, 1024]
        sample["mask"] = sample["mask"].long() #[1024, 1024]
        
        return sample
        
    def setup(self, stage=None):
        #Definimos los transformas de cada set de datos
        train_transforms = Compose ([self.preprocess])
        test_transforms = Compose ([self.preprocess])

        #Aplicamos las transformaciones (preprocesado) a los datasets (train, val y test)
        train_dataset = LEVIRCDPlus(
            split="train", transforms=train_transforms, **self.kwargs
        )

        if self.val_split_pct > 0.0:
            #Hacemos el split de los datos de entrenamient
            #80% train, 20% val
            self.train_dataset, self.val_dataset, _ = dataset_split(
                train_dataset, val_pct=self.val_split_pct, test_pct=0.0
            )
        else:
            self.train_dataset = train_dataset
            self.val_dataset = train_dataset
        
        self.test_dataset = LEVIRCDPlus(
            split="test", transforms=test_transforms, **self.kwargs
        )
    
    def train_dataloader(self):
        return DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=True,
        )

    def val_dataloader(self):
        return DataLoader(
            self.val_dataset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=False,
        )
    
    def test_dataloader(self):
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=False,
        )

***
#### Configuramos el entrenamiento:

* Instanciamos el `datamodule`

In [8]:
datamodule = LEVIRCDPlusDataModule(
    root = "LEVIRCDPlus",
    batch_size = batch_size,
    num_workers = num_workers,
    val_split_pct = val_split_pct,
    patch_size = (patch_size, patch_size),
)

* Instanciamos la tarea customizada: `CustomSemanticSegmentationTask`

In [9]:
task = CustomSemanticSegmentationTask(
    model="unet",
    backbone="resnet18",
    weights="imagenet",
    in_channels=6,
    num_classes=2,
    loss="ce",
    ignore_index=None,
    learning_rate=lr,
    learning_rate_schedule_patience=10
)

Observación: se emplea el modelo `unet`, un backkbone basado en `resenet18`, se realizó un pre-entrenamiento usando el dataset `imagenet`, y se utiliza como función de pérdida la función cross-entropy (ce).

* Seteamos los callbacks y el logger

In [10]:
checkpoint_callback = ModelCheckpoint(
    monitor="val_loss",
    dirpath=exp_dir,
    save_top_k=1,
    save_last=True,
)

early_stopping_callback = EarlyStopping(
    monitor="val_loss",
    min_delta=0.00,
    patience=10,
)

#La carpeta /logs/<name> contendrá los resultados para enviar a TensorBoard
tb_logger = TensorBoardLogger(
    save_dir="logs/",
    name=exp_name
)

* Seteamos parámetros para el entrenador (`Trainer`)

In [11]:
trainer = pl.Trainer(
    callbacks=[checkpoint_callback, early_stopping_callback],
    logger=[tb_logger],
    default_root_dir=exp_dir,
    min_epochs=1,
    max_epochs=100,
    accelerator="gpu",
    devices=[gpu_id]    
)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


***
#### Entrenamos el modelo 

In [12]:
# torch.set_float32_matmul_precision('high')

_ = trainer.fit(model=task, datamodule=datamodule)

You are using a CUDA device ('NVIDIA GeForce RTX 3070') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
Missing logger folder: logs/exp_1
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1,2]

  | Name          | Type             | Params
---------------------------------------------------
0 | model         | Unet             | 14.3 M
1 | loss          | CrossEntropyLoss | 0     
2 | train_metrics | MetricCollection | 0     
3 | val_metrics   | MetricCollection | 0     
4 | test_metrics  | MetricCollection | 0     
---------------------------------------------------
14.3 M    Trainable params
0         Non-trainable params
14.3 M    Total params
57.351    Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

In [None]:
trainer.test(model=task, datamodule=datamodule)
