In [56]:
import glob

In [57]:
path = "lfw"

In [58]:
face_images = glob.glob('lfw/**/*.jpg') #returns path of images
print(len(face_images)) #contains 13243 images

13233


In [59]:
from torch.utils.data import Dataset
from __future__ import print_function, division
import os
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
from PIL import Image
import pytorch_lightning as pl
## PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
import torch.optim as optim
# Torchvision
import torchvision
from torchvision.datasets import MNIST
from torchvision import transforms

from sklearn.model_selection import train_test_split


## Standard libraries
import os
import json
import math
import numpy as np 
import random

## Imports for plotting
import matplotlib.pyplot as plt
from matplotlib import cm
%matplotlib inline 
from IPython.display import set_matplotlib_formats
set_matplotlib_formats('svg', 'pdf') # For export
from matplotlib.colors import to_rgb
import matplotlib
from mpl_toolkits.mplot3d.axes3d import Axes3D
from mpl_toolkits.mplot3d import proj3d
matplotlib.rcParams['lines.linewidth'] = 2.0
import seaborn as sns
sns.reset_orig()

  set_matplotlib_formats('svg', 'pdf') # For export


In [98]:
# https://pytorch.org/tutorials/beginner/data_loading_tutorial.html
# https://www.kaggle.com/code/balraj98/single-image-super-resolution-gan-srgan-pytorch

# This custom dataset class has been derived from above two sources. 
class FacesDataset(Dataset):
    def __init__(self, path, transform=None):
        self.img_dir = path
        self.transform = transform
        self.image_path_list = glob.glob('lfw/**/*.jpg')
        self.hr_height = 128
        self.hr_width = 128
        
        self.hr_transform = transforms.Compose(
        [
            transforms.Resize((self.hr_height, self.hr_height), Image.BICUBIC), 
            transforms.ToTensor(), # converts a 255 image to 0-1
        ])
        
        self.lr_transform = transforms.Compose(
        [
            transforms.Resize((self.hr_height//4, self.hr_height//4), Image.BICUBIC),
            transforms.Resize((self.hr_height, self.hr_height), Image.BICUBIC),
            transforms.ToTensor()
            
            
        ])
        
    def __len__(self):
        return len(self.image_path_list)
    
    def __getitem__(self,idx):
        image = Image.open(self.image_path_list[idx])
        image_lr = self.lr_transform(image)
        image_hr = self.hr_transform(image)
        
        return {"lr": image_lr, "hr": image_hr}
    
        
        
        

### Hyperparams

In [99]:
batch_size = 16

In [100]:
train_paths, test_paths = train_test_split(sorted(face_images), test_size=0.02, random_state=42)
train_dataloader = DataLoader(FacesDataset(train_paths), batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(FacesDataset(test_paths), batch_size=int(batch_size*0.75), shuffle=True)

### Model


In [104]:
# https://uvadlc-notebooks.readthedocs.io/en/latest/tutorial_notebooks/tutorial8/Deep_Energy_Models.html
# Taken from the above link 

# This model structure is arbitrary. Basically just applying convolution to reduce the dimension from 128 to 1
# This we need to explore. 

import torch.nn as nn
import torch.nn.functional as F
class Swish(nn.Module):
    def forward(self, x):
        return x * torch.sigmoid(x)


class CNNModel(nn.Module):

    def __init__(self, hidden_features=64, out_dim=1, **kwargs):
        super().__init__()
        # We increase the hidden dimension over layers. Here pre-calculated for simplicity.
        c_hid1 = hidden_features//2
        c_hid2 = hidden_features
        c_hid3 = hidden_features*2
        
        # Series of convolutions and Swish activation functions
        self.cnn_layers = nn.Sequential(
                nn.Conv2d(3, c_hid1, kernel_size=8, stride=2), # [16x16] - Larger padding to get 32x32 image
                nn.MaxPool2d(7, stride=2),
                Swish(),
            
                nn.Conv2d(c_hid1, c_hid2, kernel_size=5, stride=2), #  [8x8]
                nn.MaxPool2d(6, stride=2),
                Swish(),
            
                nn.Conv2d(c_hid2, c_hid3, kernel_size=2, stride=1), #  [8x8]
                
                Swish(),
                nn.Conv2d(c_hid3, c_hid3, kernel_size=3, stride=1), #  [8x8]
                Swish(),
                nn.Flatten(),
                nn.Linear(128, out_dim)
        )

    def forward(self, x):
        x = self.cnn_layers(x).squeeze(dim=-1)
        return x

In [105]:
sample_image = torch.Tensor(32,3,128, 128)
net = CNNModel()
out = net(sample_image)
out.shape

torch.Size([32])

In [111]:
class DeepEnergyModel(pl.LightningModule):
    
    def __init__(self, img_shape, batch_size, alpha=0.1, lr=1e-4, beta1=0.0, **CNN_args):
        super().__init__()
        self.save_hyperparameters()
        
        self.cnn = CNNModel(**CNN_args)
        self.sampler = Sampler(self.cnn, img_shape=img_shape, sample_size=batch_size)
        self.example_input_array = torch.zeros(3, *img_shape)
 
    def forward(self, x):
        z = self.cnn(x)
        return z

    def configure_optimizers(self):
        # Energy models can have issues with momentum as the loss surfaces changes with its parameters. 
        # Hence, we set it to 0 by default. 
        optimizer = optim.Adam(self.parameters(), lr=self.hparams.lr, betas=(self.hparams.beta1, 0.999))
        scheduler = optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.97) # Exponential decay over epochs
        return [optimizer], [scheduler]

    def training_step(self, batch, batch_idx):
        # We add minimal noise to the original images to prevent the model from focusing on purely "clean" inputs
        images_dict = batch
        hr_image = images_dict["hr"]
        lr_image = images_dict["lr"]
        
        small_noise = torch.randn_like(hr_image) * 0.005
        hr_image.add_(small_noise).clamp_(min=-1.0, max=1.0)
        
       # Predict energy score for all images
        inp_imgs = torch.cat([hr_image, lr_image], dim=0)
        real_out, fake_out = self.cnn(inp_imgs).chunk(2, dim=0)
        
        # Calculate losses
        reg_loss = self.hparams.alpha * (real_out ** 2 + fake_out ** 2).mean()
        cdiv_loss = fake_out.mean() - real_out.mean()
        loss = reg_loss + cdiv_loss
        
        # Logging
        self.log('loss', loss)
        self.log('loss_regularization', reg_loss)
        self.log('loss_contrastive_divergence', cdiv_loss)
        self.log('metrics_avg_real', real_out.mean())
        self.log('metrics_avg_fake', fake_out.mean())
        return loss

    def validation_step(self, batch, batch_idx):
        # For validating, we calculate the contrastive divergence between purely random images and unseen examples
        # Note that the validation/test step of energy-based models depends on what we are interested in the model
        
        images_dict = batch
        hr_image = images_dict["hr"]
        lr_image = images_dict["lr"]
        
#         real_imgs, _ = batch
#         fake_imgs = torch.rand_like(real_imgs) * 2 - 1
        
        inp_imgs = torch.cat([hr_image, lr_image], dim=0)
        real_out, fake_out = self.cnn(inp_imgs).chunk(2, dim=0)
        
        cdiv = fake_out.mean() - real_out.mean()
        self.log('val_contrastive_divergence', cdiv)
        self.log('val_fake_out', fake_out.mean())
        self.log('val_real_out', real_out.mean())

In [112]:
device="cpu"

In [113]:
def train_model(**kwargs):
    # Create a PyTorch Lightning trainer with the generation callback
    trainer = pl.Trainer(default_root_dir=os.path.join("EBM"),
                         gpus=1 if str(device).startswith("cuda") else 0,
                         max_epochs=1,
                         gradient_clip_val=0.1)
    # Check whether pretrained model exists. If yes, load it and skip training
    pretrained_filename = "EBM.ckpt"
    if os.path.isfile(pretrained_filename):
        print("Found pretrained model, loading...")
        model = DeepEnergyModel.load_from_checkpoint(pretrained_filename)
    else:
        pl.seed_everything(42)
        model = DeepEnergyModel(**kwargs)
        trainer.fit(model, train_dataloader, test_dataloader)
        model = DeepEnergyModel.load_from_checkpoint(trainer.checkpoint_callback.best_model_path)
    # No testing as we are more interested in other properties
    return model

In [114]:
model = train_model(img_shape=(3,128,128), 
                    batch_size=train_dataloader.batch_size,
                    lr=1e-4,
                    beta1=0.0)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
Global seed set to 42

  | Name | Type     | Params | In sizes         | Out sizes
-----------------------------------------------------------------
0 | cnn  | CNNModel | 238 K  | [3, 3, 128, 128] | [3]      
-----------------------------------------------------------------
238 K     Trainable params
0         Non-trainable params
238 K     Total params
0.952     Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

IsADirectoryError: [Errno 21] Is a directory: '/Users/vaibhavsingh/Desktop/NYU/DL project'