In [12]:
import data_handling
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import torch
import torchvision
import numpy as np
import encdec as ed

In [13]:
torch.cuda.is_available()

True

In [14]:
DATASET_DIRECTORY = "./CelebDataProcessed"
ANNOTATIONS_DIRECTORY = "./annotations.csv"
NAME = ""
BATCH_SIZE = 128
TRANSFORM = torchvision.transforms.Compose([
torchvision.transforms.ToPILImage(),
torchvision.transforms.ToTensor(),
])

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pubfig = data_handling.PublicFigureDataset(ANNOTATIONS_DIRECTORY, DATASET_DIRECTORY, NAME, transform=TRANSFORM)

# 80-20 train test split
train_size = int(0.8 * len(pubfig))
test_size = len(pubfig) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(pubfig, [train_size, test_size])

train_dl = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dl = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)



In [15]:
### Check data
index = 0 # First image from the dataset
print(type(pubfig[index][0]))
print("Image Shape: " + str(pubfig[index][0].size())) # Image
print("Images are of: " + str(pubfig[index][0].dtype))
print((pubfig[index][1])) # Label

<class 'torch.Tensor'>
Image Shape: torch.Size([3, 256, 256])
Images are of: torch.float32
Abhishek Bachan


In [16]:
LATENT_DIM = 2048

In [17]:
import models

model = models.SingleEnc(LATENT_DIM).to(device)


In [18]:
import helper as h
from numpy.random import randint

#h.loadWeights(model, "./Weights/double.pth")
index = randint(len(pubfig))
image , _= h.getImage(index, pubfig)

model.eval()
output = model(image.unsqueeze(0).to(device)) # Images need to be unsqueezed since we use batch norm. The model expects batch size as a part of the image.
print(output.size())

torch.Size([1, 3, 256, 256])




In [19]:
EPOCHS = 100
loss_fn = torch.nn.MSELoss()
lr= 0.001
optim = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-05)

In [20]:
import torchvision.transforms as T
from PIL import Image
from torchvision.utils import save_image
from numpy.random import randint
import time
import gc

def train_epoch(model, device, trainloader, loss_fn, optimizer, testloader, dataset, epochs=5, default_dtype=torch.FloatTensor, video=False):

    start_time = time.time()
    iters = 0

    if video:
        index = randint(len(dataset)) # From the dataset we get a random image
        image, name = h.getImage(index, dataset) 
        image = image.unsqueeze(0).to(device)
        save_image(image, "./Outputs/Video/GPU/{}.png".format(name))

    # Set train mode for both the encoder and the decoder
    model.train()
    for ep in range(epochs):
        train_loss_a = []
        train_loss_b = []

        # Iterate the dataloader (we do not need the label values, this is unsupervised learning)
        for i, (image_batch, _) in enumerate(trainloader): # with "_" we just ignore the labels (the second element of the dataloader tuple)
            if video: # TODO: This only works for the Autoencoder class atm
                model.eval()
                output = model(image, "a")
                save_image(output, "./Outputs/Video/GPU/a/{}_{}.png".format(ep, i))
                output = model(image, "b")
                save_image(output, "./Outputs/Video/GPU/b/{}_{}.png".format(ep, i))
                model.train()

            iters += 1

            # Move tensor to the proper device
            image_batch = image_batch.type(default_dtype).to(device)
            #labels = labels.type(default_dtype).to(device)

            # Encode data
            output = model(image_batch, type="a")

            # Evaluate loss
            loss_a = loss_fn(output, image_batch)

            # Backward pass
            optimizer.zero_grad()
            loss_a.backward()
            optimizer.step()

            output = model(image_batch, type="b")

            # Evaluate loss
            loss_b = loss_fn(output, image_batch)

            # Backward pass
            optimizer.zero_grad()
            loss_b.backward()
            optimizer.step()

            time_lapse = time.strftime('%H:%M:%S', time.gmtime(time.time() - start_time))
            if i % 20 == 0:
                print('Epoch:{:2d} | Iter:{:5d} | Time: {} | Train_A Loss: {:.4f} | Train_B Loss: {:.4f}'.format(ep+1, i, time_lapse, loss_a.data, loss_b.data))

            # Print batch loss
            train_loss_a.append(loss_a.detach().cpu().numpy())
            train_loss_b.append(loss_b.detach().cpu().numpy())
        gc.collect()
        test_loss_a, test_loss_b = test_epoch(model, device, testloader, loss_fn)
        print('\n EPOCH {}/{} \t Avg. Train_A loss this Epoch {} \t Avg. Train_B loss this Epoch {} \t Test loss A {} \t Test loss B {}'.format(ep + 1, epochs, np.mean(train_loss_a),np.mean(train_loss_b), test_loss_a, test_loss_b))
    return



def test_epoch(model, device, dataloader, loss_fn, default_dtype=torch.FloatTensor):
    # Set evaluation mode for encoder and decoder
    model.eval()
    with torch.no_grad(): # No need to track the gradients
        # Define the lists to store the outputs for each batch
        conc_out_a = []
        conc_label_a = []
        for image_batch, _ in dataloader:
            # Move tensor to the proper device
            image_batch = image_batch.type(default_dtype).to(device)#image_batch.type(torch.HalfTensor).to(device)
            # Encode data
            output = model(image_batch)

            # Append the network output and the original image to the lists
            conc_out_a.append(output.cpu())
            conc_label_a.append(image_batch.cpu())
        # Create a single tensor with all the values in the lists
        conc_out_a = torch.cat(conc_out_a)
        conc_label_a = torch.cat(conc_label_a) 
        # Evaluate global loss
        val_loss_a = loss_fn(conc_out_a, conc_label_a)

        conc_out_b = []
        conc_label_b = []
        for image_batch, _ in dataloader:
            # Move tensor to the proper device
            image_batch = image_batch.type(default_dtype).to(device)#image_batch.type(torch.HalfTensor).to(device)
            # Encode data
            output = model(image_batch,  type="b")

            # Append the network output and the original image to the lists
            conc_out_b.append(output.cpu())
            conc_label_b.append(image_batch.cpu())
        # Create a single tensor with all the values in the lists
        conc_out_b = torch.cat(conc_out_b)
        conc_label_b = torch.cat(conc_label_b) 
        # Evaluate global loss
        val_loss_b = loss_fn(conc_out_b, conc_label_b)

    return val_loss_a.data, val_loss_b.data

In [21]:
train_epoch(model,device, train_dl,loss_fn,optim, test_dl, pubfig, epochs=EPOCHS, video=True)

Epoch: 1 | Iter:    0 | Time: 00:00:00 | Train_A Loss: 0.3525 | Train_B Loss: 0.3269
Epoch: 1 | Iter:   20 | Time: 00:00:13 | Train_A Loss: 0.3164 | Train_B Loss: 0.2973
Epoch: 1 | Iter:   40 | Time: 00:00:25 | Train_A Loss: 0.2804 | Train_B Loss: 0.3031
Epoch: 1 | Iter:   60 | Time: 00:00:38 | Train_A Loss: 0.2870 | Train_B Loss: 0.2719

 EPOCH 1/100 	 Avg. Train_A loss this Epoch 0.2920683026313782 	 Avg. Train_B loss this Epoch 0.2909405827522278 	 Test loss A 0.061915166676044464 	 Test loss B 0.06251343339681625
Epoch: 2 | Iter:    0 | Time: 00:01:03 | Train_A Loss: 0.2777 | Train_B Loss: 0.2818
Epoch: 2 | Iter:   20 | Time: 00:01:15 | Train_A Loss: 0.2748 | Train_B Loss: 0.2775
Epoch: 2 | Iter:   40 | Time: 00:01:28 | Train_A Loss: 0.2677 | Train_B Loss: 0.2644
Epoch: 2 | Iter:   60 | Time: 00:01:40 | Train_A Loss: 0.2712 | Train_B Loss: 0.2696

 EPOCH 2/100 	 Avg. Train_A loss this Epoch 0.2704046666622162 	 Avg. Train_B loss this Epoch 0.27042216062545776 	 Test loss A 0.063643

In [None]:
# h.saveWeights(model, "./1024LatentGPU73Epochs")

In [3]:
### https://pythonprogramming.altervista.org/png-to-git-to-tell-a-story-with-python-and-pil/?doing_wp_cron=1682123526.3413488864898681640625

from PIL import Image
import glob
import os
 
# Create the frames
frames = []
imgs = sorted(glob.glob('./Outputs/Video/64Adversarial/a/*.png'), key=os.path.getmtime)
for i in imgs:
    new_frame = Image.open(i)
    frames.append(new_frame)
 
# Save into a GIF file that loops forever
frames[0].save('double.gif', format='GIF',
               append_images=frames[1:],
               save_all=True,
               duration=5, loop=0)