# Generating all the files used to train the model and the model itself

## Imports

In [13]:
import numpy as np
import torch
from torch import nn
import pandas as pd
from sklearn.model_selection import GroupShuffleSplit
import json
from ax.service.managed_loop import optimize


## Model definition

In [15]:
class MLPRegressor(nn.Module):
    def __init__(self, input_size, output_size, hidden_size=(100, ), activation=nn.ReLU()):
        torch.manual_seed(0)  # control random effects

        super(MLPRegressor, self).__init__()

        layers = []
        prev_layer_size = input_size
        for layer_size in hidden_size:
            layers.append(nn.Linear(prev_layer_size, layer_size))
            layers.append(activation)
            prev_layer_size = layer_size

        layers.append(nn.Linear(prev_layer_size, output_size))
        self.fc_layers = nn.Sequential(*layers)

    def forward(self, x):
        x = self.fc_layers(x)
        return x
    
import tqdm

def train_model(model, data, optimizer, n_epochs):
    torch.manual_seed(0)

    criterion = nn.MSELoss()
    x_train, x_val, y_train, y_val = data

    # do the training
    pbar = tqdm.tqdm(np.arange(n_epochs))
    for epoch in pbar:
        # Reset gradients
        optimizer.zero_grad()
        # Forward pass
        yp = model(x_train)
        # Compute Loss
        loss = criterion(yp, y_train)
        # Backward pass
        loss.backward()
        optimizer.step()
        # Update progress bar
        pbar.set_postfix_str(f'loss: {loss.item():.3e}')

    return model

# create a data structure to convert from str input to nn function
activation_dict = {'tanh': nn.Tanh(),
                   'relu': nn.ReLU(),
                   'leaky_relu': nn.LeakyReLU(),
                   'sigmoid': nn.Sigmoid(),
                   'elu': nn.ELU()}

def generate_model_and_optimizer(params):
    # create a list of layer sizes from the start, end, and depth
    raw_dims = np.linspace(params["layer_i"], params["layer_f"], params["num_layers"])
    hidden_size = tuple(raw_dims.round().astype(int))
    # choose the activation function
    activation = activation_dict[params['activation']]

    # initialize the model
    model = MLPRegressor(3, 2, hidden_size, activation)

    # initialize the optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=params['lr'])

    # return both objects to calling function
    return model, optimizer

def mlp_fitness(params):
    # the evaluation function cannot accept arguments -- we hard-code them here

    data = (x_train, x_val, y_train, y_val)
    try:
        # train on training set
        model, optimizer = generate_model_and_optimizer(params)
        train_model(model, data, optimizer, params['n_epochs'])
        # evaluate on held-out validation set
        y_pred = model(x_val).detach()
        rmse = torch.sqrt( torch.mean( (y_pred - y_val)**2 ) ).item()
    except:
        rmse = 1e12
    rmse = min(rmse, 1e2)  # need to choose a threshold RMSE to handle errors
    return np.log(rmse)    # log will be better behaved for very small numbers


In [17]:
# Getting data for the model 

# Import processed embedding file, nn_input, and nn_output
df = pd.read_csv('no_avg_dataset.csv')
nia = np.load("nn_input.npy")
noa = np.load("nn_output.npy")
seq_array = np.loadtxt("seq_arr.npy", dtype = str).reshape(-1,1)

# Get training, testing, and validation datasets, split by sequence
test_val_splitter = GroupShuffleSplit(n_splits = 1, test_size = 0.6, random_state = 0)

test_val_indices, train_indices = next(test_val_splitter.split(nia, noa, groups = seq_array))

x_test_val, x_train = nia[test_val_indices], nia[train_indices]
y_test_val, y_train = noa[test_val_indices], noa[train_indices]
groups_test_val = seq_array[test_val_indices]

val_test_splitter = GroupShuffleSplit(n_splits=1, test_size=0.5, random_state=0)
val_indices, test_indices = next(val_test_splitter.split(x_test_val, y_test_val, groups=groups_test_val))

x_val, x_test = x_test_val[val_indices], x_test_val[test_indices]
y_val, y_test = y_test_val[val_indices], y_test_val[test_indices]

test_seqs = seq_array[test_indices]
val_seqs = seq_array[val_indices]

#Turn them into tensors
x_train = torch.Tensor(x_train)
x_val = torch.Tensor(x_val)
y_train = torch.Tensor(y_train)
y_val = torch.Tensor(y_val)
x_test = torch.Tensor(x_test)
y_test = torch.Tensor(y_test)

# Model optimization 

In [None]:
best_parameters, values, experiment, ax_model = optimize(
    parameters=[
        {"name": "lr", "type": "range", "bounds": [1e-4, 1e0], "log_scale": True},
        {"name": "n_epochs", "type": "range", "bounds": [100, 2000]},
        {"name": "activation", "type": "choice", "values": ["sigmoid", "tanh", "relu", "leaky_relu", "elu"], "is_ordered": False},
        {"name": "num_layers", "type": "range", "bounds": [1, 6]},
        {"name": "layer_i", "type": "range", "bounds": [1,128]},
        {"name": "layer_f", "type": "range", "bounds": [1,64]},
    ],
    evaluation_function=mlp_fitness,
    objective_name='rmse',
    minimize=True,
    total_trials=25,
    random_seed=0,
)

In [None]:
# Saving best parameters
with open('best_parameters.json', 'w') as f:
    json.dump(best_parameters, f)

# Model training

In [18]:
with open('best_parameters.json', 'r') as f:
    best_parameters = json.load(f)

model, optimizer = generate_model_and_optimizer(best_parameters)
x_train = torch.Tensor(x_train)
x_val = torch.Tensor(x_val)
y_train = torch.Tensor(y_train)
y_val = torch.Tensor(y_val)
x_test = torch.Tensor(x_test)
y_test = torch.Tensor(y_test)

data = (x_train, x_val, y_train, y_val)
train_model(model, data, optimizer, best_parameters['n_epochs'])

100%|██████████| 1185/1185 [00:06<00:00, 176.92it/s, loss: 8.613e-01]


MLPRegressor(
  (fc_layers): Sequential(
    (0): Linear(in_features=3, out_features=79, bias=True)
    (1): ReLU()
    (2): Linear(in_features=79, out_features=68, bias=True)
    (3): ReLU()
    (4): Linear(in_features=68, out_features=56, bias=True)
    (5): ReLU()
    (6): Linear(in_features=56, out_features=45, bias=True)
    (7): ReLU()
    (8): Linear(in_features=45, out_features=2, bias=True)
  )
)

In [None]:
# Save model 
torch.save(model.state_dict(), 'embedding_model.pth')