In [None]:
import cv2
import sys
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
import os
import numpy as np
import random
import pandas as pd
from matplotlib import pyplot as plt

In [None]:
class CnnAutoencoder(nn.Module):
    def __init__(self, scale=2, channel_maps=[], padding=1, kernel_size=3, num_channels=3, img_width=100, img_height=100, device=torch.device("cpu")):
        super().__init__()

        self.device = device

        self.img_width      = img_width
        self.img_height     = img_height
        self.num_channels   = num_channels
        self.kernel_size    = kernel_size
        self.padding        = padding
        self.channel_maps   = channel_maps
        self.scale          = scale

        self.reversed_channel_maps = list(reversed(channel_maps))

        # Build convolutional layers
        self.convolutional_layers = nn.ModuleList([])

        # [3, 16, 4, 8, 4, 8, 8, 4, 8]
        for i in range(len(self.channel_maps) - 1):
            self.convolutional_layers.append(nn.Conv2d(self.channel_maps[i], self.channel_maps[i+1], kernel_size=self.kernel_size, padding=self.padding))

        # Build deconvolutional layers
        self.deconvolutional_layers = nn.ModuleList([])

        for i in range(len(self.reversed_channel_maps) - 1):
            self.deconvolutional_layers.append(nn.ConvTranspose2d(self.reversed_channel_maps[i], self.reversed_channel_maps[i+1], 2, stride=2))

    def conv(self, x):
        for i in range(len(self.convolutional_layers)):
            conv_layer = self.convolutional_layers[i]

            x = F.max_pool2d(F.relu(conv_layer(x)), self.scale)
        
        return x

    def compress(self, x):
        x = self.conv(x)
        x = x.view(-1, x.shape[1] * x.shape[2] * x.shape[3])

        return x

    def deconv(self, x):
        for i in range(len(self.deconvolutional_layers)):
            deconv_layer = self.deconvolutional_layers[i]
            x = deconv_layer(x)

            if i != len(self.deconvolutional_layers) - 1:
                x = F.relu(x)
            else:
                x = torch.sigmoid(x)

        return x

    def forward(self, x):
        x = self.conv(x)
        x = self.deconv(x)

        return x


In [None]:
class CnnAutoencoderDataset(Dataset):
    def __init__(self, img_dir, img_width, img_height):
        self.img_dir    = img_dir
        self.img_width  = img_width
        self.img_height = img_height
        self.images     = os.listdir(img_dir)

        self.dim = (img_width, img_height)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        img_path = os.path.join(self.img_dir, self.images[index])

        img = (cv2.resize(cv2.imread(img_path), self.dim) / 255).transpose((2, 0, 1)) 

        # Input is the Output
        return torch.Tensor(img), torch.Tensor(img)

In [None]:
def train_fn(loader, model, optimizer, loss_fn, scaler):
    loop = tqdm(loader)

    ave_loss = 0.0
    count = 0

    for batch_idx, (data, targets) in enumerate(loop):
        data    = data.float().to(device=device)
        targets = targets.float().to(device=device)

        # Forward
        predictions = model.forward(data)

        loss = loss_fn(predictions, targets)

        # Backward
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # update tqdm
        loop.set_postfix(loss=loss.item())

        ave_loss += loss.item()
        count += 1

    ave_loss = ave_loss / count

    return ave_loss

In [None]:
device         = 'cuda'
gpu_index      = 0
epochs         = 100
learning_rate  = 0.0001
chunk_size     = 1
batch_size     = 1
cont           = False
kernel_size    = 3
model_file     = "cnn-model.pth"
output_file    = "data.csv"
channel_maps   = [3, 16, 8, 4]
padding        = 1
scale          = 2
img_width      = 128
img_height     = 128
train_img_dir  = "/home/ralampay/Desktop/training/Male"
num_channels   = 3

In [None]:
if device == 'cuda':
    print("CUDA Device: {}".format(torch.cuda.get_device_name(gpu_index)))
    device = "cuda:{}".format(gpu_index)
    
model = CnnAutoencoder(
    scale=scale,
    channel_maps=channel_maps,
    padding=padding,
    kernel_size=kernel_size,
    num_channels=num_channels,
    img_width=img_width,
    img_height=img_height
).to(device)

In [None]:
if cont:
    print("Loading model from {}".format(model_file))
    state = torch.load(model_file)
    model.load_state_dict(state['state_dict'])
    model.optimizer = state['optimizer']

In [None]:
loss_fn = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scaler = torch.cuda.amp.GradScaler()

In [None]:
train_ds = CnnAutoencoderDataset(
    img_dir=train_img_dir,
    img_width=img_width,
    img_height=img_height
)

train_loader = DataLoader(
    train_ds,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False
)

In [None]:
losses = []

for epoch in range(epochs):
    print("Epoch: {}".format(epoch))
    ave_loss = train_fn(train_loader, model, optimizer, loss_fn, scaler)

    print("Ave Loss: {}".format(ave_loss))
    losses.append(ave_loss)

    # Save model after every epoch
    print("Saving model to {}...".format(model_file))

    state = {
        'state_dict':   model.state_dict(),
        'optimizer':    optimizer.state_dict()
    }

    torch.save(state, model_file)

    print("Done.")

In [None]:
df_loss = pd.Series(losses)
df_loss.plot.line()