In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from opart_functions import SquaredHingeLoss
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import KFold
import matplotlib.pyplot as plt
from opart_functions import get_acc_rate, get_err_df, gen_data_dict, tune_lldas

In [None]:
# training data
features_fold1_path = 'training_data/genome/seq_features.csv'
features_fold2_path = 'training_data/genome/seq_features.csv'  
target_fold1_path = 'training_data/genome/target_fold1.csv'
target_fold2_path = 'training_data/genome/target_fold2.csv'

# sequences and labels
seqs_path   = 'raw_data/genome/signals.csv'
labels_path = 'raw_data/genome/labels.csv'

# err for each log_lambda
err_fold1_path = 'training_data/genome/errors_fold1.csv'
err_fold2_path = 'training_data/genome/errors_fold2.csv'

# writing accuracy rate path
acc_rate_path = 'acc_rate/genome.csv'

# path to write df to csv
output_df_path = 'record_dataframe/genome/'

In [None]:
# generate sequence and label dictionary
seqs_dict   = gen_data_dict(seqs_path)
labels_dict = gen_data_dict(labels_path)

# getting dataframe of error count for each log_lambda
err_fold1_df = pd.read_csv(err_fold1_path)
err_fold2_df = pd.read_csv(err_fold2_path)

# features_df
features_df_fold1 = pd.read_csv(features_fold1_path)
features_df_fold2 = pd.read_csv(features_fold2_path)

# targets_df
target_df_fold1 = pd.read_csv(target_fold1_path)
target_df_fold2 = pd.read_csv(target_fold2_path)

In [None]:
# feature
chosen_feature = ['std_deviation', 'length', 'sum_diff', 'range_value', 'abs_skewness']
X = features_df_fold1.iloc[:, 1:][chosen_feature].to_numpy()
X0 = X[:, 0]
X0 = np.log(X0).reshape(-1, 1)
X1 = X[:, 1]
X1 = np.log(np.log(X1)).reshape(-1, 1)
X2 = X[:, 2]
X2 = np.log(np.log(X2)).reshape(-1, 1)
X3 = X[:, 3]
X3 = np.log(X3).reshape(-1, 1)
X4 = X[:, 4]
X4 = np.log(X4).reshape(-1, 1)

X = np.concatenate([X0, X1, X2, X3, X4], axis=1)
mean = np.mean(X, axis=0)
std_dev = np.std(X, axis=0)
X = (X-mean)/std_dev
X = torch.Tensor(X)

In [None]:
targets_low_1  = torch.Tensor(target_df_fold1.iloc[:, 1:2].to_numpy())
targets_high_1 = torch.Tensor(target_df_fold1.iloc[:, 2:3].to_numpy())
targets_low_2  = torch.Tensor(target_df_fold2.iloc[:, 1:2].to_numpy())
targets_high_2 = torch.Tensor(target_df_fold2.iloc[:, 2:3].to_numpy())

target_fold1 = torch.cat((targets_low_1, targets_high_1), dim=1)
target_fold2 = torch.cat((targets_low_2, targets_high_2), dim=1)

In [None]:
class MLPModel(nn.Module):
    torch.manual_seed(123)
    def __init__(self, input_size, hidden_layers, hidden_size):
        super(MLPModel, self).__init__()
        self.input_size    = input_size
        self.hidden_layers = hidden_layers
        self.hidden_size   = hidden_size

        if(self.hidden_layers == 0):
            self.linear_model = nn.Linear(input_size, 1)                                                        # Define linear model
        else:
            self.input_layer = nn.Linear(input_size, hidden_size)                                               # Define input layer
            self.hidden = nn.ModuleList([nn.Linear(hidden_size, hidden_size) for _ in range(hidden_layers-1)])  # Define hidden layers
            self.output_layer = nn.Linear(hidden_size, 1)                                                       # Define output layer
        
    def forward(self, x):
        if(self.hidden_layers == 0):
            return self.linear_model(x)
        else:
            x = torch.sigmoid(self.input_layer(x))
            for layer in self.hidden:
                x = torch.sigmoid(layer(x))
            x = self.output_layer(x)
            return x

In [None]:
# function to plot train loss and val loss
def plot_loss(train_loss, val_loss, best_ite, train_set_name, val_set_name):
    epochs = range(1, len(train_loss) + 1)
    plt.plot(epochs, train_loss, 'b', label='Training loss')
    plt.plot(epochs, val_loss,   'r', label='Validation loss')
    
    # Mark the minimum validation loss point
    if(best_ite != None):
        plt.plot(best_ite, val_loss[best_ite], 'g*', markersize=10, label=f'Min Val epoch: {best_ite: 3d}')

    plt.title('Train ' + train_set_name + " Validate " + val_set_name)
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

In [None]:
def cv_learn(n_splits, X, y, n_hiddens, layer_size, batch_size, n_ite, show_plot):
    torch.manual_seed(123)
    
    # Define the number of folds for cross-validation
    kf = KFold(n_splits, shuffle=True, random_state=123)

    # loss function
    loss_func = SquaredHingeLoss(margin=1)

    # learn best ite
    total_train_losses = np.zeros(n_ite)
    total_val_losses   = np.zeros(n_ite)
    for train_index, val_index in kf.split(X):

        # Split the data into training and validation sets
        X_train, X_val = X[train_index], X[val_index]
        y_train, y_val = y[train_index], y[val_index]

        # Create DataLoader
        dataset    = TensorDataset(X_train, y_train)
        dataloader = DataLoader(dataset, batch_size, shuffle=True)

        # Define your model
        model = MLPModel(X.shape[1], n_hiddens, layer_size)

        # define optimizer
        optimizer = optim.Adam(model.parameters())

        # Training loop for the specified number of iterations
        train_losses = []
        val_losses   = []
        for i in range(n_ite):
            # training
            train_loss = 0
            for inputs, labels in dataloader:
                optimizer.zero_grad()
                loss = loss_func(model(inputs), labels)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

            # validating
            model.eval()
            with torch.no_grad():
                val_loss = loss_func(model(X_val), y_val)

            # add train_loss and val_loss into arrays
            train_losses.append(train_loss/len(dataloader))
            val_losses.append(val_loss.item())

        total_train_losses += train_losses
        total_val_losses += val_losses

    best_no_ite = np.argmin(total_val_losses)
    if(show_plot == True):
        plot_loss(total_train_losses/n_splits, total_val_losses/n_splits, best_no_ite, 'subtrain', 'val')
    return best_no_ite + 1

In [None]:
# learn lldas
def mlp(features, targets, hidden_layers, hidden_size, batch_size, n_ites, test_fold, err_df):
    torch.manual_seed(123)
    # prepare training dataset
    dataset    = TensorDataset(features, targets)
    dataloader = DataLoader(dataset, batch_size, shuffle=True)

    # Instantiate model, loss function and opimizer
    model = MLPModel(features.shape[1], hidden_layers, hidden_size)
    criterion = SquaredHingeLoss()
    optimizer = optim.Adam(model.parameters())

    # Training loop
    for i in range(n_ites):
        for inputs, labels in dataloader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
    
        with torch.no_grad():
            lldas = model(features).numpy().reshape(-1)

            lldas = tune_lldas(lldas)
            df = get_err_df(lldas, test_fold, seqs_dict, labels_dict, err_df)
            rate = get_acc_rate(df)

        print(i, rate)

In [None]:
# learn n_o_ite for training fold 2
cv_learn(2, X, target_fold1, 0, 0, 1, 50, True)

In [None]:
# learn n_o_ite for training fold 2
cv_learn(2, X, target_fold2, 1, 8, 1, 50, True)

In [None]:
# learn n_o_ite for training fold 2
cv_learn(2, X, target_fold2, 0, 0, 1, 50, True)

In [None]:
# learn n_o_ite for training fold 1
cv_learn(2, X, target_fold2, 2, 16, 1, 50, True)

In [None]:
# train fold2, test fold1 batch 1
mlp(X, target_fold2, 2, 16, 1, 30, 1, err_fold1_df)

In [None]:
# train fold2, test fold2 batch 1
mlp(X, target_fold2, 2, 16, 1, 10, 2, err_fold2_df)