### Edward Kim 2023 ###

In [1]:
import numpy as np
import torch
import torch.nn as nn
from torch.nn import MSELoss
import pickle
import matplotlib.pyplot as plt
from sequitur.models import LINEAR_AE, LSTM_AE
from statistics import mean

In [7]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


In [2]:
with open('./data/test3_rs_segments.pickle', 'rb') as f:
    segments = pickle.load(f)

In [3]:
with open('./data/test3_rs_bps.pickle', 'rb') as f:
    labels = pickle.load(f)

In [4]:
samples = 1000
X_train = segments[:samples, :]
seg_tensor = torch.tensor(X_train, dtype=torch.float32)  # input for linear autoencoder

In [5]:
seg_tensor.shape

torch.Size([1000, 200])

In [7]:
seq_len = seg_tensor.shape[0]  
input_size = seg_tensor.shape[1]  
batch_size = 64  # Hyperparamter to adjust appropriately
seq_len //= batch_size  # adjust size to accodomate different batch sizes

#seg_tensor_reshaped = seg_tensor.reshape(seq_len, batch_size, input_size).to(device)  # input for LSTM autoencoder

encoding_dim = 45  # should range from 20 - 100 for dimension 200

In [8]:
# CreaIte linear and LSTM autoencoder models

# function for outputing list of hidden layer dimensions for the encoder and decoder
def calculate_h_dims(input_dim, encoding_dim, num_hidden_layers):
    units_per_layer = (input_dim - encoding_dim) // (num_hidden_layers + 1)
    h_dims = [input_dim - units_per_layer]  # first hidden layer
    for i in range(num_hidden_layers - 1):
        h_dims.append(h_dims[-1] - units_per_layer)
    return h_dims

rows, cols = segments.shape
input_dim = cols
num_hidden_layers = 2  # Hyperparameter for tuning

h_dims = calculate_h_dims(input_dim, encoding_dim, num_hidden_layers)
print(h_dims)
h_activ = nn.ReLU()  # activation function for the hidden layers
out_activ = nn.ReLU()  # activation function for the decoder's output layer

linear_model = LINEAR_AE(input_dim, encoding_dim, h_dims, h_activ, out_activ)
#lstm_model = LSTM_AE(input_dim, encoding_dim, h_dims, h_activ, out_activ)

[149, 98]


In [20]:
# Autoencoder training algorithm
# Mostly taken from https://github.com/shobrook/sequitur/blob/master/sequitur/quick_train.py

def get_device():
    return torch.device("cuda" if torch.cuda.is_available() else "cpu")


def instantiate_model(model, train_set, encoding_dim, **kwargs):
    return model(train_set[-1].shape[-1], encoding_dim, **kwargs)


def train_model(model, train_set, verbose, lr, epochs, denoise, clip_value, device=None):
    if device is None:
        device = get_device()
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = MSELoss(reduction="sum")
    mean_losses = []
    epoch_lst = []
    for epoch in range(1, epochs + 1):
        model.train()
        losses = []
        for x in train_set:
            x = x.to(device)
            optimizer.zero_grad()
            x_prime = model(x)  # Forward pass
            loss = criterion(x_prime, x)
            loss.backward()  # Backward pass
            if clip_value is not None:
                torch.nn.utils.clip_grad_norm_(model.parameters(), clip_value)
            optimizer.step()
            losses.append(loss.item())
        mean_loss = mean(losses)
        print(f"Epoch: {epoch}, Loss: {mean_loss}")
        epoch_lst.append(epoch)
        mean_losses.append(mean_loss)
    if len(epochs) == len(losses): 
        plt.plot(epoch_lst, mean_losses)
        plt.xlabel('Epochs')
        plt.ylabel('Losses')
        plt.title('Losses for Training over Epochs')
        plt.show()
    return mean_losses


def get_encodings(model, train_set, device=None):
    if device is None:
        device = get_device()
    model.eval()
    return [model.encoder(x.to(device)) for x in train_set]


def train_ae(model, train_set, encoding_dim, verbose=True, lr=1e-3, epochs=50, clip_value=1, 
                denoise=False, device=None, **kwargs,):
    model = instantiate_model(model, train_set, encoding_dim, **kwargs)
    losses = train_model(model, train_set, verbose, lr, epochs, denoise, clip_value, device)
    encodings = get_encodings(model, train_set, device)
    return model.encoder, model.decoder, encodings, losses

In [21]:
# Get linear encoder, decoder

encoder_lin, decoder_lin, encodings_lin, losses_lin = train_ae(LINEAR_AE, seg_tensor, encoding_dim, h_dims=h_dims,
                                                       h_activ=h_activ, out_activ=out_activ, epochs=50)

KeyboardInterrupt: 

In [None]:
# Get LSTM encoder, decoder


# implement own version of quick_sort


#train_segs = [s.reshape(200, 1) for s in X_train]
#train_segs_tensor = [torch.tensor(s).float() for s in train_segs]

#train_set = train_segs_tensor 
#encoder, decoder, _, _ = quick_train(LSTM_AE, train_set, encoding_dim=100, h_dims=[64], verbose=True)
#encoder_lstm, decoder_lstm, encodings_lstm, losses_lstm = quick_train(LSTM_AE, train_set, encoding_dim, 
                                                                    # h_dims=[64], verbose=True)
    
encoder_lstm, decoder_lstm, encodings_lstm, losses_lstm = quick_train(LSTM_AE, seg_tensor_reshaped, encoding_dim, verbose=True, 
                                                              h_dims=h_dims, h_activ=h_activ, out_activ=out_activ, epochs=1000)
                                                                 


