In [None]:
!pip3 install torch
!pip3 install torchvision
!pip3 install tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt
from torchvision import transforms, utils, datasets
from tqdm import tqdm
import tqdm.notebook as tq
from torch.nn.parameter import Parameter
import pdb
import torchvision
import torchvision.transforms.functional as TF

from IPython.core.ultratb import AutoFormattedTB
__ITB__ = AutoFormattedTB(mode = 'Verbose',color_scheme='LightBg', tb_offset = 1)

# File Operations
from google.colab import drive
import zipfile
import shutil
import os
import gzip
import tarfile
import gc
import sys

# Image Operations
import cv2 as cv 
from PIL import Image
from google.colab.patches import cv2_imshow
from matplotlib.pyplot import imshow

drive.mount('/drive')

Drive already mounted at /drive; to attempt to forcibly remount, call drive.mount("/drive", force_remount=True).


In [None]:
class CelebASuperResolutionDataset(Dataset):
  def __init__(self, root, download_root, download=False, train=True):
    # Directory Names
    folder_dir = "celeba"
    train_dir = "train"
    validation_dir = "validation"

    # Paths
    folder_path = os.path.join(root, folder_dir)
    
    # Download
    if download and not os.path.exists(download_root):
      datasets.utils.download_url('https://s3-us-west-1.amazonaws.com/udacity-dlnfd/datasets/celeba.zip', download_root, folder_dir + '.zip', None)

    # Unzip Dataset
    if download and not os.path.exists(folder_path):
      os.mkdir(folder_path)
      with zipfile.ZipFile(os.path.join(download_root, folder_dir + '.zip'), 'r') as zip_ref:
        zip_ref.extractall(folder_path)

    # Create Train Path
    train_path = os.path.join(folder_path, train_dir)
    if not os.path.exists(train_path):
      os.mkdir(train_path)
      os.mkdir(os.path.join(train_path, 'train'))

    # Create Validation Path
    validation_path = os.path.join(folder_path, validation_dir)
    if not os.path.exists(validation_path):
      os.mkdir(validation_path)
      os.mkdir(os.path.join(validation_path, 'validation'))

    # Split Training and Validation
    if download:
      validation_ratio = 0.05
      file_names = os.listdir(os.path.join(folder_path, 'img_align_celeba'))
      
      # Move Last 20% of Image to Validation Folder
      # last_n = int(len(file_names) - len(file_names) * validation_ratio)
      last_n = len(file_names) - 500
      for i, file_name in enumerate(file_names):
        old_file_path = os.path.join(folder_path, 'img_align_celeba', file_name)
        new_file_path = old_file_path
        if i < last_n:
          new_file_path = os.path.join(train_path, 'train', file_name)
        else:
          new_file_path = os.path.join(validation_path, 'validation', file_name)

        shutil.move(old_file_path, new_file_path)

    h = 218
    w = 178
    # Image Folder
    source_path = train_path if train else validation_path
    self.blurred_folder = torchvision.datasets.ImageFolder(source_path,
                                                           transform=transforms.Compose([transforms.Resize((h//4, w//4)),
                                                                                         transforms.Resize((h, w)),
                                                                                         transforms.ToTensor()]))
    self.original_folder = torchvision.datasets.ImageFolder(source_path,
                                                            transform=transforms.Compose([transforms.ToTensor()]))

  def __getitem__(self, index):
    blurred = self.blurred_folder[index]
    original = self.original_folder[index]
    return blurred[0], original[0]

  def __len__(self):
    return len(self.blurred_folder)

In [None]:
class SRCNN(nn.Module):
  def __init__(self):
    super(SRCNN, self).__init__()
    self.net = nn.Sequential(
        nn.Conv2d(in_channels=3, out_channels=64, kernel_size=(9,9), padding=9//2),
        nn.ReLU(),
        nn.Conv2d(in_channels=64, out_channels=32, kernel_size=(3, 3), padding=3//2),
        nn.ReLU(),
        nn.Conv2d(in_channels=32, out_channels=3, kernel_size=(5,5), padding=5//2)
    )
  
  def forward(self, x):
    return self.net(self.net(x).squeeze(2).squeeze(2))

In [None]:
# Hyperparameters
LEARNING_RATE = 1e-4
EPOCHS = 2
BATCH_SIZE = 50
NUM_WORKERS = 2

In [None]:
# Image Paths
root = '/content'
download_root = '/drive/MyDrive/CS-474-Final-Project'

# Datasets
train_dataset = CelebASuperResolutionDataset(root, download_root, train=True, download=False)
val_dataset = CelebASuperResolutionDataset(root, download_root, train=False)

# Dataloaders
train_loader = DataLoader(train_dataset,
                          batch_size=BATCH_SIZE,
                          shuffle=True,
                          num_workers=NUM_WORKERS,
                          pin_memory=True)

val_loader = DataLoader(val_dataset,
                        batch_size=BATCH_SIZE,
                        shuffle=True,
                        num_workers=NUM_WORKERS,
                        pin_memory=True)

In [None]:
# Network Details
def train():
  gc.collect()
  # model = torch.hub.load('pytorch/vision:v0.9.0', 'resnet50', pretrained=False).cuda()
  model = SRCNN().cuda()
  # objective = nn.CrossEntropyLoss()
  objective = nn.MSELoss()
  optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

  # Train
  losses = []
  validations = []
  accuracies = []
  predictions = []
  accuracy = 0

  for epoch in range(EPOCHS):
    loop = tqdm(total=len(train_loader), position=0, leave=False)
    for batch_num, (x, y_truth) in enumerate(train_loader):
      gc.collect()
      # Reset Gradients
      optimizer.zero_grad()

      # Pass Blurred Image Through Model
      x, y_truth = x.cuda(non_blocking=True), y_truth.cuda(non_blocking=True)
      y_hat = model(x)

      # Calculate Loss
      loss = objective(y_hat, y_truth)
      loss.backward()

      # Optimize
      optimizer.step()

      losses.append(loss.item())

      loop.set_description('epoch:{}, loss:{:.4f}, accuracy:{:.3f}'.format(epoch, loss, accuracy))
      loop.update(1)

      if (batch_num + 1) % 200 == 0:
        for x, y in val_loader:
          # Validations
          val = np.mean([objective(model(x.cuda()), y.cuda()).item() for x, y in val_loader])
          validations.append((len(losses), val))

    # Prediction
    val_72 = val_dataset[172][0].unsqueeze(0).cuda()
    prediction = model(val_72).squeeze(0)
    predictions.append(prediction)

  loop.close()
  gc.collect()
  return model, losses, validations, predictions

In [None]:
gc.collect()
trained_model, losses, validations, predictions = train()
torch.save(trained_model, "/drive/MyDrive/CS-474-Final-Project/sr2.pt")



In [None]:
# print(losses)
# print(validations)
# trained_model = torch.load("/drive/MyDrive/CS-474-Final-Project/sr2.pt")

In [None]:
num = 160

# Truth Image
truth_72 = val_dataset[num][1]
truth_72 = truth_72.swapaxes(0, 2).swapaxes(0, 1)
plt.subplot(1,2,2)
plt.title(f'Original Image')
plt.imshow(truth_72.cpu().numpy(), cmap = 'gray')
plt.axis('off')
plt.show()

# Blurred Image
blur_72 = val_dataset[num][0]
blur_72 = blur_72.swapaxes(0, 2).swapaxes(0, 1)
plt.subplot(1,2,2)
plt.title(f'Blurred Image')
plt.imshow(blur_72.cpu().numpy(), cmap = 'gray')
plt.axis('off')
plt.show()

# Predicted Image
pred_72_input = val_dataset[num][0].unsqueeze(0).cuda()
pred_72 = trained_model(pred_72_input).squeeze(0).detach()
pred_72 = pred_72.cpu().swapaxes(0, 2).swapaxes(0, 1)
pred = np.clip(pred_72, 0, 1)
plt.subplot(1,2,2)
plt.title(f'Predicted Image')
plt.imshow(pred, cmap='gray')
plt.axis('off')
plt.show()