# Importing all the libraries needed

In [1]:
import torch
from torch import nn, optim
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader

import glob
import numpy as np
from PIL import Image
from tqdm.notebook import tqdm
from skimage.color import rgb2lab, lab2rgb



# Select the device to be cuda in order to use the GPU
if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("cpu")

# Get the data in order to create the dataset

In [1]:
#Fastai library to get images required for training
!pip install fastai==2.4

Collecting fastai==2.4
  Downloading fastai-2.4-py3-none-any.whl (187 kB)
[?25l[K     |█▊                              | 10 kB 25.6 MB/s eta 0:00:01[K     |███▌                            | 20 kB 31.8 MB/s eta 0:00:01[K     |█████▎                          | 30 kB 27.1 MB/s eta 0:00:01[K     |███████                         | 40 kB 15.5 MB/s eta 0:00:01[K     |████████▊                       | 51 kB 12.8 MB/s eta 0:00:01[K     |██████████▌                     | 61 kB 14.9 MB/s eta 0:00:01[K     |████████████▏                   | 71 kB 14.9 MB/s eta 0:00:01[K     |██████████████                  | 81 kB 13.1 MB/s eta 0:00:01[K     |███████████████▊                | 92 kB 14.5 MB/s eta 0:00:01[K     |█████████████████▍              | 102 kB 14.4 MB/s eta 0:00:01[K     |███████████████████▏            | 112 kB 14.4 MB/s eta 0:00:01[K     |█████████████████████           | 122 kB 14.4 MB/s eta 0:00:01[K     |██████████████████████▊         | 133 kB 14.4 MB/s eta 

In [None]:
# Run this in order to get the images for the celebA dataset
# but change paths to coco_image_paths
path = "/content/drive/MyDrive/celebAimages/img_align_celeba"  
paths = glob.glob(path + "/*.jpg") # Grabbing all the image file names

In [2]:
#Run this in order to het the coco images
from fastai.data.external import untar_data, URLs
# Use a sample of around 20.000 pictures of the coco dataset for object detection
coco_images_path = untar_data(URLs.COCO_SAMPLE)
coco_images_path = str(coco_images_path) + "/train_sample"
#Use glob module to find all the jpg files (the images needed for training)
coco_image_paths = glob.glob(coco_images_path + "/*.jpg")

In [3]:
# In most of my experiments the random.seed(25) was used
# the experiments were replicated with other random seeds as well  
np.random.seed(25)

In [4]:
# Here, the parameters of how many images have been modified in order to create multiple test situations
# Choose 21000 image paths randomly for all of them 
coco_paths_subset = np.random.choice(coco_image_paths, 21000, replace=False)
print(len(coco_paths_subset))
# Random indexes to do the permutations
random_idxs = np.random.permutation(21000)
#Choosing the first 20000 indexes for the training set
training_idxs = random_idxs[:20000]
#Choosing the other 1000 indexes for the validation set
validation_idxs = random_idxs[20000:] # choosing last 2000 as validation set
# get the training paths
training_paths = coco_image_paths[0:8000]
# get the validation paths
validation_paths = coco_image_paths[20000:21000]

#print(training_paths)
print(len(training_paths))
#print(validation_paths)
print(len(validation_paths))


21000
8000
1000


# Create the dataset

In [5]:
# Run this cell in order to create the datasets
# This was modified during the training to add data augmentations by using 
# self.transforms = transforms.Compose([ transforms.Resize((SIZE, SIZE),  Image.BICUBIC), transforms.RandomHorizontalFlip(), 
# if the image was a training image

SIZE = 256
class ColorizationDataset(Dataset):
    def __init__(self, paths):
        self.transforms = transforms.Resize((SIZE, SIZE),  Image.BICUBIC)
        self.size = SIZE
        self.paths = paths
    
    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        # Open the image
        img = Image.open(self.paths[idx]).convert("RGB")
        # Transform the image to the corresponding sizes
        img = self.transforms(img)
        # Convert it to an np array
        img = np.array(img)
        # Convert from RGB representation to LAB and convert it to tensor
        img_lab = rgb2lab(img).astype("float32")
        img_lab = transforms.ToTensor()(img_lab)
        #Normalize the values   
        L = img_lab[[0], ...] / 50. - 1. # Between -1 and 1
        ab = img_lab[[1, 2], ...] / 110. # Between -1 and 1
        
        return {'L': L, 'ab': ab}

# Create Dataloaders

In [6]:
training_dataset = ColorizationDataset(training_paths)
validation_dataset = ColorizationDataset(validation_paths)

  "Argument interpolation should be of type InterpolationMode instead of int. "


In [7]:
training_dataloader = DataLoader(training_dataset, batch_size=16, num_workers = 2)
validation_dataloader = DataLoader(validation_dataset, batch_size=16, num_workers = 2)

# ECCV GENERATOR

In [8]:
import torch
from torch import nn

class BaseColor(nn.Module):
	def __init__(self):
		super(BaseColor, self).__init__()

		self.l_cent = 50.
		self.l_norm = 100.
		self.ab_norm = 110.

	def normalize_l(self, in_l):
		return (in_l-self.l_cent)/self.l_norm

	def unnormalize_l(self, in_l):
		return in_l*self.l_norm + self.l_cent

	def normalize_ab(self, in_ab):
		return in_ab/self.ab_norm

	def unnormalize_ab(self, in_ab):
		return in_ab*self.ab_norm

In [9]:
import torch
import torch.nn as nn
import numpy as np
from IPython import embed


class ECCVGenerator(BaseColor):
    def __init__(self, norm_layer=nn.BatchNorm2d):
        super(ECCVGenerator, self).__init__()

        model1=[nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=True),]
        model1+=[nn.ReLU(True),]
        model1+=[nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=1, bias=True),]
        model1+=[nn.ReLU(True),]
        model1+=[norm_layer(64),]

        model2=[nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, bias=True),]
        model2+=[nn.ReLU(True),]
        model2+=[nn.Conv2d(128, 128, kernel_size=3, stride=2, padding=1, bias=True),]
        model2+=[nn.ReLU(True),]
        model2+=[norm_layer(128),]

        model3=[nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1, bias=True),]
        model3+=[nn.ReLU(True),]
        model3+=[nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1, bias=True),]
        model3+=[nn.ReLU(True),]
        model3+=[nn.Conv2d(256, 256, kernel_size=3, stride=2, padding=1, bias=True),]
        model3+=[nn.ReLU(True),]
        model3+=[norm_layer(256),]

        model4=[nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=True),]
        model4+=[nn.ReLU(True),]
        model4+=[nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1, bias=True),]
        model4+=[nn.ReLU(True),]
        model4+=[nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1, bias=True),]
        model4+=[nn.ReLU(True),]
        model4+=[norm_layer(512),]

        model5=[nn.Conv2d(512, 512, kernel_size=3, dilation=2, stride=1, padding=2, bias=True),]
        model5+=[nn.ReLU(True),]
        model5+=[nn.Conv2d(512, 512, kernel_size=3, dilation=2, stride=1, padding=2, bias=True),]
        model5+=[nn.ReLU(True),]
        model5+=[nn.Conv2d(512, 512, kernel_size=3, dilation=2, stride=1, padding=2, bias=True),]
        model5+=[nn.ReLU(True),]
        model5+=[norm_layer(512),]

        model6=[nn.Conv2d(512, 512, kernel_size=3, dilation=2, stride=1, padding=2, bias=True),]
        model6+=[nn.ReLU(True),]
        model6+=[nn.Conv2d(512, 512, kernel_size=3, dilation=2, stride=1, padding=2, bias=True),]
        model6+=[nn.ReLU(True),]
        model6+=[nn.Conv2d(512, 512, kernel_size=3, dilation=2, stride=1, padding=2, bias=True),]
        model6+=[nn.ReLU(True),]
        model6+=[norm_layer(512),]

        model7=[nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1, bias=True),]
        model7+=[nn.ReLU(True),]
        model7+=[nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1, bias=True),]
        model7+=[nn.ReLU(True),]
        model7+=[nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1, bias=True),]
        model7+=[nn.ReLU(True),]
        model7+=[norm_layer(512),]

        model8=[nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=True),]
        model8+=[nn.ReLU(True),]
        model8+=[nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1, bias=True),]
        model8+=[nn.ReLU(True),]
        model8+=[nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1, bias=True),]
        model8+=[nn.ReLU(True),]

        model8+=[nn.Conv2d(256, 313, kernel_size=1, stride=1, padding=0, bias=True),]

        self.model1 = nn.Sequential(*model1)
        self.model2 = nn.Sequential(*model2)
        self.model3 = nn.Sequential(*model3)
        self.model4 = nn.Sequential(*model4)
        self.model5 = nn.Sequential(*model5)
        self.model6 = nn.Sequential(*model6)
        self.model7 = nn.Sequential(*model7)
        self.model8 = nn.Sequential(*model8)

        self.softmax = nn.Softmax(dim=1)
        self.model_out = nn.Conv2d(313, 2, kernel_size=1, padding=0, dilation=1, stride=1, bias=False)
        self.upsample4 = nn.Upsample(scale_factor=4, mode='bilinear')

    def forward(self, input_l):
        conv1_2 = self.model1(self.normalize_l(input_l))
        conv2_2 = self.model2(conv1_2)
        conv3_3 = self.model3(conv2_2)
        conv4_3 = self.model4(conv3_3)
        conv5_3 = self.model5(conv4_3)
        conv6_3 = self.model6(conv5_3)
        conv7_3 = self.model7(conv6_3)
        conv8_3 = self.model8(conv7_3)
        out_reg = self.model_out(self.softmax(conv8_3))

        return self.unnormalize_ab(self.upsample4(out_reg))


# Train ECCV Model

In [None]:
# Import SummaryWriter
from torch.utils.tensorboard import SummaryWriter

# Create a SummaryWriter instance
# SummaryWriter writes event files to log_dir
log_dir = "/content/drive/MyDrive/Graphs/ECCV"
writer = SummaryWriter(log_dir)

num_epochs = 2
#loss function
model = ECCVGenerator()
#PATH = "/content/drive/MyDrive/weights/EGGV2weights.pth"
#model.load_state_dict(torch.load(PATH))
criterion = torch.nn.MSELoss() 
optimizer = optim.Adam(model.parameters(), lr = 0.001)  
data1 = next(iter(validation_dataloader))
model.train()
for epoch in range(num_epochs):
  i = 0
  running_loss = 0.0
  for data in tqdm(training_dataloader):
    i += 1
    optimizer.zero_grad()
    # get the predicted ab from the image
    ab_out_images = model.forward(data['L'])
    #ab_out_images = ab_out_images.detach()
    #get the original ab from the image
    real_ab_images = data['ab']
    loss = criterion(ab_out_images, real_ab_images)
    #backward
    loss.backward()
    #optimize
    optimizer.step()
    writer.add_scalar("Loss/train", loss.item(), i + (epoch * 500))
    running_loss += loss.item()
print("finished training")
FILE = "ECCV_Weights.pth"
torch.save(model.state_dict(), FILE)

# Pix2PixModel

In [11]:
#Code from Pix2Pix implementation, better explained in the report
# Unet with skip connections
class UnetBlock(nn.Module):
    def __init__(self, nf, ni, submodule=None, input_c=None, dropout=False,
                 innermost=False, outermost=False):
        super().__init__()
        self.outermost = outermost
        if input_c is None: input_c = nf
        downconv = nn.Conv2d(input_c, ni, kernel_size=4,
                             stride=2, padding=1, bias=False)
        downrelu = nn.LeakyReLU(0.2, True)
        downnorm = nn.BatchNorm2d(ni)
        uprelu = nn.ReLU(True)
        upnorm = nn.BatchNorm2d(nf)
        
        if outermost:
            upconv = nn.ConvTranspose2d(ni * 2, nf, kernel_size=4,
                                        stride=2, padding=1)
            down = [downconv]
            up = [uprelu, upconv, nn.Tanh()]
            model = down + [submodule] + up
        elif innermost:
            upconv = nn.ConvTranspose2d(ni, nf, kernel_size=4,
                                        stride=2, padding=1, bias=False)
            down = [downrelu, downconv]
            up = [uprelu, upconv, upnorm]
            model = down + up
        else:
            upconv = nn.ConvTranspose2d(ni * 2, nf, kernel_size=4,
                                        stride=2, padding=1, bias=False)
            down = [downrelu, downconv, downnorm]
            up = [uprelu, upconv, upnorm]
            if dropout: up += [nn.Dropout(0.5)]
            model = down + [submodule] + up
        self.model = nn.Sequential(*model)
    
    def forward(self, x):
        if self.outermost:
            return self.model(x)
        else:
            return torch.cat([x, self.model(x)], 1)

class Unet(nn.Module):
    def __init__(self, input_c=1, output_c=2, n_down=8, num_filters=64):
        super().__init__()
        unet_block = UnetBlock(num_filters * 8, num_filters * 8, innermost=True)
        for _ in range(n_down - 5):
            unet_block = UnetBlock(num_filters * 8, num_filters * 8, submodule=unet_block, dropout=True)
        out_filters = num_filters * 8
        for _ in range(3):
            unet_block = UnetBlock(out_filters // 2, out_filters, submodule=unet_block)
            out_filters //= 2
        self.model = UnetBlock(output_c, out_filters, input_c=input_c, submodule=unet_block, outermost=True)
    
    def forward(self, x):
        return self.model(x)

In [12]:
#PatchGan discriminator
class PatchDiscriminator(nn.Module):
    def __init__(self, input_c, num_filters=64, n_down=3):
        super().__init__()
        model = [self.get_layers(input_c, num_filters, norm=False)]
        model += [self.get_layers(num_filters * 2 ** i, num_filters * 2 ** (i + 1), s=1 if i == (n_down-1) else 2) 
                          for i in range(n_down)]
                                                  
        model += [self.get_layers(num_filters * 2 ** n_down, 1, s=1, norm=False, act=False)] 
                                                                                           
        self.model = nn.Sequential(*model)                                                   
        
    def get_layers(self, ni, nf, k=4, s=2, p=1, norm=True, act=True):
        layers = [nn.Conv2d(ni, nf, k, s, p, bias=not norm)]         
        if norm: layers += [nn.BatchNorm2d(nf)]
        if act: layers += [nn.LeakyReLU(0.2, True)]
        return nn.Sequential(*layers)
    
    def forward(self, x):
        return self.model(x)

In [13]:
# A method to upde the loss of the gan
class GANLoss(nn.Module):
    def __init__(self, gan_mode='vanilla', real_label=1.0, fake_label=0.0):
        super().__init__()
        self.register_buffer('real_label', torch.tensor(real_label))
        self.register_buffer('fake_label', torch.tensor(fake_label))
        if gan_mode == 'vanilla':
            self.loss = nn.BCEWithLogitsLoss()
        elif gan_mode == 'lsgan':
            self.loss = nn.MSELoss()
    
    def get_labels(self, preds, target_is_real):
        if target_is_real:
            labels = self.real_label
        else:
            labels = self.fake_label
        return labels.expand_as(preds)
    
    def __call__(self, preds, target_is_real):
        labels = self.get_labels(preds, target_is_real)
        loss = self.loss(preds, labels)
        return loss

In [14]:
# A method to initialise the weights
def init_weights(net, init='norm', gain=0.02):
    
    def init_func(m):
        classname = m.__class__.__name__
        if hasattr(m, 'weight') and 'Conv' in classname:
            if init == 'norm':
                nn.init.normal_(m.weight.data, mean=0.0, std=gain)
            elif init == 'xavier':
                nn.init.xavier_normal_(m.weight.data, gain=gain)
            elif init == 'kaiming':
                nn.init.kaiming_normal_(m.weight.data, a=0, mode='fan_in')
            
            if hasattr(m, 'bias') and m.bias is not None:
                nn.init.constant_(m.bias.data, 0.0)
        elif 'BatchNorm2d' in classname:
            nn.init.normal_(m.weight.data, 1., gain)
            nn.init.constant_(m.bias.data, 0.)
            
    net.apply(init_func)
    print(f"model initialized with {init} initialization")
    return net

def init_model(model, device):
    model = model.to(device)
    model = init_weights(model)
    return model

In [15]:
# The main model from the pix2pix paper
class MainModel(nn.Module):
    def __init__(self, net_G=None, lr_G=2e-4, lr_D=2e-4, 
                 beta1=0.5, beta2=0.999, lambda_L1=100.):
        super().__init__()
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.lambda_L1 = lambda_L1
        
        if net_G is None:
            self.net_G = init_model(Unet(input_c=1, output_c=2, n_down=8, num_filters=64), self.device)
        else:
            self.net_G = net_G.to(self.device)
        self.net_D = init_model(PatchDiscriminator(input_c=3, n_down=3, num_filters=64), self.device)
        self.GANcriterion = GANLoss(gan_mode='vanilla').to(self.device)
        self.L1criterion = nn.L1Loss()
        self.opt_G = optim.Adam(self.net_G.parameters(), lr=lr_G, betas=(beta1, beta2))
        self.opt_D = optim.Adam(self.net_D.parameters(), lr=lr_D, betas=(beta1, beta2))
    
    def set_requires_grad(self, model, requires_grad=True):
        for p in model.parameters():
            p.requires_grad = requires_grad
        
    def setup_input(self, data):
        self.L = data['L'].to(self.device)
        self.ab = data['ab'].to(self.device)
        
    def forward(self):
        self.fake_color = self.net_G(self.L)
    
    def backward_D(self):
        fake_image = torch.cat([self.L, self.fake_color], dim=1)
        fake_preds = self.net_D(fake_image.detach())
        self.loss_D_fake = self.GANcriterion(fake_preds, False)
        real_image = torch.cat([self.L, self.ab], dim=1)
        real_preds = self.net_D(real_image)
        self.loss_D_real = self.GANcriterion(real_preds, True)
        # Multiply by 0.5 to slow down the rate at which D learns relative to G
        self.loss_D = (self.loss_D_fake + self.loss_D_real) * 0.5
        self.loss_D.backward()
    
    def backward_G(self):
        fake_image = torch.cat([self.L, self.fake_color], dim=1)
        fake_preds = self.net_D(fake_image)
        self.loss_G_GAN = self.GANcriterion(fake_preds, True)
        self.loss_G_L1 = self.L1criterion(self.fake_color, self.ab) * self.lambda_L1
        self.loss_G = self.loss_G_GAN + self.loss_G_L1
        self.loss_G.backward()
    
    def optimize(self):
        # Alternate between one gradient descent step on D
        self.forward()
        self.net_D.train()
        self.set_requires_grad(self.net_D, True)
        self.opt_D.zero_grad()
        self.backward_D()
        self.opt_D.step()
        
        # And then one sten on G
        self.net_G.train()
        self.set_requires_grad(self.net_D, False)
        self.opt_G.zero_grad()
        self.backward_G()
        self.opt_G.step()

# Transfer Learning

In [16]:
# I have used the same approach in order to transfer the weights from resnet 34 as well
# This could be done just by replacing resnet 18 with resnet 34 in the code below
from fastai.vision.learner import create_body
from torchvision.models.resnet import resnet18
from fastai.vision.models.unet import DynamicUnet


def build_res_unet(n_input=1, n_output=2, size=256):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    body = create_body(resnet18, pretrained=True, n_in=n_input, cut=-2)
    net_G = DynamicUnet(body, n_output, (size, size)).to(device)
    return net_G

# Train the Pix2Pix Model

In [None]:
# Code comenteed below is used in order to use transfer learning
#net_G_loaded = build_res_unet(n_input=1, n_output=2, size=256)
#net_G_loaded.load_state_dict(torch.load("/content/drive/MyDrive/FinalWeights/Resnet-18-pretrainedbyme8000pictures.pth", map_location=device))
#model = MainModel(net_G=net_G)

model = MainModel()
# Load TensorBoard notebook extension
#%load_ext tensorboard

# Import SummaryWriter
from torch.utils.tensorboard import SummaryWriter

# Create a SummaryWriter instance
# SummaryWriter writes event files to log_dir
log_dir = "/content/drive/MyDrive/Graphs/Modelgraph"
writer = SummaryWriter(log_dir)
epochs = 100
for e in range(epochs):
  i = 0
  for data in tqdm(training_dataloader):
    model.setup_input(data) 
    model.optimize()
    i += 1
    writer.add_scalar('Loss/loss_D_train', model.loss_D, i +  (e * 1000))
    writer.add_scalar('Loss/loss_D_fake_train', model.loss_D_fake, i + (e * 1000))
    writer.add_scalar('Loss/loss_D_real_train', model.loss_D_real, i + (e * 1000))
    writer.add_scalar('Loss/loss_G', model.loss_G, i + (e * 1000))
    writer.add_scalar('Loss/loss_G_GAN', model.loss_G_GAN, i + (e * 1000))
    writer.add_scalar('Loss/loss_G_L1', model.loss_G_L1, i + (e * 1000))    

  print(f"\nEpoch {e+1}/{epochs}")
  model_save_name = 'ModelName.pth'
  path = f"/content/drive/MyDrive/FinalWeights/{model_save_name}" 
  torch.save(model.state_dict(), path)

# Pretrain the generator

In [None]:
# Import SummaryWriter
from torch.utils.tensorboard import SummaryWriter

# Create a SummaryWriter instance
# SummaryWriter writes event files to log_dir
log_dir = "/content/drive/MyDrive/Graphs/NameOfTheGraph"
writer = SummaryWriter(log_dir)

#Build the net
net_G = build_res_unet(n_input=1, n_output=2, size=256)
# Choose the optimizer
opt = optim.Adam(net_G.parameters(), lr=1e-4)
# Choose the loss
criterion = nn.L1Loss() 
# Choose the number of epochs

epochs = 100
for e in range(epochs):
  i = 0
  for data in tqdm(training_dataloader):
    i = i + 1
    L, ab = data['L'].to(device), data['ab'].to(device)
    preds = net_G(L)
    loss = criterion(preds, ab)
    opt.zero_grad()
    loss.backward()
    opt.step()
    writer.add_scalar('Loss/loss_G', loss.item(), i +  (e * 500))
  print(f"Epoch {e + 1}/{epochs}")
  model_save_name = 'Model_save_name.pth'
  path = f"/content/drive/MyDrive/FinalWeights/{model_save_name}"
  torch.save(net_G.state_dict(), path)
