<a href="https://colab.research.google.com/github/institutohumai/cursos-python/blob/master/CV/6_Segmentacion/ejercicios/ejercicios_solucion.ipynb"> <img src='https://colab.research.google.com/assets/colab-badge.svg' /> </a>

# Ejercicios Clase 6

En este notebook tendrán que implementar una nueva arquitectura de segmentación semántica llamada U-Net. Esta arquitectura, a diferencia de las Fully Convolutional vistas en clase, está diseñada para retener la información espacial que se pierde durante el downsampling. De esta manera, los feature maps generados durante el upsampling pueden tener un mejor contexto acerca de la posición de los píxeles originales.

# Seccion 1: Dataset

## Ejercicio 1 

Descargar el Dataset Pascal VOC2012 y realizarle image augmentation como se vio en clase. 

Nota: cambiar el crop size por (316,316) para que coincida con los shapes internos de U-Net

In [None]:
#@markdown Descarga de Datos
import collections
import hashlib
import inspect
import math
import os
import random
import re
import shutil
import sys
import tarfile
import time
import zipfile
from collections import defaultdict
import pandas as pd
import requests
from IPython import display
from matplotlib import pyplot as plt


import numpy as np
import torch
import torchvision
from PIL import Image
from torch import nn
from torch.nn import functional as F
from torch.utils import data
from torchvision import transforms

def download(url, folder='data', sha1_hash=None):
    """Download a file to folder and return the local filepath.

    Defined in :numref:`sec_utils`"""
    if not url.startswith('http'):
        # For back compatability
        url, sha1_hash = DATA_HUB[url]
    os.makedirs(folder, exist_ok=True)
    fname = os.path.join(folder, url.split('/')[-1])
    # Check if hit cache
    if os.path.exists(fname) and sha1_hash:
        sha1 = hashlib.sha1()
        with open(fname, 'rb') as f:
            while True:
                data = f.read(1048576)
                if not data:
                    break
                sha1.update(data)
        if sha1.hexdigest() == sha1_hash:
            return fname
    # Download
    print(f'Downloading {fname} from {url}...')
    r = requests.get(url, stream=True, verify=True)
    with open(fname, 'wb') as f:
        f.write(r.content)
    return fname

def extract(filename, folder=None):
    """Extract a zip/tar file into folder.

    Defined in :numref:`sec_utils`"""
    base_dir = os.path.dirname(filename)
    _, ext = os.path.splitext(filename)
    assert ext in ('.zip', '.tar', '.gz'), 'Only support zip/tar files.'
    if ext == '.zip':
        fp = zipfile.ZipFile(filename, 'r')
    else:
        fp = tarfile.open(filename, 'r')
    if folder is None:
        folder = base_dir
    fp.extractall(folder)

def download_extract(name, folder=None):
    """Download and extract a zip/tar file.

    Defined in :numref:`sec_utils`"""
    fname = download(name)
    base_dir = os.path.dirname(fname)
    data_dir, ext = os.path.splitext(fname)
    if ext == '.zip':
        fp = zipfile.ZipFile(fname, 'r')
    elif ext in ('.tar', '.gz'):
        fp = tarfile.open(fname, 'r')
    else:
        assert False, 'Only zip/tar files can be extracted.'
    fp.extractall(base_dir)
    return os.path.join(base_dir, folder) if folder else data_dir





DATA_HUB = dict()
DATA_URL = 'http://d2l-data.s3-accelerate.amazonaws.com/'
DATA_HUB['voc2012'] = (DATA_URL + 'VOCtrainval_11-May-2012.tar',
                           '4e443f8a2eca6b1dac8a6c57641b67dd40621a49')

voc_dir = download_extract('voc2012', 'VOCdevkit/VOC2012')

In [None]:
#@markdown Definición del Dataset
def read_voc_images(voc_dir, n=-1,  is_train=True):
    """Read all VOC feature and label images."""
    txt_fname = os.path.join(voc_dir, 'ImageSets', 'Segmentation',
                             'train.txt' if is_train else 'val.txt')
    mode = torchvision.io.image.ImageReadMode.RGB
    with open(txt_fname, 'r') as f:
        images = f.read().split()
    features, labels = [], []
    for i, fname in enumerate(images):
        if i==n: break
        features.append(torchvision.io.read_image(os.path.join(
            voc_dir, 'JPEGImages', f'{fname}.jpg')))
        labels.append(torchvision.io.read_image(os.path.join(
            voc_dir, 'SegmentationClass' ,f'{fname}.png'), mode))
        
    return features, labels

VOC_COLORMAP = [[0, 0, 0], [128, 0, 0], [0, 128, 0], [128, 128, 0],
                [0, 0, 128], [128, 0, 128], [0, 128, 128], [128, 128, 128],
                [64, 0, 0], [192, 0, 0], [64, 128, 0], [192, 128, 0],
                [64, 0, 128], [192, 0, 128], [64, 128, 128], [192, 128, 128],
                [0, 64, 0], [128, 64, 0], [0, 192, 0], [128, 192, 0],
                [0, 64, 128]]

VOC_CLASSES = ['background', 'aeroplane', 'bicycle', 'bird', 'boat',
               'bottle', 'bus', 'car', 'cat', 'chair', 'cow',
               'diningtable', 'dog', 'horse', 'motorbike', 'person',
               'potted plant', 'sheep', 'sofa', 'train', 'tv/monitor']

def voc_colormap2label():
    """Build the mapping from RGB to class indices for VOC labels."""
    colormap2label = torch.zeros(256 ** 3, dtype=torch.long)
    for i, colormap in enumerate(VOC_COLORMAP):
        colormap2label[
            (colormap[0] * 256 + colormap[1]) * 256 + colormap[2]] = i
    return colormap2label

def voc_label_indices(colormap, colormap2label):
    """Map any RGB values in VOC labels to their class indices."""
    colormap = colormap.permute(1, 2, 0).numpy().astype('int32')
    idx = ((colormap[:, :, 0] * 256 + colormap[:, :, 1]) * 256
           + colormap[:, :, 2])
    return colormap2label[idx]

def voc_rand_crop(feature, label, height, width):
    """Randomly crop both feature and label images."""
    rect = torchvision.transforms.RandomCrop.get_params(
        feature, (height, width))
    feature = torchvision.transforms.functional.crop(feature, *rect)
    label = torchvision.transforms.functional.crop(label, *rect)
    return feature, label

class VOCSegDataset(torch.utils.data.Dataset):
    """A customized dataset to load the VOC dataset."""

    def __init__(self, is_train, crop_size, voc_dir):
        self.transform = torchvision.transforms.Normalize(
            mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        self.crop_size = crop_size
        features, labels = read_voc_images(voc_dir, is_train=is_train)
        self.features = [self.normalize_image(feature)
                         for feature in self.filter(features)]
        self.labels = self.filter(labels)
        self.colormap2label = voc_colormap2label()
        print('read ' + str(len(self.features)) + ' examples')

    def normalize_image(self, img):
        return self.transform(img.float() / 255)

    def filter(self, imgs):
        return [img for img in imgs if (
            img.shape[1] >= self.crop_size[0] and
            img.shape[2] >= self.crop_size[1])]

    def __getitem__(self, idx):
        feature, label = voc_rand_crop(self.features[idx], self.labels[idx],
                                       *self.crop_size)
        return (feature, voc_label_indices(label, self.colormap2label))

    def __len__(self):
        return len(self.features)



In [None]:
#@markdown Creación del Dataset
crop_size = (316, 316)
voc_train = VOCSegDataset(True, crop_size, voc_dir)
voc_test = VOCSegDataset(False, crop_size, voc_dir)

read 1380 examples
read 1362 examples


In [None]:
#@markdown Creación de los DataLoaders
batch_size = 16
train_iter = torch.utils.data.DataLoader(voc_train, batch_size, shuffle=True,
                                    drop_last=True,
                                    num_workers=4)

test_iter = torch.utils.data.DataLoader(voc_test, batch_size, shuffle=True,
                                    drop_last=True,
                                    num_workers=4)


  cpuset_checked))


# Sección 2: Modelo

El modelo que usaremos en este notebook es la U-Net definida en el paper [U-Net: Convolutional Networks for Biomedical Image Segmentation](https://arxiv.org/pdf/1505.04597.pdf).

![](https://miro.medium.com/max/720/1*YaLdptIoloK184uJQTH1HA.png)

La arquitectura original tomaba como entrada una Imagen (572x572 según el paper) con dimensión de canal "1" para la imagen en escala de grises y generaba un mapa de segmentación de tamaño (388x388) con dimensión de canal equivalente a 2 que era la cantidad de clases que se entrenó para identificar. 

Sin embargo, en este notebook trabajaremos con imágenes a color y clasificaremos cada pixel en una de 21 clases por lo que los canales de entrada y salida deberán ser 3 y 21 respectivamente.

Unet contiene principalmente tres partes:

1. **El camino de contracción**: este es el lado descendente (lado izquierdo) de la "U", ayuda a obtener las features necesarias para la clasificación a medida que va reduciendo la dimensionalidad. .

2. **El camino de expansión**: este es el lado ascendente (lado derecho) de la "U", ayuda a recuperar la dimensionalidad de las imágenes para poder hacer una clasificación a nivel de píxeles.

3. **Conexiones de Salto**: las flechas grises largas desde el lado de contracción hasta el lado de expansión son las conexiones de salto. Estas se utilizan para "retener la información espacial" perdida durante la reducción de resolución de la imagen. De modo que, el mapa de características de la ruta de expansión pueda obtener un mejor contexto de la posición de los píxeles originales. Es decir las mismas features que se generaron al reducir la dimensionalidad son utilizadas para aumentarla.

## Implementación de la Arquitectura

A continuación deberá implementar la arquitectura de U-Net en PyTorch

### Bloque simple de doble convolución
Una imagen de entrada se pasa a través de un par de convolución con tamaño de kernel de 3x3 y una activación de ReLU sobre ella. La dimensión del canal aumenta de “1” a “64”. Luego se pasa a través de otra convolución exactamente igual y otra ReLU, pero esta vez manteniendo la cantidad de canales. Por último, se aplica dropout con probabilidad 0.2

![](https://miro.medium.com/max/640/1*Uan1yYCi3ZO1xrtLohyWzg.png)

Recuerde que, en nuestro caso, los canales entrantes serán 3 porque son imágenes a color.

### Ejercicio 2 




Complete la implementación de la clase SimpleConvolution para que imite el comportamiento del bloque simple de doble convolución

In [None]:
import torch
import torch.nn as nn

class SimpleConvolution(nn.Module):
    def __init__(self,input_channel,output_channel):
        ## Complete la función init
        super(SimpleConvolution, self).__init__()
        self.conv1 = nn.Conv2d(input_channel,output_channel,(3,3))
        self.conv2 = nn.Conv2d(output_channel,output_channel,(3,3))
        self.Relu = nn.ReLU()
        self.dropout = nn.Dropout2d(0.2)

    def forward(self, x):
        ## Complete la función forward
        x = self.conv1(x)
        x = self.Relu(x)
        x = self.conv2(x)
        x = self.Relu(x)
        x = self.dropout(x)

        return x

In [None]:
#@markdown Test SimpleConvolution
block = SimpleConvolution(1,64)
inp = torch.rand(4,1,572,572)
out = block(inp)
assert out.shape==(4, 64, 568, 568), "El test no fue superado"
print("El test fue superado.")

El test fue superado.


### Bloque Downsampling de Doble convolución

Este bloque es exactamente igual al anterior sólo que antes de pasar las entradas por las capas convolucionales, se les aplica un max-pooling de kernel $2 \times 2$. 

![](https://miro.medium.com/max/640/1*9zoULdYOeKQsLWQGExhVlQ.png)

Este bloque se aplica 3 veces consecutivas en U-Net con diferentes cantidades de canales.

### Ejercicio 3


Complete la implementación de la clase DownConvolution para que imite el comportamiento del bloque downsampling de doble convolución.

In [None]:
class DownConvolution(nn.Module):
    def __init__(self,input_channel,output_channel):
        ## Complete la función init
        super(DownConvolution, self).__init__()
        self.conv1 = nn.Conv2d(input_channel,output_channel,(3,3))
        self.conv2 = nn.Conv2d(output_channel,output_channel,(3,3))
        self.maxpool = nn.MaxPool2d(2,2)
        self.Relu = nn.ReLU()
        self.dropout = nn.Dropout2d(0.2)

    def forward(self, x):
        ## Complete la función forward
        x = self.maxpool(x)
        x = self.conv1(x)
        x = self.Relu(x)
        x = self.conv2(x)
        x = self.Relu(x)
        x = self.dropout(x)
        return x

In [None]:
#@markdown Test DownConvolution
block = DownConvolution(64,128)
inp = torch.rand(4,64,568,568)
out = block(inp)
assert out.shape==(4, 128, 280, 280), "El test no fue superado"
print("El test fue superado.")

El test fue superado.


### Bloque Upsampling de Doble Convolución

Este bloque es exactamente igual al bloque simple de doble convolución, solo que al final se añade una convolución transpuesta para hacer upsampling.

Esta convolución transpuesta tiene un kernel de 2x2 y con un stride igual a 2. La dimensión del canal de salida en la convolución transpuesta se reduce a la mitad, ya que estaremos concatenando los feature maps provenientes del camino de contracción.

![](https://miro.medium.com/max/640/1*nmfwdmaW5A7_zxI0BcPcGQ.png)

### Ejercicio 4


Complete la implementación de la clase UpConvolution para que imite el comportamiento del bloque upsampling de doble convolución.

In [None]:
class UpConvolution(nn.Module):
    def __init__(self,input_channel,output_channel):
        super(UpConvolution, self).__init__()
        self.conv1 = nn.Conv2d(input_channel,output_channel,(3,3))
        self.conv2 = nn.Conv2d(output_channel,output_channel,(3,3))
        self.convtranspose = nn.ConvTranspose2d(output_channel,output_channel//2,(2,2),(2,2))
        self.Relu = nn.ReLU()
        self.dropout = nn.Dropout2d(0.2)

    def forward(self, x):
        x = self.conv1(x)
        x = self.Relu(x)
        x = self.conv2(x)
        x = self.Relu(x)
        x = self.dropout(x)
        x = self.convtranspose(x)

        return x

In [None]:
#@markdown Test UpConvolution
block = UpConvolution(512,256)
inp = torch.rand(4,512,104,104)
out = block(inp)
assert out.shape==(4, 128, 200, 200), "El test no fue superado"
print("El test fue superado.")

El test fue superado.


### Bloque Final

Este bloque es exactamente igual al bloque simple de doble convolución, solo que al final se añade una convolución $1 \times 1$ para mapear los 64 canales que le llegan a la cantidad de clases deseadas.

![](https://miro.medium.com/max/720/1*cqs5XJRsBXS0RAkdIl_wUQ.png)

Recuerda que nuestro dataset tiene 21 clases, no 2 como el paper original.

### Ejercicio 5


Complete la implementación de la clase LastConvolution para que imite el comportamiento del bloque final.

In [None]:
class LastConvolution(nn.Module):
    def __init__(self,input_channel,output_channel,num_classes):
        super(LastConvolution, self).__init__()
        self.conv1 = nn.Conv2d(input_channel,output_channel,(3,3))
        self.conv2 = nn.Conv2d(output_channel,output_channel,(3,3))
        self.conv1d = nn.Conv2d(output_channel,num_classes,(1,1))
        self.Relu = nn.ReLU()
        self.dropout = nn.Dropout2d(0.2)

    def forward(self, x):
        x = self.conv1(x)
        x = self.Relu(x)
        x = self.conv2(x)
        x = self.Relu(x)
        x = self.dropout(x)
        x = self.conv1d(x)

        return x

In [None]:
#@markdown Test LastConvolution
block = LastConvolution(128,64,2)
inp = torch.rand(4,128,392,392)
out = block(inp)
assert out.shape==(4, 2, 388, 388), "El test no fue superado"
print("El test fue superado.")

El test fue superado.


### Conexiones de Salto

Al concatenar los features map del camino de contracción con los del camino de expansión, los primeros deben recortarse para que coincidan con la dimensión de los segundos.

![](https://miro.medium.com/max/720/1*2XyH7YGv7MuJWPycqx7hew.png)

### Ejercicio 6

Implemente la función crop_img para que el source_tensor se recorte a la dimensión del target_tensor.

In [None]:
def crop_img(source_tensor, target_tensor):
    source_tensor_size = source_tensor.size()[2]
    target_tensor_size = target_tensor.size()[2]
    diff = (source_tensor_size - target_tensor_size) // 2 # up and down strip --> so //2 takes only one side
    return source_tensor[:,:,diff:-diff,diff:-diff]

In [None]:
#@markdown Test crop_img
src = torch.rand(4,128,280,280)
target = torch.rand(4,256,200,200)
crop_tensor = crop_img(src,target)
assert crop_tensor.shape==(4, 128, 200, 200), "El test no fue superado"
print("El test fue superado.")

El test fue superado.


### Modelo Completo

Ahora deberá utilizar los bloques generados para ensamblar la U-Net

![](https://miro.medium.com/max/720/1*YaLdptIoloK184uJQTH1HA.png)

### Ejercicio 7

Implemente la clase UNet a partir de los bloques anteriores para que contenga a la red completa.

In [None]:
class UNet(nn.Module):
    def __init__(self, input_channel, num_classes):
        super(UNet, self).__init__()
        self.simpleConv = SimpleConvolution(input_channel, 64)
        self.downConvBock1 = DownConvolution(64, 128)
        self.downConvBock2 = DownConvolution(128, 256)
        self.downConvBock3 = DownConvolution(256, 512)
        self.midMaxpool = nn.MaxPool2d(2, 2)
        self.upConvBlock0 = UpConvolution(512, 1024)
        self.upConvBlock1 = UpConvolution(1024, 512)
        self.upConvBlock2 = UpConvolution(512, 256)
        self.upConvBlock3 = UpConvolution(256, 128)
        self.lastConv = LastConvolution(128, 64, num_classes)

    def forward(self, x):
        x_1 = self.simpleConv(x)  # crop_x_1
        x_2 = self.downConvBock1(x_1)  # crop_x_2
        x_3 = self.downConvBock2(x_2)  # crop_x_3
        x_4 = self.downConvBock3(x_3)  # crop_x_4
        x_5 = self.midMaxpool(x_4)
        x_6 = self.upConvBlock0(x_5)
        crop_x_4 = crop_img(x_4, x_6)
        concat_x_4_6 = torch.cat((crop_x_4, x_6), 1)
        x_7 = self.upConvBlock1(concat_x_4_6)
        crop_x_3 = crop_img(x_3, x_7)
        concat_x_3_7 = torch.cat((crop_x_3, x_7), 1)
        x_8 = self.upConvBlock2(concat_x_3_7)
        crop_x_2 = crop_img(x_2, x_8)
        concat_x_2_8 = torch.cat((crop_x_2, x_8), 1)
        x_9 = self.upConvBlock3(concat_x_2_8)
        crop_x_1 = crop_img(x_1, x_9)
        concat_x_1_9 = torch.cat((crop_x_1, x_9), 1)
        out = self.lastConv(concat_x_1_9)

        return out

In [None]:
#@markdown Test UNet
block = UNet(1, 2)
inp = torch.rand(4, 1, 572, 572)
out = block(inp)
print(out.size())
assert out.shape==(4, 2, 388, 388), "El test no fue superado"
print("El test fue superado.")

torch.Size([4, 2, 388, 388])
El test fue superado.


# Entrenamiento

Nota: debido a las limitaciones de RAM de GPU provistas por Colab, no podrá ejecutar el entrenamiento si ya ha corrido los tests. Los test crean variables que ocupan memoria y se termina acabando y rompiendo el kernel de Colab. Para poder ejecutar el entrenamiento, deberá reiniciar el entorno de ejecución y ejecutar las siguientes celdas sin haber ejecutado los tests.



In [None]:
def loss(inputs, targets):
    return F.cross_entropy(inputs, targets, reduction='none').mean(1).mean(1)
    
def accuracy(y_hat, y):
    """Compute the number of correct predictions."""
    if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
        y_hat = y_hat.argmax(axis=1)
    cmp = y_hat.type(y.dtype) == y
    return float(cmp.type(y.dtype).sum())

In [None]:
num_epochs, lr, wd, device = 5, 0.003, 1e-3, torch.device('cuda' if torch.cuda.is_available() else 'cpu')
unet = UNet(3, 21)
trainer = torch.optim.SGD(unet.parameters(), lr=lr, weight_decay=wd)
model = unet.to(device)

In [None]:
for epoch in range(num_epochs):
    L = 0.0
    N = 0
    Acc = 0.0
    Acc_N = 0
    TestAcc = 0.0
    TestN = 0
    for X, y in train_iter:
        X, y = X.to(device), y.to(device)
        y_hat = model(X)
        y = crop_img(y.unsqueeze(1),y_hat)
        y = y.squeeze(1)
        l = loss(y_hat,y)
        trainer.zero_grad()
        l.mean().backward()
        trainer.step()
        L += l.sum()
        N += l.numel()
        Acc += accuracy(y_hat,y)
        Acc_N += y.numel()
    for X, y in test_iter:
        X, y = X.to(device), y.to(device)
        y_hat = model(X)
        y = crop_img(y.unsqueeze(1),y_hat)
        y = y.squeeze(1)
        TestN += y.numel()
        TestAcc += accuracy(y_hat, y)
    print(f'epoch {epoch + 1}, loss {(L/N):f}\
          , train accuracy  {(Acc/Acc_N):f}, test accuracy {(TestAcc/TestN):f}')

epoch 1, loss 2.976859          , train accuracy  0.325101, test accuracy 0.496166
epoch 2, loss 2.764970          , train accuracy  0.501332, test accuracy 0.499807
epoch 3, loss 2.394452          , train accuracy  0.500350, test accuracy 0.492830
epoch 4, loss 2.330368          , train accuracy  0.499674, test accuracy 0.491304
epoch 5, loss 2.295999          , train accuracy  0.495698, test accuracy 0.491183
epoch 6, loss 2.261164          , train accuracy  0.499677, test accuracy 0.500968
epoch 7, loss 2.232270          , train accuracy  0.497774, test accuracy 0.496788
epoch 8, loss 2.194594          , train accuracy  0.501336, test accuracy 0.498291
epoch 9, loss 2.193033          , train accuracy  0.496406, test accuracy 0.500412
epoch 10, loss 2.154083          , train accuracy  0.501587, test accuracy 0.500842
