### IMPORTANDO AS BILIOTECAS NECESSÁRIAS

In [None]:
pip install segmentation_models_pytorch



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
#Controlando o comportamento da biblioteca CUDA em Python
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import   numpy   as   np
import   cv2
import   matplotlib.pyplot   as   plt
import torch
from torch.utils.data import DataLoader
from torch.utils.data import Dataset as BaseDataset
import segmentation_models_pytorch as smp
import segmentation_models_pytorch.utils.metrics

### CHECANDO A UTILIZAÇÃO DA PLACA DE VÍDEO

In [None]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('\nDevice: {0}'.format(DEVICE))


Device: cuda


### LIMITANDO A GERAÇÃO DE NÚMEROS ALEATÓRIOS NOS TESTES

In [None]:
SEED = 42

np.random.seed(SEED)

torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)

torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

### DEFININDO OS DIRETÓRIOS DO DATASET

In [None]:
#DATA_DIR = '/home/jocsan/SegNet-Tutorial/CamVid'
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive

DATA_DIR = ("/gdrive/MyDrive/CamVid/class_dict.csv")
x_train_dir = '/content/gdrive/MyDrive/CamVid/train2'
y_train_dir = '/content/gdrive/MyDrive/CamVid/train_labels'


Mounted at /gdrive
/gdrive


In [None]:
x_train_dir = os.path.join(DATA_DIR, 'train2')
y_train_dir = os.path.join(DATA_DIR, 'trainannot')

x_valid_dir = os.path.join(DATA_DIR, 'val')
y_valid_dir = os.path.join(DATA_DIR, 'valannot')

x_test_dir = os.path.join(DATA_DIR, 'test')
y_test_dir = os.path.join(DATA_DIR, 'testannot')

### DEFININDO A FUNÇÃO AUXILIAR PARA VIZUALIZAÇÃO DE DADOS

In [None]:
#Função auxiliar para visualização de dados
def visualize(**images):
    n = len(images)
    plt.figure(figsize=(16, 5))
    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        plt.imshow(image)
    plt.show()

### CRIANDO A CLASSE "DATASET" PARA ORGANIZAR OS DADOS

In [None]:
class Dataset(BaseDataset):

    CLASSES = ['sky', 'building', 'pole', 'road', 'pavement',
               'tree', 'signsymbol', 'fence', 'car',
               'pedestrian', 'bicyclist', 'unlabelled']

    def __init__(
            self,
            images_dir,
            masks_dir,
            classes=None,
            augmentation=None, #Expansão de dados
            preprocessing=None,
    ):
        self.ids = os.listdir(images_dir)
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]
        self.masks_fps = [os.path.join(masks_dir, image_id) for image_id in self.ids]

        ##Converte nomes de str em valores de classe em máscaras
        self.class_values = [self.CLASSES.index(cls.lower()) for cls in classes]

        self.augmentation = augmentation
        self.preprocessing = preprocessing


    def __getitem__(self, i):

        ##Lendo dados
        image = cv2.imread(self.images_fps[i])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.masks_fps[i], 0)

        ##Extrai certas classes da máscara (por exemplo, carros)
        masks = [(mask == v) for v in self.class_values]
        mask = np.stack(masks, axis=-1).astype('float')


        #Aplicar aumentos
        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']


        #Aplicar pré-processamento
        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']


        return image, mask

    def __len__(self):
        return len(self.ids)


### VISUALIZANDO OS DADOS QUE TEMOS ATÉ O MOMENTO

In [None]:
CLASSES = ['sky', 'building', 'pole', 'road', 'pavement',
               'tree', 'signsymbol', 'fence', 'car',
               'pedestrian', 'bicyclist', 'unlabelled']

In [None]:
for teste in CLASSES:
    i = 1
    dataset = Dataset(x_train_dir, y_train_dir, classes=[teste])
    image, mask = dataset[i]

    linha = '-'*50
    print(linha, teste, linha)

    visualize(
        Image=image,
        Mask=mask.squeeze(),
    )


FileNotFoundError: ignored

### AUMENTO DE DADOS COM A BIBLIOTECA "ALBUMENTATIONS"

In [None]:
import albumentations as albu

In [None]:
def get_training_augmentation():
    train_transform = [

        albu.HorizontalFlip(p=0.5),

        albu.ShiftScaleRotate(scale_limit=0.5, rotate_limit=0, shift_limit=0.1, p=1, border_mode=0),

        albu.PadIfNeeded(min_height=320, min_width=320, always_apply=True, border_mode=0),
        albu.RandomCrop(height=320, width=320, always_apply=True),

        albu.IAAAdditiveGaussianNoise(p=0.2),
        albu.IAAPerspective(p=0.5),

        albu.OneOf(
            [
                albu.CLAHE(p=1),
                albu.RandomBrightness(p=1),
                albu.RandomGamma(p=1),
            ],
            p=0.9,
        ),

        albu.OneOf(
            [
                albu.IAASharpen(p=1),
                albu.Blur(blur_limit=3, p=1),
                albu.MotionBlur(blur_limit=3, p=1),
            ],
            p=0.9,
        ),

        albu.OneOf(
            [
                albu.RandomContrast(p=1),
                albu.HueSaturationValue(p=1),
            ],
            p=0.9,
        ),
    ]
    return albu.Compose(train_transform)


def get_validation_augmentation():
    """"Adicione preenchimentos para tornar a forma da imagem divisível por 32"""
    test_transform = [
        albu.PadIfNeeded(384, 480)
    ]
    return albu.Compose(test_transform)


def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')


def get_preprocessing(preprocessing_fn):
    """Construir transformação de pré-processamento

    Argumentos:
        preprocessing_fn (callbale): função de normalização de dados
            (pode ser específico para cada rede neural pré-treinada)
    Retornar:
        transform: albumentations.Compose

    """

    _transform = [
        albu.Lambda(image=preprocessing_fn),
        albu.Lambda(image=to_tensor, mask=to_tensor),
    ]
    return albu.Compose(_transform)

In [None]:
#Visualizar imagens e máscaras aumentadas

augmented_dataset = Dataset(
    x_train_dir,
    y_train_dir,
    augmentation=get_training_augmentation(),
    classes=['pedestrian'],
)

#Mesma imagem com diferentes transformações aleatórias
for i in range(3):
    image, mask = augmented_dataset[1]
    visualize(image=image, mask=mask.squeeze(-1))

### Resolvendo o erro "[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed"

In [None]:
import urllib.request
import ssl

def main():
    ssl._create_default_https_context = ssl._create_unverified_context
    r = urllib.request.urlopen('http://data.lip6.fr/cadene/pretrainedmodels/se_resnext50_32x4d-a260b3a4.pth')
    print(r.status)
    print(r)

if __name__ == '__main__':
    main()

### CRIANDO O MODELO E TREINANDO

In [None]:
ENCODER = 'se_resnext50_32x4d'#Modelo de rede neural convolucional (CNN)
ENCODER_WEIGHTS = 'imagenet'  #Parâmetro para especificar os pesos pré-treinados a serem usados no codificador
CLASSES = ['car']             #Classe escolhida para ser segmentada na imagem
ACTIVATION = 'sigmoid'        #Pode ser None para logits ou 'softmax2d' para segmentação multiclasse
DEVICE = 'cuda'               #Dispositivo CUDA em utilização

#Criar modelo de segmentação com codificador pré-treinado
model = smp.FPN(
    encoder_name=ENCODER,                    #Codificador, por exemplo mobilenet_v2 ou eficientenet-b7
    encoder_weights=ENCODER_WEIGHTS,         #Pesos pré-treinados `imagenet` para inicialização do codificador
    in_channels=3,                           #Canais de entrada do modelo (1 para imagens em escala de cinza, 3 para RGB, etc.)
    classes=len(CLASSES),                    #Canais de saída do modelo (número de classes em seu conjunto de dados)
    activation=ACTIVATION,
)
#Obtendo a função de pré-processamento adequada para codificador
preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)

In [None]:
from segmentation_models_pytorch.encoders import get_preprocessing_fn

preprocess_input = get_preprocessing_fn('resnet18', pretrained='imagenet')

In [None]:
train_dataset = Dataset(
    x_train_dir,
    y_train_dir,
    augmentation=get_training_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    classes=CLASSES,
)

valid_dataset = Dataset(
    x_valid_dir,
    y_valid_dir,
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    classes=CLASSES,
)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
valid_loader = DataLoader(valid_dataset, batch_size=1, shuffle=False, num_workers=4)

In [None]:
loss = smp.utils.losses.DiceLoss()
metrics = [
    smp.utils.metrics.IoU(threshold=0.5),
]

optimizer = torch.optim.Adam([
    dict(params=model.parameters(), lr=0.0001),
])

In [None]:
# criar corredores de época
# é um loop simples de iteração sobre as amostras do carregador de dados

train_epoch = smp.utils.train.TrainEpoch(
    model,
    loss=loss,
    metrics=metrics,
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,
)

valid_epoch = smp.utils.train.ValidEpoch(
    model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
    verbose=True,
)

### CRIANDO O MODELO DE TREINO PARA N ÉPOCAS

In [None]:
max_score = 0

for i in range(0, 1):

    print('\nEpoch: {}'.format(i))
    train_logs = train_epoch.run(train_loader)
    valid_logs = valid_epoch.run(valid_loader)

    #Salvar modelo, alterar lr, etc.
    if max_score < valid_logs['iou_score']:
        max_score = valid_logs['iou_score']
        torch.save(model, './best_model.pth')
        print('Model saved!')

    if i == 25:
        optimizer.param_groups[0]['lr'] = 1e-5
        print('Decrease decoder learning rate to 1e-5!')

In [None]:
iou_scores=[]
iou_scores.append(valid_logs['iou_score'])
accuracy = sum(iou_scores) / len(iou_scores)
print(f'Acurácia média (iou_score): {accuracy}')

### CARREGANDO O MELHOR MODELO SALVO

In [None]:
best_model = torch.load('./best_model.pth')

### CRIANDO O TESTE E AVALIANDO O MODELO

In [None]:
test_dataset = Dataset(
    x_test_dir,
    y_test_dir,
    augmentation=get_validation_augmentation(),
    preprocessing=get_preprocessing(preprocessing_fn),
    classes=CLASSES,
)

test_dataloader = DataLoader(test_dataset)

In [None]:
test_epoch = smp.utils.train.ValidEpoch(
    model=best_model,
    loss=loss,
    metrics=metrics,
    device=DEVICE,
)

logs = test_epoch.run(test_dataloader)

### CONJUNTO DE DADOS DE TESTE SEM TRANSFORMAÇÕES, PARA VISUALIZAÇÃO DAS IMAGENS

In [None]:
test_dataset_vis = Dataset(
    x_test_dir, y_test_dir,
    classes=CLASSES,
)

In [None]:
for i in range(5):
    n = np.random.choice(len(test_dataset))

    image_vis = test_dataset_vis[n][0].astype('uint8')
    image, gt_mask = test_dataset[n]

    gt_mask = gt_mask.squeeze()

    x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)
    pr_mask = best_model.predict(x_tensor)
    pr_mask = (pr_mask.squeeze().cpu().numpy().round())

    visualize(
        image=image_vis,
        ground_truth_mask=gt_mask,
        predicted_mask=pr_mask
    )