In [23]:
import MDAnalysis as mda
from MDAnalysis.analysis import align
from MDAnalysis.analysis.distances import distance_array

import glob
import re
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pickle as pkl
import random
import pandas as pd

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer

import torch
from torch import nn, optim

## Orthogonality loss

In [24]:
class OrthoLoss(nn.Module):
    
    def __init__(self, type="mse", reduction="mean"):
        """
        A loss function for making a latent space orthonormal. In this case orthonormality
        means that given an input matrix X of shape(n, d) of n data points
        of d-dimensions, (X.T @ X) / n will be a d-dimensional identity matrix. The divisor
        of n makes sure that if batch size is changed, the orthonormality condition stays the same.
        
        The loss defined by "type" is calculated over the (d,d) shaped error matrix (X.T @ X) / n - eye(d).
        
        The batch size should be sufficiently large and decorrelated for this loss to work.
        
        Parameter
        ---------
        type: one of ("mse", "l1", "l2"), default="mse"
            How the loss is calculated from the error matrix.
            mse: mean squared error (squared l2 error)
            l1:  mean absolute error
            l2:  root mean squared error
        reduction: one of ("sum", "mean"), default="mean"
            How different batch size affects the result. With "mean" the error magnitude should stay
            the same independent of the batch size. With "sum" the error matrix above is multiplied
            by batch size, so the error grows linearily with batch size.
            NOTE! due to  the error being calculated for the whole batch at once, the sum-reduction
            has a different meaning to most other loss functions. With the mse type the output scales
            as squared to the batch size instead of linearily, since the reduction is applied before
            squaring.
        """
        super().__init__()
        
        types = {"mse": self._mse,
                 "l1":  self._l1,
                 "l2":  self._l2}
        errors = {"sum":  self._sum_error,
                  "mean": self._mean_error}

        if type not in types:
            raise ValueError(f"Unrecognised loss type \"{type}\"")
        if reduction not in errors:
            raise ValueError(f"Unrecognised loss reduction \"{reduction}\"")
        self.type=type
        self.reduction=reduction
        self._calc_loss = types[type]
        self._calc_err  = errors[reduction]
    
    def forward(self, X):
        n = X.shape[0]
        d = X.shape[1]
        nc_var = X.T @ X
        target = torch.eye(d, device=X.device)
        error = self._calc_err(nc_var, target, n)
        return self._calc_loss(error, d)
    
    def _mse(self, error, d):
        return (error*error).mean()
    
    def _l2(self, error, d):
        return self._mse(error,d).sqrt()
    
    def _l1(self, error, d):
        return torch.abs(error).mean()
    
    def _mean_error(self, dp, tgt, n):
        return dp/n - tgt
        
    def _sum_error(self, dp, tgt, n):
        return dp - tgt*n

## Autoencoder

In [25]:

class Autoencoder(BaseEstimator, TransformerMixin, nn.Module):

    def __init__(self, in_shape=10, enc_shape=2, middle_shape=5, n_hidden=1, loss_fn=nn.L1Loss(), lr=1e-3, ortholoss=False, ortholoss_weight=.01, l2_reg=False):
        
        super().__init__()
        self.loss_fn = loss_fn
        self.lr = lr 
        self.n_hidden = n_hidden # number of hidden layers
        self.in_shape = in_shape # input dimension
        self.enc_shape = enc_shape # dimension of encoding
        self.middle_shape = middle_shape # hidden layer dimensions
        self.ortholoss = ortholoss
        self.ortholoss_weight = ortholoss_weight
        
        if self.ortholoss:
            self.ort = OrthoLoss()
        
        self.l2_reg = l2_reg
        encoder_layers = [nn.Linear(self.in_shape, self.middle_shape), nn.ReLU(), nn.Dropout(0.2)] # initialize encoder layer list
        decoder_layers = [nn.Linear(self.enc_shape, self.middle_shape), nn.ReLU(), nn.Dropout(0.2)] # initialize decoder layer list

        for i in range(n_hidden - 1): # Add layers to encoder and decoder according to n_hidden and middle shape
            encoder_layers.append(nn.Linear(self.middle_shape, self.middle_shape))
            encoder_layers.append(nn.ReLU())
            encoder_layers.append(nn.Dropout(0.2))
            decoder_layers.append(nn.Linear(self.middle_shape, self.middle_shape))
            decoder_layers.append(nn.ReLU())
            decoder_layers.append(nn.Dropout(0.2))
            
        encoder_layers.append(nn.Linear(self.middle_shape, self.enc_shape)) # Final encoder layer
        decoder_layers.append(nn.Linear(self.middle_shape, self.in_shape)) # Final decoder layer

        self.encode = nn.Sequential(*encoder_layers) # Make encoder
        self.decode = nn.Sequential(*decoder_layers) # Make decoder
        

    def fit(self, X, y=None, n_epochs=20, batch_size=32, verbose=False):
        self.training = True # Enables e.g. dropouts to work
        X = torch.Tensor(X)
        indices = [i for i in range(X.shape[0])]
        self.optimizer = torch.optim.Adam(self.parameters(), lr=self.lr) # Adam only atm
        
        for epoch in range(n_epochs):
        
            random.shuffle(indices) # random shuffle to get random batches for each epoch
            batches = [i for i in range(0, len(indices), batch_size)]

            for i in range(len(batches) - 1):

                batch_X = X[indices[batches[i]:batches[i+1]]]
                self.optimizer.zero_grad() # reset optimizer
                
                encoded = self.encode(batch_X)
                decoded = self.decode(encoded)
                loss = self.loss_fn(decoded, batch_X)
                if self.ortholoss:
                    loss += self.ortholoss_weight*self.ort(encoded)
                if self.l2_reg:
                    l2 = torch.tensor(0.)
                    for param in self.parameters():
                        l2 += torch.norm(param, p=2)
                    loss += .0001 * l2**2
                    
                loss.backward() # Backpropagate
                self.optimizer.step() # Apply changes
            
            if verbose:
                print(f'epoch {epoch} \t Loss: {loss.item():.4g}')
        
        return self

    def transform(self, X, y=None):
        encoded = self.encode(torch.Tensor(X))
        return encoded.cpu().detach().numpy()
    
    def inverse_transform(self, X, y=None):
        decoded = self.decode(torch.Tensor(X))
        return decoded.cpu().detach().numpy()
    
    def score(self, X, y=None):
        encoded = self.transform(X)
        decoded = self.inverse_transform(encoded)
        
        return -self.loss_fn(torch.Tensor(X), torch.Tensor(decoded)) # Take negative to make GridSearchCV work properly
        
    


## Autoencoder testing

In [26]:
#device = ('cuda' if torch.cuda.is_available() else 'cpu')
device = "cpu"  # GridSearchCV having issues with cuda
print(f"Using {device} device.")

Using cpu device.


In [27]:
def train_test_split(data, test_size):
    
    indices = [i for i in range(data.shape[0])]
    scaler = StandardScaler().fit(data)
    random.shuffle(indices)
    
    test_indices = indices[:int(test_size*len(indices))]
    train_indices = indices[int(test_size*len(indices)):]
    
    test_X = data[test_indices,:]
    train_X = data[train_indices,:]
    test_X_scaled = scaler.transform(test_X)
    train_X_scaled = scaler.transform(train_X)
    
    return train_X, train_X_scaled, test_X, test_X_scaled, scaler

In [28]:
def original_space_l1(truth, decoding, scaler):
    
    decoding_orig_space = torch.Tensor(scaler.inverse_transform(decoding))
    loss = nn.L1Loss()
    return loss(torch.Tensor(truth), decoding_orig_space).item()



# Example of use

X = np.load("./data/a2ar_common_ca_coordinates.npy")
AE = Autoencoder(in_shape=X.shape[1], enc_shape=2, middle_shape=1024, n_hidden=1, ortholoss=True, l2_reg=True)
AE.fit(X, n_epochs=10, verbose=True)


## Pipeline

In [29]:
# Grid of parameters for gridsearch

param_grid = {
    "Autoencoder":{
        "Autoencoder__middle_shape": [512],
        "Autoencoder__enc_shape": [2],
        "Autoencoder__n_hidden": [1]
    },
    "GMM": {
        "n_components": [4]
    }
}


In [30]:
# Function to search parameters and return best model

def best_pipeline(transformer, param_grid, X, cv=2):
    
    step_names = ["Scaler", "Autoencoder"]
    params = {key: val for k, d in param_grid.items() for key, val in d.items() if k in step_names}
    pipe = Pipeline(
        steps=[
            (step_names[0], StandardScaler()),
            (step_names[1], transformer),
        ]   
    )
    
    gridsearch = GridSearchCV(pipe, param_grid=params, verbose=3, cv=cv)
    gridsearch.fit(X)
    
    return gridsearch.best_estimator_
    

## Input perturbation analysis

In [31]:
from sklearn.utils import shuffle

In [32]:

def IPA(X, scaler, model):
        
    
    index = np.arange(0, X.shape[1], 3)
    effects = []
    
    for i in index:
        shuffled = X.copy()
        scaler.transform(shuffled)
        shuffled = shuffle(shuffled)
        rands = np.random.uniform(low=-2, high=2, size=(shuffled.shape[0],1))
        rands = np.concatenate([rands, rands, rands], axis=1)
        shuffled[:, i:i+3] = rands
        encoded = model.transform(shuffled)
        decoded = model.inverse_transform(encoded)
        decoded = scaler.inverse_transform(decoded)
        L1 = np.mean(np.abs(X - decoded))
        effects += [L1, L1, L1]
    
    return effects

