In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_236.png
/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_345.png
/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_692.png
/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_8.png
/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_507.png
/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_660.png
/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_410.png
/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_508.png
/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_725.png
/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_614.png
/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_595.png
/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy/Image_Test_280.png
/kaggle/input/180-

In [13]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import transforms
from torch.optim import lr_scheduler
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
from PIL import Image
import pandas as pd
import torch.nn.functional as F
import copy

# --- Configuration ---
# Set the paths to your dataset directories
TRAIN_CLEAN_DIR = '/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/train/clean'
TRAIN_NOISY_DIR = '/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/train/noisy'
TEST_NOISY_DIR = '/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy'
DENOISED_OUTPUT_DIR = 'Denoised_Images'
SUBMISSION_FILE = 'test_labels.csv'

# Image dimensions
IMG_WIDTH = 128 
IMG_HEIGHT = 128
IMG_CHANNELS = 3

# Class mapping
CLASS_MAP = {
    'Daisy': 1,
    'Dandelion': 2,
    'Roses': 3,
    'Sunflowers': 4,
    'Tulips': 5
}
INV_CLASS_MAP = {v: k for k, v in CLASS_MAP.items()}

In [3]:
class FlowerDataset(Dataset):
    """Custom Dataset for flower images."""
    def __init__(self, clean_dir, noisy_dir, transform=None):
        self.clean_dir = clean_dir
        self.noisy_dir = noisy_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        for class_name in os.listdir(clean_dir):
            class_clean_path = os.path.join(clean_dir, class_name)
            class_noisy_path = os.path.join(noisy_dir, class_name)
            if os.path.isdir(class_clean_path):
                for img_name in os.listdir(class_clean_path):
                    self.image_paths.append((
                        os.path.join(class_clean_path, img_name),
                        os.path.join(class_noisy_path, img_name)
                    ))
                    self.labels.append(CLASS_MAP[class_name.capitalize()])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        clean_path, noisy_path = self.image_paths[idx]
        clean_image = Image.open(clean_path).convert('RGB')
        noisy_image = Image.open(noisy_path).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            seed = np.random.randint(2147483647)
            torch.manual_seed(seed)
            clean_image = self.transform(clean_image)
            torch.manual_seed(seed)
            noisy_image = self.transform(noisy_image)

        return noisy_image, clean_image, label

In [4]:
class UNet(nn.Module):
    """U-Net architecture for image denoising."""
    def __init__(self, in_channels=3, out_channels=3):
        super(UNet, self).__init__()
        def CBR(in_c, out_c):
            return nn.Sequential(
                nn.Conv2d(in_c, out_c, kernel_size=3, padding=1), nn.BatchNorm2d(out_c), nn.ReLU(inplace=True),
                nn.Conv2d(out_c, out_c, kernel_size=3, padding=1), nn.BatchNorm2d(out_c), nn.ReLU(inplace=True)
            )
        self.enc1, self.enc2, self.enc3 = CBR(in_channels, 64), CBR(64, 128), CBR(128, 256)
        self.pool = nn.MaxPool2d(2)
        self.bottleneck = CBR(256, 512)
        self.up3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec3 = CBR(512, 256)
        self.up2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec2 = CBR(256, 128)
        self.up1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec1 = CBR(128, 64)
        self.final_conv = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x):
        e1 = self.enc1(x); e2 = self.enc2(self.pool(e1)); e3 = self.enc3(self.pool(e2))
        b = self.bottleneck(self.pool(e3))
        d3 = self.dec3(torch.cat((self.up3(b), e3), dim=1))
        d2 = self.dec2(torch.cat((self.up2(d3), e2), dim=1))
        d1 = self.dec1(torch.cat((self.up1(d2), e1), dim=1))
        return self.final_conv(d1)


In [5]:
def train_denoiser(dataloaders, device):
    """Trains the U-Net denoiser model."""
    model = UNet().to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    num_epochs = 25
    best_model_wts, best_psnr = copy.deepcopy(model.state_dict()), 0.0

    print("--- Training Denoising Model (U-Net) ---")
    for epoch in range(num_epochs):
        for phase in ['train', 'val']:
            model.train() if phase == 'train' else model.eval()
            running_loss, epoch_psnr_scores = 0.0, []
            for noisy_imgs, clean_imgs, _ in dataloaders[phase]:
                noisy_imgs, clean_imgs = noisy_imgs.to(device), clean_imgs.to(device)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(noisy_imgs)
                    loss = criterion(outputs, clean_imgs)
                    if phase == 'train':
                        loss.backward(); optimizer.step()
                running_loss += loss.item() * noisy_imgs.size(0)
                if phase == 'val':
                    outputs = torch.clamp(outputs, 0, 1)
                    clean_imgs_np = np.transpose(clean_imgs.cpu().numpy(), (0, 2, 3, 1))
                    outputs_np = np.transpose(outputs.cpu().numpy(), (0, 2, 3, 1))
                    for i in range(clean_imgs_np.shape[0]):
                        epoch_psnr_scores.append(psnr(clean_imgs_np[i], outputs_np[i], data_range=1.0))
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            if phase == 'val':
                epoch_psnr = np.mean(epoch_psnr_scores)
                print(f"Epoch {epoch+1}/{num_epochs} | {phase.capitalize()} Loss: {epoch_loss:.4f} PSNR: {epoch_psnr:.4f}")
                if epoch_psnr > best_psnr:
                    best_psnr, best_model_wts = epoch_psnr, copy.deepcopy(model.state_dict())
            else:
                print(f"Epoch {epoch+1}/{num_epochs} | {phase.capitalize()} Loss: {epoch_loss:.4f}")
    print(f"Best validation PSNR for denoiser: {best_psnr:.4f}")
    model.load_state_dict(best_model_wts)
    return model

In [6]:
class ResidualBlock(nn.Module):
    """A standard residual block."""
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        return F.relu(out)

In [7]:
class ResNetClassifier(nn.Module):
    """A custom ResNet-style classifier built from scratch."""
    def __init__(self, block, num_blocks, num_classes=5):
        super(ResNetClassifier, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.linear = nn.Linear(512, num_classes)
    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for s in strides:
            layers.append(block(self.in_channels, out_channels, s)); self.in_channels = out_channels
        return nn.Sequential(*layers)
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out); out = self.layer2(out); out = self.layer3(out); out = self.layer4(out)
        out = self.avg_pool(out); out = out.view(out.size(0), -1)
        return self.linear(out)


In [8]:
def train_classifier(dataloaders, num_classes, device):
    """Trains the simplified ResNet-style classifier."""
    model = ResNetClassifier(ResidualBlock, [2, 2, 2, 2], num_classes).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    num_epochs = 30
    best_model_wts, best_acc = copy.deepcopy(model.state_dict()), 0.0

    print("\n--- Training Classifier (Simplified ResNet) ---")
    for epoch in range(num_epochs):
        for phase in ['train', 'val']:
            model.train() if phase == 'train' else model.eval()
            running_loss, running_corrects = 0.0, 0
            for _, clean_imgs, labels in dataloaders[phase]:
                clean_imgs, labels = clean_imgs.to(device), (labels - 1).to(device)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(clean_imgs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    if phase == 'train':
                        loss.backward(); optimizer.step()
                running_loss += loss.item() * clean_imgs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
            print(f"Epoch {epoch+1}/{num_epochs} | {phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")
            if phase == 'val' and epoch_acc > best_acc:
                best_acc, best_model_wts = epoch_acc, copy.deepcopy(model.state_dict())
    print(f"Best validation accuracy for classifier: {best_acc:.4f}")
    model.load_state_dict(best_model_wts)
    return model

In [9]:
def denoise_test_images(denoiser, test_dir, output_dir, device):
    if not os.path.exists(output_dir): os.makedirs(output_dir)
    denoiser.eval()
    test_images, test_image_names = [], []
    transform = transforms.Compose([transforms.Resize((IMG_HEIGHT, IMG_WIDTH)), transforms.ToTensor()])
    print("\n--- Denoising Test Images ---")
    with torch.no_grad():
        for img_name in sorted(os.listdir(test_dir)):
            if img_name.endswith(('.png', '.jpg', '.jpeg')):
                img_path = os.path.join(test_dir, img_name)
                noisy_image = Image.open(img_path).convert('RGB')
                noisy_tensor = transform(noisy_image).unsqueeze(0).to(device)
                denoised_tensor = torch.clamp(denoiser(noisy_tensor).squeeze(0), 0, 1)
                save_path = os.path.join(output_dir, img_name)
                transforms.ToPILImage()(denoised_tensor.cpu()).save(save_path)
                test_images.append(denoised_tensor.cpu().permute(1, 2, 0).numpy())
                test_image_names.append(img_name)
    return np.array(test_images), test_image_names

In [10]:
def classify_denoised_images(classifier, denoised_images, test_transform, device):
    classifier.eval()
    predicted_labels = []
    print("\n--- Classifying Denoised Images ---")
    with torch.no_grad():
        for img_array in denoised_images:
            img_pil = Image.fromarray((img_array * 255).astype(np.uint8))
            img_tensor = test_transform(img_pil).unsqueeze(0).to(device)
            outputs = classifier(img_tensor)
            _, predicted = torch.max(outputs.data, 1)
            predicted_labels.append(predicted.item() + 1)
    return predicted_labels

In [11]:
def evaluate_denoising(original_noisy_dir, denoised_dir):
    psnr_scores, ssim_scores = [], []
    print("\n--- Evaluating Denoising Performance ---")
    for img_name in sorted(os.listdir(denoised_dir)):
        if img_name.endswith(('.png', '.jpg', '.jpeg')):
            original_noisy_path = os.path.join(original_noisy_dir, img_name)
            denoised_path = os.path.join(denoised_dir, img_name)
            original_noisy_img = np.array(Image.open(original_noisy_path).resize((IMG_WIDTH, IMG_HEIGHT))) / 255.0
            denoised_img = np.array(Image.open(denoised_path).resize((IMG_WIDTH, IMG_HEIGHT))) / 255.0
            psnr_scores.append(psnr(original_noisy_img, denoised_img, data_range=1.0))
            ssim_scores.append(ssim(original_noisy_img, denoised_img, multichannel=True, data_range=1.0, channel_axis=2))
    print(f"Average PSNR of noisy images (vs denoised): {np.mean(psnr_scores):.4f}")
    print(f"Average SSIM of noisy images (vs denoised): {np.mean(ssim_scores):.4f}")

In [14]:
if __name__ == '__main__':
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    train_transform = transforms.Compose([
        transforms.Resize((IMG_HEIGHT, IMG_WIDTH)), transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15), transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.ToTensor(), transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])
    val_test_transform = transforms.Compose([
        transforms.Resize((IMG_HEIGHT, IMG_WIDTH)), transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])
    denoise_transform = transforms.Compose([transforms.Resize((IMG_HEIGHT, IMG_WIDTH)), transforms.ToTensor()])

    full_denoise_dataset = FlowerDataset(TRAIN_CLEAN_DIR, TRAIN_NOISY_DIR, transform=denoise_transform)
    full_classify_dataset = FlowerDataset(TRAIN_CLEAN_DIR, TRAIN_NOISY_DIR)
    
    val_split = 0.2
    train_size = int((1 - val_split) * len(full_denoise_dataset))
    val_size = len(full_denoise_dataset) - train_size
    
    train_denoise_ds, val_denoise_ds = random_split(full_denoise_dataset, [train_size, val_size])
    train_classify_ds, val_classify_ds = random_split(full_classify_dataset, [train_size, val_size])
    train_classify_ds.dataset.transform = train_transform
    val_classify_ds.dataset.transform = val_test_transform

    denoise_loaders = {'train': DataLoader(train_denoise_ds, batch_size=16, shuffle=True), 'val': DataLoader(val_denoise_ds, batch_size=16)}
    classify_loaders = {'train': DataLoader(train_classify_ds, batch_size=32, shuffle=True), 'val': DataLoader(val_classify_ds, batch_size=32)}

    denoiser_model = train_denoiser(denoise_loaders, device)
    denoised_test_images, test_image_names = denoise_test_images(denoiser_model, TEST_NOISY_DIR, DENOISED_OUTPUT_DIR, device)

    classifier_model = train_classifier(classify_loaders, len(CLASS_MAP), device)
    predicted_labels = classify_denoised_images(classifier_model, denoised_test_images, val_test_transform, device)

    evaluate_denoising(TEST_NOISY_DIR, DENOISED_OUTPUT_DIR)

    submission_df = pd.DataFrame({'Images': test_image_names, 'Predicted_Classes': predicted_labels})
    submission_df.to_csv(SUBMISSION_FILE, index=False)
    print(f"\nSubmission file '{SUBMISSION_FILE}' created successfully.")


Using device: cuda
--- Training Denoising Model (U-Net) ---
Epoch 1/25 | Train Loss: 0.0445
Epoch 1/25 | Val Loss: 0.0067 PSNR: 22.0516
Epoch 2/25 | Train Loss: 0.0090
Epoch 2/25 | Val Loss: 0.0058 PSNR: 22.6389
Epoch 3/25 | Train Loss: 0.0083
Epoch 3/25 | Val Loss: 0.0057 PSNR: 22.6857
Epoch 4/25 | Train Loss: 0.0073
Epoch 4/25 | Val Loss: 0.0057 PSNR: 22.7566
Epoch 5/25 | Train Loss: 0.0070
Epoch 5/25 | Val Loss: 0.0054 PSNR: 22.9190
Epoch 6/25 | Train Loss: 0.0072
Epoch 6/25 | Val Loss: 0.0059 PSNR: 22.5725
Epoch 7/25 | Train Loss: 0.0067
Epoch 7/25 | Val Loss: 0.0050 PSNR: 23.2271
Epoch 8/25 | Train Loss: 0.0065
Epoch 8/25 | Val Loss: 0.0051 PSNR: 23.2583
Epoch 9/25 | Train Loss: 0.0059
Epoch 9/25 | Val Loss: 0.0049 PSNR: 23.3363
Epoch 10/25 | Train Loss: 0.0062
Epoch 10/25 | Val Loss: 0.0059 PSNR: 22.7703
Epoch 11/25 | Train Loss: 0.0060
Epoch 11/25 | Val Loss: 0.0048 PSNR: 23.4108
Epoch 12/25 | Train Loss: 0.0060
Epoch 12/25 | Val Loss: 0.0064 PSNR: 22.9412
Epoch 13/25 | Train Lo