In [None]:
from VariationalAutoDecoder import VariationalAutoDecoder as VAD
from VAD_Trainer import VAD_Trainer
import utils
from evaluate import evaluate_model
import torch
import torch.optim as optim
import torch.nn as nn
import csv
import time
import random

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

## Create DataLoaders

In [None]:
train_ds, train_dl, test_ds, test_dl = utils.create_dataloaders(data_path="dataset" ,batch_size=64)

## Train Auto Decoder

In [None]:
import torch.nn as nn

architectures = [
    {
        "mu_net": nn.Sequential(
            nn.Linear(128, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 128)
        ),
        "log_var_net": nn.Sequential(
            nn.Linear(128, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 128)
        )
    },

    {
        "mu_net": nn.Sequential(
            nn.Linear(128, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, 128)
        ),
        "log_var_net": nn.Sequential(
            nn.Linear(128, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, 128)
        )
    },

    {
        "mu_net": nn.Sequential(
            nn.Linear(128, 512),
            nn.LayerNorm(512),
            nn.ReLU(),
            nn.Linear(512, 128)
        ),
        "log_var_net": nn.Sequential(
            nn.Linear(128, 512),
            nn.LayerNorm(512),
            nn.ReLU(),
            nn.Linear(512, 128)
        )
    }
]



# latent_dims = [dim for dim in [64, 32, 128, 16, 10] for _ in range(5)]
VADs = [VAD(mu_layers=arch['mu_net'], var_layers=arch['log_var_net'], device=device) for arch in architectures]# for _ in range(5)]
# learning_rates = [lr for lr in [0.001, 0.0005, 0.0001, 0.002, 0.005] for _ in range(5)]
trainers = [VAD_Trainer(var_decoder=VADs[i], dataloader=train_dl, latent_dim=128, device=device, lr=1e-2) for i in range(len(VADs))]

In [None]:
# Initialize the results list to hold all the data
num_test_samples = len(test_dl.dataset)

# Create latent parameters and optimizers for each trainer
temp_latents = torch.randn(10, self.latent_dim).to(self.device)
latents_list = [torch.nn.Parameter((torch.stack([temp_latents[label,:] for label in dataloader.dataset.y])).to(device) for i in range(len(VADs))]# for _ in range(5)]
optimizers = [optim.Adam([latents], lr=1e-3) for latents in latents_list]

# Save results to a CSV file
csv_file_path = 'results_VAD_temp.csv'

# Write header to the CSV file first
with open(csv_file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    header = ['Index'] + [f'Epoch {i+1} Loss' for i in range(500)] + ['Final Test Loss']
    writer.writerow(header)

# Main training and evaluation loop
for index, trainer in enumerate(trainers):
    start_time = time.time()  # Record the start time
    train_loss = trainer.train(num_epochs=500)  # Train the model
    end_time = time.time()  # Record the end time
    
    elapsed_time = end_time - start_time  # Calculate elapsed time
    print(f"Trainer {index} has finished training in {elapsed_time:.2f} seconds.")

    start_time = time.time()  # Record the start time
    test_loss = evaluate_model(model=VADs[index], test_dl=test_dl, opt=optimizers[index], latents=latents_list[index], epochs=500, device=device) 
    end_time = time.time()  # Record the end time
    
    elapsed_time = end_time - start_time  # Calculate elapsed time
    print(f"AD {index} has finished test evaluation in {elapsed_time:.2f} seconds.")

    # Prepare the row to be saved
    row = [index] + train_loss + [test_loss]

    # Append results to the CSV file after each iteration
    with open(csv_file_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(row)

print(f"Results saved to {csv_file_path}.")

In [None]:
for i in range(len(trainers)):
    utils.plot_tsne(train_ds, trainers[i].latents, f"tsne_kl_{i}")

## Fine Tune

In [None]:
arch = {
        "mu_net": nn.Sequential(
            nn.Linear(128, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 128)
        ),
        "log_var_net": nn.Sequential(
            nn.Linear(128, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 128)
        )
    }

learning_rates = [0.001, 0.0001, 0.005]
VADs_ft = [VAD(mu_layers=arch['mu_net'], var_layers=arch['log_var_net'], device=device) for _ in range(len(learning_rates))]# for _ in range(5)]
trainers_ft = [VAD_Trainer(var_decoder=VADs_ft[i], dataloader=train_dl, latent_dim=128, device=device, lr=learning_rates[i]) for i in range(len(VADs_ft))]

In [None]:
# Initialize the results list to hold all the data
num_test_samples = len(test_dl.dataset)

# Create latent parameters and optimizers for each trainer
latents_list_ft = [torch.nn.Parameter(torch.randn(num_test_samples, trainers_ft[i].latent_dim).to(device)) for i in range(len(VADs_ft))]# for _ in range(5)]
optimizers = [optim.Adam([latents], lr=1e-3) for latents in latents_list_ft]

# Save results to a CSV file
csv_file_path = 'results_VAD_ft.csv'

# Write header to the CSV file first
with open(csv_file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    header = ['Index'] + [f'Epoch {i+1} Loss' for i in range(500)] + ['Final Test Loss']
    writer.writerow(header)

# Main training and evaluation loop
for index, trainer in enumerate(trainers_ft):
    start_time = time.time()  # Record the start time
    train_loss = trainer.train(num_epochs=500)  # Train the model
    end_time = time.time()  # Record the end time
    
    elapsed_time = end_time - start_time  # Calculate elapsed time
    print(f"Trainer {index} has finished training in {elapsed_time:.2f} seconds.")

    start_time = time.time()  # Record the start time
    test_loss = evaluate_model(model=VADs_ft[index], test_dl=test_dl, opt=optimizers[index], latents=latents_list_ft[index], epochs=500, device=device) 
    end_time = time.time()  # Record the end time
    
    elapsed_time = end_time - start_time  # Calculate elapsed time
    print(f"AD {index} has finished test evaluation in {elapsed_time:.2f} seconds.")

    # Prepare the row to be saved
    row = [index] + train_loss + [test_loss]

    # Append results to the CSV file after each iteration
    with open(csv_file_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(row)

print(f"Results saved to {csv_file_path}.")

In [None]:
for i in range(len(trainers_ft)):
    utils.plot_tsne(train_ds, trainers_ft[i].latents, f"tsne_kl_ft_{i}")

## Best model

In [None]:
arch = {
        "mu_net": nn.Sequential(
            nn.Linear(128, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 128)
        ),
        "log_var_net": nn.Sequential(
            nn.Linear(128, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 128)
        )
    }

VAD_best = VAD(mu_layers=arch['mu_net'], var_layers=arch['log_var_net'], device=device)
trainer_best = VAD_Trainer(var_decoder=VAD_best, dataloader=train_dl, latent_dim=128, device=device, lr=0.001)
trainer_best.train(num_epochs=1000)

## Sample specific vectors

In [None]:
num_test_samples = len(test_dl.dataset)
latents = torch.nn.Parameter(torch.randn(num_test_samples, trainer.latent_dim).to(device))
opt = optim.Adam([latents], lr=1e-3)

In [None]:
test_loss = evaluate_model(model=decoder, test_dl=test_dl, opt=opt, latents=latents, epochs=1000, device=device)
print(f"AD has finished test evaluation with a test loss of {test_loss}.")

In [None]:
# Randomly sample 5 indices from the test dataset
random.seed(6)
sampled_indices = random.sample(range(len(latents)), 5)

# Extract the corresponding vectors (input data) and their labels
sampled_latents = [latents[i] for i in sampled_indices]  # Only selecting input data, not labels

# Convert to a single tensor (optional)
sampled_latents_tensor = torch.stack(sampled_latents)
random_latents_tensor = torch.randn_like(sampled_latents_tensor)

print("Sampled Vectors Shape:", sampled_latents_tensor.shape)  # Should be (5, *) depending on your data shape
print("Random Vectors Shape:", random_latents_tensor.shape)  # Should be (5, *) depending on your data shape

sampled_test_images = decoder(sampled_latents_tensor).view(-1, 1, 28, 28)
random_test_images = decoder(random_latents_tensor).view(-1, 1, 28, 28)

print("Sampled Images Shape:", sampled_test_images.shape)  # Should be (5, *) depending on your data shape
utils.save_images(sampled_test_images, "sampled_test_images.png")
utils.save_images(random_test_images, "random_test_images.png")