# <ins>CNN Homework</ins>
### David Fan 范庭維 312831023


In [1]:
from __future__ import print_function, division
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data
import torch

### First, let's begin with creating the U-Net architecture!

In [2]:
#helper functions
class conv_block(nn.Module):
    """
    Convolution Block 
    """
    def __init__(self, in_ch, out_ch):
        super(conv_block, self).__init__()
        
        self.conv = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, kernel_size=3, stride=1, padding=1, bias=True),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_ch, out_ch, kernel_size=3, stride=1, padding=1, bias=True),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True))

    def forward(self, x):

        x = self.conv(x)
        return x

class up_conv(nn.Module):
    """
    Up Convolution Block
    """
    def __init__(self, in_ch, out_ch):
        super(up_conv, self).__init__()
        self.up = nn.Sequential(
            nn.Upsample(scale_factor=2),
            nn.Conv2d(in_ch, out_ch, kernel_size=3, stride=1, padding=1, bias=True),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.up(x)
        return x

In [3]:
class U_Net(nn.Module):
    def __init__(self, in_ch=3, out_ch=1):
        super(U_Net, self).__init__()

        n1 = 64
        filters = [n1, n1 * 2, n1 * 4, n1 * 8, n1 * 16]
        
        self.Maxpool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.Maxpool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.Maxpool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.Maxpool4 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.Conv1 = conv_block(in_ch, filters[0])
        self.Conv2 = conv_block(filters[0], filters[1])
        self.Conv3 = conv_block(filters[1], filters[2])
        self.Conv4 = conv_block(filters[2], filters[3])
        self.Conv5 = conv_block(filters[3], filters[4])

        self.Up5 = up_conv(filters[4], filters[3])
        self.Up_conv5 = conv_block(filters[4], filters[3])

        self.Up4 = up_conv(filters[3], filters[2])
        self.Up_conv4 = conv_block(filters[3], filters[2])

        self.Up3 = up_conv(filters[2], filters[1])
        self.Up_conv3 = conv_block(filters[2], filters[1])

        self.Up2 = up_conv(filters[1], filters[0])
        self.Up_conv2 = conv_block(filters[1], filters[0])

        self.Conv = nn.Conv2d(filters[0], out_ch, kernel_size=1, stride=1, padding=0)

       # self.active = torch.nn.Sigmoid()

    def forward(self, x):

        e1 = self.Conv1(x)

        e2 = self.Maxpool1(e1)
        e2 = self.Conv2(e2)

        e3 = self.Maxpool2(e2)
        e3 = self.Conv3(e3)

        e4 = self.Maxpool3(e3)
        e4 = self.Conv4(e4)

        e5 = self.Maxpool4(e4)
        e5 = self.Conv5(e5)

        d5 = self.Up5(e5)
        d5 = torch.cat((e4, d5), dim=1)

        d5 = self.Up_conv5(d5)

        d4 = self.Up4(d5)
        d4 = torch.cat((e3, d4), dim=1)
        d4 = self.Up_conv4(d4)

        d3 = self.Up3(d4)
        d3 = torch.cat((e2, d3), dim=1)
        d3 = self.Up_conv3(d3)

        d2 = self.Up2(d3)
        d2 = torch.cat((e1, d2), dim=1)
        d2 = self.Up_conv2(d2)

        out = self.Conv(d2)

        #d1 = self.active(out)

        return out

### Now it's time to import the underwater images for our model!
### I decided to merge the underwater_imagenet and underwater_scenes image datasets into one and use a 80-20 train-test split on it.

In [4]:
import torch
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.datasets import ImageFolder
from PIL import Image  # Make sure to import Image from the PIL library
import os
import numpy as np

class CustomDataset(Dataset):
    def __init__(self, root_dir, transform=None, target_size=(256, 256), file_list=None):
        self.root_dir = root_dir
        self.transform = transform
        self.target_size = target_size  # Set the target size for resizing

        # Use the provided file list or list all the image files in the folder
        self.file_list = file_list if file_list is not None else sorted(os.listdir(root_dir))

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        img_filename = self.file_list[idx]
        img_path = os.path.join(self.root_dir, img_filename)

        img = Image.open(img_path).convert('RGB')

        # Resize the image to the target size
        img = img.resize(self.target_size, Image.BICUBIC)

        if self.transform:
            img = self.transform(img)

        return img


# Set the path to your dataset
dataset_path_A = r'./underwater/trainA'
dataset_path_B = r'./underwater/trainB'

# Define data transformations (including resizing and normalization)
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize images to the target size
    transforms.ToTensor(),           # Convert to tensor
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize
])

from sklearn.model_selection import train_test_split

# Set the seed for reproducibility
np.random.seed(42)

# List all the image files in the folder
all_image_files = sorted(os.listdir(dataset_path_A))

# Split the dataset into training and test sets (80-20%)
train_files_A, test_files_A = train_test_split(all_image_files, test_size=0.2, random_state=42)

# Create custom datasets for training and test
trainA_dataset = CustomDataset(dataset_path_A, transform=transform, file_list=train_files_A)
testA_dataset = CustomDataset(dataset_path_A, transform=transform, file_list=test_files_A)

trainB_dataset = CustomDataset(dataset_path_B, transform=transform, file_list=train_files_A)
testB_dataset = CustomDataset(dataset_path_B, transform=transform, file_list=test_files_A)

# Create data loaders
BATCH_SIZE = 32
trainA_loader = DataLoader(trainA_dataset, batch_size=BATCH_SIZE, shuffle=False)
testA_loader = DataLoader(testA_dataset, batch_size=BATCH_SIZE, shuffle=False)

trainB_loader = DataLoader(trainB_dataset, batch_size=BATCH_SIZE, shuffle=False)
testB_loader = DataLoader(testB_dataset, batch_size=BATCH_SIZE, shuffle=False)


#### Ignore this code below, it is just to check my dataset ordering.

In [5]:
# import matplotlib.pyplot as plt
# import numpy as np

# def show_image(dataset, index):
#     img = dataset[index]
#     img_np = img.permute(1, 2, 0).numpy()  # Convert tensor to NumPy array
#     plt.imshow(img_np)
#     plt.title(f'Dataset: {dataset.root_dir}, Image {index}')
#     plt.show()

# # Show an image from trainA_dataset
# show_image(trainA_dataset, 2)

# # Show an image from trainB_dataset
# show_image(trainB_dataset, 2)

# # Show an image from valA_dataset
# show_image(valA_dataset, 2)

# # Show an image from valB_dataset
# show_image(valB_dataset, 2)

### Defining some required and helpful functions...

In [6]:
from torchvision.utils import make_grid
import matplotlib.pyplot as plt

num_samples=1

def plot_images(input_images, output_images, target_images, epoch):
    input_grid = make_grid(input_images.cpu(), nrow=num_samples, normalize=True)
    output_grid = make_grid(output_images.cpu(), nrow=num_samples, normalize=True)
    target_grid = make_grid(target_images.cpu(), nrow=num_samples, normalize=True)

    plt.figure(figsize=(18, 6))
    plt.subplot(131)
    plt.imshow(input_grid.permute(1, 2, 0).numpy())
    plt.title(f'Epoch {epoch + 1} - Input Images')

    plt.subplot(132)
    plt.imshow(output_grid.permute(1, 2, 0).numpy())
    plt.title(f'Epoch {epoch + 1} - Output Images')

    plt.subplot(133)
    plt.imshow(target_grid.permute(1, 2, 0).numpy())
    plt.title(f'Epoch {epoch + 1} - Target Images')

    plt.show()

def PSNR(img,target): # tensor / tensor
    mse = torch.mean((img - target) ** 2)
    return 10 * torch.log10(1 / mse)

### Setting up our U-Net model...

In [7]:
# Import the learning rate scheduler
from torch.optim.lr_scheduler import StepLR

# Define hyperparameters
batch_size = 16
learning_rate = 1e-4
epochs = 50

# Initialize model, loss function, optimizer, and learning rate scheduler
model = U_Net(in_ch=3, out_ch=3).cuda()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size=20, gamma=0.5)  # Adjust parameters as needed


### Training and saving our U-Net model.

In [None]:
from tqdm import tqdm
import numpy as np
from pytorch_msssim import ssim

# Lists to store training losses for plotting
train_losses = []
print_interval=10
num_samples=1

# Training loop
for epoch in range(epochs):
    model.train()

    with tqdm(zip(trainA_loader, trainB_loader), total=len(trainA_loader)) as t:
        for batch_A, batch_B in t:
            input_images = batch_A.cuda()
            target_images = batch_B.cuda()

            optimizer.zero_grad()
            outputs = model(input_images)
            loss = criterion(outputs, target_images)
            # Compute PSNR and SSIM
            psnrV = PSNR(outputs, target_images)
            ssimV = ssim(outputs, target_images, data_range=1.0, size_average=True)

            # Add PSNR and SSIM to the progress bar
            t.set_postfix({'Loss': loss.item(), 'PSNR': psnrV.item(), 'SSIM': ssimV.item()})

            # Backpropagation and optimization steps
            loss.backward()
            optimizer.step()
        # Print input and output images after each epoch
        if (epoch % print_interval == 0):
            with torch.no_grad():
                # Take a few samples from the batch
                sample_indices = np.random.choice(len(input_images), size=num_samples, replace=False)
                sample_input_images = input_images[sample_indices]
                sample_target_images = target_images[sample_indices]
                sample_output_images = outputs[sample_indices]

                # Display the images
                plot_images(sample_input_images, sample_output_images, sample_target_images, epoch)

    scheduler.step()  # Step the learning rate scheduler

    print(f'Epoch {epoch + 1}/{epochs}, Training Loss: {loss} , PSNR: {psnrV}, SSIM: {ssimV}')
    train_losses.append(loss.item())

# Print final PSNR and SSIM values
print(f'Final PSNR: {psnrV.item()}')
print(f'Final SSIM: {ssimV.item()}')

# Save the trained model
torch.save(model.state_dict(), './trained_model.pth')


  0%|          | 0/148 [01:26<?, ?it/s, Loss=0.567, PSNR=2.46, SSIM=0.0132]

In [None]:
# Plot the training and validation losses
plt.figure(figsize=(10, 5))
plt.plot(range(1, epochs + 1), train_losses, label='Training Loss')
plt.title('Training Losses')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

### Looking at the downward training loss curve, it seems like our model has been able to learn something. Let's use the test dataset from earlier to verify this...

In [None]:
# Load the trained model
model = U_Net(in_ch=3, out_ch=3).cuda()
model.load_state_dict(torch.load("./trained_model.pth"))
model.eval()

# Lists to store test PSNR scores
img_psnr = []
out_psnr = []

# Testing loop
with torch.no_grad():
    for idx, (testA_batch, testB_batch) in tqdm(enumerate(zip(testA_loader, testB_loader)), total=len(testA_loader), desc='Testing Progress'):
        input_images = testA_batch.cuda()
        target_images = testB_batch.cuda()

        # Apply the model to input images
        outputs = model(input_images)

        # Compute PSNR scores
        psnr_input = PSNR(input_images, target_images)
        psnr_output = PSNR(outputs, target_images)

        # Append PSNR scores to the lists
        img_psnr.append(psnr_input.item())
        out_psnr.append(psnr_output.item())

        # Display images (you can modify this part as needed)
        with torch.no_grad():
            # Take a few samples from the batch
            sample_indices = np.random.choice(len(input_images), size=num_samples, replace=False)
            sample_input_images = input_images[sample_indices]
            sample_target_images = target_images[sample_indices]
            sample_output_images = outputs[sample_indices]

            # Display the images
            plot_images(sample_input_images, sample_output_images, sample_target_images, epoch)

# Compute and print the average PSNR for input and output
avg_psnr_input = np.mean(img_psnr)
avg_psnr_output = np.mean(out_psnr)
print(f'Average PSNR for Input Images: {avg_psnr_input}')
print(f'Average PSNR for Output Images: {avg_psnr_output}')

### Great! Let's check it's performance on the underwater_dark dataset now.

In [None]:
# Set the path to your dark dataset
dark_dataset_path_A = r'./underwater_dark/trainA'
dark_dataset_path_B = r'./underwater_dark/trainB'

# Create custom datasets for dark dataset
dark_dataset_A = CustomDataset(dark_dataset_path_A, transform=transform)
dark_dataset_B = CustomDataset(dark_dataset_path_B, transform=transform)

# Create data loaders for dark dataset
dark_loader_A = DataLoader(dark_dataset_A, batch_size=BATCH_SIZE, shuffle=False)
dark_loader_B = DataLoader(dark_dataset_B, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
# Load the trained model
model = U_Net(in_ch=3, out_ch=3).cuda()
model.load_state_dict(torch.load("./trained_model.pth"))
model.eval()

# Lists to store validation PSNR scores
dark_img_psnr = []
dark_out_psnr = []

# Testing loop on the dark dataset
with torch.no_grad():
    for idx, (darkA_batch, darkB_batch) in tqdm(enumerate(zip(dark_loader_A, dark_loader_B)), total=len(dark_loader_A), desc='Testing on Dark Dataset'):
        dark_input_images = darkA_batch.cuda()
        dark_target_images = darkB_batch.cuda()

        # Apply the model to input images
        dark_outputs = model(dark_input_images)

        # Compute PSNR scores
        dark_psnr_input = PSNR(dark_input_images, dark_target_images)
        dark_psnr_output = PSNR(dark_outputs, dark_target_images)

        # Append PSNR scores to the lists
        dark_img_psnr.append(dark_psnr_input.item())
        dark_out_psnr.append(dark_psnr_output.item())

        with torch.no_grad():
            # Take a few samples from the batch
            sample_indices = np.random.choice(len(dark_input_images), size=num_samples, replace=False)
            sample_dark_input_images = dark_input_images[sample_indices]
            sample_dark_target_images = dark_target_images[sample_indices]
            sample_dark_output_images = dark_outputs[sample_indices]

            # Display the images
            plot_images(sample_dark_input_images, sample_dark_output_images, sample_dark_target_images, epoch)

# Compute and print the average PSNR for input and output on the dark dataset
avg_dark_psnr_input = np.mean(dark_img_psnr)
avg_dark_psnr_output = np.mean(dark_out_psnr)
print(f'Average PSNR for Dark Input Images: {avg_dark_psnr_input}')
print(f'Average PSNR for Dark Output Images: {avg_dark_psnr_output}')

### Would the PSNR improve if I added a transformation for brightness?

In [None]:
from torchvision import transforms

# Define data transformations including ColorJitter
transform = transforms.Compose([
    transforms.ColorJitter(brightness=0.2),  # Adjust brightness with a random factor up to 0.2
    transforms.Resize((256, 256)),            # Resize images to the target size
    transforms.ToTensor(),                   # Convert to tensor
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize
])

# Create custom datasets for dark dataset
dark_dataset_A = CustomDataset(dark_dataset_path_A, transform=transform)
dark_dataset_B = CustomDataset(dark_dataset_path_B, transform=transform)

# Create data loaders for dark dataset
dark_loader_A = DataLoader(dark_dataset_A, batch_size=BATCH_SIZE, shuffle=False)
dark_loader_B = DataLoader(dark_dataset_B, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
# Load the trained model
model = U_Net(in_ch=3, out_ch=3).cuda()
model.load_state_dict(torch.load("./trained_model.pth"))
model.eval()

# Lists to store validation PSNR scores
dark_img_psnr = []
dark_out_psnr = []

# Testing loop on the dark dataset
with torch.no_grad():
    for idx, (darkA_batch, darkB_batch) in tqdm(enumerate(zip(dark_loader_A, dark_loader_B)), total=len(dark_loader_A), desc='Testing on Dark Dataset'):
        dark_input_images = darkA_batch.cuda()
        dark_target_images = darkB_batch.cuda()

        # Apply the model to input images
        dark_outputs = model(dark_input_images)

        # Compute PSNR scores
        dark_psnr_input = PSNR(dark_input_images, dark_target_images)
        dark_psnr_output = PSNR(dark_outputs, dark_target_images)

        # Append PSNR scores to the lists
        dark_img_psnr.append(dark_psnr_input.item())
        dark_out_psnr.append(dark_psnr_output.item())

        with torch.no_grad():
            # Take a few samples from the batch
            sample_indices = np.random.choice(len(dark_input_images), size=num_samples, replace=False)
            sample_dark_input_images = dark_input_images[sample_indices]
            sample_dark_target_images = dark_target_images[sample_indices]
            sample_dark_output_images = dark_outputs[sample_indices]

            # Display the images
            plot_images(sample_dark_input_images, sample_dark_output_images, sample_dark_target_images, epoch)

# Compute and print the average PSNR for input and output on the dark dataset
avg_dark_psnr_input = np.mean(dark_img_psnr)
avg_dark_psnr_output = np.mean(dark_out_psnr)
print(f'Average PSNR for transformed Dark Input Images: {avg_dark_psnr_input}')
print(f'Average PSNR for transformed Dark Output Images: {avg_dark_psnr_output}')