In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.utils import save_image
import os
import numpy as np
from PIL import Image

In [2]:
class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.block = nn.Sequential(
            nn.Conv2d(channels, channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(channels, channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(channels)
        )

    def forward(self, x):
        return x + self.block(x)  # Skip connection


In [3]:
class LAB(nn.Module):
    def __init__(self, channels, reduction=16):
        super(LAB, self).__init__()
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channels, channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channels // reduction, channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        batch, channels, _, _ = x.size()
        y = self.global_pool(x).view(batch, channels)
        y = self.fc(y).view(batch, channels, 1, 1)
        return x * y  # Scale the input with attention weights


In [4]:
class LightweightDenoiser(nn.Module):
    def __init__(self):
        super(LightweightDenoiser, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.ReLU(inplace=True)
        )
        self.residual_blocks = nn.Sequential(
            *[ResidualBlock(128) for _ in range(4)]  # Fewer residual blocks for lightweight design
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1)
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.residual_blocks(x)
        x = self.decoder(x)
        return x


In [5]:
class SuperResolutionNetLAKD(nn.Module):
    def __init__(self, num_channels=3, num_features=64, num_blocks=8, scale=4):
        super(SuperResolutionNetLAKD, self).__init__()
        self.input = nn.Conv2d(num_channels, num_features, kernel_size=3, padding=1)

        # Use Lightweight Attention Blocks
        self.blocks = nn.Sequential(
            *[LAB(num_features) for _ in range(num_blocks)]
        )

        # Upsampling with correct channel adjustments
        self.upsample = nn.Sequential(
            nn.Conv2d(num_features, num_features * (scale // 2) ** 2, kernel_size=3, padding=1),
            nn.PixelShuffle(scale // 2),  # Upsample by 2x
            nn.ReLU(inplace=True),
            nn.Conv2d(num_features, num_features * (scale // 2) ** 2, kernel_size=3, padding=1),
            nn.PixelShuffle(scale // 2),  # Upsample by another 2x
            nn.ReLU(inplace=True),
            nn.Conv2d(num_features, num_features, kernel_size=3, padding=1)  # Reduce back to num_features channels
        )

        self.output = nn.Conv2d(num_features, num_channels, kernel_size=3, padding=1)

    def forward(self, x):
        x = self.input(x)
        x = self.blocks(x)
        x = self.upsample(x)
        return self.output(x)


In [6]:
class CombinedModel(nn.Module):
    def __init__(self):
        super(CombinedModel, self).__init__()
        self.denoiser = LightweightDenoiser()
        self.super_resolver = SuperResolutionNetLAKD()

    def forward(self, x):
        x = self.denoiser(x)
        x = self.super_resolver(x)
        return x


In [7]:
# Custom Dataset
class LowLightDataset(Dataset):
    def __init__(self, clean_dir, noisy_dir, transform=None):
        self.noisy_dir = noisy_dir
        self.transform = transform
        self.clean_dir = clean_dir
        self.image_list = os.listdir(noisy_dir)

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, idx):
        image_name = self.image_list[idx]
        name, img_no = image_name.split("_")
        
        noise_path = os.path.join(self.noisy_dir, self.image_list[idx])
        clean_path = os.path.join(self.clean_dir, "gt_"+img_no)
        
        noise_img = Image.open(noise_path).convert("RGB")
        clean_image = Image.open(clean_path).convert("RGB")
        
        if self.transform:
            clean_image = self.transform(clean_image)
            noise_img = self.transform(noise_img)
        return clean_image, noise_img


In [8]:
# Custom Dataset
class TestLowLightDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_list = os.listdir(image_dir)

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, idx):
        img_path = os.path.join(self.image_dir, self.image_list[idx])
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, self.image_list[idx]

In [9]:
# DataLoader Setup
transform = transforms.Compose([
    transforms.ToTensor(),
])

In [10]:
train_clean = r'/kaggle/input/enhance-the-dark-world/archive/train/gt'
train_noisy = r'/kaggle/input/enhance-the-dark-world/archive/train/train'

val_clean = r'/kaggle/input/enhance-the-dark-world/archive/val/gt'
val_noisy = r'/kaggle/input/enhance-the-dark-world/archive/val/val'


test = r'/kaggle/input/enhance-the-dark-world/archive/test'

In [11]:
train_dataset = LowLightDataset(train_clean,train_noisy, transform=transform)
val_dataset = LowLightDataset(val_clean,val_noisy, transform=transform)
test_dataset = TestLowLightDataset(test, transform = transform)

In [12]:
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size =16, shuffle = False, num_workers = 4)

In [13]:
# Training Function
def train_model(model, dataloader, optimizer, criterion, scaler, device):
    model.train()
    running_loss = 0.0
    for clean_images, noisy_images in dataloader:
        clean_images,noisy_images = clean_images.to(device), noisy_images.to(device)
        optimizer.zero_grad()
        with torch.cuda.amp.autocast():
            outputs = model(noisy_images)
            loss = criterion(outputs, clean_images)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()

    return running_loss / len(dataloader)

In [14]:
# Validation Function
def validate_model(model, dataloader, criterion, device):
  model.eval()
  running_loss = 0.0
  with torch.no_grad():
    for clean_images, noisy_images in dataloader:
        clean_images = clean_images.to(device)
        noisy_images = noisy_images.to(device)
        outputs = model(noisy_images)
        loss = criterion(outputs, clean_images)
        running_loss += loss.item()
  return running_loss / len(dataloader)

In [15]:
# Training Loop
# shift the model to the GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

def initialize_weights(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)
model = CombinedModel()

model = nn.DataParallel(model)  # Use GPUs 0 and 1
model.apply(initialize_weights)  # Apply custom weight initialization

model = model.to(device)

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
scaler = torch.amp.GradScaler()

In [16]:
num_epochs = 100
best_val_loss = float("inf")

for epoch in range(num_epochs):

  train_loss = train_model(model, train_loader, optimizer, criterion, scaler, device)
  val_loss = validate_model(model, val_loader, criterion, device)

  if val_loss < best_val_loss:
      best_val_loss = val_loss
      torch.save(model.state_dict(), "best_model.pth")

  print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.9f}, Val Loss: {val_loss:.9f}")

  with torch.cuda.amp.autocast():
  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


Epoch 1/100, Train Loss: 0.008404980, Val Loss: 0.000565794
Epoch 2/100, Train Loss: 0.000918865, Val Loss: 0.000467923
Epoch 3/100, Train Loss: 0.000840137, Val Loss: 0.000507194
Epoch 4/100, Train Loss: 0.000787239, Val Loss: 0.000407043
Epoch 5/100, Train Loss: 0.000673114, Val Loss: 0.000380729
Epoch 6/100, Train Loss: 0.000657342, Val Loss: 0.000427366
Epoch 7/100, Train Loss: 0.000617800, Val Loss: 0.000465952
Epoch 8/100, Train Loss: 0.000613486, Val Loss: 0.000323718
Epoch 9/100, Train Loss: 0.000537856, Val Loss: 0.000309895
Epoch 10/100, Train Loss: 0.000547556, Val Loss: 0.000292735
Epoch 11/100, Train Loss: 0.000561846, Val Loss: 0.000307456
Epoch 12/100, Train Loss: 0.000547525, Val Loss: 0.000264051
Epoch 13/100, Train Loss: 0.000500008, Val Loss: 0.000295059
Epoch 14/100, Train Loss: 0.000508573, Val Loss: 0.000252993
Epoch 15/100, Train Loss: 0.000471223, Val Loss: 0.000261777
Epoch 16/100, Train Loss: 0.000464571, Val Loss: 0.000261682
Epoch 17/100, Train Loss: 0.00045

In [17]:
os.makedirs('/kaggle/working/test_final',exist_ok=True)
final_path = "/kaggle/working/test_final"

In [18]:
def test_prediction(model, test_loader, device, path):
    model.eval()
    with torch.no_grad():
        with tqdm(total = len(test_loader), desc = 'Testing', unit = 'batch') as tepoch:
            for noise_imgs, img_name in test_loader:
                noise_imgs = noise_imgs.to(device)

                outputs = model(noise_imgs)

                for idx in range(outputs.shape[0]):
                    # Scale the output tensor to [0, 1]
                    predicted_img = outputs[idx].squeeze(0).cpu()
                    
                    # Save the image with the given name
                    out_path = os.path.join(path, f"{img_name[idx]}")
                    save_image(predicted_img, out_path)

                tepoch.update(1)

In [19]:
from tqdm import tqdm

## Loading Best Model

In [20]:
model.load_state_dict(torch.load("best_model.pth"))

  model.load_state_dict(torch.load("best_model.pth"))


<All keys matched successfully>

In [21]:
test_prediction(model,test_loader,device, final_path)

Testing: 100%|██████████| 4/4 [00:23<00:00,  5.83s/batch]


In [22]:
import os
import numpy as np
import pandas as pd
from PIL import Image

def images_to_csv(folder_path, output_csv):
    data_rows = []
    for filename in os.listdir(folder_path):
        if filename.endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
            image_path = os.path.join(folder_path, filename)
            image = Image.open(image_path).convert('L') 
            image_array = np.array(image).flatten()[::8]
            # Replace 'test_' with 'gt_' in the ID
            image_id = filename.split('.')[0].replace('test_', 'gt_')
            data_rows.append([image_id, *image_array])
    column_names = ['ID'] + [f'pixel_{i}' for i in range(len(data_rows[0]) - 1)]
    df = pd.DataFrame(data_rows, columns=column_names)
    df.to_csv(output_csv, index=False)
    print(f'Successfully saved to {output_csv}')

folder_path = final_path
output_csv = '21f1000641.csv'
images_to_csv(folder_path, output_csv)

Successfully saved to 21f1000641.csv
