# Download the Dataset

In [None]:
!pip install opendatasets

import opendatasets as od
import pandas

od.download("https://www.kaggle.com/datasets/mittalshubham/images256")

# Create Directories

In [None]:
import os

os.makedirs('images/train/class/', exist_ok=True) 
os.makedirs('images/val/class/', exist_ok=True) 

test_file = open('/content/images256/files.csv', 'r')
file_arr = test_file.readlines()

file_arr.pop(0)

for i, file in enumerate(file_arr):
  filepath = file.split(',', 1)[0]

  if i < 600:
    os.rename('/content/images256/' + filepath, 'images/val/class/' + filepath.rsplit('/',1)[1])
  elif i < 6000:
    os.rename('/content/images256/' + filepath, 'images/train/class/' + filepath.rsplit('/',1)[1])

# Import Tools


In [None]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

from skimage.color import lab2rgb, rgb2lab, rgb2gray
from skimage import io

import torch
import torch.nn as nn
import torch.nn.functional as F

import torchvision.models as models
from torchvision import datasets, transforms

import shutil, time

In [None]:
# Used to check if GPU is valiable
use_gpu = torch.cuda.is_available()

# Build the Model

In [None]:
from torch.nn.modules.batchnorm import BatchNorm2d
# Start with the second half of the model: convolution and upsampling
class ColorizationNet(nn.Module):
  def __init__(self, input_size=128):
    super(ColorizationNet, self).__init__()
    MIDLEVEL_FEATURE_SIZE = 128

    resnet = models.resnet18(num_classes=365)
    # Change the weight of the first layer to receive grayscale input (single-channel)
    resnet.conv1.weight = nn.Parameter(resnet.conv1.weight.sum(dim=1).unsqueeze(1))
    # Extract midlevel features from resnet-gray
    self.midlevel_resenet = nn.Sequential(*list(resnet.children())[0:6])

    self.upsample = nn.Sequential(
        nn.Conv2d(MIDLEVEL_FEATURE_SIZE, 128, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(128),
        nn.ReLU(),
        nn.Upsample(scale_factor=2),
        nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(64),
        nn.ReLU(),
        nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(64),
        nn.ReLU(),
        nn.Upsample(scale_factor=2),
        nn.Conv2d(64, 32, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(32),
        nn.ReLU(),
        nn.Conv2d(32, 2, kernel_size=3, stride=1, padding=1),
        nn.Upsample(scale_factor=2)
    )

  def forward(self, input):
    # Pass the input through resnet-gray to extract features
    midlevel_features = self.midlevel_resenet(input)
    # Upscale the features to get colors
    output = self.upsample(midlevel_features)
    return output

In [None]:
# Instantiate the model
model = ColorizationNet()

In [None]:
# Regression is being used, so the mean square error will be used as the loss function
# It will minimize the Euclidian distance between the predicted color value and the true color value
# Model will tend to choose desaturated colors since they are less likely to be wrong according to the loss function than bright colors

loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2, weight_decay=0.0)

# Convert Images to LAB

In [None]:
class GrayscaleImageFolder(datasets.ImageFolder):
  # This will convert images to grayscale before loading
  def __getitem__(self, index):
    path, target = self.imgs[index]
    img = self.loader(path)

    if self.transform is not None:
      img_original = self.transform(img)
      img_original = np.asarray(img_original)
      img_lab = rgb2lab(img_original)
      img_lab = (img_lab + 128) / 255
      img_ab = img_lab[:, :, 1:3]
      img_ab = torch.from_numpy(img_ab.transpose((2, 0, 1))).float()
      img_original = rgb2gray(img_original)
      img_original = torch.from_numpy(img_original).unsqueeze(0).float()

    if self.target_transform is not None:
      target = self.target_transform(target)
    
    return img_original, img_ab, target

# Load the Data

In [None]:
# Define the transforms for training and validation data
train_transforms = transforms.Compose([transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip()])
train_imagefolder = GrayscaleImageFolder('images/train', train_transforms)
train_loader = torch.utils.data.DataLoader(train_imagefolder, batch_size=64, shuffle=True)

# Validation
val_transforms = transforms.Compose([transforms.Resize(256), transforms.CenterCrop(224)])
val_imagefolder = GrayscaleImageFolder('images/val', val_transforms)
val_loader = torch.utils.data.DataLoader(val_imagefolder, batch_size=64, shuffle=False)

# Helper Functions

In [None]:
class AverageMeter(object):
  # This will track the training loss
  def __init__(self):
    self.reset()
  def reset(self):
    self.val, self.avg, self.sum, self.count = 0, 0, 0, 0
  def update(self, val, n=1):
    self.val = val
    self.sum += val*n
    self.count += n
    self.avg = self.sum / self.count
  
def to_rgb(grayscale_input, ab_input, save_path=None, save_name=None):
  # This will convert images back to RGB
  plt.clf()
  color_image = torch.cat((grayscale_input, ab_input), 0).numpy() # Combine channels
  color_image = color_image.transpose((1, 2, 0)) # Rescale for matplotlib
  color_image[:, :, 0:1] = color_image[:, :, 0:1] * 100
  color_image[:, :, 1:3] = color_image[:, :, 1:3] * 255 - 128
  color_image = lab2rgb(color_image.astype(np.float64))
  grayscale_input = grayscale_input.squeeze().numpy()

  if save_path is not None and save_name is not None:
    plt.imsave(arr=grayscale_input, fname='{}{}'.format(save_path['grayscale'], save_name), cmap='gray')
    plt.imsave(arr=color_image, fname='{}{}'.format(save_path['colorized'], save_name))

# Validation

In [None]:
def validate(val_loader, model, loss_fn, save_images, epoch):
  model.eval()

  # Prepare value counters and timers
  batch_time, data_time, losses = AverageMeter(), AverageMeter(), AverageMeter()

  end = time.time()
  already_saved_images = False

  for i, (input_gray, input_ab, target) in enumerate(val_loader):
    data_time.update(time.time() - end)

    if use_gpu: input_gray, input_ab, target = input_gray.cuda(), input_ab.cuda(), target.cuda()

    # Run the model and record loss
    output_ab = model(input_gray)
    loss = loss_fn(output_ab, input_ab)
    losses.update(loss.item(), input_gray.size(0))

    # Save images
    if save_images and not already_saved_images:
      already_saved_images = True
      for j in range(min(len(output_ab), 10)):
        save_path = {'grayscale': 'outputs/gray/', 'colorized': 'outputs/color/'}
        save_name = 'img-{}-epoch-{}.jpg'.format(i * val_loader.batch_size + j, epoch)
        to_rgb(input_gray[j].cpu(), ab_input=output_ab[j].detach().cpu(), save_path=save_path, save_name=save_name)

    batch_time.update(time.time() - end)
    end = time.time()

    # Print the model accuracy
    if i % 25 == 0:
      print('Validate: [{0}/{1}]\t'
            'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
            'Loss {loss.val:.4f} ({loss.avg:.4f})\t'.format(
             i, len(val_loader), batch_time=batch_time, loss=losses))
    
    print('Finished validation.')
    return losses.avg

    

# Training

In [None]:
def train(train_loader, model, loss_fn, optimizer, epoch):
  print('Starting Epoch #{} ...'.format(epoch))
  model.train()
  
  # Prepare value counters and timers
  batch_time, data_time, losses = AverageMeter(), AverageMeter(), AverageMeter()

  end = time.time()
  for i, (input_gray, input_ab, target) in enumerate(train_loader):
    
    # Use GPU if available
    if use_gpu: input_gray, input_ab, target = input_gray.cuda(), input_ab.cuda(), target.cuda()

    # Record time to load data (above)
    data_time.update(time.time() - end)

    # Run forward pass
    output_ab = model(input_gray) 
    loss = loss_fn(output_ab, input_ab) 
    losses.update(loss.item(), input_gray.size(0))

    # Compute the gradient and optimize
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Record time to do forward and backward passes
    batch_time.update(time.time() - end)
    end = time.time()

    # Print model accuracy -- in the code below, val refers to value, not validation
    if i % 25 == 0:
      print('Epoch: [{0}][{1}/{2}]\t'
            'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
            'Data {data_time.val:.3f} ({data_time.avg:.3f})\t'
            'Loss {loss.val:.4f} ({loss.avg:.4f})\t'.format(
              epoch, i, len(train_loader), batch_time=batch_time,
             data_time=data_time, loss=losses)) 

  print('Finished training epoch #{}'.format(epoch))

In [None]:
# Move the model and loss function to GPU
if use_gpu:
  loss_fn = loss_fn.cuda()
  model = model.cuda()

In [None]:
# Create output directories
os.makedirs('outputs/color', exist_ok=True)
os.makedirs('outputs/gray', exist_ok=True)
os.makedirs('checkpoints', exist_ok=True)
save_images = True
best_losses = 1e10
epochs = 100

In [None]:
# Train the model
for epoch in range(epochs):
  train(train_loader, model, loss_fn, optimizer, epoch)
  with torch.no_grad():
    losses = validate(val_loader, model, loss_fn, save_images, epoch)
  if losses < best_losses:
    best_losses = losses
    torch.save(model.state_dict(), 'checkpoints/model-epoch-{}-losses-{:.3f}.pth'.format(epoch+1,losses))