The code below, which is related to AutoEncoders training, is written mostly by authors of the following paper:  

Ilya Trofimov and Daniil Cherniavskii and Eduard Tulchinskii and Nikita Balabin and Serguei Barannikov and Evgeny Burnaev,  
Learning topology-preserving data representations,  
International Conference on Learning Representations,  
2023,  
https://openreview.net/forum?id=lIu-ixf-Tzf

In [None]:
! pip install giotto-ph
! pip install torch
! pip install scikit-network --user
! pip install --upgrade numpy
! pip install pytorch-lightning==1.5.10
! pip install ripserplusplus
! pip install gudhi

! git clone https://github.com/danchern97/RTD_AE
! mv 'RTD_AE/src' 'src'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import time
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
from torch import nn
from torch.utils.data import DataLoader
import pytorch_lightning as pl

from src.autoencoder import AutoEncoder
from src.utils import *
from src.rtd import RTDLoss, MinMaxRTDLoss
from src.top_ae import TopologicallyRegularizedAutoencoder

from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler

from collections import defaultdict

from tqdm.notebook import tqdm

# Autoencoders

In [None]:
config = {
    "dataset_name":"GenesExtended",
    "version":"d16",
    "model_name":"default",
    "max_epochs":400,
    "gpus":[],
    "rtd_every_n_batches":1,
    "rtd_start_epoch":0,
    "rtd_l":1.0, # rtd loss 
    "n_runs":1, # number of runs for each model
    "card":50, # number of points on the persistence diagram
    "n_threads":50, # number of threads for parallel ripser computation of pers homology
    "latent_dim":16, # latent dimension (2 or 3 for vizualization purposes)
    "input_dim":635*1,
    "n_hidden_layers":2,
    "hidden_dim":512,
    "batch_size":256,
#     "width":80,
#     "heigth":80,
    "engine":"ripser",
    "is_sym":True,
    "lr":5e-4
#     'mode':'minimum',
#     'lp':1.0
}

In [None]:
def get_model(input_dim, latent_dim=2, n_hidden_layers=2, m_type='encoder', **kwargs):
    n = int(np.log2(input_dim))-1
    layers = []
    if m_type == 'encoder':
        in_dim = input_dim
        if input_dim  // 2 >= latent_dim:
            out_dim = input_dim // 2
        else:
            out_dim = input_dim
        for i in range(min(n, n_hidden_layers)):
            layers.extend([nn.Linear(in_dim, out_dim), nn.ReLU()])
            in_dim = out_dim
            if in_dim  // 2 >= latent_dim:
                out_dim = in_dim // 2
            else:
                out_dim = in_dim
        layers.extend([nn.Linear(in_dim, latent_dim)])
    elif m_type == 'decoder':
        in_dim = latent_dim
        out_dim = latent_dim * 2
        for i in range(min(n, n_hidden_layers)):
            layers.extend([nn.Linear(in_dim, out_dim), nn.ReLU()])
            in_dim = out_dim
            out_dim *= 2
        layers.extend([nn.Linear(in_dim, input_dim)])
    return nn.Sequential(*layers)

def get_list_of_models(**config):
    # define a list of models
    encoder = get_linear_model(
        m_type='encoder',
        **config
    )
    decoder = get_linear_model(
        m_type='decoder',
        **config
    )
    models = {
        'Basic AutoEncoder':AutoEncoder(
           encoder = encoder,
            decoder = decoder,
            MSELoss = nn.MSELoss(),
            **config
        ),
        'Topological AutoEncoder':TopologicallyRegularizedAutoencoder(
            encoder = encoder,
            decoder = decoder,
            MSELoss = nn.MSELoss(),
            **config
        ),
        # 'RTD AutoEncoder H1':AutoEncoder(
        #     encoder = encoder,
        #     decoder = decoder,
        #     RTDLoss = RTDLoss(dim=1, lp=1.0,  **config), # only H1
        #     MSELoss = nn.MSELoss(),
        #     **config
        # )
    }
    return models, encoder, decoder

In [None]:
def collate_with_matrix(samples):
    indicies, data, labels = zip(*samples)
    data, labels = torch.tensor(np.asarray(data)), torch.tensor(np.asarray(labels))
    if len(data.shape) > 2:
        dist_data = torch.flatten(data, start_dim=1)
    else:
        dist_data = data
    x_dist = torch.cdist(dist_data, dist_data, p=2) / np.sqrt(dist_data.shape[1])
#     x_dist = (x_dist + x_dist.T) / 2.0 # make symmetrical (cdist is prone to computational errors)
    return data, x_dist, labels

def collate_with_matrix_geodesic(samples):
    indicies, data, labels, dist_data = zip(*samples)
    data, labels = torch.tensor(np.asarray(data)), torch.tensor(np.asarray(labels))
    x_dist = torch.tensor(np.asarray(dist_data)[:, indicies])
    return data, x_dist, labels

In [None]:
dataset_name = config['dataset_name']
train_data = np.load(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/prepared/train_data.npy').astype(np.float32)

try:
    test_data = np.load(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/prepared/test_data.npy').astype(np.float32)
except FileNotFoundError:
    ids = np.random.choice(np.arange(len(train_data)), size=int(0.2*len(train_data)), replace=False)
    test_data = train_data[ids]

try:
    train_labels = np.load(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/prepared/train_labels.npy')
except FileNotFoundError:
    train_labels = None

try:
    test_labels = np.load(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/prepared/test_labels.npy')
except FileNotFoundError:
    if train_labels is None:
        test_labels = None
    else:
        test_labels = train_labels[ids]

In [None]:
scaler = FurthestScaler()
# scaler = None
flatten = True
geodesic = False

train = FromNumpyDataset(
    train_data, 
    train_labels, 
    geodesic=geodesic, 
    scaler=scaler, 
    flatten=flatten, 
    n_neighbors=2
)
test = FromNumpyDataset(
    test_data, 
    test_labels, 
    geodesic=geodesic, 
    scaler = train.scaler,    
    flatten=flatten, 
    n_neighbors=2
)

train_loader = DataLoader(
    train, 
    batch_size=config["batch_size"], 
    #num_workers=2,
    num_workers=0, 
    collate_fn=collate_with_matrix_geodesic if geodesic else collate_with_matrix, 
    shuffle=True
)

val_loader = DataLoader(
    test,
    batch_size=config["batch_size"],
    #num_workers=2,
    num_workers=0, 
    collate_fn=collate_with_matrix_geodesic if geodesic else collate_with_matrix,
)

In [None]:
def train_autoencoder(model, train_loader, val_loader=None, model_name='default', 
                      dataset_name='MNIST', gpus=[3], max_epochs=100, run=0, version=""):
    version = f"{dataset_name}_{model_name}_{version}_{run}"
    logger = pl.loggers.TensorBoardLogger(save_dir=os.getcwd(), name='lightning_logs', version=version)
    trainer = pl.Trainer(
        logger=logger, 
        gpus=gpus, 
        max_epochs=max_epochs, 
        log_every_n_steps=1, 
        num_sanity_val_steps=0
    )
    trainer.fit(model, train_loader, val_loader)
    return model

def dump_figures(figures, dataset_name, version):
    for model_name in figures:
        figures[model_name].savefig(f'results/{dataset_name}/{model_name}_{version}.png')

def train_models(train_loader, val_loader, dataset_name="", max_epochs=1, gpus=[], n_neighbors=[1], n_runs=1, version='', **kwargs):
    models, encoder, decoder = get_list_of_models(**kwargs)
    for model_name in tqdm(models, desc=f"Training models"):
        if 'AutoEncoder' in model_name: # train an autoencoder
            models[model_name] = train_autoencoder(
                models[model_name], 
                train_loader, 
                val_loader, 
                model_name, 
                dataset_name,
                gpus,
                max_epochs,
                0,
                version
            )
        else: # umap / pca / t-sne (sklearn interface)
            train_latent = models[model_name].fit_transform(train_loader.dataset.data)
        # measure training time
    return encoder, decoder, models

In [None]:
encoder, decoder, trained_models = train_models(train_loader, val_loader, **config)

In [None]:
version = config['version']
train_loader = DataLoader(
    train,
    batch_size=config["batch_size"],
    num_workers=2,
    collate_fn=collate_with_matrix_geodesic if geodesic else collate_with_matrix,
    shuffle=False
)

for model_name in trained_models:
    latent, labels = get_latent_representations(trained_models[model_name], train_loader)
    np.save(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/{model_name}_output_{version}.npy', latent)
    np.save(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/{model_name}_labels_{version}.npy', labels)

In [None]:
test_loader = DataLoader(
    test, 
    batch_size=config["batch_size"], 
    num_workers=2, 
    collate_fn=collate_with_matrix_geodesic if geodesic else collate_with_matrix, 
    shuffle=False
)

In [None]:
for model_name in trained_models:
    latent, labels = get_latent_representations(trained_models[model_name], test_loader)
    np.save(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/{model_name}_output_{version}_test.npy', latent)
    np.save(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/{model_name}_labels_{version}_test.npy', labels)

# PCA

In [None]:
import numpy as np

In [None]:
dataset_name = 'GenesExtended'
train_data = np.load(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/prepared/train_data.npy').astype(np.float32)

try:
    test_data = np.load(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/prepared/test_data.npy').astype(np.float32)
except FileNotFoundError:
    ids = np.random.choice(np.arange(len(train_data)), size=int(0.2*len(train_data)), replace=False)
    test_data = train_data[ids]

try:
    train_labels = np.load(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/prepared/train_labels.npy')
except FileNotFoundError:
    train_labels = None

try:
    test_labels = np.load(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/prepared/test_labels.npy')
except FileNotFoundError:
    if train_labels is None:
        test_labels = None
    else:
        test_labels = train_labels[ids]

In [None]:
from sklearn.decomposition import PCA

In [None]:
pca = PCA(n_components=16)
pca_data = pca.fit_transform(train_data)
pca_data.shape

In [None]:
np.array(train_labels).shape

In [None]:
model_name = 'PCA'
np.save(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/{model_name}_output_d16.npy', pca_data)
np.save(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/{model_name}_labels_d16.npy', np.array(train_labels))

# UMAP

In [None]:
! pip install umap-learn

In [None]:
import umap

In [None]:
np.random.seed(0)
reducer = umap.UMAP(n_components=16)
umap_data = reducer.fit_transform(train_data)

umap_data.shape

In [None]:
model_name = 'UMAP'
np.save(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/{model_name}_output_d16.npy', umap_data)
np.save(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/{model_name}_labels_d16.npy', np.array(train_labels))

# t-SNE

In [None]:
from sklearn.manifold import TSNE

In [None]:
tsne = TSNE(n_components=3)
tsne_data = tsne.fit_transform(train_data)
tsne_data.shape

In [None]:
model_name = 'tSNE'
np.save(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/{model_name}_output_d16.npy', tsne_data)
np.save(f'/content/drive/MyDrive/Lab project/RTD_AE-main/data/{dataset_name}/{model_name}_labels_d16.npy', np.array(train_labels))