In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
os.getcwd()

'/content'

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
!nvidia-smi

NVIDIA-SMI has failed because it couldn't communicate with the NVIDIA driver. Make sure that the latest NVIDIA driver is installed and running.



In [None]:
torch.cuda.empty_cache()

In [None]:
import os
import PIL
import random
import torchvision

from pathlib import Path
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import functional


def get_train_test_paths(test_ratio: float = 0.2):
    # extract the data from the dataset folder
    files = [file_name for file_name in
             Path(os.getcwd() + os.sep + 'drive' + os.sep + 'MyDrive' + os.sep + 'Water Bodies Dataset' + os.sep + 'Images').rglob("*.jpg")]
    # randomize the order of the data
    random.shuffle(files)
    # separate test and train files
    first_train = int(test_ratio * len(files))
    test_path = files[:first_train]
    train_path = files[first_train:]
    return train_path, test_path


def get_mask_path(file_path):
    # gets source image path, returns mask path
    file_path = str(file_path).replace('Images', 'Masks')
    return file_path


class WaterDataset(Dataset):
    def __init__(self, path_list, transform_source=None, transform_both=None):
        self.sources = path_list
        self.transform_source = transform_source
        self.transform_both = transform_both

    def __len__(self):
        return len(self.sources)

    def __getitem__(self, index):
        img_path = self.sources[index]
        source = functional.to_tensor(PIL.Image.open(img_path))
        label = functional.to_tensor(PIL.Image.open(get_mask_path(img_path)).convert('L'))

        if self.transform_source:
            source = self.transform_source(source)
        if self.transform_both:
            source = self.transform_both(source)
            label = self.transform_both(label)
            label = (label < 0.5).float()

            assert len(label.unique()) <= 2, "threshold didn't work"

        return source, label


def get_train_test_loaders(batch_size, length):
    train_path, test_path = get_train_test_paths()
    train_loader = DataLoader(dataset=WaterDataset(train_path,
                                                   transform_both=torchvision.transforms.Resize((length, length))),
                              batch_size=batch_size,
                              shuffle=True)
    test_loader = DataLoader(dataset=WaterDataset(test_path,
                                                  transform_both=torchvision.transforms.Resize((length, length))),
                             batch_size=batch_size)

    return train_loader, test_loader


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as f


class Hidden1(nn.Module):
    # define the model
    def __init__(self, length, hidden_size, activation):
        super().__init__()
        self.activation = activation
        self.length = length

        self.flat = nn.Flatten()
        self.fc1 = nn.Linear(length * length * 3, hidden_size)
        self.fc2 = nn.Linear(hidden_size, 2 * length * length)  # 2 for each class

    # set activation functions for the layers
    def forward(self, x):
        x = self.flat(x)
        x = self.activation(self.fc1(x))
        x = self.activation(self.fc2(x))
        x = x.reshape(x.size(0) * self.length**2, 2)
        return x


class Hidden2(nn.Module):
    # define the model
    def __init__(self, length, hidden_size, activation):
        super().__init__()
        self.activation = activation
        self.length = length

        self.flat = nn.Flatten()
        self.fc1 = nn.Linear(length * length * 3, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, 2 * length * length)  # 2 for each class

    # set activation functions for the layers
    def forward(self, x):
        x = self.flat(x)
        x = self.activation(self.fc1(x))
        x = self.activation(self.fc2(x))
        x = self.activation(self.fc3(x))
        x = x.reshape(x.size(0) * self.length**2, 2)
        return x


class Conv1(nn.Module):
    # define the model
    def __init__(self, length, hidden_size, activation, kernel_size: int = 3):
        super().__init__()
        self.activation = activation
        self.length = length
        self.kernel_size = kernel_size  # expected odd kernel_size
        self.hidden_size = hidden_size  # hidden_size = 3 times a perfect square

        self.conv1 = nn.Conv2d(3, 3, self.kernel_size, padding=int((self.kernel_size - 1) / 2))
        self.fc1 = nn.Linear(3 * length * length, self.hidden_size)
        self.fc2 = nn.Linear(self.hidden_size, self.hidden_size)
        self.fc3 = nn.Linear(self.hidden_size, 2 * length * length)

    def forward(self, x):
        x = self.activation(self.conv1(x))
        x = f.max_pool2d(x, kernel_size=self.kernel_size, stride=1, padding=int((self.kernel_size - 1) / 2))
        x = torch.flatten(x, 1)
        x = self.activation(self.fc1(x))
        x = self.activation(self.fc2(x))
        x = self.fc3(x)
        x = x.reshape(x.size(0) * self.length ** 2, 2)
        return x


class Conv2(nn.Module):
    # define the model
    def __init__(self, length, hidden_size, activation, kernel_size: int = 3):
        super().__init__()
        self.activation = activation
        self.length = length
        self.kernel_size = kernel_size  # expected odd kernel_size
        self.hidden_size = hidden_size  # hidden_size = 3 times a perfect square

        self.conv1 = nn.Conv2d(3, 3, self.kernel_size, padding=int((self.kernel_size - 1) / 2))
        self.conv2 = nn.Conv2d(3, 3, self.kernel_size, padding=int((self.kernel_size - 1) / 2))
        self.fc1 = nn.Linear(3 * length * length, self.hidden_size)
        self.fc2 = nn.Linear(self.hidden_size, self.hidden_size)
        self.fc3 = nn.Linear(self.hidden_size, 2 * length * length)

    def forward(self, x):
        x = self.activation(self.conv1(x))
        x = f.max_pool2d(x, kernel_size=self.kernel_size, stride=1, padding=int((self.kernel_size - 1) / 2))
        x = self.activation(self.conv2(x))
        x = f.max_pool2d(x, kernel_size=self.kernel_size, stride=1, padding=int((self.kernel_size - 1) / 2))
        x = torch.flatten(x, 1)
        x = self.activation(self.fc1(x))
        x = self.activation(self.fc2(x))
        x = self.fc3(x)
        x = x.reshape(x.size(0) * self.length ** 2, 2)
        return x


class Conv3(nn.Module):
    # define the model
    def __init__(self, length, hidden_size, activation, kernel_size: int = 3):
        super().__init__()
        self.activation = activation
        self.length = length
        self.kernel_size = kernel_size  # expected odd kernel_size
        self.hidden_size = hidden_size  # hidden_size = 3 times a perfect square

        self.conv1 = nn.Conv2d(3, 3, self.kernel_size, padding=int((self.kernel_size - 1) / 2))
        self.conv2 = nn.Conv2d(3, 3, self.kernel_size, padding=int((self.kernel_size - 1) / 2))
        self.conv3 = nn.Conv2d(3, 3, self.kernel_size, padding=int((self.kernel_size - 1) / 2))
        self.fc1 = nn.Linear(3 * length * length, self.hidden_size)
        self.fc2 = nn.Linear(self.hidden_size, self.hidden_size)
        self.fc3 = nn.Linear(self.hidden_size, 2 * length * length)

    def forward(self, x):
        x = self.activation(self.conv1(x))
        x = f.max_pool2d(x, kernel_size=self.kernel_size, stride=1, padding=int((self.kernel_size - 1) / 2))
        x = self.activation(self.conv2(x))
        x = f.max_pool2d(x, kernel_size=self.kernel_size, stride=1, padding=int((self.kernel_size - 1) / 2))
        x = self.activation(self.conv3(x))
        x = f.max_pool2d(x, kernel_size=self.kernel_size, stride=1, padding=int((self.kernel_size - 1) / 2))
        x = torch.flatten(x, 1)
        x = self.activation(self.fc1(x))
        x = self.activation(self.fc2(x))
        x = self.fc3(x)
        x = x.reshape(x.size(0) * self.length ** 2, 2)
        return x


In [None]:
import os
import cv2
import pandas as pd
import torch.nn.functional as f

from tqdm import tqdm
from torch import optim
from datetime import datetime
from torchvision.utils import save_image
from torchvision.transforms import Resize
from sklearn.metrics import accuracy_score, f1_score


def get_img_index(path):
    index = str(path).split('_')[-1].split('.')[0]
    return index


def fit_model(model, model_parameters, loss_function, optimizer, batch_size, image_normalized_length, num_of_epochs):
    # retrieve train and test files
    train_loader, test_loader = get_train_test_loaders(batch_size, image_normalized_length)
    # if GPU is available, prepare it for heavy calculations
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # assign the model
    model = model(*model_parameters).to(device)
    # assign the loss function
    criterion = loss_function()
    # set an optimizer
    optimizer = optimizer(model.parameters(), lr=3e-4)
    for epoch in range(1, num_of_epochs + 1):
        # MODEL TRAINING
        model.train()
        # start counting epoch duration
        epoch_start = datetime.now()
        # initiate epoch loss
        epoch_loss = 0
        # iterate through all data pairs
        for image, mask in tqdm(train_loader):
            # convert input pixel to tensor
            x = image.float().to(device)
            # convert target to tensor
            tag = mask.flatten().long().to(device)
            # reset all gradients
            optimizer.zero_grad()
            # save current prediction
            prediction = model(x)
            # activate loss function, calculate loss
            loss = criterion(prediction, tag)
            # back propagation
            loss.backward()
            optimizer.step()
            # update epoch loss
            epoch_loss += loss.item()
        # stop counting epoch duration
        epoch_end = datetime.now()
        epoch_seconds = (epoch_end - epoch_start).total_seconds()
        # MODEL EVALUATION
        model.eval()
        # collect predicted results and real results
        predicted, real = list(), list()
        for x, y in tqdm(test_loader):
            x = x.to(device)
            real.append(y)
            probabilities = model(x)
            batch_predicted = torch.argmax(probabilities, dim=1)
            predicted.append(batch_predicted)
        real = torch.cat(real).reshape(-1)
        predicted = torch.cat(predicted).reshape(-1).detach().cpu()
        # calculate accuracy and f1 score
        accuracy = accuracy_score(real, predicted)
        f1 = f1_score(real, predicted)
        # append results to csv file
        df = pd.DataFrame({'Model Name': [model.__class__.__name__],
                           'Iteration': [epoch],
                           'Input Image Length': [image_normalized_length],
                           'Hidden Layer Size': [hidden_layer_size],
                           'Batch Size': [batch_size],
                           'Activation Function': [str(model_parameters[2].__name__)],
                           'Optimizer': [str(type(optimizer))],
                           'Loss Function': [str(loss_function)],
                           'Loss': [epoch_loss],
                           'Accuracy': [accuracy],
                           'F1': [f1],
                           'Iteration Training Seconds': [epoch_seconds]})
        df.to_csv('drive/MyDrive/Water_Bodies_Results.csv', index=False, mode='a', header=False)
        print(df)
    # save a prediction
    # with torch.no_grad():
    #     for i, image, mask in enumerate(test_loader):
    #         name = ''
    #         x = image.float().to(device)
    #         tag = mask.flatten().long().to(device)
    #         prediction = model(x)
    #         file_name, _ = test_loader.dataset.samples[i]
    #         file_index = get_img_index(file_name)
    #         save_prediction(prediction, file_index, name)
    torch.cuda.empty_cache()


def save_prediction(prediction, index, name):
    # convert two-valued pixels to single-max-value
    prediction = torch.argmax(prediction, dim=1)
    # reshape the flattened prediction to a matrix
    prediction = prediction.reshape(100, 100)
    # get original size
    source_path = f'{os.getcwd()}{os.sep}Water Bodies Dataset{os.sep}Images{os.sep}water_body_{index}.jpg'
    width, height = cv2.imread(source_path).shape
    # resize prediction to original source size
    prediction = Resize(prediction, size=(width, height))
    prediction_path = f'{os.getcwd()}{os.sep}Deep Images{os.sep}{name}_{index}.jpg'
    # save the prediction
    save_image(prediction, prediction_path)


if __name__ == '__main__':
    models = (Hidden1, Hidden2)
    activation_funcs = (f.relu, f.leaky_relu, f.sigmoid)
    hidden_layer_sizes = (2_500, 10_000, 40_000)
    batch_sizes = (4, 16)
    optimizers = (optim.Adam, optim.SGD)

    image_normalized_length = 100
    num_of_epochs = 10
    loss_func = nn.CrossEntropyLoss
    kernel_size = 3

    # train models with varying hyperparameters
    for model in models:
        for activation_func in activation_funcs:
            for hidden_layer_size in hidden_layer_sizes:
                for batch_size in batch_sizes:
                    for optimizer in optimizers:
                        model_parameters = (image_normalized_length, hidden_layer_size, activation_func)
                        fit_model(model, model_parameters, loss_func, optimizer,
                                  batch_size, image_normalized_length, num_of_epochs)


 45%|████▌     | 258/569 [16:03<18:04,  3.49s/it]

In [None]:
df = pd.DataFrame({'Model Name': [],
                   'Iteration': [],
                   'Input Image Length': [],
                   'Hidden Layer Size': [],
                   'Batch Size': [],
                   'Activation Function': [],
                   'Optimizer': [],
                   'Loss Function': [],
                   'Loss': [],
                   'Accuracy': [],
                   'F1': [],
                   'Iteration Training Seconds': []})
df.to_csv('drive/MyDrive/Water_Bodies_Results.csv', index=False, header=True)