In [1]:
import torch
DEVICE = (
    "cuda"
)

In [20]:

LR = 0.005
IMAGE_SIZE = 64
CHANNELS = 3
BATCH_SIZE = 64
EMBEDDING_DIM = 128
EPOCHS = 50
KLD_WEIGHT = 0.00025

DATASET_PATH =r" C:\Users\ACER\Downloads\img_align_celeba\img_align_celeba"
DATASET_ATTRS_PATH = "dataset/list_attr_celeba.csv"

NUM_FRAMES = 50
FPS = 5

LABELS = ["Eyeglasses", "Smiling", "Attractive", "Male", "Blond_Hair"]


In [21]:
import glob
import os
import matplotlib
import numpy as np
from PIL import Image
from torch.utils.data import Dataset


class CelebADataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.all_images = list(glob.iglob(root_dir + "/*.jpg"))

    def __len__(self):
        return len(self.all_images)
    def __shape__(self):
        return self.shape
    def __getitem__(self, idx):
        img_path = self.all_images[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image
    
    


In [22]:
import matplotlib
import numpy as np
from PIL import Image
matplotlib.use("Agg")
import imageio
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F


In [23]:
def loss_function(VAELossParams, kld_weight):
    recons,input,mu,log_var=VAELossParams
    recons_loss=F.mse_loss(recons,input)
     
    kld_loss=torch.mean(-0.5 * torch.sum(1+log_var-mu**2-log_var.exp(),dim=1),dim=0)
    loss= recons_loss + (kld_weight*kld_loss)
    return{"reconstruction loss":recons_loss,"kld loss":kld_loss,"loss":loss}


In [24]:
from typing import List,Tuple
import torch.nn as nn
import torch
from torch import Tensor

In [25]:
class ConvBlock(nn.Module):
    def __init__(self,in_channels:int,out_channels:int) ->None:
        super(ConvBlock,self).__init__()
        self.block=nn.Sequential(nn.Conv2d(
            in_channels,
            out_channels=out_channels,
            kernel_size=3,
            stride=2,
            padding=1,
        ),
        nn.BatchNorm2d(out_channels),nn.LeakyReLU(),

        )

    def forward(self,x):
        return self.block(x)    



In [26]:
class ConvTBlock(nn.Module):
    def __init__(self,in_channels:int,out_channels:int) ->None:
        super(ConvTBlock,self).__init__()
        self.block=nn.Sequential(
            nn.ConvTranspose2d(
                in_channels,
                out_channels,
                kernel_size=3,
                stride=2,
                padding=1,
                output_padding=1,

            ),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU()

        )
    def forward(self,x):
        return self.block(x)    

In [27]:


class celebVAE(nn.Module):
    def __init__(self, in_channels: int, out_channels: int, latent_dim: int, hidden_dims: List[int] = None) -> None:
        super(celebVAE, self).__init__()
        self.latent_dim = latent_dim
        if hidden_dims is None:
            hidden_dims = [32, 64, 128, 256, 512]
        
        self.encoder = nn.Sequential(
            *[
                ConvBlock(in_f, out_f)
                for in_f, out_f in zip([in_channels] + hidden_dims[:-1], hidden_dims)
            ]
        )
        self.fc_mu = nn.Linear(hidden_dims[-1] * 4, latent_dim)
        self.fc_var = nn.Linear(hidden_dims[-1] * 4, latent_dim)
        self.decoder_input = nn.Linear(latent_dim, hidden_dims[-1] * 4)
        self.decoder = nn.Sequential(
            *[
                ConvTBlock(in_f, out_f)
                for in_f, out_f in zip(hidden_dims[:-1], hidden_dims[1:])
            ]
        )
        self.final_layer = nn.Sequential(
            nn.ConvTranspose2d(
                hidden_dims[-1],
                hidden_dims[-1],
                kernel_size=3,
                stride=2,
                padding=1,
                output_padding=1,
            ),
            nn.BatchNorm2d(hidden_dims[-1]),
            nn.LeakyReLU(),
            nn.Conv2d(hidden_dims[-1], out_channels=3, kernel_size=3, padding=1),
            nn.Tanh(),
        )

    def encode(self, input: torch.Tensor) -> List[Tuple[torch.Tensor]]:
        result = self.encoder(input)
        result = torch.flatten(result, start_dim=1)
        mu = self.fc_mu(result)
        log_var = self.fc_var(result)
        return mu, log_var

    def decode(self, z: torch.Tensor) -> torch.Tensor:
        result = self.decoder_input(z)
        result = result.view(-1, 32, 8, 8)  
        result = self.decoder(result)
        result = self.final_layer(result)
        return result

    def reparametrize(self, mu: torch.Tensor, logvar: torch.Tensor) -> torch.Tensor:
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return (eps * std + mu)

    def forward(self, input: torch.Tensor) -> List[Tensor]:
        mu, log_var = self.encode(input)
        z = self.reparametrize(mu, log_var)
        return [self.decode(z), input, mu, log_var]


In [28]:
import os 
import torch
import torch.optim as optim
from torch.utils.data import DataLoader,random_split
from torchvision import transforms


In [29]:
output_dir="output"
os.makedirs("output",exist_ok=True)
training_progress_dir=os.path.join(output_dir,"training_progress")
os.makedirs(training_progress_dir,exist_ok=True)
model_weights_dir=os.path.join(output_dir,"model weights")
os.makedirs(model_weights_dir,exist_ok=True)
MODEL_BEST_WEIGHTS_PATH=os.path.join(model_weights_dir,"best_vae_celeba.pt")
MODEL_WEIGHTS_PATH=os.path.join(model_weights_dir,"vae_celeba.pt")

In [30]:
train_transforms=transforms.Compose(

    [
        transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(148),
        transforms.Resize(IMAGE_SIZE),
        transforms.ToTensor(),


    ]
)

validation_transforms=transforms.Compose(
    [
        transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(148),
        transforms.Resize(IMAGE_SIZE),
        transforms.ToTensor(),


    ]
)



In [31]:
celeba_dataset = CelebADataset(root_dir=r"C:\Users\ACER\Downloads\img_align_celeba", transform=train_transforms)

val_size=int(len(celeba_dataset)*0.1)
train_size=len(celeba_dataset)-val_size


train_dataset,val_dataset=random_split(celeba_dataset,[train_size,val_size])

train_dataloader=DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=0,
    pin_memory=True,
)
val_dataloader=DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
    pin_memory=True,
)


In [32]:
data_iter=iter(train_dataloader)
batch=next(data_iter)
#for images in batch:
   # print(images.shape)
    

In [33]:
celeba_dataset = CelebADataset(root_dir=r"C:\Users\ACER\Downloads\img_align_celeba", transform=train_transforms)
print(len(celeba_dataset))
print(celeba_dataset[0].shape)

11892
torch.Size([3, 64, 64])


In [51]:
import matplotlib.pyplot as plt
import torchvision.utils as vutils
matplotlib.use("Agg")

for batch in train_dataloader:
    images = batch 
    for images in batch:
        
        plt.imshow(vutils.make_grid(images, normalize=True).permute(1, 2, 0))
        plt.axis("off")
        plt.title("Sample Images")
        plt.show()
        plt.savefig('plot.png')
        break

    break  

  plt.show()


In [None]:
model=celebVAE(in_channels=CHANNELS,out_channels=None,latent_dim=EMBEDDING_DIM,hidden_dims=None,)
model=model.to(DEVICE)
optimizer=optim.SGD(model.parameters(),lr=LR)
scheduler=optim.lr_scheduler.ExponentialLR(optimizer,gamma=0.95)

del model

best_val_loss=float("inf")

for epoch in range(EPOCHS):
    running_loss=0.0
    for i,x in enumerate(train_dataloader):

        print(x.shape)
        x=x.to(DEVICE)
        del x
        optimizer.zero_grad()
        predicted_values=model(x)
        total_loss=loss_function(predicted_values,KLD_WEIGHT)
        total_loss.backward()
        optimizer.step()
        running_loss+=total_loss.item()
    print("epoch ",epoch)    



In [None]:
train_loss=running_loss/len(train_dataloader)
for i,x in enumerate(val_dataloader):
        x=x.to(DEVICE)
        optimizer.zero_grad()
        predicted_values=model(x)
        val_loss=(loss_function(predicted_values,KLD_WEIGHT)).mean()
        
        
torch.save({"vae-celeba": model.state_dict()},
           MODEL_BEST_WEIGHTS_PATH,) 

torch.save({"vae-celeba":model.state_dict()},
           MODEL_WEIGHTS_PATH,)

print(
        f"Epoch {epoch+1}/{EPOCHS}, Batch {i+1}/{len(train_dataloader)}, "
        f"Total Loss: {total_loss['loss'].detach().item():.4f}, "
        f"Reconstruction Loss: {total_loss['Reconstruction_Loss']:.4f}, "
        f"KL Divergence Loss: {total_loss['KLD']:.4f}",
        f"Val Loss: {val_loss:.4f}",
    )
scheduler.step()
      