# Task and Setup

Mini-project in PyTorch for getting expereience.

Loads images, blacks a specific part of them, trains a model to predict the blacked part.


In [None]:
# Imports
import numpy as np
import pandas as pd
from os import mkdir
from zipfile import ZipFile
import urllib
import glob
import os
import random
from os.path import exists, join
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from copy import copy
import torch
from datetime import datetime
from torch.utils.tensorboard import SummaryWriter
import time
from torch.optim.lr_scheduler import StepLR
from statistics import mean 
import tqdm.notebook as tq

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
###--- Get Data---#

working_path = os.getcwd()
print("Start path:", working_path)

dataURL = "https://sid.erda.dk/public/archives/daaeac0d7ce1152aea9b61d9f1e19370/GTSRB-Training_fixed.zip"
fileName = "GTSRB-Training_fixed.zip"
path = "./pytorch_project"
path_data = "./data"
millis = int(round(time.time() * 1000))

# Create folder in current path
# Checks wethere the folder exisits or not, raises exception if something goes wrong
# Checks if the path name is part of the actual path and if path exists in general
try:
    if os.path.exists(path) == False and str(path)[2:] not in (working_path):
        os.mkdir(path)
        print ("Successfully created %s " % path)
    else: 
        print ("The directory %s already exists" % path)
except (OSError, Exception):
    print ("Creation of the directory %s failed" % path)

    
# Switches in the correct working dir    
if str(path)[2:] not in (working_path):
    os.chdir(path)
working_path = os.getcwd()
print(f"Working now in: {working_path}")

# Init tensorbaord writer
writer = SummaryWriter(f"runs/write_{millis}")

# Download and unzip the data
try:
    if os.path.exists(path_data) == False:
        # Copy a network object to a local file
        print ("Start download to:", fileName)
        urllib.request.urlretrieve(dataURL, fileName)
        print ("End download, start to unzipn")
        with ZipFile("./" + fileName, 'r') as zip:
            zip.extractall("./data")
            zip.close()
        print ("End unzipped, data now useable under ./data")
    else:
        print("Data folder already exists in:", path +"/data", " -> no new data loaded.")
except Exception as e:
    print ("Failed with Exception:", e)

In [None]:
###--- Display Data---#

images = glob.glob(os.path.join("./data/GTSRB/Training", "*/*.ppm"), recursive=True)
for i in range (0, 5):
    print("Open random orignal Foro:")
    x = random.randint(0, len(images))
    img = Image.open(images[x])
    display(img)
    print("Formatted Version")
    img = img.resize((200,200))
    img = img.convert('LA')
    display(img)

In [None]:
###--- Create Dataset ---###
class Dataset(Dataset):
    def __init__(self, dataset_folder, xDim, yDim, resize):
        self.dataset_folder = dataset_folder
        self.image_filenames = glob.glob(os.path.join(self.dataset_folder, "*/*.ppm"), recursive=True)
        #xDim -> left to right
        self.xDim = xDim
        #yDim -> top to bottom
        self.yDim = yDim
        self.resize = resize
        self.device = device

        
    def __len__(self):
        return len(self.image_filenames)
    
    
    # Returns the original image formatted, and formatted with blacked part
    def __getitem__(self, idx):
        img = Image.open(self.image_filenames[idx])
        img = transforms.Resize((self.resize[0], self.resize[1]))(img)
        img = transforms.functional.to_grayscale(img)
        img = transforms.ToTensor()(img)
        img_out = img.clone().detach()
        img_out[:, self.xDim[0]:self.xDim[1], self.yDim[0]:self.yDim[1]] = 0
        return img, img_out
    

# Creates dataset, parametrized is the blacked part, and the resize of the picture
dataset=Dataset(dataset_folder="./data/GTSRB/Training", xDim=[70, 100], yDim=[70, 100], resize=[200, 200])

# Val_ratio chosen as in pytorch.org docu
val_ratio = 0.2
n_samples = len(dataset)
shuffled_indices = np.random.permutation(n_samples)
validationset_inds = shuffled_indices[:int(n_samples * val_ratio)]
trainingset_inds = shuffled_indices[int(n_samples * val_ratio):]


###--- Data Loader ---# 
batch_size = 8

train_dataset = torch.utils.data.Subset(dataset, indices=trainingset_inds)
val_dataset = torch.utils.data.Subset(dataset, indices=validationset_inds)

train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           num_workers=0,
                                           shuffle=True, sampler=None,
                                           collate_fn=None)


val_loader = torch.utils.data.DataLoader(dataset=val_dataset,
                                           batch_size=batch_size,
                                           num_workers=0,
                                           shuffle=False, sampler=None,
                                           collate_fn=None)

In [None]:
### Idea for network structur:
### Paper for best inpaiting: http://iizuka.cs.tsukuba.ac.jp/projects/completion/data/completion_sig2017.pdf
### Source code for model, referenced to the paper: https://github.com/otenim/GLCIC-PyTorch

class CompletionNetwork(nn.Module):
    def __init__(self, in_channels):
        super(CompletionNetwork, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, 4, kernel_size=5, stride=1, padding=2)
        self.bn1 = nn.BatchNorm2d(4)
        self.act1 = nn.ReLU()
       
        self.conv2 = nn.Conv2d(4, 8, kernel_size=3, stride=2, padding=1)
        self.bn2 = nn.BatchNorm2d(8)
        self.act2 = nn.ReLU()
        
        self.conv3 = nn.Conv2d(8, 8, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(8)
        self.act3 = nn.ReLU()
        
        self.conv4 = nn.Conv2d(8, 8, kernel_size=3, stride=2, padding=1)
        self.bn4 = nn.BatchNorm2d(8)
        self.act4 = nn.ReLU()
       
        self.conv5 = nn.Conv2d(8,8, kernel_size=3, stride=1, padding=1)
        self.bn5 = nn.BatchNorm2d(8)
        self.act5 = nn.ReLU()
        
        self.conv6 = nn.Conv2d(8, 8, kernel_size=3, stride=1, padding=1)
        self.bn6 = nn.BatchNorm2d(8)
        self.act6 = nn.ReLU()
        
        self.deconv13 = nn.ConvTranspose2d(8, 8, kernel_size=4, stride=2, padding=1)
        self.bn13 = nn.BatchNorm2d(8)
        self.act13 = nn.ReLU()
        
        self.conv14 = nn.Conv2d(8, 8, kernel_size=3, stride=1, padding=1)
        self.bn14 = nn.BatchNorm2d(8)
        self.act14 = nn.ReLU()
        
        self.deconv15 = nn.ConvTranspose2d(8, 4, kernel_size=4, stride=2, padding=1)
        self.bn15 = nn.BatchNorm2d(4)
        self.act15 = nn.ReLU()
        
        self.conv16 = nn.Conv2d(4, 2, kernel_size=3, stride=1, padding=1)
        self.bn16 = nn.BatchNorm2d(2)
        self.act16 = nn.ReLU()
        
        self.conv17 = nn.Conv2d(2, 1, kernel_size=3, stride=1, padding=1)
        self.act17 = nn.Sigmoid()

        
    def forward(self, x):
        x = self.bn1(self.act1(self.conv1(x)))
        x = self.bn2(self.act2(self.conv2(x)))
        x = self.bn3(self.act3(self.conv3(x)))
        x = self.bn4(self.act4(self.conv4(x)))
        x = self.bn5(self.act5(self.conv5(x)))
        x = self.bn6(self.act6(self.conv6(x)))
        x = self.bn13(self.act13(self.deconv13(x)))
        x = self.bn14(self.act14(self.conv14(x)))
        x = self.bn15(self.act15(self.deconv15(x)))
        x = self.bn16(self.act16(self.conv16(x)))
        x = self.act17(self.conv17(x))
        return x

completionNetwork = CompletionNetwork(in_channels=1)
completionNetwork.to(device)

img, img_out = (iter(train_loader)).next()
img = img[0].unsqueeze(0).to(device)

completionNetwork(img)

In [None]:
model = CompletionNetwork(in_channels=1)
model.to(device)

optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
total_loss = 0


loss_function = nn.MSELoss(reduction='mean')

class ModelTainer:
    def __init__(self ,model, train_loader, val_loader, device, optimizer, loss_function, optimizer_args={}):
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.device = device
        self.optimizer_args = optimizer_args
        self.optimizer = optimizer
        self.loss_function = loss_function
        self.optimizer = optimizer
                
    def train_epoch(self, running_loss, epoch, n_epochs):
        self.running_loss = running_loss 
        self.epoch = epoch
        self.n_epochs = n_epochs
        total_loss = 0
        loop = tq.tqdm(enumerate(self.train_loader), total=len(self.train_loader), leave=False)
        for i, data in loop:
            img, target = data
            img, target = img.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(img)
            loss = loss_function(output, target)
            
            #total_loss += float(loss)
            total_loss += loss.item()
            loss.backward()
            optimizer.step()
            
            loop.set_description(f"Epoch [{self.epoch}/{self.n_epochs}] | Training")
            loop.set_postfix(loss =loss.item(), total_loss=total_loss)
            
            ###--- Tensorboard ---#
            self.running_loss += loss.item()
            if i % 100 == 99:
                writer.add_scalar('training loss',
                                self.running_loss / 1000,
                                self.epoch * len(train_loader) + i)
                self.running_loss = 0.0
        
                
        return (total_loss) / len(train_loader), self.running_loss 
    
    def evaluate(self, epoch, n_epochs):
        self.epoch = epoch
        self.n_epochs = n_epochs
        total_loss = 0
        correct = 0 
        loop = tq.tqdm(enumerate(self.val_loader), total=len(self.val_loader), leave=False)
        with torch.no_grad():
            for i, data in loop:
                img, target = data
                img, target = img.to(device), target.to(device)
                output = model(img)
                predicted = torch.argmax(output.data, dim=1)
                correct += (predicted == target).sum().item()
                loss = loss_function(output, target)
                total_loss += loss.item()
                loop.set_description(f"Epoch [{self.epoch}/{self.n_epochs}] | Validation")
                loop.set_postfix(loss =loss.item(), total_loss=total_loss)
        return (total_loss) / len(val_loader), correct / (val_loader.batch_size * len(val_loader))
    
    def train(self, n_epochs):
        print(f"Model training starts: {datetime.now()} | working with: {device} | Amount of epochs: {n_epochs}")
        # Storing Results of Loss for plot
        loss_values_trainingX = []
        loss_values_trainingY = []
        loss_values_evalutationX = []
        loss_values_evalutationY = []
        running_loss = 0.0
        
        # Adjust learning rate
        scheduler = StepLR(optimizer, step_size=2, gamma=0.1)

        for e in range(1, n_epochs+1):
            print(f"Start epoch: {e} | Time: {datetime.now()}")
            start = datetime.now().replace(microsecond=0)
            loss_train, running_loss = self.train_epoch(running_loss=running_loss, epoch=e, n_epochs=n_epochs)
            loss_values_trainingX.append(e)
            loss_values_trainingY.append(loss_train)
            duration = datetime.now().replace(microsecond=0) - start
            print(f"Training epoch: {e} | Duration: {duration} | Loss: {loss_train}")
            start = datetime.now().replace(microsecond=0)
            loss_eval, correct = self.evaluate(epoch=e, n_epochs=n_epochs)
            loss_values_evalutationX.append(e)
            loss_values_evalutationY.append(loss_eval)
            duration = datetime.now().replace(microsecond=0) - start
            print(f"Validation epoch: {e} | Duration: {duration} | Loss: {loss_eval} | Correct: {correct}")
            print(f"End epoch: {e} | Time: {datetime.now()}")
            scheduler.step()
        
        # Loss plotted
        plt.plot(loss_values_trainingX, loss_values_trainingY, label = "Training Loss")
        plt.plot(loss_values_evalutationX, loss_values_evalutationY, label = "Validation Loss")
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.title('Plotting Loss for Train and Validation')
        plt.legend()
        plt.show()
        print(f"Model training end: {datetime.now()}")
    
    def inference(self, loader=None):        
        pass
    
trainer = ModelTainer(model=model, train_loader=train_loader, val_loader=val_loader, device=device, optimizer=optimizer, loss_function=loss_function)
epochs = 15
trainer.train(n_epochs=epochs)

date = datetime.now().strftime("%Y%m%d_%H:%M:%S")
name = f"MODEL_WATZELT_EPOCHS_{epochs}_DATE_{date}.pt"

###--- Save Model ---###
torch.save(model.state_dict(), name)

### Uncomment if you want to use TensorBoard :)

In [None]:
# From time to time board is only visible in browser and does not show in the juypter notebooks (common bug known at stackoverflow)
#%load_ext tensorboard
# IF reload required
# %reload_ext tensorboard
# CARE If you want to use it, replace the path with the path of the log dirs, should be something like .../runs/write_x
#%tensorboard --logdir PATH

# Get access to tensorboard: http://localhost:6006/

In [None]:
# Load Model, uncomment for well working model
print(f"Working for model: {name}")
model = CompletionNetwork(in_channels=1)
model.load_state_dict(torch.load(f"{name}"))
model.to(device)
model.eval()

# helpfer function
# https://www.pyimagesearch.com/2014/09/15/python-compare-two-images/
def __mse__(imageA, imageB):
    # the 'Mean Squared Error' between the two images is the
    # sum of the squared difference between the two images;
    # NOTE: the two images must have the same dimension
    err = np.sum((imageA.astype("float") - imageB.astype("float")) ** 2)
    err /= float(imageA.shape[0] * imageA.shape[1])
    
    # return the MSE, the lower the error, the more "similar"
    # the two images are
    return err

mse_all = []
with torch.no_grad():
    for i, data in enumerate(val_loader):
        img, target = data
        original, target = img.to(device), target.to(device)
        prediction = model(target)
        for i in range(0, len(img)):
            original = img[i].squeeze(0).cpu().numpy()
            _prediction = prediction[i].squeeze(0).cpu().numpy()
            x = __mse__(original, _prediction)
            mse_all.append(x)

print("Average Mean Squared Error:", mean(mse_all))

In [None]:
###--- Random Testing of Model ---###

print(f"Working for model: {name}")
model = CompletionNetwork(in_channels=1)
model.load_state_dict(torch.load(f"{name}"))
model.to(device)
model.eval()

# Helper functions
# Formats a random image from a path
def __formatRandomPicture__(_img,  xDim, yDim, resize):
    img = Image.open(_img)
    img = transforms.Resize((resize[0], resize[1]))(img)
    img = transforms.functional.to_grayscale(img)
    img = transforms.ToTensor()(img)
    img_out = img.clone().detach()
    img_out[:, xDim[0]:xDim[1], yDim[0]:yDim[1]] = 0
    return img, img_out

# Plots the blacked, predicted and original picture
def __plotter__(blacked, original, title='Overview'):
    model_pred = model(blacked.unsqueeze(0).unsqueeze(0).to(device)).detach().cpu().clone().numpy().squeeze(0).squeeze(0)
    
    fig, (ax1, ax2, ax3) = plt.subplots(1,3)
   
    fig.suptitle(title, fontsize=14)
    ax1.set_title('Blacked')
    ax2.set_title('Orginal')
    ax3.set_title('Model')

    ax1.imshow(blacked, cmap="gray")
    ax2.imshow(original, cmap="gray")
    ax3.imshow(model_pred, cmap="gray")
    
    plt.tight_layout()    
    plt.show()


data_iterator_train = iter(train_loader)
imgs_train, targets_train = data_iterator_train.next() 

data_iterator_val = iter(val_loader)
imgs_val, targets_val = data_iterator_val.next() 
for i in range (4, 7):
    __plotter__(original = imgs_train[i].squeeze(0), blacked = targets_train[i].squeeze(0), title= "Overview foto: training set")
    print()
    __plotter__(original = imgs_val[i].squeeze(0), blacked = targets_val[i].squeeze(0), title= "Overview foto: validation set")
    print()


# Enter URL of jpg, png, ppm
# Should have at least .xxx as file ending
photoArray=["https://enter_picture_url_example.jpg"]

x = 0
for i in photoArray:
    urllib.request.urlretrieve(str(i), f"{str(x)}.{i[-3:]}")
    img, img_out = __formatRandomPicture__(_img=f"{str(x)}.{i[-3:]}", xDim=[60, 100], yDim=[60, 100], resize=[256, 256])
    original = img.squeeze(0)
    blacked = img_out.squeeze(0)
    __plotter__(original=original, blacked=blacked, title=f"Over foto: {i}")