In [1]:
import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2
from pathlib import Path
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

from lightning import LightningModule, Trainer

In [2]:
class ImageData(Dataset):
    def __init__(self, files):
        self.files = files
        self.transform = A.Compose([
            A.Resize(256, 256),
            ToTensorV2()]
        )

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        img = cv2.imread(self.files[idx])
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = self.transform(image=img)['image']
        if img.shape[0] == 1:
            img = torch.cat([img]*3)
        img = img / 255.0 - 0.5
        return img.to(device)


device = 'cuda' if torch.cuda.is_available() else 'cpu'
path = Path.home() / 'OneDrive - Seagroup/ai/computer_vison/coco/coco2017/train2017'
files = [str(file) for file in path.glob("*.jpg")]
train_files, valid_files = train_test_split(files, test_size=0.1)

batch_size = 1
num_workers = 0
train_ds = ImageData(train_files)
valid_ds = ImageData(valid_files)
train_dl = DataLoader(train_ds, batch_size, shuffle=True, drop_last=True, num_workers=num_workers)
valid_dl = DataLoader(valid_ds, batch_size, shuffle=False, drop_last=True, num_workers=num_workers)

In [3]:
kernel_size = 4 # (4, 4) kernel
init_channels = 8 # initial number of filters
image_channels = 3
latent_dim = 16

class ConvVAE(nn.Module):
    def __init__(self):
        super(ConvVAE, self).__init__()

        # encoder
        self.enc1 = nn.Conv2d(image_channels, init_channels, kernel_size, 2, 1)
        self.enc2 = nn.Conv2d(init_channels, init_channels*2, kernel_size, 2, 1)
        self.enc3 = nn.Conv2d(init_channels*2, init_channels*4, kernel_size, 2, 1)
        self.enc4 = nn.Conv2d(init_channels*4, 64, kernel_size, 2, 0)

        self.fc1 = nn.Linear(64, 128)
        self.fc_mu = nn.Linear(128, latent_dim)
        self.fc_log_var = nn.Linear(128, latent_dim)
        self.fc2 = nn.Linear(latent_dim, 64)

        # decoder
        self.dec1 = nn.ConvTranspose2d(64, init_channels*8, kernel_size, 1, 0)
        self.dec2 = nn.ConvTranspose2d(init_channels*8, init_channels*4, kernel_size, 2, 1)
        self.dec3 = nn.ConvTranspose2d(init_channels*4, init_channels*2, kernel_size, 2, 1)
        self.dec4 = nn.ConvTranspose2d(init_channels*2, image_channels, kernel_size, 2, 1)

    def reparameterize(self, mu, log_var):
        """
        :param mu: mean from the encoder's latent space
        :param log_var: log variance from the encoder's latent space
        """
        std = torch.exp(0.5*log_var)
        eps = torch.randn_like(std)
        sample = mu + (eps * std)
        return sample

    def forward(self, x):
        # encoding
        x = F.relu(self.enc1(x))
        x = F.relu(self.enc2(x))
        x = F.relu(self.enc3(x))
        x = F.relu(self.enc4(x))
        batch, _, _, _ = x.shape
        x = F.adaptive_avg_pool2d(x, 1).reshape(batch, -1)
        hidden = self.fc1(x)
        # get `mu` and `log_var`
        mu = self.fc_mu(hidden)
        log_var = self.fc_log_var(hidden)
        # get the latent vector through reparameterization
        z = self.reparameterize(mu, log_var)
        z = self.fc2(z)
        z = z.view(-1, 64, 1, 1)

        # decoding
        x = F.relu(self.dec1(z))
        x = F.relu(self.dec2(x))
        x = F.relu(self.dec3(x))
        reconstruction = torch.sigmoid(self.dec4(x))
        return reconstruction, mu, log_var


def final_loss(bce_loss, mu, logvar):
    """
    This function will add the reconstruction loss (BCELoss) and the KL-Divergence.
    KL-Divergence = 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
    :param bce_loss: recontruction loss
    :param mu: the mean from the latent vector
    :param logvar: log variance from the latent vector
    """
    BCE = bce_loss
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD


class LightningModel(LightningModule):
    def __init__(self, model, learning_rate):
        super().__init__()
        self.model = model
        self.learning_rate = learning_rate
        self.criterion = nn.BCELoss(reduction='sum')

    def common_step(self, x):
        reconstruction, mu, logvar = self.model(x)
        bce_loss = self.criterion(reconstruction, x)
        loss = final_loss(bce_loss, mu, logvar)
        return loss

    def training_step(self, x, *args):
        loss = self.common_step(x)
        self.log(name="Training loss", value=loss, on_step=True, on_epoch=True)
        return loss

    def validation_step(self, x, *args):
        loss = self.common_step(x)
        self.log(name="Validation loss", value=loss, on_step=True, on_epoch=True)
        return x.cpu()

    def configure_optimizers(self) -> torch.optim.Optimizer:
        return torch.optim.AdamW(self.parameters(), lr=self.learning_rate)

In [9]:
batch_size = 1
train_dl = DataLoader(train_ds, batch_size, shuffle=True, drop_last=True, num_workers=0)
x = next(iter(train_dl))
x.shape

torch.Size([1, 3, 256, 256])

In [12]:
x

tensor([[[[-0.1392, -0.1431, -0.1471,  ..., -0.3549, -0.3431, -0.3588],
          [-0.1392, -0.1353, -0.1353,  ..., -0.3510, -0.3471, -0.3471],
          [-0.1431, -0.1314, -0.1431,  ..., -0.3392, -0.3353, -0.3431],
          ...,
          [ 0.1314,  0.2059,  0.1275,  ..., -0.0333, -0.0333, -0.0098],
          [ 0.1510,  0.1627,  0.1980,  ...,  0.0059,  0.0255,  0.0490],
          [ 0.1392,  0.1980,  0.1824,  ..., -0.0216,  0.0490,  0.0451]],

         [[ 0.0961,  0.1000,  0.0961,  ..., -0.1353, -0.1353, -0.1314],
          [ 0.0961,  0.0961,  0.0961,  ..., -0.1353, -0.1431, -0.1431],
          [ 0.1039,  0.1039,  0.1039,  ..., -0.1314, -0.1353, -0.1431],
          ...,
          [ 0.1745,  0.2490,  0.1706,  ..., -0.0608, -0.0569, -0.0412],
          [ 0.1863,  0.2020,  0.2373,  ..., -0.0176, -0.0020,  0.0176],
          [ 0.1667,  0.2333,  0.2255,  ..., -0.0294,  0.0176,  0.0333]],

         [[ 0.3000,  0.3157,  0.3000,  ...,  0.0451,  0.0490,  0.0451],
          [ 0.3196,  0.3196,  

In [17]:
model = ConvVAE().to(device)
reconstruction, mu, logvar = model(x)

In [19]:
reconstruction.shape

torch.Size([1, 3, 32, 32])

In [20]:
nn.BCELoss(reduction='sum')(reconstruction, x)

ValueError: Using a target size (torch.Size([1, 3, 256, 256])) that is different to the input size (torch.Size([1, 3, 32, 32])) is deprecated. Please ensure they have the same size.

In [5]:
lightning_model = LightningModel(ConvVAE(), learning_rate=1e-3)
EPOCHS = 3
trainer = Trainer(
    max_epochs=EPOCHS,
    accelerator='gpu',
    precision=16,
)
trainer.fit(lightning_model, train_dl, valid_dl)

Using 16bit None Automatic Mixed Precision (AMP)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
You are using a CUDA device ('NVIDIA GeForce RTX 3060 Ti') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type    | Params
--------------------------------------
0 | model     | ConvVAE | 164 K 
1 | criterion | BCELoss | 0     
--------------------------------------
164 K     Trainable params
0         Non-trainable params
164 K     Total params
0.329     Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

  rank_zero_warn(


ValueError: Using a target size (torch.Size([1, 3, 256, 256])) that is different to the input size (torch.Size([1, 3, 32, 32])) is deprecated. Please ensure they have the same size.

In [None]:
x = next(iter(train_dl))