In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from tqdm import tqdm

train_dir = "/kaggle/input/augmented-hardhat-vest-dataset/DIV2K_train_HR/"  
output_dir = '/kaggle/working/'
os.makedirs(output_dir + "hr_images/", exist_ok=True)
os.makedirs(output_dir + "lr_images/", exist_ok=True)

def is_image_file(filename):
    return any(filename.endswith(extension) for extension in ['.png', '.jpg', '.jpeg', '.PNG', '.JPG', '.JPEG'])

for img in os.listdir(train_dir):
    if is_image_file(img):
        img_array = cv2.imread(train_dir + img)
        img_array = cv2.resize(img_array, (128, 128)) # bicubic interpolation
        lr_img_array = cv2.resize(img_array, (32, 32)) # bicubic interpolation
        cv2.imwrite(output_dir + "hr_images/" + img, img_array)
        cv2.imwrite(output_dir + "lr_images/" + img, lr_img_array)

lr_list = os.listdir(output_dir + "lr_images")
hr_list = os.listdir(output_dir + "hr_images")

lr_images = []
hr_images = []
for lr_img, hr_img in zip(lr_list, hr_list):
    img_lr = cv2.imread(output_dir + "lr_images/" + lr_img)
    img_lr = cv2.cvtColor(img_lr, cv2.COLOR_BGR2RGB)
    img_hr = cv2.imread(output_dir + "hr_images/" + hr_img)
    img_hr = cv2.cvtColor(img_hr, cv2.COLOR_BGR2RGB) # BGR to RGB
    lr_images.append(img_lr)
    hr_images.append(img_hr)

lr_images = np.array(lr_images)
hr_images = np.array(hr_images)

lr_images = lr_images / 255. # Devide to float
hr_images = hr_images / 255. # RGB value: 8 bit

lr_train, lr_test, hr_train, hr_test = train_test_split(lr_images, hr_images, test_size=0.33, random_state=42)

# array numpy: N H W C -> tensor pytorch: N C H W
lr_train = torch.from_numpy(lr_train).permute(0, 3, 1, 2).float()
hr_train = torch.from_numpy(hr_train).permute(0, 3, 1, 2).float()
lr_test = torch.from_numpy(lr_test).permute(0, 3, 1, 2).float()
hr_test = torch.from_numpy(hr_test).permute(0, 3, 1, 2).float()


class ResBlock(nn.Module):
    def __init__(self):
        super(ResBlock, self).__init__()
        self.conv1 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64) # Normalize data based on the mean and variance of a mini-batch -> faster convergence
        self.prelu = nn.PReLU()
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.prelu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += residual # residual connection
        return out


class UpscaleBlock(nn.Module):
    def __init__(self):
        super(UpscaleBlock, self).__init__()
        self.conv = nn.Conv2d(64, 256, kernel_size=3, padding=1)
        self.pixel_shuffle = nn.PixelShuffle(2) # Upsampling
        self.prelu = nn.PReLU()

    def forward(self, x):
        out = self.conv(x)
        out = self.pixel_shuffle(out)
        out = self.prelu(out)
        return out


class Generator(nn.Module):
    def __init__(self, num_res_blocks=16):
        super(Generator, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=9, padding=4)
        self.prelu = nn.PReLU()
        self.res_blocks = nn.Sequential(*[ResBlock() for _ in range(num_res_blocks)]) # * unpacking
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn = nn.BatchNorm2d(64)
        self.upscale1 = UpscaleBlock()
        self.upscale2 = UpscaleBlock()
        self.conv3 = nn.Conv2d(64, 3, kernel_size=9, padding=4)

    def forward(self, x):
        out1 = self.conv1(x)
        out1 = self.prelu(out1)
        out = self.res_blocks(out1)
        out = self.conv2(out)
        out = self.bn(out)
        out += out1
        out = self.upscale1(out)
        out = self.upscale2(out)
        out = self.conv3(out)
        return out



class DiscriminatorBlock(nn.Module):
    def __init__(self, in_filters, out_filters, strides=1, bn=True): 
        super(DiscriminatorBlock, self).__init__()
        self.conv = nn.Conv2d(in_filters, out_filters, kernel_size=3, stride=strides, padding=1)
        self.bn = nn.BatchNorm2d(out_filters) if bn else None # bn=True -> batch normalization
        self.lrelu = nn.LeakyReLU(0.2)

    def forward(self, x):
        out = self.conv(x)
        if self.bn:
            out = self.bn(out)
        out = self.lrelu(out)
        return out


class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.block1 = DiscriminatorBlock(3, 64, bn=False)
        self.block2 = DiscriminatorBlock(64, 64, strides=2)
        self.block3 = DiscriminatorBlock(64, 128)
        self.block4 = DiscriminatorBlock(128, 128, strides=2)
        self.block5 = DiscriminatorBlock(128, 256)
        self.block6 = DiscriminatorBlock(256, 256, strides=2)
        self.block7 = DiscriminatorBlock(256, 512)
        self.block8 = DiscriminatorBlock(512, 512, strides=2)

        self.adaptive_pool = nn.AdaptiveAvgPool2d((6, 6)) # pooling: downsample feature map (average pixel value) -> 6x6
        self.flatten = nn.Flatten() # C_tensor=1 to connect to fully-connected layers (dense layers)
        self.linear1 = nn.Linear(512 * 6 * 6, 1024) # fully connected layer (dense layer) with 1024 neuron
        self.lrelu = nn.LeakyReLU(0.2) 
        self.linear2 = nn.Linear(1024, 1) # 1 neuron


    def forward(self, x):
        out = self.block1(x)
        out = self.block2(out)
        out = self.block3(out)
        out = self.block4(out)
        out = self.block5(out)
        out = self.block6(out)
        out = self.block7(out)
        out = self.block8(out)
        out = self.adaptive_pool(out)
        out = self.flatten(out)

        out = self.linear1(out)
        out = self.lrelu(out)
        out = self.linear2(out)


        return out



class VGGFeatureExtractor(nn.Module):
    def __init__(self, vgg_weights_path):
        super(VGGFeatureExtractor, self).__init__()
        vgg19 = models.vgg19()
        if vgg_weights_path:
            state_dict = torch.load(vgg_weights_path)
            vgg19.load_state_dict(state_dict, strict=False)  


        self.features = nn.Sequential(*list(vgg19.features.children())[:11]) 
        for param in self.features.parameters():
            param.requires_grad = False # not update parameters


    def forward(self, x):
        return self.features(x)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
generator = Generator().to(device) # move weights and tensors to device (cpu or cuda)
discriminator = Discriminator().to(device)


vgg_weights_path = '/kaggle/input/vgg19-weights-pth/vgg19-dcbb9e9d.pth' 
vgg = VGGFeatureExtractor(vgg_weights_path).to(device)


criterion_gan = nn.BCEWithLogitsLoss() # D loss (minimize  binary cross-entropy loss with sigmoid)
criterion_content = nn.MSELoss() # G loss (minimize MSE loss)

optimizer_g = optim.Adam(generator.parameters(), lr=0.0001) # Adam optimizer with learning rate = 0.0001
optimizer_d = optim.Adam(discriminator.parameters(), lr=0.0001)

batch_size = 8 
epochs = 1000

for epoch in range(epochs):
    g_losses = []
    d_losses = []
    for i in tqdm(range(0, len(lr_train), batch_size)):
        lr_batch = lr_train[i:i + batch_size].to(device)
        hr_batch = hr_train[i:i + batch_size].to(device)

        optimizer_d.zero_grad() # reset gradient of weights and biases (= 0) in Discriminator
        fake_images = generator(lr_batch)
        real_labels = torch.ones(lr_batch.size(0), 1).to(device) # label HR tensor Nx1 = 1
        fake_labels = torch.zeros(lr_batch.size(0), 1).to(device) # label SR tensor Nx1 = 0

        d_loss_real = criterion_gan(discriminator(hr_batch), real_labels)
        d_loss_fake = criterion_gan(discriminator(fake_images.detach()), fake_labels) # detach SR from computation graph
        d_loss = (d_loss_real + d_loss_fake) / 2
        d_loss.backward() # backpropagation (calculate gradient)
        optimizer_d.step() # update weights and biases of D

        optimizer_g.zero_grad() # reset gradient of weights and biases (= 0) in Generator
        g_loss_gan = criterion_gan(discriminator(fake_images), real_labels)
        vgg_features_real = vgg(hr_batch)
        vgg_features_fake = vgg(fake_images)
        g_loss_content = criterion_content(vgg_features_fake, vgg_features_real)
        g_loss = 0.001 * g_loss_gan + g_loss_content
        g_loss.backward()
        optimizer_g.step()

        g_losses.append(g_loss.item()) # .item(): get scalar (number)
        d_losses.append(d_loss.item())

    print(f"Epoch {epoch+1}/{epochs}, G Loss: {np.mean(g_losses):.4f}, D Loss: {np.mean(d_losses):.4f}")
torch.save(generator.state_dict(), "final_generator.pt")




In [None]:

generator.load_state_dict(torch.load("final_generator.pt"))
generator.eval() # change generator to eval mode (turn off BN and Dropout)

ix = np.random.randint(0, len(lr_test), 1) # random int num in range
lr_image = lr_test[ix].to(device)
hr_image = hr_test[ix].to(device)

with torch.no_grad(): # off gradient calculation
    sr_image = generator(lr_image)

vgg_features_real = vgg(hr_image)
vgg_features_fake = vgg(sr_image)
content_loss = criterion_content(vgg_features_fake, vgg_features_real).item()  

lr_image = lr_image.cpu().squeeze(0).permute(1, 2, 0).numpy() 
hr_image = hr_image.cpu().squeeze(0).permute(1, 2, 0).numpy()
sr_image = sr_image.cpu().squeeze(0).permute(1, 2, 0).numpy()
# GPU -> CPU, delete N(=1), C H W -> H W C -> numpy (to use matplotlib imshow)


plt.figure(figsize=(12, 6))

plt.subplot(1, 3, 1)
plt.title("LR Image")
plt.imshow(lr_image)
plt.axis("off")

plt.subplot(1, 3, 2)
plt.title("SR Image")
plt.imshow(sr_image)
plt.axis("off")


plt.subplot(1, 3, 3)
plt.title("HR Image")
plt.imshow(hr_image)
plt.axis("off")

plt.show()


print(f"Content Loss for SR Image: {content_loss}")