In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd drive/My \Drive/Acad/ADS/Project2/

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Define DenseLayer class, which represents a single dense layer
class DenseLayer(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DenseLayer, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=3 // 2)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        return torch.cat([x, self.relu(self.conv(x))], 1)

# Define RDB (Residual Dense Block) class
class RDB(nn.Module):
    def __init__(self, in_channels, growth_rate, num_layers):
        super(RDB, self).__init__()
        self.layers = nn.Sequential(*[DenseLayer(in_channels + growth_rate * i, growth_rate) for i in range(num_layers)])

        # local feature fusion
        self.lff = nn.Conv2d(in_channels + growth_rate * num_layers, growth_rate, kernel_size=1)

    def forward(self, x):
        # local residual learning
        return x + self.lff(self.layers(x))

# Define RDN (Residual Dense Network) class
class RDN(nn.Module):
    def __init__(self, num_channels=3, num_features=32, growth_rate=32, num_blocks=8, num_layers=6):
        super(RDN, self).__init__()
        self.G0 = num_features
        self.G = growth_rate
        self.D = num_blocks
        self.C = num_layers

        # shallow feature extraction
        self.sfe1 = nn.Conv2d(num_channels, num_features, kernel_size=3, padding=3 // 2)
        self.sfe2 = nn.Conv2d(num_features, num_features, kernel_size=3, padding=3 // 2)

        # residual dense blocks
        self.rdbs = nn.ModuleList([RDB(self.G0, self.G, self.C)])
        for _ in range(self.D - 1):
            self.rdbs.append(RDB(self.G, self.G, self.C))

        # global feature fusion
        self.gff = nn.Sequential(
            nn.Conv2d(self.G * self.D, self.G0, kernel_size=1),
            nn.Conv2d(self.G0, self.G0, kernel_size=3, padding=3 // 2)
        )

        self.output = nn.Conv2d(self.G0, 1, kernel_size=3, padding=3 // 2)

        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        sfe1 = self.sfe1(x)
        sfe2 = self.sfe2(sfe1)

        x = sfe2
        local_features = []
        for i in range(self.D):
            x = self.rdbs[i](x)
            local_features.append(x)

        # global residual learning
        x = self.gff(torch.cat(local_features, 1)) + sfe1
        x = self.output(x)
        return self.sigmoid(x)

# Set the device to use for training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Pass the model to the device
model = RDN().to(device)

In [None]:
'''
Notebook to train Residual Dense Net Model
'''

# Import required libraries
import os
import numpy as np
import torch
import torch.nn as nn
from matplotlib import pyplot as plt
from tqdm import tqdm
from torch import optim
import torchvision
from PIL import Image
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

def dice_loss(pred, target, smooth = 1e-6):
    intersection = (pred * target).sum()
    union = pred.sum() + target.sum()

    dice = (2. * intersection + smooth) / (union + smooth)
    return 1 - dice

class CombinedLoss(nn.Module):
    def __init__(self, alpha=0.5):
        super(CombinedLoss, self).__init__()
        self.alpha = alpha

    def forward(self, pred, target):
        bce = F.binary_cross_entropy(pred, target, reduction='mean')
        dice = dice_loss(pred, target)
        return self.alpha * bce + (1 - self.alpha) * dice

# Load training data
# Labels (Outputs)
x_trainHR = np.load('./Data/train_mask.npy').astype(np.float32)
# Images (Conditions)
x_trainLR = np.load('./Data/train_img.npy').astype(np.float32)
x_trainHR = torch.Tensor(x_trainHR)
x_trainLR = torch.Tensor(x_trainLR)
# Print data dimensions
print(x_trainHR.shape)
print(x_trainLR.shape)

# Create dataset and dataloader for efficient data loading and batching
dataset = TensorDataset(x_trainHR,x_trainLR)
dataloader = DataLoader(dataset, batch_size=5)

l = len(dataloader)
device = "cuda"
optimizer = optim.AdamW(model.parameters(), lr=3e-4)
loss_function = CombinedLoss(alpha=0.5)
n_epochs = 500
scheduler = optim.lr_scheduler.OneCycleLR(optimizer, 2e-3, epochs=n_epochs, steps_per_epoch=x_trainHR.shape[0])

for epoch in range(n_epochs):
    print(f"Starting epoch {epoch + 1}:")

    epoch_loss = 0
    pbar = tqdm(dataloader)

    for labels, images in pbar:
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = loss_function(outputs, labels)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # Update the learning rate
        scheduler.step()

        epoch_loss += loss.item()
        pbar.set_postfix(Loss=loss.item())

    avg_loss = epoch_loss / len(dataloader)
    print(f'Average Loss for Epoch {epoch + 1}: {avg_loss:.5f}\n')

    # Save model weights
    torch.save(model, os.path.join("Weights", f"RDN_ckpt_1.pt"))