In [6]:
import copy
import os
import random
import time

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms.functional as TF
from PIL import Image
from pytorch_model_summary import summary
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision import transforms

torch.manual_seed(0)
np.random.seed(0)
random.seed(0)
torch.cuda.manual_seed_all(0)
torch.backends.cudnn.benchmark = True  # speed


In [7]:
train_df = pd.read_csv("dataset/train.csv")
test_df = pd.read_csv("dataset/test_input.csv")
validation_df = pd.read_csv("dataset/validation.csv")

In [8]:
class SupersamplingDataset(Dataset):
    def __init__(self, df, dataset_path, stage="train", augment=True):
        self.df = df.reset_index(drop=True)
        self.dataset_path = dataset_path
        self.stage = stage  # 'train', 'val', 'test'
        self.augment = augment
        self.to_tensor = transforms.ToTensor()

    def __len__(self):
        return len(self.df)

    def _apply_geometric_augment(self, lr_img, hr_img):
        if random.random() < 0.5:
            lr_img = TF.hflip(lr_img); hr_img = TF.hflip(hr_img)
        if random.random() < 0.5:
            lr_img = TF.vflip(lr_img); hr_img = TF.vflip(hr_img)

        return lr_img, hr_img


    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_id = row["id"]

        # Low-res image
        small_image_path = os.path.join(self.dataset_path, row["input_image"])
        small_image = Image.open(small_image_path).convert("RGB")

        # TRAIN or VAL: return LR + HR
        if self.stage in ("train", "val"):
            big_image_path = os.path.join(self.dataset_path, row["target_image"])
            big_image = Image.open(big_image_path).convert("RGB")

            # Apply joint augmentations (train only)
            if self.augment:
                small_image, big_image = self._apply_geometric_augment(small_image, big_image)

            # To tensor
            small_tensor = self.to_tensor(small_image)  # [3, 32, 32]
            big_tensor = self.to_tensor(big_image)      # [3, 128, 128]

            return img_id, small_tensor, big_tensor

        # TEST: only LR
        small_tensor = self.to_tensor(small_image)
        return img_id, small_tensor

In [9]:
batch_size = 64

In [11]:
train_dataset = SupersamplingDataset(
    df=train_df,
    dataset_path="dataset/train/",
    stage="train",
    augment=True    # use flips + rotations
)

validation_dataset = SupersamplingDataset(
    df=validation_df,
    dataset_path="dataset/validation/",
    stage="val",
    augment=False
)

test_dataset = SupersamplingDataset(
    df=test_df,
    dataset_path="dataset/test_input/",
    stage="test",
    augment=False
)


train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,
                              num_workers=0, pin_memory=True, persistent_workers=False
                              )
validation_dataloader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False,
                                   num_workers=0, pin_memory=True, persistent_workers=False
                                   )
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False,
                             num_workers=0, pin_memory=True, persistent_workers=False
                             )

In [12]:
class ResidualBlock(nn.Module):
    def __init__(self, channels=64):
        super().__init__()
        self.conv1 = nn.Conv2d(channels, channels, 3, 1, 1)
        self.act   = nn.LeakyReLU(0.2, inplace=True)
        self.conv2 = nn.Conv2d(channels, channels, 3, 1, 1)

    def forward(self, x):
        return x + self.conv2(self.act(self.conv1(x)))

class UpsampleBlock(nn.Module):
    """2x upsample using PixelShuffle"""
    def __init__(self, in_channels=64):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, in_channels * 4, 3, 1, 1)
        self.ps   = nn.PixelShuffle(2)
        self.act  = nn.LeakyReLU(0.2, inplace=True)

    def forward(self, x):
        return self.act(self.ps(self.conv(x)))

class SRResNet4x(nn.Module):
    def __init__(self, num_blocks=12, channels=64, clamp_output=False):
        super().__init__()
        self.clamp_output = clamp_output

        self.conv_in = nn.Conv2d(3, channels, 3, 1, 1)
        self.act     = nn.LeakyReLU(0.2, inplace=True)

        self.blocks  = nn.Sequential(*[ResidualBlock(channels) for _ in range(num_blocks)])
        self.conv_mid = nn.Conv2d(channels, channels, 3, 1, 1)

        self.up1 = UpsampleBlock(channels)  # 2x
        self.up2 = UpsampleBlock(channels)  # 4x total

        self.conv_out = nn.Conv2d(channels, 3, 3, 1, 1)

    def forward(self, lr):
        # bicubic skip
        base = F.interpolate(lr, scale_factor=4, mode="bicubic", align_corners=False)

        x = self.act(self.conv_in(lr))
        res = x
        x = self.blocks(x)
        x = self.conv_mid(x) + res

        x = self.up1(x)
        x = self.up2(x)
        x = self.conv_out(x)

        out = base + x  # residual learning

        if self.clamp_output:
            out = out.clamp(0.0, 1.0)
        return out


In [13]:
model = SRResNet4x(num_blocks=12, channels=64, clamp_output=False).cuda()
nn.init.zeros_(model.conv_out.weight)
if model.conv_out.bias is not None:
    nn.init.zeros_(model.conv_out.bias)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

scheduler = torch.optim.lr_scheduler.MultiStepLR(
    optimizer, milestones=[60, 120, 160], gamma=0.5
)

scaler = torch.cuda.amp.GradScaler()
print(summary(model, torch.rand(size=(16, 3, 32, 32)).cuda(), show_input=True))

  scaler = torch.cuda.amp.GradScaler()


---------------------------------------------------------------------------
       Layer (type)            Input Shape         Param #     Tr. Param #
           Conv2d-1        [16, 3, 32, 32]           1,792           1,792
        LeakyReLU-2       [16, 64, 32, 32]               0               0
    ResidualBlock-3       [16, 64, 32, 32]          73,856          73,856
    ResidualBlock-4       [16, 64, 32, 32]          73,856          73,856
    ResidualBlock-5       [16, 64, 32, 32]          73,856          73,856
    ResidualBlock-6       [16, 64, 32, 32]          73,856          73,856
    ResidualBlock-7       [16, 64, 32, 32]          73,856          73,856
    ResidualBlock-8       [16, 64, 32, 32]          73,856          73,856
    ResidualBlock-9       [16, 64, 32, 32]          73,856          73,856
   ResidualBlock-10       [16, 64, 32, 32]          73,856          73,856
   ResidualBlock-11       [16, 64, 32, 32]          73,856          73,856
   ResidualBlock-12     

In [14]:
for epoch in range(5):
    start_time = time.time()
    model.train()
    for ids, lr, hr in train_dataloader:
        lr = lr.cuda(non_blocking=True)   # lr in [0,1]
        hr = hr.cuda(non_blocking=True)   # hr in [0,1]

        optimizer.zero_grad(set_to_none=True)

        with torch.cuda.amp.autocast():
            pred = model(lr)
            loss = criterion(pred, hr)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

    scheduler.step()

    # validation
    val_loss = 0.0
    n = 0
    with torch.no_grad():
        for ids, lr, hr in validation_dataloader:
            lr = lr.cuda(non_blocking=True)
            hr = hr.cuda(non_blocking=True)
            pred = model(lr).clamp(0, 1)
            bs = lr.size(0)
            val_loss += criterion(pred, hr).item() * bs
            n += bs
    val_loss /= n
    end_time = time.time()
    print(f"epoch {epoch + 1}, "
          f"loss {val_loss:.6f}, "
          f"time {end_time - start_time:1f} s")

  with torch.cuda.amp.autocast():


epoch 1, loss 0.00827, time 26.432177 seconds
epoch 2, loss 0.00795, time 11.289188 seconds
epoch 3, loss 0.00754, time 11.360390 seconds
epoch 4, loss 0.00730, time 11.599068 seconds
epoch 5, loss 0.00719, time 11.638368 seconds


In [None]:
def show_test_samples(
    model,
    test_loader,
    device="cuda",
    num_samples=4,
    upscale_factor=4,
    up_mode="bicubic",
):
    model.eval()
    model.to(device)

    to_pil = transforms.ToPILImage()
    shown = 0

    with torch.no_grad():
        for batch in test_loader:
            if not isinstance(batch, (tuple, list)) or len(batch) < 2:
                raise ValueError("Expected batch like (ids, lr_imgs, [hr_imgs], ...).")

            ids = batch[0]
            lr_imgs = batch[1]
            hr_imgs = batch[2] if len(batch) >= 3 and torch.is_tensor(batch[2]) else None

            lr_imgs = lr_imgs.to(device)  # (B,3,32,32)
            sr_imgs = model(lr_imgs)      # (B,3,128,128) expected

            # clamp for visualization
            lr_vis = lr_imgs.clamp(0.0, 1.0)
            sr_vis = sr_imgs.clamp(0.0, 1.0)

            # baseline upsample for comparison
            lr_up = F.interpolate(
                lr_vis,
                scale_factor=upscale_factor,
                mode=up_mode,
                align_corners=False if up_mode in ("bilinear", "bicubic") else None
            ).clamp(0.0, 1.0)

            B = lr_imgs.size(0)
            for i in range(B):
                if shown >= num_samples:
                    return

                sample_id = ids[i].item() if torch.is_tensor(ids[i]) else ids[i]

                lr_up_pil = to_pil(lr_up[i].cpu())
                sr_pil = to_pil(sr_vis[i].cpu())

                # If HR exists, show it. Otherwise show the raw LR 32x32.
                if hr_imgs is not None:
                    hr_pil = to_pil(hr_imgs[i].clamp(0.0, 1.0).cpu())
                    titles = ["LR (upsampled)", "SR (model)", "HR (ground truth)"]
                    images = [lr_up_pil, sr_pil, hr_pil]
                else:
                    lr_small_pil = to_pil(lr_vis[i].cpu())
                    titles = ["LR (upsampled)", "SR (model)", "LR (32x32)"]
                    images = [lr_up_pil, sr_pil, lr_small_pil]

                fig, axes = plt.subplots(1, 3, figsize=(12, 4))
                fig.suptitle(f"ID: {sample_id}")

                for ax, img, title in zip(axes, images, titles):
                    ax.imshow(img)
                    ax.set_title(title)
                    ax.axis("off")

                plt.tight_layout()
                plt.show()

                shown += 1


show_test_samples(
    model=model,
    test_loader=validation_dataloader,
    device="cuda",
    num_samples=4
)

In [56]:
def create_submission_csv(
    model,
    test_loader,
    out_csv_path="submissions/submission.csv",
    device="cuda"
):
    model.eval()
    model.to(device)

    H, W, C = 128, 128, 3
    num_pixels = H * W * C

    all_rows = []

    with torch.no_grad():
        for batch in test_loader:
            # Handle (ids, lr) or (ids, lr, something_else)
            if len(batch) == 2:
                ids, lr_imgs = batch
            elif len(batch) == 3:
                ids, lr_imgs, _ = batch
            else:
                raise ValueError("Unexpected batch format from test_loader")

            lr_imgs = lr_imgs.to(device)           # [B, 3, 32, 32]

            # Forward pass: super-res prediction
            sr_imgs = model(lr_imgs)               # [B, 3, 128, 128]

            # Clamp to [0,1] before converting to [0,255]
            sr_imgs = sr_imgs.clamp(0.0, 1.0)

            # Move to CPU, (B, H, W, C)
            sr_np = sr_imgs.permute(0, 2, 3, 1).cpu().numpy()

            # Scale to [0,255], round and convert to uint8
            sr_np = (sr_np * 255.0).round().clip(0, 255).astype(np.uint8)

            B = sr_np.shape[0]
            for i in range(B):
                img_id = int(ids[i])
                # Flatten in row-major order -> (H * W * C,)
                flat_pixels = sr_np[i].reshape(-1)
                # Build row: [id, pixel_0, ..., pixel_49151]
                row = [img_id] + flat_pixels.tolist()
                all_rows.append(row)

    # Build DataFrame
    columns = ["id"] + [f"pixel_{i}" for i in range(num_pixels)]
    df = pd.DataFrame(all_rows, columns=columns)

    # Ensure sorted by id (just to be safe)
    df = df.sort_values("id").reset_index(drop=True)

    # Save
    df.to_csv(out_csv_path, index=False)
    print(f"Saved submission to {out_csv_path} with shape {df.shape}")


In [57]:
create_submission_csv(
    model=model,
    test_loader=test_dataloader,
    out_csv_path="submissions/submission4.csv",
    device="cuda"
)

Saved submission to submissions/submission4.csv with shape (500, 49153)
