In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import KFold
import numpy as np
import pandas as pd
from ipynb.fs.full.utility_functions import gen_data_dict, get_data, error_count, opart, SquaredHingeLoss, show_error_rate

np.set_printoptions(precision=3)
np.random.seed(123)
torch.manual_seed(123)

<torch._C.Generator at 0x117b50afcf0>

In [2]:
# data
data = pd.read_csv('1_genome/1_training_data/seq_features.csv')['count'].to_numpy()
data = np.log(np.log(data)).reshape(-1,1)
X = torch.FloatTensor(data)

In [3]:
# target
target_df_1 = pd.read_csv('1_genome/1_training_data/target_lambda_fold1_base_e.csv')
target_df_2 = pd.read_csv('1_genome/1_training_data/target_lambda_fold2_base_e.csv')

targets_low_1  = torch.FloatTensor(target_df_1.iloc[:, 1:2].to_numpy())
targets_high_1 = torch.FloatTensor(target_df_1.iloc[:, 2:3].to_numpy())
targets_low_2  = torch.FloatTensor(target_df_2.iloc[:, 1:2].to_numpy())
targets_high_2 = torch.FloatTensor(target_df_2.iloc[:, 2:3].to_numpy())

y1 = torch.cat((targets_low_1, targets_high_1), dim=1)
y2 = torch.cat((targets_low_2, targets_high_2), dim=1)

In [4]:
# Define the linear model
class LinearModel(nn.Module):
    def __init__(self, input_size):
        super(LinearModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 1)

    def forward(self, x):
        return self.fc1(x)

In [5]:
def cv_learn(X, y1, y2):
    # Define the number of folds for cross-validation
    kf = KFold(n_splits=2, shuffle=True, random_state=123)

    # loss function
    squared_hinge_loss = SquaredHingeLoss(margin=1, low_lim=-7, high_lim=7)

    best_ites_1 = []
    best_ites_2 = []
    for train_index, val_index in kf.split(X):

        # Split the data into training and validation sets
        X_train_tensor, X_val_tensor   = X[train_index],  X[val_index]
        y1_train_tensor, y1_val_tensor = y1[train_index], y1[val_index]
        y2_train_tensor, y2_val_tensor = y2[train_index], y2[val_index]

        # Define your model, loss, and optimizer
        model1 = LinearModel(input_size=1)
        model2 = LinearModel(input_size=1)

        optimizer1 = optim.Adam(model1.parameters(), lr=0.001)
        optimizer2 = optim.Adam(model2.parameters(), lr=0.001)

        # Training loop for the specified number of iterations
        val_losses_1 = []
        val_losses_2 = []
        for _ in range(5000):
            loss1      = squared_hinge_loss(model1(X_train_tensor), y1_train_tensor)
            val_loss_1 = squared_hinge_loss(model1(X_val_tensor),   y1_val_tensor)
            optimizer1.zero_grad()
            loss1.backward()
            optimizer1.step()

            loss2      = squared_hinge_loss(model2(X_train_tensor), y2_train_tensor)
            val_loss_2 = squared_hinge_loss(model2(X_val_tensor),   y2_val_tensor)
            optimizer2.zero_grad()
            loss2.backward()
            optimizer2.step()

            val_losses_1.append(val_loss_1.item())
            val_losses_2.append(val_loss_2.item())
        
        best_ite_1 = np.argmin(val_losses_1) + 1
        best_ite_2 = np.argmin(val_losses_2) + 1

        best_ites_1.append(best_ite_1)
        best_ites_2.append(best_ite_2)

    best_no_ite_1 = int(np.mean(best_ites_1))
    best_no_ite_2 = int(np.mean(best_ites_2))
    
    return best_no_ite_1, best_no_ite_2

In [6]:
def train_model(X, y, n_ites):
    model = LinearModel(input_size=1)
    squared_hinge_loss = SquaredHingeLoss(margin=1, low_lim=-7, high_lim=7)
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Training loop
    for _ in range(n_ites):
        # Forward pass
        outputs = model(X)
        
        # Compute the custom loss
        loss = squared_hinge_loss(outputs, y)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    return model

In [7]:
def get_df_stat(ldas1, ldas2, seqs, labels):
    header = ['sequenceID', 'lambda_test_fold1', 'lambda_test_fold2', 'fold_1_total_labels', 'fold_2_total_labels', 'fold_1_fp_errs', 'fold_1_fn_errs', 'fold_1_tp', 'fold_1_tn', 'fold_2_fp_errs', 'fold_2_fn_errs', 'fold_2_tp', 'fold_2_tn']
    rows = []
    for i in range(len(seqs)):
        # generate data
        sequence, neg_start_1, neg_end_1, pos_start_1, pos_end_1, neg_start_2, neg_end_2, pos_start_2, pos_end_2 = get_data(i, seqs=seqs, labels=labels)

        # get total labels
        fold1_total_labels = len(neg_start_1) + len(pos_start_1)
        fold2_total_labels = len(neg_start_2) + len(pos_start_2)

        # run each lambda and record it into csv file
        row  = [seqs[i][0], np.exp(ldas2[i]), np.exp(ldas1[i]), fold1_total_labels, fold2_total_labels]

        chpnt_fold1 = opart(np.exp(ldas2[i]), sequence)
        chpnt_fold2 = opart(np.exp(ldas1[i]), sequence)

        err_1 = error_count(chpnt_fold1, neg_start_1, neg_end_1, pos_start_1, pos_end_1)
        err_2 = error_count(chpnt_fold2, neg_start_2, neg_end_2, pos_start_2, pos_end_2)
        
        for err in [err_1, err_2]:
            row.extend(err[:4])

        rows.append(row)
    
    df = pd.DataFrame(rows, columns=header)
    return df

In [8]:
def try_model(X, y1, y2):
    best_no_ite_1, best_no_ite_2 = cv_learn(X, y1, y2)
    print(best_no_ite_1, best_no_ite_2)

    model1 = train_model(X, y1, best_no_ite_1)
    model2 = train_model(X, y2, best_no_ite_2)

    torch.save(model1.state_dict(), '1_genome/3_learned_models/1_linear/model1.pth')
    torch.save(model2.state_dict(), '1_genome/3_learned_models/1_linear/model2.pth')

    with torch.no_grad():
        ldas1 = model1(X).numpy().reshape(-1)
        ldas2 = model2(X).numpy().reshape(-1)

    seqs   = gen_data_dict('1_genome/0_sequences_labels/signals.gz')
    labels = gen_data_dict('1_genome/0_sequences_labels/labels.gz')
    df = get_df_stat(ldas1, ldas2, seqs, labels)

    return df

In [None]:
df = try_model(X, y1, y2)

In [19]:
show_error_rate(df)

(72.73936170212765, 63.26923076923077, 752, 520, 205, 191)

In [10]:
# # Instantiate the model, define custom loss function, and optimizer
# model1 = LinearModel(input_size = 1)
# model2 = LinearModel(input_size = 1)

# squared_hinge_loss = SquaredHingeLoss(margin=1, low_lim=-7, high_lim=7)
# optimizer1 = optim.Adam(model1.parameters(), lr=0.001)
# optimizer2 = optim.Adam(model2.parameters(), lr=0.001)

# # Training loop
# min_loss_1_test = float('inf')
# min_loss_2_test = float('inf')
# for epoch in range(2000):
#     # Forward pass
#     outputs1 = model1(X)
#     outputs2 = model2(X)
    
#     # Compute the custom loss
#     loss_1 = squared_hinge_loss(outputs1, y1)
#     loss_2 = squared_hinge_loss(outputs2, y2)

#     loss_1_test = squared_hinge_loss(outputs1, y2)
#     loss_2_test = squared_hinge_loss(outputs2, y1)
    
#     # Backward pass and optimization
#     optimizer1.zero_grad()
#     loss_1.backward()
#     optimizer1.step()

#     optimizer2.zero_grad()
#     loss_2.backward()
#     optimizer2.step()

#     # # save models
#     # if loss_1_test < min_loss_1_test:
#     #     min_loss_1_test = loss_1_test
#     #     torch.save(model1.state_dict(), '1_genome/3_learned_models/1_linear/model1.pth')
    
#     # if loss_2_test < min_loss_2_test:
#     #     min_loss_2_test = loss_2_test
#     #     torch.save(model2.state_dict(), '1_genome/3_learned_models/1_linear/model2.pth')
    
#     # Print the loss every 100 epochs
#     if (epoch+1) % 1000 == 0:
#         print(f'Epoch {epoch:5d}, Loss_1: {loss_1.item():8.4f}, Loss_1_test: {loss_1_test.item():8.4f}, Loss_2: {loss_2.item():8.4f}, Loss_2_test: {loss_2_test.item():8.4f}')

In [11]:
# torch.save(model1.state_dict(), '1_genome/3_learned_models/1_linear/model1.pth')
# torch.save(model2.state_dict(), '1_genome/3_learned_models/1_linear/model2.pth')

In [12]:
# # Load model1
# model1 = LinearModel(input_size=1)
# model1.load_state_dict(torch.load('1_genome/3_learned_models/1_linear/model1.pth'))
# model1.eval()  # Set the model to evaluation mode

# # Load model2
# model2 = LinearModel(input_size=1)
# model2.load_state_dict(torch.load('1_genome/3_learned_models/1_linear/model2.pth'))
# model2.eval()  # Set the model to evaluation mode

In [13]:
# for name, param in model1.named_parameters():
#     print(f"Parameter: {name}, Value: {param.data}")

In [14]:
# for name, param in model2.named_parameters():
#     print(f"Parameter: {name}, Value: {param.data}")

In [15]:
# with torch.no_grad():
#     ldas1 = np.exp(model1(X).numpy()).reshape(-1)
#     ldas2 = np.exp(model2(X).numpy()).reshape(-1)

In [16]:
# seqs   = gen_data_dict('1_genome/0_sequences_labels/signals.gz')
# labels = gen_data_dict('1_genome/0_sequences_labels/labels.gz')

# header = ['sequenceID', 'fold_1_total_labels', 'fold_2_total_labels', 'fold_1_fp_errs', 'fold_1_fn_errs', 'fold_1_tp', 'fold_1_tn', 'fold_2_fp_errs', 'fold_2_fn_errs', 'fold_2_tp', 'fold_2_tn',]
# rows = []
# for i in range(len(seqs)):
#     # generate data
#     sequence, neg_start_1, neg_end_1, pos_start_1, pos_end_1, neg_start_2, neg_end_2, pos_start_2, pos_end_2 = get_data(i, seqs=seqs, labels=labels)

#     # get total labels
#     fold1_total_labels = len(neg_start_1) + len(pos_start_1)
#     fold2_total_labels = len(neg_start_2) + len(pos_start_2)

#     # run each lambda and record it into csv file
#     row = [seqs[i][0], fold1_total_labels, fold2_total_labels]

#     chpnt_fold1 = opart(ldas2[i], sequence)
#     chpnt_fold2 = opart(ldas1[i], sequence)

#     err_1 = error_count(chpnt_fold1, neg_start_1, neg_end_1, pos_start_1, pos_end_1)
#     err_2 = error_count(chpnt_fold2, neg_start_2, neg_end_2, pos_start_2, pos_end_2)
    
#     for err in [err_1, err_2]:
#         row.extend(err[:4])

#     rows.append(row)

# pd.DataFrame(rows, columns=header).to_csv('1_genome/2_learning_record/linear.csv', index=False)

In [17]:
# show_error_rate(pd.DataFrame(rows, columns=header))