# RNA Pre-Trained Model

In [1]:
 #Import All Libraries Here
import pandas as pd
from sklearn.metrics import accuracy_score ,  roc_curve, auc , classification_report

import numpy as np
from sklearn.metrics import classification_report, accuracy_score

from sklearn.metrics import mean_squared_error
import optuna
from torch.utils.data import WeightedRandomSampler
from sklearn.model_selection import train_test_split 

import time
from collections import Counter
# PyTorch Import

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

# Record Constants
INPUT_TRAIN_IN = "../../../data/train_in.csv"
INPUT_TRAIN_OUT = "../../../data/train_out.csv"  
INPUT_TEST_IN = "../../../data/test_in.csv"
INPUT_TEST_OUT = "../../../data/test_out.csv"
INPUT_VALIDATION_IN = "../../../data/valid_in_nucleo.csv"
INPUT_VALIDATION_OUT  = "../../../data/valid_out.csv"

TARGET_MODEL_PATH = '../../webapp/model_files'


WINDOW_SIZE = 50

# 1 - One Hot Encoding with Pytorch in build Emnedding 
# 2 - K-mers with Word2Vec
ENCODING_METHOD = 1


# 1 - LSTM with Cross Entropy 
MODEL = 1


FRAMEWORK = "PYTORCH"

# Startegy to Crop Sequene
# MID - Modification is present at Mid of cropped Sequence 
# END - Modification is present at End of cropepd Sequence 
CROP_STRATEGY = 'END'

# Y Category Encoding Method
# LABEL or ONE_HOT  
TARGET_ENCODING = 'LABEL'



  from .autonotebook import tqdm as notebook_tqdm


In [2]:
#Read X Varaibles and Y Varaibles

x_train_raw =  pd.read_csv(INPUT_TRAIN_IN, header=None , skiprows=1  )
y_train_raw =  pd.read_csv(INPUT_TRAIN_OUT, header=None , skiprows=1 )

x_test_raw =  pd.read_csv(INPUT_TEST_IN, header=None , skiprows=1 )
y_test_raw =  pd.read_csv(INPUT_TEST_OUT, header=None , skiprows=1)

x_valid_raw =  pd.read_csv(INPUT_VALIDATION_IN, header=None , skiprows=1 )
y_valid_raw =  pd.read_csv(INPUT_VALIDATION_OUT, header=None , skiprows=1 )


x_data = pd.concat([x_train_raw, x_test_raw, x_valid_raw], axis=0, ignore_index=True)
y_data = pd.concat([y_train_raw, y_test_raw, y_valid_raw], axis=0, ignore_index=True)

In [3]:
middle_index = (x_train_raw.shape[1] // 2) + 1 # This is location for Modified Sequence . Use this as Y Target

if CROP_STRATEGY == 'MID':
    STRAT_INEDX =middle_index - WINDOW_SIZE -1 
    END_INDEX =middle_index + WINDOW_SIZE 

if CROP_STRATEGY == 'END':
    STRAT_INEDX =middle_index - (WINDOW_SIZE*2) -1 
    END_INDEX =middle_index-1 # Ignore Modified Position

In [4]:
print(f"Train Shape of X : {x_train_raw.shape} and Tranin Shape of Y : {x_train_raw.shape}")
print(f"Test Shape of X : {x_test_raw.shape} and Test Shape of Y : {y_test_raw.shape}")
print(f"Validation Shape of X : {x_valid_raw.shape} and Validation Shape of Y : {y_valid_raw.shape}")

Train Shape of X : (304661, 1001) and Tranin Shape of Y : (304661, 1001)
Test Shape of X : (1200, 1001) and Test Shape of Y : (1200, 12)
Validation Shape of X : (3599, 1001) and Validation Shape of Y : (3599, 12)


### Encode with One Hot Encoding

In [5]:
number_of_unique_kmers = set()
def encode_seq(kmer_token):

    # A 1 0 0 0
    # C 0 1 0 0
    # T/U 0 0 0 1
    # G 0 0 1 0
    # N 0 0 0 0

    encoding_dict = {
        'A': [1, 0, 0, 0],
        'C': [0, 1, 0, 0],
        'G': [0, 0, 1, 0],
        'T': [0, 0, 0, 1],
        'U': [0, 0, 0, 1],
        'N': [0, 0, 0, 0],
    }

    encoded_sequence = []
    number_of_unique_kmers.add(kmer_token)
    for  base in kmer_token:
        encoded_sequence.append(encoding_dict[base])
    return np.array(encoded_sequence).flatten()

def applyOneHotEncoding(tokenized_sequences):
    encoded_sequences = []
    for seq in tokenized_sequences:
        encoded_sequences.append(encode_seq(seq)) 
 
    return np.array(encoded_sequences).flatten()

def encode_with_one_hot_encoding(x_train_raw):
    truncated_df =  x_train_raw.iloc[:,STRAT_INEDX :END_INDEX] # Window Starts from V501 with 50 window size
    concatenated_column= truncated_df.apply(lambda row: ''.join(map(str, row)), axis=1)
    df_result = truncated_df.assign(Sequence=concatenated_column)
    tokenized_sequences =  df_result['Sequence'].apply(applyOneHotEncoding).tolist()
    
    return tokenized_sequences


In [6]:
# X_train = torch.tensor(encode_with_one_hot_encoding(x_train_raw) , dtype=torch.long)
# X_test = torch.tensor(encode_with_one_hot_encoding(x_test_raw) , dtype=torch.long)
# X_valid = torch.tensor(encode_with_one_hot_encoding(x_valid_raw) , dtype=torch.long)

  X_train = torch.tensor(encode_with_one_hot_encoding(x_train_raw) , dtype=torch.long)


In [6]:

RMs = ['A','G','C','U','T','N']
RMEncoding = [0,1,2,3,3,4]

def encode_target(y_data):
    # Write Customer Lable Encoder . This is required since we have train and test alreday splitted. Always creating a new instanc of label encoder will change encoding.
    y_encoded = []
    for y in y_data:
        index = RMs.index(y)
        encoding =  RMEncoding[index]
        y_encoded.append(encoding)
    return y_encoded

y_originals =  x_data.iloc[:, middle_index-1] 


print("Encode X and Y Features.")
X_encoded = torch.tensor(encode_with_one_hot_encoding(x_data) , dtype=torch.long)
y_encoded = encode_target(y_originals)


print("Generate Train and Split..")
# Train set
X_train, X_temp, y_train, y_temp = train_test_split(X_encoded, y_encoded, test_size=0.3, random_state=42)

# Test and Validation set
X_valid, X_test, y_valid, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)


Encode X and Y Features.


  X_encoded = torch.tensor(encode_with_one_hot_encoding(x_data) , dtype=torch.long)


Generate Train and Split..


In [8]:

# print("Middle Index is :" ,middle_index )
# y_train_original =  x_train_raw.iloc[:, middle_index-1] 
# y_test_original = x_test_raw.iloc[:, middle_index-1] 
# y_valid_original =  x_valid_raw.iloc[:, middle_index -1] 

# print("Train Value Counts")
# print(y_train_original.value_counts())
# print("Test Value Counts")
# print(y_test_original.value_counts())
# print("Valid Value Counts")
# print(y_valid_original.value_counts())

# y_train = encode_target(y_train_original)
# y_test = encode_target(y_test_original)
# y_valid = encode_target(y_valid_original)


print("Train Y Count : " ,Counter(y_train))
print("Test Y Count : " ,Counter(y_test))
print("Test Y Count : " ,Counter(y_valid))


y_train = torch.tensor(y_train , dtype=torch.long)
y_test = torch.tensor(y_test , dtype=torch.long)
y_valid = torch.tensor(y_valid , dtype=torch.long)

# # Calculate class weights
class_weights = 1.0 / torch.bincount(y_train)
weights = class_weights[y_train]
sampler = WeightedRandomSampler(weights, len(weights))

In [9]:
hyperparameter = {}
hyperparameter['INPUT_DIMENSION'] = 4 # For One Hot Encoding Input Dimension would be 4 as there only 4 unique nucleocide 
hyperparameter['HIDDEN_DIMENSION'] = 3
hyperparameter['NO_OF_LAYERS'] = 1
hyperparameter['BATCH_SIZE'] = 32
hyperparameter['OUTPUT_DIMENSION'] = 4
hyperparameter['EMBEDDING_DIMENSION'] = 7 # if you are using Word2Vec Encoding then this should be same as Word2Vec Embedding Dim 
hyperparameter['DROP_OUT'] = 0.1

In [10]:
class RNADataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        return self.X[index], self.y[index]

train_dataset = RNADataset(X_train, y_train)
test_dataset = RNADataset(X_test, y_test)
valid_dataset = RNADataset(X_valid, y_valid)

train_dataloader = DataLoader(train_dataset, batch_size=hyperparameter['BATCH_SIZE'], sampler=sampler)
test_dataloader = DataLoader(test_dataset, batch_size=hyperparameter['BATCH_SIZE'], shuffle=False)
valid_dataloader = DataLoader(valid_dataset, batch_size=hyperparameter['BATCH_SIZE'], shuffle=False)

In [11]:
class RNATransferLearning(nn.Module):
    def __init__(self, input_dim,embedding_dim, hidden_dim,num_layers, output_dim , dropout=0.5):
        super(RNATransferLearning, self).__init__()

        #Pytroch Embedding 
        self.embedding = nn.Embedding(input_dim,embedding_dim)

        #Bi-Directional LSTM Model
        self.lstm = nn.LSTM(embedding_dim, hidden_dim,num_layers, dropout= dropout, batch_first=True )

        #Fully Connected Layer   
        self.fc = nn.Linear(hidden_dim, output_dim) 

        # Drop out layer for Overfitting 
        self.dropout = nn.Dropout(dropout)


    def forward(self, x):

        x  = x.long()

        x_embeded =  self.embedding(x)
        
        lstm_out, (h,c) = self.lstm(x_embeded)

        # Pass it to drop out layer 
        output = self.dropout(h[-1])
        
        # Finally pass it to fully connected layer.
        out = self.fc(output)
        
        return out # Sequeez to Single dimension for loss calculation
    

def validate_model(model, test_dataloader , device ,loss_function):
    model.eval()
    running_loss = 0.0
    class_correct = [0] * hyperparameter['OUTPUT_DIMENSION'] 
    class_total = [0] * hyperparameter['OUTPUT_DIMENSION'] 
    total = 0
    correct = 0
    true_labels = []
    predicted_labels = []

    with torch.no_grad():
        for inputs, labels in test_dataloader:
            inputs = inputs.to(device)
            labels = labels.long().to(device)

            outputs = model(inputs)
            loss = loss_function(outputs, labels)
            running_loss += loss.item()

            _, predicted = torch.max(outputs, 1)            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            true_labels.extend(labels.cpu().numpy())  # Capture True Lables for Summary Report
            predicted_labels.extend(predicted.cpu().numpy()) # Capture Predicted Labels Lables for Summary Report

    validation_loss = running_loss / len(test_dataloader)
    validation_accuracy = correct / total
    
    return validation_loss , validation_accuracy , true_labels , predicted_labels


def train_model(model, train_dataloader, test_dataloader, device, epochs, optimizer, loss_function):
    best_val_loss = float('inf')
    no_improvement_count = 0

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        start_time = time.time()
        
        for i, (inputs, labels) in enumerate(train_dataloader):
            inputs = inputs.to(device)
            labels = labels.long().to(device)

            optimizer.zero_grad()
            outputs = model(inputs) 
            loss = loss_function(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        epoch_loss = running_loss / len(train_dataloader)
        val_loss,  validation_accuracy , true_labels , predicted_labels = validate_model(model, test_dataloader, device, loss_function) 
        end_time = time.time()
        elapsed_time = end_time - start_time    
        
        print(f"Epoch {epoch + 1}, Train Loss: {epoch_loss:.4f}, Val Loss: {val_loss:.4f}, Test Accuracy: {validation_accuracy:.4f} , Time Taken : {elapsed_time}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            no_improvement_count = 0
        else:
            no_improvement_count += 1
            if no_improvement_count == 3:
                print("No improvement in validation loss for 3 epochs. Training stopped.")
                break



In [12]:
model = RNATransferLearning(input_dim=hyperparameter['INPUT_DIMENSION'],
                            embedding_dim=hyperparameter['EMBEDDING_DIMENSION'], 
                            hidden_dim=hyperparameter['HIDDEN_DIMENSION'] , 
                            num_layers = hyperparameter['NO_OF_LAYERS'],
                            output_dim=hyperparameter['OUTPUT_DIMENSION'],
                            dropout=hyperparameter['DROP_OUT'] )


loss_function = nn.CrossEntropyLoss()  ## MSELoss of Regression problem  # BCELoss for binary classification
optimizer = optim.Adam(model.parameters() ,  lr=0.01)

# Number of Parameters for Model
total_parameters = []
for p in model.parameters():
    total_parameters.append(p.numel())

print(f"Total Number of Parameters for Model Training : { sum(total_parameters)} " )

# Train the model
num_epochs = 50
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

print("Model Parameters  : " , hyperparameter)

# Train Model with configured Parameter
train_model(model, train_dataloader ,test_dataloader, device ,num_epochs,optimizer,loss_function)



Total Number of Parameters for Model Training : 188 
Model Parameters  :  {'INPUT_DIMENSION': 4, 'HIDDEN_DIMENSION': 3, 'NO_OF_LAYERS': 1, 'BATCH_SIZE': 32, 'OUTPUT_DIMENSION': 4, 'EMBEDDING_DIMENSION': 7, 'DROP_OUT': 0.1}
Epoch 1, Train Loss: 1.3604, Val Loss: 1.2868, Test Accuracy: 0.6796 , Time Taken : 112.36537885665894
Epoch 2, Train Loss: 1.3565, Val Loss: 1.3123, Test Accuracy: 0.5329 , Time Taken : 98.34932374954224
Epoch 3, Train Loss: 1.3542, Val Loss: 1.2683, Test Accuracy: 0.5302 , Time Taken : 83.228515625
Epoch 4, Train Loss: 1.3537, Val Loss: 1.2916, Test Accuracy: 0.5185 , Time Taken : 70.33051705360413
Epoch 5, Train Loss: 1.3538, Val Loss: 1.2991, Test Accuracy: 0.5158 , Time Taken : 65.71989297866821
Epoch 6, Train Loss: 1.3544, Val Loss: 1.2947, Test Accuracy: 0.5158 , Time Taken : 65.53555703163147
No improvement in validation loss for 3 epochs. Training stopped.


In [12]:
model_name = 'rna_pre_trained_model.pt'
torch.save(model.state_dict(), model_name)

In [13]:
# Evaluate the model on the test dataset
_, final_accuracy, true_labels, predicted_labels = validate_model(model, valid_dataloader,device,loss_function)

# Print the final accuracy
print(f"Final Accuracy: {final_accuracy:.4f}")

# Print the classification summary
print("\n Classification Summary:")
print(classification_report(true_labels, predicted_labels))

In [14]:
import optuna

hyperparameter = {}

hyperparameter['INPUT_DIMENSION'] = 4 
hyperparameter['OUTPUT_DIMENSION'] = 4

def objective(trial):

    embedding_dim = trial.suggest_int("embedding_dim", 16, 300)
    hidden_dim = trial.suggest_int("hidden_dim", 32, 1024)
    num_layers = trial.suggest_int("num_layers", 1, 4)
    dropout = trial.suggest_float("dropout", 0.1, 0.5)
    learning_rate = trial.suggest_loguniform("learning_rate", 1e-4, 1e-2)
    batch_size = trial.suggest_categorical("batch_size", [32, 64, 256 , 512])

    model = RNATransferLearning(
        input_dim=hyperparameter['INPUT_DIMENSION'],
        embedding_dim=embedding_dim,
        hidden_dim=hidden_dim,
        num_layers=num_layers,
        output_dim=hyperparameter['OUTPUT_DIMENSION'],
        dropout=dropout
    )

    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

    loss_function = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    total_parameters = sum(p.numel() for p in model.parameters())
    print(f"Total Number of Parameters for Model Training: {total_parameters}")

    num_epochs = 100
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    print(f"hyperparameter['INPUT_DIMENSION'] : {hyperparameter['INPUT_DIMENSION']},hyperparameter['OUTPUT_DIMENSION'] : {hyperparameter['OUTPUT_DIMENSION']},embedding_dim : {embedding_dim},hidden_dim : {hidden_dim},num_layers :{num_layers},dropout : {dropout},learning_rate : {learning_rate},batch_size : {batch_size}")

    train_model(model, train_dataloader ,test_dataloader, device ,num_epochs,optimizer,loss_function)
    val_loss, _, _, _ = validate_model(model, test_dataloader, device, loss_function)

    return val_loss


study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=50)

print("Best trial:")
trial = study.best_trial
print(f"  Value: {trial.value:.4f}")
print("  Params:")
for key, value in trial.params.items():
    print(f"    {key}: {value}")


[32m[I 2023-06-03 16:35:30,646][0m A new study created in memory with name: no-name-196f1d01-5050-49c9-bbf0-9d4a3efe7603[0m


Total Number of Parameters for Model Training: 3499936
hyperparameter['INPUT_DIMENSION'] : 4,hyperparameter['OUTPUT_DIMENSION'] : 4,embedding_dim : 155,hidden_dim : 514,num_layers :2,dropout : 0.27461528083423375,learning_rate : 0.00012557635551129994,batch_size : 64


  learning_rate = trial.suggest_loguniform("learning_rate", 1e-4, 1e-2)


In [None]:
window_size = 1000
target_size = 1
model_name = "model_feature_windoe_"+str(window_size)+"_target_"+str(target_size)
torch.save(model.state_dict, model_name)