In [1]:
%load_ext autoreload
%autoreload 2

In [1]:
# This cell should be runned. Otherwise, the following cells will not work properly.
batch_size = 16
max_num_epoch = 100

# --- imports ---
import torch
import os
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
import hw3utils

# ---- options ----
DEVICE_ID = "cuda" if torch.cuda.is_available() else 'cpu' # set to 'cpu' for cpu, 'cuda' / 'cuda:0' or similar for gpu.
LOG_DIR = 'checkpoints'
VISUALIZE = False # set True to visualize input, prediction and the output from the last batch

torch.multiprocessing.set_start_method('spawn', force=True)
torch.manual_seed(483) #can be removed if seed is not needed
np.random.seed(483) #can be removed if seed is not needed
# ---- utility functions -----
def get_loaders(batch_size,device):
    data_root = 'dataset/ceng483-hw3-dataset'
    train_set = hw3utils.HW3ImageFolder(root=os.path.join(data_root,'train'),device=device)
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0)
    val_set = hw3utils.HW3ImageFolder(root=os.path.join(data_root,'val'),device=device)
    val_loader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=0)
    # Note: you may later add test_loader to here.
    return train_loader, val_loader

# ---- ConvNet -----
class Net(nn.Module):
    kernel_size = 3

    def __init__(self,num_of_layer,num_of_kernel,batch_norm=False,use_tanh=False):
        super(Net, self).__init__()
        self.num_of_layer = num_of_layer
        self.num_of_kernel = num_of_kernel if num_of_layer != 1 else 3
        self.batch_norm = batch_norm
        self.use_tanh = use_tanh
        
        self.conv_first = nn.Conv2d(1, self.num_of_kernel, self.kernel_size, padding=1)
        self.conv_inter = nn.Conv2d(self.num_of_kernel, self.num_of_kernel, self.kernel_size, padding=1)
        self.conv_last = nn.Conv2d(self.num_of_kernel, 3, self.kernel_size, padding=1)
        self.batch_norm_inter = nn.BatchNorm2d(self.num_of_kernel)
        self.batch_norm_out = nn.BatchNorm2d(3)
        self.tanh = nn.Tanh()
        self.relu = nn.ReLU()

    def forward(self, grayscale_image):
        # apply your network's layers in the following lines:
        x = None
        for layer_i in range(self.num_of_layer):
            if layer_i == 0:
                x = self.conv_first(grayscale_image)
                if self.batch_norm: x = self.batch_norm_inter(x)
                if layer_i != self.num_of_layer-1:  x = self.relu(x)
                
            elif layer_i == self.num_of_layer-1:
                x = self.conv_last(x)
                if self.batch_norm: x = self.batch_norm_out(x)
            else:
                x = self.conv_inter(x)
                if self.batch_norm: x = self.batch_norm_inter(x)
                x = self.relu(x)
        if self.use_tanh: x = self.tanh(x)
        return x

# ---- MarginLoss -----
class MarginLoss(nn.Module):
    def __init__(self, margin):
        super(MarginLoss, self).__init__()
        self.margin = margin
        
    def forward(self, predictions, true_values):
        
        loss = torch.abs(true_values-predictions)
        loss = torch.maximum(loss - self.margin,torch.zeros_like(loss))         
        loss = torch.mean(torch.square(loss))
        
        return loss

# ---- training code -----
device = torch.device(DEVICE_ID)
print('device: ' + str(device))

criterion_mse = nn.MSELoss()
criterion_margin = MarginLoss(24/255)
train_loader, val_loader = get_loaders(batch_size,device)

  if save_path is not '':


device: cuda


In [2]:
# This functions are called inside the train() function, but they should be uncommented and runned if you want to get the results.
import matplotlib.pyplot as plt
def plot_losses(train_losses, val_losses,title):
    """Plots the losses for training and validation sets with respect to epochs."""
    plt.close()
    last_point_tr = train_losses[-1]
    last_point_val = val_losses[-1]
    last_point_x = len(train_losses)-1
    
    plt.scatter(last_point_x, last_point_tr, color='blue', label=f"Loss:{last_point_tr:.4f} ")
    plt.scatter(last_point_x, last_point_val, color='red', label=f"Loss:{last_point_val:.4f} ")
    
    plt.plot(train_losses, 'b', label='Training loss')
    plt.plot( val_losses, 'r', label='Validation loss')
    plt.title(title)
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.savefig(LOG_DIR+"/"+title+"_plot.jpg")
    plt.close()

def visualize_same_batch(net,title):
    """Visualizes the input, prediction and the output from the first 6 batches.

    Args:
        net: model
        title (str): name of the saved file 
    """
    for iteri, data in enumerate(val_loader, 0):
        if iteri == 6:
            break
        inputs, targets = data
        preds = net(inputs)
        hw3utils.visualize_batch(inputs[:8],preds[:8],targets[:8],os.path.join(LOG_DIR,f'{title}_img'+str(iteri)+'.png'))

In [3]:
def calculate_loss(loader,model,loss_func):
    """returns the average loss on the given data set and over the given model.

    Args:
        loader: data loader
        model: trained model
        loss_func (function): function that calculates the loss

    Returns:
        float: average loss
    """
    total_loss = 0
    for i, data in enumerate(loader, 0):
        inputs, labels = data
        outputs = model(inputs)
        loss = loss_func(outputs, labels)
        total_loss += loss.item()
    return total_loss/len(loader)   

In [5]:
#This cell should be runned if you want to train the model. Otherwise, no need.
early_stop_delta = 0.0001
early_stop_counter_max = 3


def train(num_of_layer,num_of_kernel,lr,batch_norm,use_tanh):
    """Trains the model with the given parameters.
        Used loss function in training is MSE.
        Used loss function in validation is 12-Margin Error.
    Args:
        num_of_layer (int): number of layers
        num_of_kernel (int): number of kernels
        lr (float): learning rate
        batch_norm (bool): whether to use batch normalization
        use_tanh (bool): whether to use tanh 
    """
    title = f"Layer{num_of_layer}_Kernel{num_of_kernel}_LR{lr}_BN{batch_norm}_TANH{use_tanh}"
    net = Net(num_of_layer=num_of_layer,num_of_kernel = num_of_kernel,batch_norm=batch_norm,use_tanh=use_tanh).to(device=device)
    optimizer = optim.SGD(net.parameters(), lr=lr)
    print(f'training begins --> {num_of_layer} layers | {num_of_kernel} kernels | lr = {lr} | batch_norm = {batch_norm} | use_tanh = {use_tanh}')
    loss_lst_valid = []
    loss_lst_train = []
    early_stop_counter = 0
    min_validation_loss = float('inf')
    for epoch in range(max_num_epoch):
        for iteri, data in enumerate(train_loader, 0):
            inputs, targets = data
            optimizer.zero_grad() # zero the parameter gradients
            # do forward, backward, SGD step
            preds = net(inputs)
            loss = criterion_mse(preds, targets)
            # loss = criterion_margin(preds, targets)
            loss.backward()
            optimizer.step()  
            print(f"Batch: {iteri}/{len(train_loader)}", end='\r')
        
        avg_loss_train = calculate_loss(train_loader,net,criterion_mse)
        avg_loss_val = calculate_loss(val_loader,net,criterion_margin)
        
        

        print('Epoch %d training loss: %.5f' %
                (epoch + 1, avg_loss_train))
        print('Epoch %d validation loss: %.5f' %
                (epoch + 1, avg_loss_val))
        
        #Early stop
        if (min_validation_loss - avg_loss_val ) < early_stop_delta :
            early_stop_counter += 1
            print(f"Early_stop counter {early_stop_counter}")
            if early_stop_counter >= early_stop_counter_max:
                print(f"Early_stop with last epoch {avg_loss_val}")
                break
        else:
            early_stop_counter = 0
        min_validation_loss = min(avg_loss_val,min_validation_loss)
                
        loss_lst_valid.append(avg_loss_val)
        loss_lst_train.append(avg_loss_train)
                
        print('Saving the model, end of epoch %d' % (epoch+1))
        if not os.path.exists(LOG_DIR):
            os.makedirs(LOG_DIR)    
        torch.save(net.state_dict(), os.path.join(LOG_DIR,f'{title}.pt'))
        # visualize_same_batch(net, title) #uncomment if you want to visualize and save the input, prediction and the output from the first 6 batches.
        # plot_losses(loss_lst_train,loss_lst_valid,title) #uncomment if you want to plot the losses.
    print('Finished Training')
    

In [None]:
#THIS CELL SHOULD NOT BE RUNNED UNLESS YOU WANT TO GRID SEARCH THE HYPERPARAMETERS
#Used for baseline model Part 1
num_of_layers = [1,2,4]
num_of_kernels = [2,4,8]
learning_rates = [0.0001,0.001,0.01,0.1]

for num_of_layer in num_of_layers:
        for num_of_kernel in num_of_kernels:
            for lr in learning_rates:
                train(num_of_layer,num_of_kernel,lr,False,False)

In [6]:
#This cell should be runned if you want to create the npy file.
#It will directly create a npy file when runned.
import hw3utils
saved_model_file_name = "Layer2_Kernel8_LR0.001_BNFalse_TANHTrue" #change this to the name of the model you want to use

def get_test_loader():
    """Returns the test loader for test images"""
    test_root = 'test' # change to the directory of test images
    test_set = hw3utils.HW3ImageFolder(root=os.path.join(test_root),device=device)
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=1, shuffle=False, num_workers=0)
    return test_loader

    
def create_npy(title):
    """Creates an estimation npy file for test images

    Args:
        title (str): name of the pt file to be loaded as model withot .pt
    """
    directory = LOG_DIR # change to the directory of the model
    test_loader = get_test_loader()
    # _,val_load =  get_loaders(1,device) #used for calculating accuracy on validation set
    
    results_array = np.zeros((100, 80, 80, 3), dtype=np.float32)
    saved_state_dict = torch.load(directory+"/"+title+'.pt')
    
    #change the parameters according to the loaded model
    net2 = Net(num_of_layer=2,num_of_kernel = 8,batch_norm=False,use_tanh=True).to(device=device)  #Parameters of my best model
    net2.load_state_dict(saved_state_dict)
    myfile = open('test_images.txt', 'w')
    myfile.write("") #clearing inside if it is not empty
    # Iterate through the test set
    for iteri, data in enumerate(test_loader, 0):
        
        inputs, _ = data
        preds = net2(inputs)
        preds_array = preds[0].cpu().detach().numpy().transpose(1, 2, 0)
        
        results_array[iteri] = preds_array
        if iteri == 99:
            break

    # Convert the results array to [0,255] from [-1,1]
    results_array = ((results_array+1) * (255/2)).astype(np.uint8)
    print(len(results_array)) #should be 100
    
    # Save the results array to a file
    np.save('estimations_test.npy', results_array)

create_npy(saved_model_file_name) 

100


In [None]:
!python evaluate.py estimations.npy img_names.txt