Caricamento del dataset da google drive

In [7]:
from google.colab import drive
drive.mount('/content/gdrive/')
%cd /content/gdrive/My Drive/DataSet/DatasetTiles

Drive already mounted at /content/gdrive/; to attempt to forcibly remount, call drive.mount("/content/gdrive/", force_remount=True).
/content/gdrive/My Drive/DataSet/DatasetTiles


LIbrerie principali da importare nel progetto:

In [8]:
import os
import time
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, ConcatDataset, Subset
from torch import autograd, optim
import torch.nn as nn
import matplotlib.pyplot as plt
import random
import cv2

**Costruzione della rete neurale U-Net**
---



Funzioni di utilità della rete

In [9]:
def get_convolutional_layers(in_channel: int,
                             out_channel: int,
                             kernel_size: tuple = (3, 3),
                             strides: tuple = (1, 1),
                             padding: tuple = (1, 1)):
    r"""
    Convolutional layer.
    :param in_channel: number of input filters.
    :param out_channel: number of output filters.
    :param kernel_size: kernel size.
    :param strides: stride size.
    :param padding: padding size.
    """

    return nn.Conv2d(in_channels=in_channel, out_channels=out_channel, kernel_size=kernel_size, stride=strides,
                     padding=padding)


def get_relu():
    r"""
    Relu activation function.
    """
    return nn.ReLU(inplace=True)


def get_sigmoid():
    r"""
    Sigmoid activation function.
    """
    return nn.Sigmoid()


def get_max_pooling(kernel_size: tuple = (2, 2),
                    strides: tuple = (2, 2)):
    r"""
    Max pooling layer.
    :param kernel_size: kernel size.
    :param strides: stride size.
    """
    return nn.MaxPool2d(kernel_size=kernel_size, stride=strides)


def get_up_sample(scale_factor: int = 2):
    r"""
    Up sample layer.
    :param scale_factor: scale factor.
    """
    return nn.Upsample(scale_factor=scale_factor)

Blocchi della rete: blocco di down-sampling, bottleneck, blocco di up-sampling e di output.

In [10]:
class Down(nn.Module):
    r"""
    Downsampling block.
    """

    def __init__(self,
                 in_channel: int,
                 out_channel: int):
        r"""
        :param in_channel: number of input filters.
        :param out_channel: number of output filters.
        """
        super().__init__()

        self.conv_1 = get_convolutional_layers(in_channel, out_channel)
        self.conv_2 = get_convolutional_layers(out_channel, out_channel)
        self.activation = get_relu()
        self.max_pooling = get_max_pooling()

    def forward(self, x):
        r"""
        Forward down block.
        """
        out_conv = self.conv_1(x)
        out_relu = self.activation(out_conv)

        out_conv = self.conv_2(out_relu)
        out_relu = self.activation(out_conv)

        out_pooling = self.max_pooling(out_relu)
        return out_relu, out_pooling


class Bottleneck(nn.Module):
    r"""
    Bottleneck block.
    """

    def __init__(self,
                 in_channel: int,
                 out_channel: int):
        r"""

        :param in_channel: number of input filters.
        :param out_channel: number of output filters.
        """
        super().__init__()

        self.conv_1 = get_convolutional_layers(in_channel, out_channel)
        self.conv_2 = get_convolutional_layers(out_channel, out_channel)
        self.activation = get_relu()

    def forward(self, x):
        r"""
        Forward bottleneck block.
        """
        out_conv = self.conv_1(x)
        out_relu = self.activation(out_conv)

        out_conv = self.conv_2(out_relu)
        out_relu = self.activation(out_conv)

        return out_relu


class Up(nn.Module):
    r"""
    Upsampling block.
    """

    def __init__(self,
                 in_channel: int,
                 out_channel: int):
        r"""
        :param in_channel: number of input filters.
        :param out_channel: number of output filters.
        """
        super().__init__()

        self.up_sample = get_up_sample()
        self.concat = Concat()
        self.activation = get_relu()
        self.conv_1 = get_convolutional_layers(in_channel=in_channel, out_channel=out_channel)
        self.conv_2 = get_convolutional_layers(out_channel, out_channel)

    def forward(self, x, skip):
        r"""
        Forward upblock.
        :param x: input layer.
        :param skip: skip connection layer to concatenate.
        """
        out_up = self.up_sample(x)
        out_concat = self.concat(out_up, skip)
        out_conv = self.conv_1(out_concat)
        out_relu = self.activation(out_conv)
        out_conv = self.conv_2(out_relu)
        out_relu = self.activation(out_conv)

        return out_relu


class Concat(nn.Module):
    r"""
    Concatenate class.
    Concatenate two levels of the network.
    """

    def __init__(self):
        super().__init__()

    def forward(self, layer_1, layer_2):
        out_concat = torch.cat((layer_1, layer_2), dim=1)
        return out_concat


class Out(nn.Module):
    r"""
    Output block.
    """

    def __init__(self,
                 in_channel: int,
                 out_channel: int,
                 kernel_size: tuple):
        r"""
        :param in_channel: number of input filters.
        :param out_channel: number of output filters.
        :param kernel_size: kernel size.
        """
        super().__init__()

        self.conv = get_convolutional_layers(in_channel=in_channel, out_channel=out_channel, kernel_size=kernel_size,
                                             padding=(0, 0))
        self.sigmoid = get_sigmoid()

    def forward(self, x: torch.Tensor):
        r"""
        Forward output block.
        """
        out_conv = self.conv(x)
        out_sigmoid = self.sigmoid(out_conv)

        return out_sigmoid

Corpo della rete

In [18]:
class Unet(nn.Module):
    r"""
    U-net class.
    """

    def __init__(self,
                 n_classes_out: int,
                 block_filter_count = [3, 64, 128, 256, 512, 1024]):
        r"""
        Builder of the class.
        :param n_classes_out: number of classes of the problem.
        :param block_filter_count: number of filters for each convolutional layer.
        """
        super(Unet, self).__init__()

        self.blocks_down = []
        self.blocks_up = []

        # Down sampling path
        print("** Block downs **")
        for i in range(0, 4):
            block = Down(in_channel=block_filter_count[i], out_channel=block_filter_count[i + 1])
            print(f"Block down {i + 1}-layer: in: {block_filter_count[i]}, out: {block_filter_count[i + 1]}")
            self.blocks_down.append(block)

        # Bottleneck
        print("\n** Bottleneck **")
        self.bottleneck = Bottleneck(in_channel=block_filter_count[4], out_channel=block_filter_count[5])
        print(f"Bottleneck layer: in: {block_filter_count[4]}, out: {block_filter_count[5]}")

        # Up sampling path
        print("\n** Block ups **")
        for i in range(0, 4):
            block = Up(in_channel=block_filter_count[5 - i] + block_filter_count[5 - (i + 1)],
                             out_channel=block_filter_count[5 - (i + 1)])
            print(
                f"Block ups {i + 1}-layer: in: {block_filter_count[5 - i] + block_filter_count[5 - (i + 1)]}, out: {block_filter_count[5 - (i + 1)]}")
            self.blocks_up.append(block)

        # Output layer
        print("\n** Out net **")

        self.out = Out(in_channel=block_filter_count[1], out_channel=n_classes_out, kernel_size=(1, 1))
        print(f"Out layer: in: {block_filter_count[1]}, out: {n_classes_out}\n")

        self.blocks_down = nn.ModuleList(self.blocks_down)
        self.bottleneck = nn.ModuleList([self.bottleneck])
        self.blocks_up = nn.ModuleList(self.blocks_up)

    def forward(self, x: torch.Tensor):
        r"""
        Forward of the network.
        :param x: input net.
        :return: output net.
        """

        encoder = []

        # Down sampling path
        for block in self.blocks_down:
            out_conv, out_pool = block(x)

            encoder.append(out_conv)
            x = out_pool

        out_down_blocks = x

        # Bottleneck
        bn = self.bottleneck[0](out_down_blocks)
        x = bn

        # Up sampling path
        for index, block in enumerate(self.blocks_up):
            x = block(x, encoder[len(encoder) - (index + 1)])

        # Output layer
        out_net = self.out(x)

        return out_net


Classe per caricamento del dataset nella rete con successiva pre-elaborazione delle immagini

In [19]:
WIDTH = 256
HEIGHT = 256


class DatasetTiles(Dataset):
    r"""
    Class to load the dataset of a specific defect.
    """

    def __init__(self, parent_dir, image_dir):
        r"""
        Load the dataset.
        :param parent_dir: root folder.
        :param image_dir: directory of the defect.

        Image format:
            - .jpg: image
            - .png: binay mask
        """
        self.img_list_path = glob.glob(parent_dir + '/' + image_dir + '/Imgs/*.jpg')
        self.img_mask_list_path = glob.glob(parent_dir + '/' + image_dir + '/Imgs/*.png')
        print(f"{image_dir} loaded!")

    def __getitem__(self, index):
        r"""
        Get the image and its mask
        :param index: index of the specific image
        """

        x = preprocessing(self.img_list_path[index], False)
        # Resize input format [height, width, n_channels]
        x = np.rollaxis(x, 2, 0)

        y = preprocessing(self.img_mask_list_path[index], True)
        # Add 1 channel to input. Input format [height, width, n_channels]
        y = np.expand_dims(y, axis=0)

        return x, y

    def __len__(self):
        return len(self.img_list_path)


def preprocessing(img, convert_to_gray):
    r"""
    Performs a preprocessing on the image.
    :param img: img to be processed
    :param convert_to_gray: true if the conversion is grayscale, false otherwise.
    """

    img = cv.imread(img)
    img = cv.resize(img, (WIDTH, HEIGHT))

    if convert_to_gray:
        img = cv.cvtColor(img, cv.COLOR_BGR2GRAY)

    else:
        img = cv.cvtColor(img, cv.COLOR_BGR2RGB)

    img = img / 255
    img = img.astype(np.float32)

    return img


def train_test_split(dataset):
    r"""
    Slip dataset in training, validation and test set:
        - 70% training set;
        - 20% validation set:
        - 10% test set.

    :param dataset: dataset to split
    :return : training, validation and test set.
    """
    length_dataset = len(dataset)

    length_train = np.int_(length_dataset * 0.7)
    length_validate = np.int_(length_dataset * 0.2)

    training_dataset = Subset(dataset, range(0, length_train))
    validation_dataset = Subset(dataset, range(length_train, length_train + length_validate))
    test_dataset = Subset(dataset, range(length_train + length_validate, len(dataset)))

    return training_dataset, validation_dataset, test_dataset
