<a href="https://colab.research.google.com/github/tayfununal/Uniform-Autoencoder-with-Latent-Flow-Matching/blob/main/uae_training/digits.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!git clone https://github.com/tayfununal/Uniform-Autoencoder-with-Latent-Flow-Matching.git

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split

import numpy as np
import matplotlib.pyplot as plt

import os

%run /content/Uniform-Autoencoder-with-Latent-Flow-Matching/datasets/digits_dataset.ipynb
%run /content/Uniform-Autoencoder-with-Latent-Flow-Matching/models/digits_model.ipynb

plt.rcParams['font.size'] = 16
plt.rcParams['font.family'] = 'DeJavu Serif'
plt.rcParams['font.serif'] = ['Times New Roman']

In [None]:
class Trainer:
    def __init__(self, model, optimizer, device='cpu', max_patience=20):
        self.model = model.to(device)
        self.optimizer = optimizer
        self.device = device

        self.max_patience = max_patience
        self.best_val_loss = float('inf')
        self.patience = 0

        self.val_cost = []
        self.train_cost = []

    def train(self, train_loader, val_loader=None, epochs=10, print_every=1, name=None):
        self.model.train()

        for epoch in range(1, epochs + 1):
            total_loss = 0.0
            for x, x_, _ in train_loader:
                x = x.to(self.device)
                x_ = x_.to(self.device)

                with torch.no_grad():
                  z_hat_, _ = self.model(x_)
                z_hat, x_hat = self.model(x)

                loss = self.model.criterion(z_pred=z_hat_, z_true=z_hat, x_pred=x_hat, x_true=x.reshape(-1,64))

                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                total_loss += loss.item()

            # Validation
            if val_loader:
                val_loss = self.validate(val_loader)

                # Early stopping check
                if val_loss < self.best_val_loss:
                    print('saved!')
                    torch.save(self.model, name + '.model')
                    self.best_val_loss = val_loss
                    self.patience = 0

                else:
                    self.patience = self.patience + 1

                if self.patience > self.max_patience:
                    print("Early stopping triggered.")
                    break

            if epoch % print_every == 0:
                print(f"Epoch {epoch:3d} | Train Loss: {total_loss / len(train_loader):.6f} | Validation Loss: {val_loss:.6f}")

            self.val_cost.append(val_loss)
            self.train_cost.append(total_loss / len(train_loader))

    def validate(self, val_loader):
        self.model.eval()
        total_loss = 0.0

        with torch.no_grad():
            for x, x_, _ in val_loader:
                x = x.to(self.device)
                x_ = x_.to(self.device)


                z_hat_, _ = self.model(x_)
                z_hat, x_hat = self.model(x)

                loss = self.model.criterion(z_pred=z_hat_, z_true=z_hat, x_pred=x_hat, x_true=x.reshape(-1,64))

                total_loss += loss.item()

        avg_loss = total_loss / len(val_loader)
        self.model.train()
        return avg_loss

In [None]:
# Custom Transform
class NoiseTransform:
    """Add some noise."""

    def __init__(self, split_ratio=0.001, dim=64):

        self.split_ratio = split_ratio
        self.dim = dim

    def __call__(self, x):
      return x + self.split_ratio * torch.randn(self.dim, device='cuda' if torch.cuda.is_available() else 'cpu' , dtype=torch.float)

In [None]:
# Hyper-Parameters & Settings

batch_size = 256
lr = 0.0003

epochs = 1000
max_patience = 1000

split_ratio = 0.0001

In [None]:
# Dataset
train_dataset = DigitsDataset(mode='train', transform=NoiseTransform(split_ratio))
val_dataset = DigitsDataset(mode='val', transform=NoiseTransform(split_ratio))
test_dataset = DigitsDataset(mode='test')

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=len(val_dataset), shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)

In [None]:
# Create the results folder
os.makedirs("/content/Uniform-Autoencoder-with-Latent-Flow-Matching/results/digits", exist_ok=True)

# Model
name = '/content/Uniform-Autoencoder-with-Latent-Flow-Matching/results/digits/UAE_Digits'
model = UAE(
            encoder_layers=[64, 512, 512, 512, 512, 3],
            decoder_layers=[3, 512, 512, 512, 512, 64],
            encoder_act=nn.SiLU,
            decoder_act=nn.SiLU,
            final_encoder_act=nn.Sigmoid,
            final_decoder_act=nn.Sigmoid,
            use_batchnorm=True
)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

In [None]:
# Training
trainer = Trainer(model, optimizer, device='cuda' if torch.cuda.is_available() else 'cpu', max_patience=max_patience)
trainer.train(train_loader, val_loader, epochs=epochs, name=name)

In [None]:
# Save to a CSV file
train_losses = trainer.train_cost
val_losses = trainer.val_cost

np.savetxt("/content/Uniform-Autoencoder-with-Latent-Flow-Matching/results/digits/losses.csv",
           np.column_stack((train_losses, val_losses)),
           delimiter=",",
           header="train_loss,val_loss",
           comments="")