In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from gensim.models import Word2Vec
from itertools import chain

In [None]:
data = pd.read_csv("LncRNA_Dataset.csv")
data = data.dropna()

In [None]:
for i in range(len(data['Sequence'])):
    try:
        if len(data['Sequence'][i]) > 8196:
            data['Sequence'][i] = data['Sequence'][i][:8196]
    except KeyError:
        pass

In [None]:
mask = data['Sequence'].apply(lambda x: len(x) < 256)
data = data[~mask]

In [None]:
emb_size = 128
k = 3

train_data, test_data = train_test_split(data, test_size = 0.1, random_state = 42)

def generate_kmers(sequence, k):
    kmers = [sequence[i:i+k] for i in range(len(sequence) - k + 1)]
    return kmers

train_data['K-mers'] = train_data['Sequence'].apply(generate_kmers, args=(k,))
# train_data['K-mers'] = train_data['K-mers'].apply(lambda x: x + ['<EOS>'])
train_kmers = list(chain.from_iterable(train_data['K-mers']))

In [None]:
model = Word2Vec(sentences = [train_kmers], vector_size = emb_size, window = 5, min_count = 0, workers = 8, sg = 1, epochs = 30)

kmer_embeddings = {}
for kmer in train_kmers:
    if kmer in model.wv:
        kmer_embeddings[kmer] = model.wv[kmer]

In [None]:
def generate_kmers(sequence, k):
    return [sequence[i:i+k] for i in range(len(sequence) - k + 1)]

def fetch_embeddings(sequence, kmer_embeddings, k, n):

    sequence_embeddings = []
    sequence_length = len(sequence)
    subsequence_length = sequence_length // n

    for i in range(n):
        subsequence_start = i * subsequence_length
        subsequence_end = min((i + 1) * subsequence_length, sequence_length)
        subsequence = sequence[subsequence_start:subsequence_end]
        kmers = generate_kmers(subsequence, k)
        kmer_embeds = [kmer_embeddings[kmer] for kmer in kmers]
        sequence_embeddings.append(np.mean(kmer_embeds, axis=0))

    return sequence_embeddings

n = 64

In [None]:
device = torch.device(
    "mps"
    if torch.backends.mps.is_available()
    else "cuda" if torch.cuda.is_available() else "cpu"
)

print(device)

data['Nucleus'] = data['SubCellular_Localization'].apply(lambda x: 1 if 'Nucleus' in x else 0)
data['Cytoplasm'] = data['SubCellular_Localization'].apply(lambda x: 1 if 'Cytoplasm' in x else 0)
data['Chromatin'] = data['SubCellular_Localization'].apply(lambda x: 1 if 'Chromatin' in x else 0)
data['Insoluble cytoplasm'] = data['SubCellular_Localization'].apply(lambda x: 1 if 'Insoluble cytoplasm' in x else 0)

N = len(data)

labels = ['Nucleus', 'Cytoplasm', 'Chromatin', 'Insoluble cytoplasm']

for label in labels:
    positives = sum(data[label] == 1)
    print(
      '{}:\tPositive Samples: {}\t\tNegative Samples: {}'
      .format(label, positives, N - positives)
    )

class_weights = {}
positive_weights = {}
negative_weights = {}

for label in labels:
    positive_weights[label] = N /(2 * sum(data[label] == 1))
    negative_weights[label] = N /(2 * sum(data[label] == 0))
    
class_weights['positive_weights'] = positive_weights
class_weights['negative_weights'] = negative_weights

In [None]:
all_embeddings = []

for sequence in data['Sequence']:
    seq_emb = fetch_embeddings(sequence, kmer_embeddings, k, n)
    all_embeddings.extend(seq_emb)
    

all_embeddings_np = np.array(all_embeddings).reshape(len(data),n,emb_size)
X = torch.tensor(all_embeddings_np, dtype = torch.float32).to(device)
# X = X.view(len(data), n, emb_size)
y = torch.tensor(data[['Nucleus', 'Cytoplasm', 'Chromatin', 'Insoluble cytoplasm']].values, dtype = torch.float32).to(device)

In [None]:
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(8192, 2048)
        self.bn1 = nn.BatchNorm1d(2048)
        self.dropout1 = nn.Dropout(0.3)
        self.fc2 = nn.Linear(2048, 512)
        self.bn2 = nn.BatchNorm1d(512)
        self.dropout2 = nn.Dropout(0.3)
        self.fc3 = nn.Linear(512, 64)
        self.bn3 = nn.BatchNorm1d(64)
        self.dropout3 = nn.Dropout(0.3)
        self.fc4 = nn.Linear(64, 4)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = self.bn1(torch.relu(self.fc1(x)))
        x = self.dropout1(x)
        x = self.bn2(torch.relu(self.fc2(x)))
        x = self.dropout2(x)
        x = self.bn3(torch.relu(self.fc3(x)))
        x = self.dropout3(x)
        x = torch.sigmoid(self.fc4(x))
        return x

In [None]:
class TextCNN(nn.Module):
    def __init__(self):
        super(TextCNN, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=128, out_channels=64, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=128, out_channels=64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv1d(in_channels=128, out_channels=64, kernel_size=5, padding=2)
        self.layer_norm1 = nn.LayerNorm([64, 64])
        self.layer_norm2 = nn.LayerNorm([64, 64])
        self.layer_norm3 = nn.LayerNorm([64, 64])
        self.relu = nn.ReLU()
        self.adaptive_max_pool = nn.AdaptiveMaxPool1d(1)
        self.fc = nn.Linear(64 * 3, 64)
        self.fc_out = nn.Linear(64, 4)
        self.dropout = nn.Dropout(0.5) 


    def forward(self, x):
        # x: batchSize × seqLen × feaSize
        x = x.permute(0, 2, 1) 

        # print(x.shape) 
        x1 = self.conv1(x)
        x2 = self.conv2(x)
        x3 = self.conv3(x)


        x1 = x1.permute(0, 2, 1)  
        x2 = x2.permute(0, 2, 1)  
        x3 = x3.permute(0, 2, 1)

        x1 = self.layer_norm1(x1)
        x2 = self.layer_norm2(x2)
        x3 = self.layer_norm3(x3)

        x1 = self.relu(x1)
        x2 = self.relu(x2)
        x3 = self.relu(x3)

        x1 = self.adaptive_max_pool(x1)
        x2 = self.adaptive_max_pool(x2)
        x3 = self.adaptive_max_pool(x3)

        x1 = x1.squeeze(dim=2)
        x2 = x2.squeeze(dim=2)
        x3 = x3.squeeze(dim=2)

        x = torch.cat((x1, x2, x3), dim=1) 
        x = self.fc(x) 
        x = self.relu(x) 
        x = self.dropout(x)
        x = self.fc_out(x)  
        x = torch.sigmoid(x)  
        return x

In [None]:
class BiLSTM_MLP(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, mlp_hidden_size, dropout=0.2):
        super(BiLSTM_MLP, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # BiLSTM layer
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)

        # MLP layer
        self.fc = nn.Sequential(
            nn.Linear(hidden_size * 2, mlp_hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(mlp_hidden_size, 4),
            nn.Sigmoid()
        )

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])

        return out

input_size = 64    # embedding dimension
hidden_size = 64
num_layers = 1
mlp_hidden_size = 64

In [None]:
def focal_binary_cross_entropy(outputs, targets, gamma = 2, num_classes = 4):
    p = outputs.reshape(-1)
    t = targets.reshape(-1)
    p = torch.where(t >= 0.5, p, 1-p)
    logp = - torch.log(torch.clamp(p, 1e-4, 1-1e-4))
    loss = logp * ((1-p) ** gamma)
    loss = num_classes * loss.mean()
    return loss

In [None]:
class MyDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.1, random_state = 42)

In [None]:
train_dataset = MyDataset(X_train, y_train)
test_dataset = MyDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size = 16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size = 16, shuffle=False)

model = MLP().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.00003)

num_epochs = 40

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = focal_binary_cross_entropy(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}")

In [None]:
model.eval()
correct_at_rank_1 = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        
        _, top_pred_indices = outputs.topk(1, dim=1)        
        for i in range(len(labels)):
            top_pred_index = top_pred_indices[i].item()  
            true_labels = labels[i]  
            
            if true_labels[top_pred_index] == 1:
                correct_at_rank_1 += 1

precision_at_1 = correct_at_rank_1 / len(test_loader.dataset)
print(f"Precision@1: {precision_at_1:.5f}")

In [None]:
# Initialize variables to store the count of correctly identified relevant items at rank 1
model.eval()
correct_at_rank_1 = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        
        # Get the index of the top prediction for each sequence
        _, top_pred_indices = outputs.topk(1, dim=1)
        
        # Iterate over each prediction and its corresponding true labels within the batch
        for i in range(len(labels)):
            top_pred_index = top_pred_indices[i].item()  # Get the index of the top prediction
            true_labels = labels[i]  # Get the true labels for the current sequence
            
            # Check if the top prediction matches at least one of the true labels for the current sequence
            if true_labels[top_pred_index] == 1:
                correct_at_rank_1 += 1

# Calculate Precision@1
precision_at_1 = correct_at_rank_1 / len(test_loader.dataset)

print(f"Precision@1: {precision_at_1:.5f}")