In [1]:
import numpy as np
import os
import tqdm
import torch
from PIL import Image
import cv2
import matplotlib.pyplot as plt
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from datasets import Dataset
import torch.nn as nn
import torch.nn.functional as F

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
image_dir = '/kaggle/input/task-4-1/train/train'
gt_dir = '/kaggle/input/task-4-1/train/gt'

image_files = sorted(os.listdir(image_dir))
gt_files = sorted(os.listdir(gt_dir))

imgs = []
gt = []

for i in range(len(os.listdir(image_dir))):
    imgs.append(cv2.imread(os.path.join(image_dir, image_files[i])))
    gt.append(cv2.imread(os.path.join(gt_dir, gt_files[i])))

In [4]:
imgs_val_dir = '/kaggle/input/task-4-1/val/val'
gt_val_dir = '/kaggle/input/task-4-1/val/gt'

image_files_val = sorted(os.listdir(imgs_val_dir))
gt_files_val = sorted(os.listdir(gt_val_dir))

imgs_val = []
gt_val = []

for i in range(len(os.listdir(imgs_val_dir))):
    imgs_val.append(cv2.imread(os.path.join(imgs_val_dir, image_files_val[i])))
    gt_val.append(cv2.imread(os.path.join(gt_val_dir, gt_files_val[i])))

In [5]:
imgs = np.array(imgs)
gt= np.array(gt)

In [6]:
imgs_val = np.array(imgs_val)
gt_val = np.array(gt_val)

In [7]:
imgs.shape

(1105, 160, 256, 3)

In [8]:
data = [(transforms.ToTensor()(imgs[i]), transforms.ToTensor()(gt[i])) for i in range(len(imgs))]
data_val = [(transforms.ToTensor()(imgs_val[i]), transforms.ToTensor()(gt_val[i])) for i in range(len(imgs_val))]

train = DataLoader(data, batch_size=8, shuffle=True)
val = DataLoader(data_val, batch_size=2, shuffle=True)

In [9]:
class ChannelAttention(nn.Module):
    def __init__(self, in_channels, reduction=16):
        super(ChannelAttention, self).__init__()
        self.fc1 = nn.Conv2d(in_channels, in_channels // reduction, kernel_size=1, bias=True)
        self.fc2 = nn.Conv2d(in_channels // reduction, in_channels, kernel_size=1, bias=True)

    def forward(self, x):
        avg_pool = F.adaptive_avg_pool2d(x, (1, 1))
        max_pool = F.adaptive_max_pool2d(x, (1, 1))
        out = self.fc1(avg_pool) + self.fc1(max_pool)
        out = F.relu(out, inplace=True)
        out = self.fc2(out)
        return x * torch.sigmoid(out)

class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super(SpatialAttention, self).__init__()
        self.conv = nn.Conv2d(2, 1, kernel_size=kernel_size, padding=(kernel_size // 2), bias=False)

    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        out = torch.cat([avg_out, max_out], dim=1)
        out = self.conv(out)
        return x * torch.sigmoid(out)

class InceptionModule(nn.Module):
    def __init__(self, in_channels, out_channels, dilation=2):
        super(InceptionModule, self).__init__()
        self.conv1x1 = nn.Conv2d(in_channels, out_channels, kernel_size=1)
        self.conv3x3 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        self.conv_dilated = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=dilation, dilation=dilation)
        self.pool = nn.AvgPool2d(kernel_size=3, stride=1, padding=1)

    def forward(self, x):
        out1 = self.conv1x1(x)
        out2 = self.conv3x3(x)
        out3 = self.conv_dilated(x)
        out4 = self.pool(x)
        out = torch.cat([out1, out2, out3, out4], dim=1)
        return out

class ResidualBlock(nn.Module):
    def __init__(self, in_channels):
        super(ResidualBlock, self).__init__()
        self.inception = InceptionModule(in_channels, in_channels)
        concatenated_channels = in_channels * 4
        self.project = nn.Conv2d(concatenated_channels, in_channels, kernel_size=3, padding=1, bias=True)
        self.ca = ChannelAttention(in_channels)
        self.sa = SpatialAttention()

    def forward(self, x):
        residual = x
        out = self.inception(x)
        out = self.project(out)
        out = self.ca(out)
        out = self.sa(out)
        return out + residual

class UpscaleBlock(nn.Module):
    def __init__(self, in_channels, scale_factor):
        super(UpscaleBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, in_channels * (scale_factor ** 2), kernel_size=3, padding=1)
        self.pixel_shuffle = nn.PixelShuffle(scale_factor)

    def forward(self, x):
        out = self.conv(x)
        out = self.pixel_shuffle(out)
        return F.relu(out, inplace=True)

class SISRModel(nn.Module):
    def __init__(self, in_channels=3, num_features=64, num_res_blocks=8, scale_factor=4):
        super(SISRModel, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, num_features, kernel_size=3, padding=1)
        self.res_blocks = nn.Sequential(
            *[ResidualBlock(num_features) for _ in range(num_res_blocks)]
        )
        self.conv2 = nn.Conv2d(num_features, num_features, kernel_size=3, padding=1)
        self.upscale = UpscaleBlock(num_features, scale_factor)
        self.conv3 = nn.Conv2d(num_features, in_channels, kernel_size=3, padding=1)

    def forward(self, x):
        initial_features = F.relu(self.conv1(x), inplace=True)
        features = self.res_blocks(initial_features)
        features = self.conv2(features) + initial_features
        upscaled = self.upscale(features)
        output = self.conv3(upscaled)
        return output


In [10]:
model = SISRModel(in_channels=3, num_features=64, num_res_blocks=8, scale_factor=4)
model.to(device)

SISRModel(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (res_blocks): Sequential(
    (0): ResidualBlock(
      (inception): InceptionModule(
        (conv1x1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1))
        (conv3x3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (conv_dilated): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2), dilation=(2, 2))
        (pool): AvgPool2d(kernel_size=3, stride=1, padding=1)
      )
      (project): Conv2d(256, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (ca): ChannelAttention(
        (fc1): Conv2d(64, 4, kernel_size=(1, 1), stride=(1, 1))
        (fc2): Conv2d(4, 64, kernel_size=(1, 1), stride=(1, 1))
      )
      (sa): SpatialAttention(
        (conv): Conv2d(2, 1, kernel_size=(7, 7), stride=(1, 1), padding=(3, 3), bias=False)
      )
    )
    (1): ResidualBlock(
      (inception): InceptionModule(
        (conv1x1): Conv2d(64, 64, kernel_size=(1

In [11]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params}")

Total parameters: 2441011


In [12]:
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.00075)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.75)

In [13]:
import copy
import torch.optim as optim

num_epochs = 50
best_val_loss = float('inf')
patience = 5
patience_counter = 0
best_model_weights = copy.deepcopy(model.state_dict())

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0

    for imgs, depths in train:
        imgs = imgs.to(device)
        depths = depths.to(device)
        outputs = model(imgs)
        optimizer.zero_grad()
        loss = criterion(outputs, depths)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train)

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for imgs, depths in val:
            imgs = imgs.to(device)
            depths = depths.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, depths)
            val_loss += loss.item()
    val_loss /= len(val)

    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss}, Validation Loss: {val_loss}")

    # Check for improvement
    if val_loss < best_val_loss:
        print(f"Validation loss improved from {best_val_loss} to {val_loss}. Saving best weights.")
        best_val_loss = val_loss
        patience_counter = 0
        best_model_weights = copy.deepcopy(model.state_dict())
    else:
        patience_counter += 1
        print(f"No improvement for {patience_counter} epochs.")

    if patience_counter >= patience:
        print("Early stopping due to no improvement in validation loss.")
        break

    scheduler.step()

model.load_state_dict(best_model_weights)

Epoch 1/50, Train Loss: 0.002050137425782896, Validation Loss: 0.00027552406108545476
Validation loss improved from inf to 0.00027552406108545476. Saving best weights.
Epoch 2/50, Train Loss: 0.0004093280099139703, Validation Loss: 0.00023047564726726
Validation loss improved from 0.00027552406108545476 to 0.00023047564726726. Saving best weights.
Epoch 3/50, Train Loss: 0.0003482635576226045, Validation Loss: 0.0002085324984418277
Validation loss improved from 0.00023047564726726 to 0.0002085324984418277. Saving best weights.
Epoch 4/50, Train Loss: 0.00032217115787336965, Validation Loss: 0.0001953703835875894
Validation loss improved from 0.0002085324984418277 to 0.0001953703835875894. Saving best weights.
Epoch 5/50, Train Loss: 0.00030865683073760846, Validation Loss: 0.00019177512757466244
Validation loss improved from 0.0001953703835875894 to 0.00019177512757466244. Saving best weights.
Epoch 6/50, Train Loss: 0.0002970209762097249, Validation Loss: 0.0001810241560429967
Validat

<All keys matched successfully>

In [14]:
torch.save(model, "model.pth")

In [15]:
image_dir_test = '/kaggle/input/task-4-1/test'
image_files_test = sorted(os.listdir(image_dir_test))

imgs_test = []

for i in range(len(os.listdir(image_dir_test))):
    imgs_test.append(cv2.imread(os.path.join(image_dir_test, image_files_test[i])))

imgs_test = np.array(imgs_test)

In [16]:
imgs_test.shape

(60, 160, 256, 3)

In [17]:
data = [transforms.ToTensor()(imgs_test[i]) for i in range(len(imgs_test))]
test = DataLoader(data, batch_size=4, shuffle=False)

In [18]:
preds = []
with torch.no_grad():
    for imgs in test:
        imgs = imgs.to(device)
        outputs = model(imgs)
        preds.extend(outputs.cpu().numpy())

In [19]:
preds = np.array(preds)
preds.shape
preds = (preds * 255).astype(np.uint8)

In [20]:
output_dir = "output_images"
os.makedirs(output_dir, exist_ok=True)

for idx, img in enumerate(preds):
    img = img.transpose(1, 2, 0)
    filename = os.path.join(output_dir, f"gt_{idx:05d}.png")
    cv2.imwrite(filename, img)

In [21]:
import zipfile

folder_to_zip = "output_images"
zip_filename = "zipped_folder.zip"

with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
    for root, _, files in os.walk(folder_to_zip):
        for file in files:
            file_path = os.path.join(root, file)
            arcname = os.path.relpath(file_path, folder_to_zip)
            zipf.write(file_path, arcname)