## Imports

In [None]:
import argparse
import os
import copy
import numpy as np
import PIL.Image as pil_image
from PIL import Image
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
from tensorflow.data import Dataset
import time

## Dataset

In [None]:
train_dir = '/DIV2K/DIV2K_train_HR/DIV2K_train_HR'
val_dir = '/DIV2K/DIV2K_valid_HR/DIV2K_valid_HR'

## Model

In [None]:
class FSRCNN(keras.Model):
    def __init__(self, scale_factor):

        super(FSRCNN, self).__init__()
        self.scale_factor = scale_factor

        self.feature_extraction = keras.Sequential([
            layers.Conv2D(56, kernel_size=5, padding='same', activation='relu', input_shape=(None, None, 3)),
            layers.PReLU()
        ])
        self.shrinking = keras.Sequential([
            layers.Conv2D(12, kernel_size=1, padding='same', activation='relu'),
            layers.PReLU()
        ])
        self.non_linear_mapping = keras.Sequential([
            layers.Conv2D(12, kernel_size=3, padding='same', activation='relu'),
            layers.PReLU(),
            layers.Conv2D(12, kernel_size=3, padding='same', activation='relu'),
            layers.PReLU(),
            layers.Conv2D(12, kernel_size=3, padding='same', activation='relu'),
            layers.PReLU(),
            layers.Conv2D(12, kernel_size=3, padding='same', activation='relu'),
            layers.PReLU(),
            layers.Conv2D(12, kernel_size=3, padding='same', activation='relu'),
            layers.PReLU()
        ])
        self.expanding = keras.Sequential([
            layers.Conv2D(56, kernel_size=1, padding='same', activation='relu'),
            layers.PReLU()
        ])
        self.deconvolution = layers.Conv2DTranspose(3, kernel_size=9, strides=scale_factor, padding='same')

    def call(self, x):
        x = self.feature_extraction(x)
        x = self.shrinking(x)
        x = self.non_linear_mapping(x)
        x = self.expanding(x)
        x = self.deconvolution(x)
        return x

## Dataloader / preprocessing

In [None]:
class DIV2KDataset(Dataset):
    def __init__(self, img_dir, scale_factor, desired_height, desired_width):
        super(DIV2KDataset, self).__init__()
        self.img_dir = img_dir
        self.scale_factor = scale_factor
        self.desired_height = desired_height
        self.desired_width = desired_width
        self.img_list = os.listdir(self.img_dir)

    def __getitem__(self, index):
        img_hr = Image.open(os.path.join(self.img_dir, self.img_list[index]))

        img_hr = img_hr.resize((self.desired_width, self.desired_height), Image.BICUBIC)

        img_lr = img_hr.resize((self.desired_width // self.scale_factor, self.desired_height // self.scale_factor), Image.BICUBIC)

        transform = keras.Sequential([
            keras.layers.Lambda(lambda x: tf.convert_to_tensor(x, dtype=tf.float32)),
            # Add additional transformations to HR and LR images if needed
            keras.layers.Lambda(lambda x: tf.image.rgb_to_grayscale(x)),
            # Add other transformations such as normalization if desired
        ])

        img_hr = transform(img_hr)
        img_lr = transform(img_lr)

        return img_hr, img_lr

    def __len__(self):
        return len(self.img_list)


## Train and Val

In [None]:
class TrainFSRCNN(object):
    def __init__(self, model, criterion, optimizer, train_loader, val_loader, n_epochs, device):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.n_epochs = n_epochs
        self.device = device

    def train(self):
        start_time = time.time()

        for epoch in range(self.n_epochs):
            running_loss = 0.0
            for i, (hr, lr) in enumerate(self.train_loader):
                hr = tf.convert_to_tensor(hr, dtype=tf.float32)
                lr = tf.convert_to_tensor(lr, dtype=tf.float32)

                with tf.GradientTape() as tape:
                    outputs = self.model(lr)
                    loss = self.criterion(hr, outputs)

                grads = tape.gradient(loss, self.model.trainable_variables)
                self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))

                running_loss += loss

            val_loss, val_psnr = self.validate()

            print("Epoch: %d, Loss: %.3f, Validation Loss: %.3f, Validation PSNR: %.2f" %
                  (epoch + 1, running_loss / len(self.train_loader), val_loss, val_psnr))

        end_time = time.time()
        total_time = end_time - start_time
        print('Tempo total de treinamento: {:.2f} segundos'.format(total_time))

    def validate(self):
        self.model.eval()
        val_loss = 0.0
        val_psnr = 0.0
        for hr, lr in self.val_loader:
            hr = tf.convert_to_tensor(hr, dtype=tf.float32)
            lr = tf.convert_to_tensor(lr, dtype=tf.float32)
            outputs = self.model(lr)
            loss = self.criterion(hr, outputs)
            val_loss += loss

            mse = tf.reduce_mean(tf.square(hr - outputs))
            psnr = 20 * tf.math.log(1.0 / tf.sqrt(mse)) / tf.math.log(10.0)
            val_psnr += psnr

        return val_loss / len(self.val_loader), val_psnr / len(self.val_loader)


In [None]:
class Args:
    train_dir = train_dir
    val_dir = val_dir = train_dir

    scale = 4
    batch_size = 4
    epochs = 300

args = Args()

device = 'cuda' if tf.test.is_gpu_available() else 'cpu'

train_dataset = DIV2KDataset(args.train_dir, args.scale, 1020, 2040)
val_dataset = DIV2KDataset(args.val_dir, args.scale, 1020, 2040)

train_loader = Dataset.from_generator(lambda: train_dataset, output_signature=(tf.TensorSpec(shape=(None, None, 1), dtype=tf.float32), tf.TensorSpec(shape=(None, None, 1), dtype=tf.float32)))
train_loader = train_loader.batch(args.batch_size)
val_loader = Dataset.from_generator(lambda: val_dataset, output_signature=(tf.TensorSpec(shape=(None, None, 1), dtype=tf.float32), tf.TensorSpec(shape=(None, None, 1), dtype=tf.float32)))
val_loader = val_loader.batch(args.batch_size)

model = FSRCNN(scale_factor=args.scale)
criterion = tf.keras.losses.MeanSquaredError()
optimizer = Adam(learning_rate=0.00001)
train_fsrc

## Inference

In [None]:
torch.save(model.state_dict(), '300_epochs.pt')

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = FSRCNN(4)
model.to(device)
model.load_state_dict(torch.load('300_epochs.pt'))
model.eval()

class Args:
    train_dir = 'C:/Users/moreiran/Desktop/FSRCNN_DIV2K/DIV2K/DIV2K/DIV2K_train_HR/DIV2K_train_HR'
    val_dir = 'C:/Users/moreiran/Desktop/FSRCNN_DIV2K/DIV2K/DIV2K/DIV2K_valid_HR/DIV2K_valid_HR'
    scale = 4
    batch_size = 4
    epochs = 300

args = Args()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Carregar dataset de treinamento
train_dataset = DIV2KDataset(args.train_dir, args.scale, 1020, 2040)

# Carregar dataset de validação
val_dataset = DIV2KDataset(args.val_dir, args.scale, 1020, 2040)

train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=args.batch_size, shuffle=True)

# hr_img, lr_img
x_batch, y_batch = next(iter(val_loader))
y, x = x_batch[0], y_batch[0]
x = x.to(device)
pred = model(x)
pred = pred.cpu()
print('imagem PREDITA', pred.shape)
print('imagem HR', y.shape)

# Converter tensores para arrays numpy
y = y.numpy().transpose(1, 2, 0)
pred = pred.detach().numpy().transpose(1, 2, 0)

# Plotar as imagens
fig, axes = plt.subplots(1, 2, figsize=(10, 5))

axes[0].imshow(y)
axes[0].set_title('Imagem de Alta Resolução')
axes[0].axis('off')

axes[1].imshow(pred)
axes[1].set_title('Imagem Predita')
axes[1].axis('off')


In [None]:
# hr_img, lr_img
x_batch, y_batch = next(iter(val_loader))
y, x = x_batch[0], y_batch[0]
x = x.to(device)
pred = model(x)
pred = pred.cpu()
print('imagem PREDITA', pred.shape)
print('imagem HR', y.shape)

# Converter tensores para arrays numpy
y = y.numpy().transpose(1, 2, 0)
pred = pred.detach().numpy().transpose(1, 2, 0)

# Plotar as imagens
fig, axes = plt.subplots(1, 2, figsize=(10, 5))

axes[0].imshow(y)
axes[0].set_title('Imagem de Alta Resolução')
axes[0].axis('off')

axes[1].imshow(pred)
axes[1].set_title('Imagem Predita')
axes[1].axis('off')

plt.show()


HR and LR imagem comparasion

In [None]:
def train_imgs_visualization():
    for imgs in train_loader:
        fig = plt.figure(figsize=(10, 7))

        img1 = imgs[0][0].permute(1, 2, 0)
        fig.add_subplot(1, 2, 1)
        plt.imshow(img1)
        print('imagem HR', img1.shape)

        
        img2 = imgs[1][0].permute(1, 2, 0)
        fig.add_subplot(1, 2, 2)
        plt.imshow(img2)
        print('imagem LR', img2.shape)
        break

def val_imgs_visualization():
    for imgs in val_loader:
        fig = plt.figure(figsize=(10, 7))

        img1 = imgs[0][0].permute(1, 2, 0)
        fig.add_subplot(1, 2, 1)
        plt.imshow(img1)
        
        img2 = imgs[1][0].permute(1, 2, 0)
        fig.add_subplot(1, 2, 2)
        plt.imshow(img2)
        break

train_imgs_visualization()