In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!nvidia-smi
import tensorflow as tf

physical_devices = tf.config.list_physical_devices('GPU')

if len(physical_devices) > 0:
    device = physical_devices[0]

    memory_limit = 80 * 1024  # 80 GB
    tf.config.experimental.set_memory_growth(device, True)
    tf.config.set_logical_device_configuration(device, [
        tf.config.LogicalDeviceConfiguration(memory_limit=memory_limit)
    ])

# Print the device name.
# print("Selected device:", device.name)

/bin/bash: line 1: nvidia-smi: command not found


In [3]:
!pip install torchvision==0.16
!pip show torchvision
!pip install realesrgan
!pip install basicsr

Name: torchvision
Version: 0.16.0
Summary: image and video datasets and models for torch deep learning
Home-page: https://github.com/pytorch/vision
Author: PyTorch Core Team
Author-email: soumith@pytorch.org
License: BSD
Location: /usr/local/lib/python3.10/dist-packages
Requires: numpy, pillow, requests, torch
Required-by: basicsr, facexlib, fastai, gfpgan, realesrgan


In [5]:
import os, gc

os.environ['CUDA_VISIBLE_DEVICES'] ='0'
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
os.environ["PYTORCH_USE_CUDA_DSA"] = '1'
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = 'max_split_size_mb:128'

import torch

print(torch.__version__)
print(torch.version.cuda)
# Print initial memory usage
print("Initial memory usage:")
print(torch.cuda.memory_summary())

gc.collect()
torch.cuda.empty_cache()

print("Memory usage after gc.collect() and torch.cuda.empty_cache():")
print(torch.cuda.memory_summary())


2.1.0+cu121
12.1
Initial memory usage:


KeyError: 'allocated_bytes.all.current'

In [6]:
import os
import sys

import numpy as np
from PIL import Image

def read_image(path):
    img = Image.open(path)
    img = np.array(img) / 255.

    return img * 255


def psnr(img1, img2):
    mse_value = np.mean((img1 - img2)**2)

    return 20. * np.log10(255. / np.sqrt(mse_value))


input_dir = sys.argv[1]
output_dir = sys.argv[2]

submit_dir = os.path.join(input_dir, 'res')
truth_dir = os.path.join(input_dir, 'ref')

if not os.path.isdir(submit_dir):
    print("%s doesn't exist" % submit_dir)

if os.path.isdir(submit_dir) and os.path.isdir(truth_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    submit_dir_list = os.listdir(submit_dir)
    if len(submit_dir_list) == 1:
        submit_dir = os.path.join(submit_dir, "%s" % submit_dir_list[0])
        assert os.path.isdir(submit_dir)

    psnr_list = []
    for idx in range(400):
        pred_img = read_image(os.path.join(submit_dir, "%05d.png" % idx))
        gt_img = read_image(os.path.join(truth_dir, "%05d.png" % idx))
        psnr_list.append(psnr(pred_img, gt_img))

    mean_psnr = np.mean(psnr_list)

    # Create the evaluation score path
    output_filename = os.path.join(output_dir, 'scores.txt')

    with open(output_filename, 'w') as f3:
        f3.write('PSNR: {}'.format(mean_psnr))

-f/res doesn't exist


In [7]:
import cv2
import math
import numpy as np
import os
import os.path as osp
import random
import time
import torch
from basicsr.data.degradations import circular_lowpass_kernel, random_mixed_kernels
from basicsr.data.transforms import augment
from basicsr.utils import FileClient, get_root_logger, imfrombytes, img2tensor
from basicsr.utils.registry import DATASET_REGISTRY
from torch.utils import data as data
from torchvision.transforms import functional as TF

# Remove the registered class from the registry
if 'FFHQsubDataset' in DATASET_REGISTRY._obj_map:
    del DATASET_REGISTRY._obj_map['FFHQsubDataset']

@DATASET_REGISTRY.register()
class FFHQsubDataset(data.Dataset):
    """Modified from Dataset used for Real-ESRGAN model:
    Real-ESRGAN: Training Real-World Blind Super-Resolution with Pure Synthetic Data.

    It loads gt (Ground-Truth) images, and augments them.
    It also generates blur kernels and sinc kernels for generating low-quality images.
    Note that the low-quality images are processed in tensors on GPUS for faster processing.

    Args:
        opt (dict): Config for train datasets. It contains the following keys:
            dataroot_gt (str): Data root path for gt.
            meta_info (str): Path for meta information file.
            io_backend (dict): IO backend type and other kwarg.
            use_hflip (bool): Use horizontal flips.
            use_rot (bool): Use rotation (use vertical flip and transposing h and w for implementation).
            Please see more options in the codes.
    """

    def __init__(self, opt):
        super(FFHQsubDataset, self).__init__()
        self.opt = opt
        self.file_client = None
        self.io_backend_opt = opt['io_backend']
        self.gt_folder = opt['dataroot_gt']
        self.lq_folder = opt.get('dataroot_lq', None)

        # file client (lmdb io backend)
        if self.io_backend_opt['type'] == 'lmdb':
            self.io_backend_opt['db_paths'] = [self.gt_folder]
            self.io_backend_opt['client_keys'] = ['gt']
            if not self.gt_folder.endswith('.lmdb'):
                raise ValueError(f"'dataroot_gt' should end with '.lmdb', but received {self.gt_folder}")
            with open(osp.join(self.gt_folder, 'meta_info_FFHQ5000sub_GT.txt')) as fin:
                self.paths = [line.split('.')[0] for line in fin]
        else:
            # disk backend with meta_info
            # Each line in the meta_info describes the relative path to an image
            with open(self.opt['meta_info']) as fin:
                paths = [line.strip().split(' ')[0] for line in fin]
                self.paths = [os.path.join(self.gt_folder, v) for v in paths]
            self.gt_paths = [osp.join(self.gt_folder, p) for p in self.paths]
            if self.lq_folder:
                self.lq_paths = [osp.join(self.lq_folder, p) for p in self.paths]
            else:
                self.lq_paths = None

        # blur settings for the first degradation
        self.blur_kernel_size = opt['blur_kernel_size']
        self.kernel_list = opt['kernel_list']
        self.kernel_prob = opt['kernel_prob']  # a list for each kernel probability
        self.blur_sigma = opt['blur_sigma']
        self.betag_range = opt['betag_range']  # betag used in generalized Gaussian blur kernels
        self.betap_range = opt['betap_range']  # betap used in plateau blur kernels
        self.sinc_prob = opt['sinc_prob']  # the probability for sinc filters

        # blur settings for the second degradation
        self.blur_kernel_size2 = opt['blur_kernel_size2']
        self.kernel_list2 = opt['kernel_list2']
        self.kernel_prob2 = opt['kernel_prob2']
        self.blur_sigma2 = opt['blur_sigma2']
        self.betag_range2 = opt['betag_range2']
        self.betap_range2 = opt['betap_range2']
        self.sinc_prob2 = opt['sinc_prob2']

        # a final sinc filter
        self.final_sinc_prob = opt['final_sinc_prob']

        self.kernel_range = [2 * v + 1 for v in range(3, 11)]  # kernel size ranges from 7 to 21
        # TODO: kernel range is now hard-coded, should be in the configure file
        self.pulse_tensor = torch.zeros(21, 21).float()  # convolving with pulse tensor brings no blurry effect
        self.pulse_tensor[10, 10] = 1

    def __getitem__(self, index):
        # -------------------------------- Load gt images -------------------------------- #
        # Shape: (h, w, c); channel order: BGR; image range: [0, 1], float32.
        gt_path = self.paths[index]

        # Read the image file directly from Google Drive
        try:
            with open(gt_path, 'rb') as f:
                img_bytes = f.read()
            img_gt = imfrombytes(img_bytes, float32=True)
        except (IOError, OSError) as e:
            logger = get_root_logger()
            logger.warning(f'Failed to load image: {gt_path}. Error: {e}')
            # Skip this image and move to the next one
            return self.__getitem__(random.randint(0, self.__len__() - 1))

        # -------------------- Do augmentation for training: flip, rotation -------------------- #
        img_gt = augment(img_gt, self.opt['use_hflip'], self.opt['use_rot'])

        # ------------------------ Generate kernels (used in the first degradation) ------------------------ #
        kernel_size = random.choice(self.kernel_range)
        if np.random.uniform() < self.opt['sinc_prob']:
            # this sinc filter setting is for kernels ranging from [7, 21]
            if kernel_size < 13:
                omega_c = np.random.uniform(np.pi / 3, np.pi)
            else:
                omega_c = np.random.uniform(np.pi / 5, np.pi)
            kernel = circular_lowpass_kernel(omega_c, kernel_size, pad_to=False)
        else:
            kernel = random_mixed_kernels(
                self.kernel_list,
                self.kernel_prob,
                kernel_size,
                self.blur_sigma,
                self.blur_sigma, [-math.pi, math.pi],
                self.betag_range,
                self.betap_range,
                noise_range=None)
        # pad kernel
        pad_size = (21 - kernel_size) // 2
        kernel = np.pad(kernel, ((pad_size, pad_size), (pad_size, pad_size)))

        # ------------------------ Generate kernels (used in the second degradation) ------------------------ #
        kernel_size = random.choice(self.kernel_range)
        if np.random.uniform() < self.opt['sinc_prob2']:
            if kernel_size < 13:
                omega_c = np.random.uniform(np.pi / 3, np.pi)
            else:
                omega_c = np.random.uniform(np.pi / 5, np.pi)
            kernel2 = circular_lowpass_kernel(omega_c, kernel_size, pad_to=False)
        else:
            kernel2 = random_mixed_kernels(
                self.kernel_list2,
                self.kernel_prob2,
                kernel_size,
                self.blur_sigma2,
                self.blur_sigma2, [-math.pi, math.pi],
                self.betag_range2,
                self.betap_range2,
                noise_range=None)

        # pad kernel
        pad_size = (21 - kernel_size) // 2
        kernel2 = np.pad(kernel2, ((pad_size, pad_size), (pad_size, pad_size)))

        # ------------------------------------- the final sinc kernel ------------------------------------- #
        if np.random.uniform() < self.opt['final_sinc_prob']:
            kernel_size = random.choice(self.kernel_range)
            omega_c = np.random.uniform(np.pi / 3, np.pi)
            sinc_kernel = circular_lowpass_kernel(omega_c, kernel_size, pad_to=21)
            sinc_kernel = torch.FloatTensor(sinc_kernel)
        else:
            sinc_kernel = self.pulse_tensor

        # BGR to RGB, HWC to CHW, numpy to tensor
        img_gt = img2tensor([img_gt], bgr2rgb=True, float32=True)[0]
        kernel = torch.FloatTensor(kernel)
        kernel2 = torch.FloatTensor(kernel2)

        return_d = {'gt': img_gt, 'kernel1': kernel, 'kernel2': kernel2, 'sinc_kernel': sinc_kernel, 'gt_path': gt_path}

        # Load low-quality image if available
        if self.lq_paths:
            lq_path = self.lq_paths[index]
            try:
                with open(lq_path, 'rb') as f:
                    img_bytes = f.read()
                img_lq = imfrombytes(img_bytes, float32=True)
                img_lq = img2tensor([img_lq], bgr2rgb=True, float32=True)[0]
            except (IOError, OSError) as e:
                logger = get_root_logger()
                logger.warning(f'Failed to load image: {lq_path}. Error: {e}')
                img_lq = img_gt  # Use ground-truth image if low-quality image fails to load
        else:
            img_lq = img_gt  # Use ground-truth image if low-quality paths are not provided

        return_d['lq'] = img_lq
        return return_d

    def __len__(self):
        return len(self.paths)



In [9]:
import os
import yaml
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torch.utils.data import DataLoader, ConcatDataset, random_split
from torchvision.transforms import GaussianBlur

# Set the paths to the configuration file and dataset file
config_path = '/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/SuperResolution/train_SRResNet_x4_FFHQ_300k.yml'
dataset_path = '/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/SuperResolution/ffhqsub_dataset.py'

# Set the paths to your train and validation data
train_path = '/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/SuperResolution/datasets/train'
val_path = '/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/SuperResolution/datasets/val'
test_path = '/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/SuperResolution/datasets/test'

# Load the dataset module
# import importlib.util
# spec = importlib.util.spec_from_file_location("ffhqsub_dataset", dataset_path)
# dataset_module = importlib.util.module_from_spec(spec)
# spec.loader.exec_module(dataset_module)
# FFHQSubDataset = dataset_module.FFHQsubDataset

# Define the SRResNet model architecture
class ResidualBlock(nn.Module):
    def __init__(self, num_feat=64):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(num_feat, num_feat, 3, 1, 1, bias=True)

    def forward(self, x):
        identity = x
        out = self.conv2(self.relu(self.conv1(x)))
        return identity + out

class SRResNet(nn.Module):
    def __init__(self, num_in_ch=3, num_out_ch=3, num_feat=64, num_block=16, upscale=4):
        super(SRResNet, self).__init__()
        self.conv_first = nn.Conv2d(num_in_ch, num_feat, 3, 1, 1)
        self.body = self.make_layer(ResidualBlock, num_block, num_feat)
        self.conv_after_body = nn.Conv2d(num_feat, num_feat, 3, 1, 1)
        self.upsample = self.make_upsample_layer(upscale, num_feat)
        self.conv_last = nn.Conv2d(num_feat, num_out_ch, 3, 1, 1)

    def make_layer(self, block, num_blocks, num_feat):
        layers = []
        for _ in range(num_blocks):
            layers.append(block(num_feat))
        return nn.Sequential(*layers)

    def make_upsample_layer(self, upscale, num_feat):
        layers = []
        for _ in range(int(torch.log(torch.tensor(upscale)).item() / torch.log(torch.tensor(2)).item())):
            layers.append(nn.Conv2d(num_feat, 4 * num_feat, 3, 1, 1))
            layers.append(nn.PixelShuffle(2))
        return nn.Sequential(*layers)

    def forward(self, x):
        feat = self.conv_first(x)
        feat = self.body(feat)
        feat = self.conv_after_body(feat)
        feat = self.upsample(feat)
        out = self.conv_last(feat)
        return out

# no mat mul issue
class Discriminator(nn.Module):
    def __init__(self, num_in_ch=3, num_feat=64):
        super(Discriminator, self).__init__()
        self.conv1 = nn.Conv2d(num_in_ch, num_feat, 3, 1, 1)
        self.conv2 = nn.Conv2d(num_feat, num_feat * 2, 3, 2, 1)
        self.conv3 = nn.Conv2d(num_feat * 2, num_feat * 4, 3, 2, 1)
        self.conv4 = nn.Conv2d(num_feat * 4, num_feat * 8, 3, 2, 1)
        self.conv5 = nn.Conv2d(num_feat * 8, 1, 3, 1, 1)
        self.leaky_relu = nn.LeakyReLU(negative_slope=0.2, inplace=True)
        self.fc = nn.Linear(65536, 1)  # Update the input size to match the largest flattened size

#     def forward(self, x):
#         x = self.leaky_relu(self.conv1(x))
#         print("After Conv1:", x.shape)
#         x = self.leaky_relu(self.conv2(x))
#         print("After Conv2:", x.shape)
#         x = self.leaky_relu(self.conv3(x))
#         print("After Conv3:", x.shape)
#         x = self.leaky_relu(self.conv4(x))
#         print("After Conv4:", x.shape)
#         x = self.conv5(x)
#         print("After Conv5:", x.shape)
#         x = x.view(x.size(0), -1)  # Flatten the tensor
#         print("After Flatten:", x.shape)

#         if x.size(1) != self.fc.in_features:
#             self.fc = nn.Linear(x.size(1), 1)  # Dynamically adjust the input size of the fully connected layer

#         x = self.fc(x)
#         return torch.sigmoid(x)  # Apply sigmoid activation to the output

    def forward(self, x):
        x = self.leaky_relu(self.conv1(x))
        print("After Conv1:", x.shape)
        x = self.leaky_relu(self.conv2(x))
        print("After Conv2:", x.shape)
        x = self.leaky_relu(self.conv3(x))
        print("After Conv3:", x.shape)
        x = self.leaky_relu(self.conv4(x))
        print("After Conv4:", x.shape)
        x = self.conv5(x)
        print("After Conv5:", x.shape)
        x = x.view(x.size(0), -1)  # Flatten the tensor
        print("After Flatten:", x.shape)

        # Dynamically adjust the input size of the fully connected layer
        if x.size(1) != self.fc.in_features:
            self.fc = nn.Linear(x.size(1), 1)

        x = self.fc(x)
        return torch.sigmoid(x)  # Apply sigmoid activation to the output

# Load the model configuration
with open(config_path, 'r') as f:
    config = yaml.safe_load(f)

default_config = {
    'io_backend': {'type': 'disk'},
    'batch_size': 8,
    'num_workers': 4,
    'use_hflip': True,  # Typically, you might want to disable this for validation
    'use_rot': True,    # Same as above
    'blur_kernel_size': 21,
    'kernel_list': ['iso', 'aniso'],
    'kernel_prob': [0.5, 0.5],
    'blur_sigma': [0.2, 3],
    'betag_range': [0.5, 4],
    'betap_range': [1, 2],
    'sinc_prob': 0.1,
    'blur_kernel_size2': 21,
    'kernel_list2': ['iso', 'aniso'],
    'kernel_prob2': [0.5, 0.5],
    'blur_sigma2': [0.2, 1.5],
    'betag_range2': [0.5, 4],
    'betap_range2': [1, 2],
    'sinc_prob2': 0.1,
    'final_sinc_prob': 0.8,
    'lr': 1e-4,
    'num_epochs': 100
}

train_config = {
    **default_config,
    'dataroot_gt': f"{train_path}/GT",
    'meta_info': f"{train_path}/meta_info_FFHQ5000sub_GT.txt",
}

val_config = {
    **default_config,
    'dataroot_gt': f"{val_path}/GT",
    'dataroot_lq': f"{val_path}/LQ",
    'meta_info': "/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/SuperResolution/val_meta_info_FFHQ5000sub_GT.txt",
    'use_hflip': False,  # Typically disable augmentation for validation
    'use_rot': False
}

test_config = {
    **default_config,
    'dataroot_gt': f"{test_path}/LQ",
    'meta_info': "/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/SuperResolution/test_meta_info_FFHQ5000sub_GT.txt",
    'use_hflip': False,  # Typically disable augmentation for testing
    'use_rot': False
}

train_dataset = FFHQsubDataset(train_config)
val_dataset = FFHQsubDataset(val_config)
test_dataset = FFHQsubDataset(test_config)

train_loader = DataLoader(train_dataset, batch_size=train_config['batch_size'], shuffle=True, num_workers=train_config['num_workers'])
val_loader = DataLoader(val_dataset, batch_size=val_config['batch_size'], shuffle=False, num_workers=val_config['num_workers'])
# test_real_loader = DataLoader(test_dataset, batch_size=test_config['batch_size'], shuffle=False, num_workers=test_config['num_workers'])
test_loader = DataLoader(test_dataset, batch_size=test_config['batch_size'], shuffle=False, num_workers=test_config['num_workers'])

model = SRResNet(num_in_ch=3, num_out_ch=3, num_feat=32, num_block=8, upscale=4)
discriminator = Discriminator(num_in_ch=3, num_feat=64)

criterion_l1 = nn.L1Loss()
criterion_gan = nn.BCEWithLogitsLoss()

optimizer_g = optim.Adam(model.parameters(), lr=1e-4)
optimizer_d = optim.Adam(discriminator.parameters(), lr=1e-4)

def degrade_image(image, degradation_scale=0.5):
    """
    Degrade the input image by downscaling and then upscaling, with Gaussian blurring.
    Args:
        image (torch.Tensor): The input image tensor.
        degradation_scale (float): The factor to scale down and up the image.

    Returns:
        torch.Tensor: The degraded image tensor.
    """
    # Get the original dimensions
    original_size = (image.size(2), image.size(3))

    # # Calculate new size based on degradation scale
    # new_size = (int(original_size[0] * degradation_scale), int(original_size[1] * degradation_scale))

    # # Downscale and upscale to create a simple low-resolution version of the image
    # downsampled = F.interpolate(image, size=new_size, mode='bilinear', align_corners=False)
    # upsampled = F.interpolate(downsampled, size=original_size, mode='bilinear', align_corners=False)

    # Apply Gaussian blur
    blurred = GaussianBlur(kernel_size=(5, 5))(image)

    return blurred


In [10]:
def clean_metadata_file(file_path):
    # Open the original file in binary mode to check for BOM
    with open(file_path, 'rb') as file:
        content = file.read()

    # Remove the BOM (if any)
    # BOM for UTF-8 encoded files
    bom_utf8 = b'\xef\xbb\xbf'
    if content.startswith(bom_utf8):
        content = content[len(bom_utf8):]

    # Write the clean content back to the file
    with open(file_path, 'wb') as file:
        file.write(content)

    print(f"Cleaned metadata file: {file_path}")

# Clean the metadata files for train, val, and test datasets
clean_metadata_file(val_config['meta_info'])

Cleaned metadata file: /content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/SuperResolution/val_meta_info_FFHQ5000sub_GT.txt


In [None]:
import torch
import gc
import matplotlib.pyplot as plt

torch.backends.cudnn.benchmark = True
torch.cuda.set_per_process_memory_fraction(0.9, 0)

num_epochs = 200
device = torch.device('cuda')
model.to(device)

accumulation_steps = 2
early_stopping_patience = 10
early_stopping_counter = 0
best_loss = float('inf')

train_losses = []
val_losses = []

optimizer_g = torch.optim.Adam(model.parameters(), lr=0.001)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer_g, factor=0.1, patience=5, verbose=True)

gc.collect()
torch.cuda.empty_cache()

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for batch_idx, batch_data in enumerate(train_loader):
        gt_imgs = batch_data['gt'].to(device)
        lq_imgs = degrade_image(gt_imgs).to(device)

        optimizer_g.zero_grad()

        sr_imgs = model(lq_imgs)
        sr_imgs_resized = F.interpolate(sr_imgs, size=(gt_imgs.size(2), gt_imgs.size(3)), mode='bilinear', align_corners=False)
        g_loss = criterion_l1(sr_imgs_resized, gt_imgs)

        g_loss = g_loss / accumulation_steps
        g_loss.backward()

        if (batch_idx + 1) % accumulation_steps == 0:
            optimizer_g.step()
            optimizer_g.zero_grad()

        train_loss += g_loss.item() * accumulation_steps

        gc.collect()
        torch.cuda.empty_cache()

    train_loss /= len(train_loader)
    train_losses.append(train_loss)

    model.eval()
    val_loss = 0
    with torch.no_grad():
        for batch_data in val_loader:
            gt_imgs = batch_data['gt'].to(device)
            lq_imgs = batch_data['lq'].to(device)
            sr_imgs = model(lq_imgs)
            sr_imgs_resized = F.interpolate(sr_imgs, size=(gt_imgs.size(2), gt_imgs.size(3)), mode='bilinear', align_corners=False)
            val_loss += criterion_l1(sr_imgs_resized, gt_imgs).item()

    val_loss /= len(val_loader)
    val_losses.append(val_loss)
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

    scheduler.step(val_loss)

    if val_loss < best_loss:
        best_loss = val_loss
        early_stopping_counter = 0
        torch.save(model.state_dict(), f"/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/model_2_{val_loss:.4f}.pth")
    else:
        early_stopping_counter += 1
        if early_stopping_counter >= early_stopping_patience:
            print("Stopping early due to lack of improvement in validation loss.")
            break

plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss Curves')
plt.legend()
plt.show()

torch.save(model.state_dict(), f"/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/model_final.pth")

In [11]:
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss Curves')
plt.legend()
plt.show()


NameError: name 'plt' is not defined

In [None]:
# for epoch in range(num_epochs):
#     model.train()
#     train_loss = 0
#     for batch_idx, batch_data in enumerate(train_loader):
#         gt_imgs = batch_data['gt'].to(device)
#         lq_imgs = degrade_image(gt_imgs)  # Generate LQ images on-the-fly

#         optimizer.zero_grad()
#         sr_imgs = model(lq_imgs)  # Super-resolve the LQ images
#         sr_imgs_resized = F.interpolate(sr_imgs, size=(gt_imgs.size(2), gt_imgs.size(3)), mode='bilinear', align_corners=False)

#         loss = criterion(sr_imgs_resized, gt_imgs)
#         loss = loss / accumulation_steps  # Normalize the loss for gradient accumulation
#         loss.backward()

#         if (batch_idx + 1) % accumulation_steps == 0:
#             optimizer.step()
#             optimizer.zero_grad()

#         train_loss += loss.item()

#     # After each epoch, check validation loss
#     model.eval()
#     val_loss = 0
#     with torch.no_grad():
#       for batch_data in val_loader:
#           gt_imgs = batch_data['gt'].to(device)
#           lq_imgs = batch_data['lq'].to(device)
#           sr_imgs = model(lq_imgs)  # Super-resolve the low-quality images
#           sr_imgs_resized = F.interpolate(sr_imgs, size=(gt_imgs.size(2), gt_imgs.size(3)), mode='bilinear', align_corners=False)
#           val_loss += criterion(sr_imgs_resized, gt_imgs).item()

#     val_loss /= len(val_loader)

#     print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss/len(train_loader):.4f}, Validation Loss: {val_loss:.4f}")

#     # Early stopping logic
#     if val_loss < best_loss:
#         best_loss = val_loss
#         early_stopping_counter = 0  # Reset counter
#         # Save the model if it's the best so far
#         torch.save(model.state_dict(), f"/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/model_2_{val_loss}.pth")
#     else:
#         early_stopping_counter += 1
#         print(f"No improvement in validation loss for {early_stopping_counter} epochs.")
#         if early_stopping_counter >= early_stopping_patience:
#             print("Stopping early due to lack of improvement in validation loss.")
#             break  # Break out of the training loop

# # Save the trained model
# torch.save(model.state_dict(), f"/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/model_2_{val_loss}.pth")

In [None]:
print(sum(p.numel() for p in model.parameters()))
# torch.save(model.state_dict(), f"/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/model_2_{val_loss}.pth")

In [None]:
first_item = next(iter(test_loader))
print(first_item)

In [12]:
import torch
from torch.utils.data import DataLoader
from torchvision.utils import save_image
import os
import torchvision.transforms.functional as TF

model_path = '/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/best_model_0.0034990164777264.pth'
model = SRResNet(num_in_ch=3, num_out_ch=3, num_feat=32, num_block=8, upscale=4)  # Adjust parameters as needed
model.load_state_dict(torch.load(model_path))
model.eval()

device = torch.device('cuda')
model.to(device)

output_dir = '/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/SuperResolution/outputs'
os.makedirs(output_dir, exist_ok=True)

brightness_factor = 1.5

with torch.no_grad():
    for i, data in enumerate(test_loader):
        input_tensor = data['gt'].to(device)
        image_paths = data['gt_path']
        print(f"Processing images: {image_paths}")

        output_tensor = model(input_tensor)

        for j, (single_img_tensor, image_path) in enumerate(zip(output_tensor, image_paths)):
            single_img = single_img_tensor.cuda().squeeze(0)
            # single_img = TF.adjust_brightness(single_img, brightness_factor)
            base_name = os.path.basename(image_path).split('.')[0]
            print(f"Saving image: {base_name}.png")
            save_image(single_img, os.path.join(output_dir, f'{base_name}.png'))

print(f"Super-resolution images are saved in {output_dir}")


RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

In [None]:
import os
import zipfile

output_dir = '/content/drive/MyDrive/NTU 2023-2024/Advanced Computer Vision Projects/SuperResolution/outputs'
base_name = 'submission'

zip_filename = f"{base_name}.zip"
with zipfile.ZipFile(zip_filename, 'w') as zipf:
    for root, dirs, files in os.walk(output_dir):
        for file in files:
            file_path = os.path.join(root, file)
            zipf.write(file_path, arcname=os.path.basename(file_path))

print(f"Images are successfully compressed into {zip_filename}. You can download it for submission.")