In [None]:
# importing dependencies
from lib.eeg_transformer import *
from lib.train import *

# PyLDS is a Python library for gaussian linear dynamical systems (GLDS) PyLDS also implements
# various methods to perform bayesian inference on GLDSs.DefaultLDS (see the cell below) implements a 
# general purpose linear dynamical system with gaussian noise
from pylds.models import DefaultLDS

In [None]:
import numpy as np
import matplotlib.pyplot as plt
# torch.nn is a module that implements varios useful functions and functors to implement flexible and highly
# customized neural networks. We will use nn to define neural network modules, different kinds of layers and
# diffrent loss functions
import torch.nn as nn
# torch.nn.functional implements a large variety of activation functions and functional forms of different
# neural network layers. Here we will use it for activation functions.
import torch.nn.functional as F
# torch is the Linear Algebra / Neural Networks library
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
# Seed the random number generators for reproducible results
npr.seed(0)
torch.manual_seed(1)
torch.cuda.manual_seed_all(2)

In [None]:
TIMESTEPS = 300 # number of steps in time
INSTS = 1000 # batch-size or the number of instances
DOBS = 10 # number of observable variables
DLAT = 2 # number of hidden variabkes (latent states)

def simple_lds(d_observed=DOBS,d_latent=DLAT,d_input=-1,timesteps=TIMESTEPS,insts=INSTS):
    ## d_observed : dimensionality of observed data
    ## d_latent : dimensionality of latent states
    ## d_input : dimensionality of input data. For d_input=-1 a model with no input is generated
    ## timesteps: number of simulated timesteps
    ## insts: number of instances
    ## instantiating an lds with a random rotational dynamics matrix
    
    if d_input == -1 :
        lds_model = DefaultLDS(d_observed,d_latent,0)
        input_data = None
    else:
        lds_model = DefaultLDS(d_observed,d_latent,d_input)
        input_data = npr.randn(insts,timesteps,d_input)
    
    # initializing the output matrices:
    training_set = np.zeros((insts, timesteps, d_observed))
    latent_states= np.zeros((insts, timesteps, d_latent))
    
    # running the model and generating data
    for i in range(insts):
        training_set[i,:,:], latent_states[i,:,:] = lds_model.generate(timesteps, inputs=input_data)
    return training_set, latent_states, lds_model

In [None]:
# Instantiating a Model and Generating Data
ts,ls,mdl = simple_lds()

# Get input_d, output_d, timesteps from the initial dataset
input_d, output_d = ts.shape[2], ls.shape[2]
timesteps = ts.shape[1]
print('input_d:',input_d,'output_d:',output_d,'timesteps:',timesteps)

In [None]:
class LDSDataset(Dataset):
    # use boolen value to indicate that the data is for training or testing
    def __init__(self,x,y,train,ratio):
        self.len = x.shape[0]
        self.ratio = ratio
        split = int(self.len*self.ratio)
        self.x_train = torch.from_numpy(x[:split])
        self.y_train = torch.from_numpy(y[:split])
        self.x_test = torch.from_numpy(x[split:])
        self.y_test = torch.from_numpy(y[split:])
        self.train = train

    def __len__(self):
        if self.train:
            return int(self.len*self.ratio)
        else:
            return int(self.len*(1-self.ratio))

    def __getitem__(self, index):
        if self.train:
            return self.x_train[index], self.y_train[index]
        else:
            return self.x_test[index], self.y_test[index]

In [None]:
# split training and testing set
split_ratio = 0.8
batch_size = 50
dataset_train = LDSDataset(ts,ls,True,split_ratio)
dataloader_train = DataLoader(dataset=dataset_train,batch_size=batch_size,shuffle=True)
dataset_test = LDSDataset(ts,ls,False,split_ratio)
dataloader_test = DataLoader(dataset=dataset_test,batch_size=batch_size,shuffle=True)

In [None]:
opt = {}
opt['Transformer-layers'] = 2
opt['Model-dimensions'] = 256
opt['feedford-size'] = 512
opt['headers'] = 8
opt['dropout'] = 0.1
opt['src_d'] = input_d # input dimension
opt['tgt_d'] = output_d # output dimension
opt['timesteps'] = timesteps

In [None]:
criterion = nn.MSELoss() # mean squared error
# setup model using hyperparameters defined above
model = make_model(opt['src_d'],opt['tgt_d'],opt['Transformer-layers'],opt['Model-dimensions'],opt['feedford-size'],opt['headers'],opt['dropout'])
# setup optimization function
model_opt = NoamOpt(model_size=opt['Model-dimensions'], factor=1, warmup=400,
        optimizer = torch.optim.Adam(model.parameters(), lr=0.015, betas=(0.9, 0.98), eps=1e-9))
total_epoch = 2000
train_losses = np.zeros(total_epoch)
test_losses = np.zeros(total_epoch)

for epoch in range(total_epoch):
    model.train()
    train_loss = run_epoch(data_gen(dataloader_train), model, 
              SimpleLossCompute(model.generator, criterion, model_opt))
    train_losses[epoch]=train_loss

    if (epoch+1)%10 == 0:
        torch.save({
                    'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': model_opt.optimizer.state_dict(),
                    'loss': train_loss,
                    }, 'model_checkpoint/'+str(epoch)+'.pth')            
        torch.save(model, 'model_save/model%d.pth'%(epoch)) # save the model

    model.eval() # test the model
    test_loss = run_epoch(data_gen(dataloader_test), model, 
            SimpleLossCompute(model.generator, criterion, None))
    test_losses[epoch] = test_loss
    print('Epoch[{}/{}], train_loss: {:.6f},test_loss: {:.6f}'.format(epoch+1, total_epoch, train_loss, test_loss))

In [None]:
# choose a pair of data from test dataset
# transfer from tensor to numpy array
test_x, test_y = dataset_test.x_test[1].numpy(),dataset_test.y_test[1].numpy()
# make a prediction then compare it with its true output
test_out, true_out = output_prediction(model,test_x, test_y, max_len=opt['timesteps'], start_symbol=1,output_d=opt['tgt_d'])