<br>

## Imports

In [67]:
import time
import pickle
import numpy as np

In [68]:
import matplotlib.pyplot as plt
from matplotlib.ticker import PercentFormatter

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torch.optim import Adam

In [3]:
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

<br>

Mount drive

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


<br>

## Define constants

In [5]:
TEST_SIZE = .2
EVAL_SIZE = .1
RANDOM_SEED = 30

<br>

## Define classes

In [6]:
class seqembData(Dataset):
    def __init__(self, inputs, labels):
        for input in inputs:
            assert input.shape[0] == len(labels)
        
        self.n = len(labels)
        self.labels = torch.as_tensor(labels)
        self.inputs = [torch.as_tensor(input).float() for input in inputs]

    def __len__(self):
        return self.n
    
    def __getitem__(self, idx):
        return [input[idx] for input in self.inputs], self.labels[idx]

In [120]:
class ecnumClassifier(nn.Module):
    def __init__(self, input_sizes, num_classes, hidden_size = 256, linear_sizes = [64, 64], drop_prob = .2):
        super().__init__()

        # Input processing layers
        proc_layers = []
        for size in input_sizes:
            proc_layer = nn.Sequential(
                nn.Dropout(p = drop_prob),
                nn.Linear(size, hidden_size),
                nn.ReLU(),
                nn.BatchNorm1d(hidden_size)
            )
            proc_layers.append(proc_layer)
        self.proc_layers = nn.ModuleList(proc_layers)

        # Linear layers
        input_size = hidden_size * len(input_sizes)
        linear_layers = []
        for output_size in linear_sizes:
            linear_layers.append(nn.Dropout(p = drop_prob))
            linear_layers.append(nn.Linear(input_size, output_size))
            linear_layers.append(nn.ReLU())
            linear_layers.append(nn.BatchNorm1d(output_size))
            input_size = output_size
        self.linear_layers = nn.Sequential(*linear_layers)

        # Output layer        
        self.output_layer = nn.Linear(input_size, num_classes)
    
    def forward(self, x_list):
        # Process inputs by independent processign layers
        y_list = []
        for i, proc_layer in enumerate(self.proc_layers):
            x = self.dropout(x_list[i])
            y_list.append(proc_layer(x))

        # Concat together            
        y = torch.cat(y_list, dim = 1)

        # Pass through linear layers
        z = self.linear_layers(y)

        # Get output
        o = self.output_layer(z)

        return o

<br>


Aux. functions

In [8]:
def get_preds(logits):
    proba = torch.log_softmax(logits, dim = 1)
    _, preds = torch.max(proba, dim = 1)
    return proba, preds

def calc_acc(preds, targets):
    return torch.sum(preds == targets) / len(targets)

In [34]:
class ecnumTrainer():
    def __init__(self, model, train_data, eval_data = None, batch_size = 32, learning_rate = 1e-4, decay = 1e-5, device = 'cpu'):

        # Set device
        self.device = device

        # Set model
        self.model = model
        
        # Set train data loader
        self.train_loader = DataLoader(
            train_data,
            batch_size = batch_size,
            shuffle = True
        )

        # Set optimizer
        self.optimizer = Adam(
            self.model.parameters(),
            lr = learning_rate,
            weight_decay = decay
        )

        # Set eval data loader
        if eval_data is not None:
            self.eval_loader = DataLoader(
                eval_data, 
                batch_size=len(eval_data),
                shuffle = False
            )
        else:
            self.eval_loader = None

        # Set loss function
        self.loss_func = nn.CrossEntropyLoss() 

    def validate(self):
        self.model.eval()
        with torch.no_grad():
            inputs, targets = next(iter(self.eval_loader))
            inputs = [input.to(self.device) for input in inputs]
            logits = self.model(inputs)

            # Calc error
            error = self.loss_func(logits, targets)
            error = error.item()

            # Get predictions
            proba, preds = get_preds(logits)

            # Assess accuracy
            acc = calc_acc(preds, targets)
            acc = acc.item()
 
        return error, acc

    def train(self, num_epochs = 20, verbosity = 10):

        # Train track container
        train_track = {'error' : [], 'accuracy' : []}
        if self.eval_loader is not None:
            eval_track = {'error' : [], 'accuracy' : []}
        else:
            eval_track = None

        # Set device
        self.model.to(self.device)
        
        # Training
        n = len(self.train_loader)
        start = time.time()
        for epoch in range(num_epochs):
            self.model.train()
            train_error = 0.    
            train_acc = 0.        
            for i, (inputs, targets) in enumerate(self.train_loader):
                inputs = [input.to(self.device) for input in inputs]
                logits = self.model(inputs)

                # Calc error and do training step
                error = self.loss_func(logits, targets)
                self.optimizer.zero_grad()
                error.backward()
                self.optimizer.step()
                error = error.item()

                # Get predictions
                _, preds = get_preds(logits)

                # Calculate accuracy
                acc = calc_acc(preds, targets)
                acc = acc.item()

                # Report
                if verbosity > 0:
                    if (i + 1) % verbosity == 0:
                        print('Epoch: {}/{}\t Batch: {}/{}\t Error: {:1.3f}\t Accuracy: {:1.2f}'.format(epoch + 1, num_epochs, i + 1, n, error, acc))

                # Increment training error and accuracy              
                train_error += error                
                train_acc += acc

            # Average training error
            train_error /= n      
            train_track['error'].append(train_error)

            # Average training accuracy
            train_acc /= n
            train_track['accuracy'].append(train_acc)

            # Validation
            if self.eval_loader is not None:
                eval_error, eval_acc = self.validate()
                eval_track['error'].append(eval_error)
                eval_track['accuracy'].append(eval_acc)

                print('\n')
                print('\t Epoch: {}'.format(epoch + 1))
                print('\t Validation loss: {:1.2f}'.format(eval_error))
                print('\t Validation accuracy: {:1.2f}'.format(eval_acc))
                print('\t Elapsed time: {:1.1f}'.format(time.time()-start))
                print('\n')

        return train_track, eval_track


In [91]:
def plot_tracks(errors, accuracies, eval_errors = None, eval_accuracies = None, title = None, file_name = None):
    # Prepare x-values
    x = np.arange(len(errors)) + 1.
    
    # Prepare plot
    fig, ax1 = plt.subplots()
    ax1.set_xlabel('Epochs [#]')
    ax1.minorticks_on()
    ax1.grid()
    
    # Plot errors
    color = 'maroon'
    ax1.plot(x, errors, color=color)    
    ax1.set_ylabel('Cross-Entropy Loss', color=color)
    ax1.tick_params(axis='y', labelcolor=color)
    # Plot evaluation errors
    if eval_errors:
        ax1.plot(x, eval_errors, color=color, linestyle='dashed')    

    # Plot performance
    color = 'steelblue'
    ax2 = ax1.twinx()
    ax2.plot(x, accuracies, color=color)
    ax2.set_ylabel('Accuracy [%]', color=color)
    ax2.tick_params(axis='y', labelcolor=color)
    ax2.yaxis.set_major_formatter(PercentFormatter())
    # Plot evaluation accuracies
    if eval_accuracies :
        ax2.plot(x, eval_accuracies, color=color, linestyle='dashed')


    # Add title
    if title is not None:
        plt.suptitle(title)

    # Plot
    fig.tight_layout() 
    if file_name is None:
        plt.show()
    else:
        plt.savefig(file_name, dpi = 100)


<br>

## Load data and create datasets (train, eval, test)

In [10]:
path = '/content/drive/Othercomputers/My MacBook Pro/MIE1517_nlp_proteins/embeddings/esm_embeddings.p'
with open(path, 'rb') as f:
    esm_embedding_output = pickle.load(f)

In [11]:
path = '/content/drive/Othercomputers/My MacBook Pro/MIE1517_nlp_proteins/embeddings/protalbert_embeddings.p'
with open(path, 'rb') as f:
    protalbert_embedding_output = pickle.load(f)

<br>

Assert data integrity

In [12]:
assert esm_embedding_output['seq_labels'] == protalbert_embedding_output['seq_labels']

<br>

Select labels and inputs

In [13]:
labels = esm_embedding_output['seq_labels']
inputs = [
    esm_embedding_output['seq_embeddings'],       # Embeddings from esm
    protalbert_embedding_output['seq_embeddings']  # Embeddings from protAlbert
]

<br>

Encode labels

In [14]:
ec_encoder = LabelEncoder()
labels = ec_encoder.fit_transform(np.array(labels))

<br>

Train test split

In [15]:
train_idx, test_idx = train_test_split(
    range(len(labels)), 
    test_size=TEST_SIZE, 
    random_state=RANDOM_SEED
)

train_idx, eval_idx = train_test_split(
    train_idx, 
    test_size=EVAL_SIZE,
    random_state = RANDOM_SEED
)

<br>

Create datasets

In [16]:
# Train data
train_labels = labels[train_idx]
train_inputs = [input[train_idx] for input in inputs]
train_dataset = seqembData(train_inputs, train_labels)

# Eval data
eval_labels = labels[eval_idx]
eval_inputs = [input[eval_idx] for input in inputs]
eval_dataset = seqembData(eval_inputs, eval_labels)

# Test data
test_labels = labels[test_idx]
test_inputs = [input[test_idx] for input in inputs]
test_dataset = seqembData(test_inputs, test_labels)

<br>

## Train the model

<br>

Init model

In [121]:
input_sizes = [input.shape[1] for input in train_inputs]
num_classes = len(ec_encoder.classes_)
model = ecnumClassifier(
    input_sizes = input_sizes, 
    num_classes = num_classes,
    hidden_size = 256,
    linear_sizes = [64, 64],
    drop_prob = .4
)

In [122]:
model = model.float()

<br>

Select device

In [123]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

<br>

Init trainer

In [124]:
trainer = ecnumTrainer(
    model,
    train_dataset,
    eval_dataset,
    batch_size = 32,
    learning_rate = 1e-4,
    device = device
)

In [None]:
train_track, eval_track = trainer.train(num_epochs = 25, verbosity = 20)

In [None]:
plot_tracks(
    train_track['error'],
    train_track['accuracy'],
    eval_errors = eval_track['error'],
    eval_accuracies = eval_track['accuracy'],
    file_name = '/content/drive/Othercomputers/My MacBook Pro/MIE1517_nlp_proteins/results/tracks.png'
)