In [1]:
import argparse
import numpy as np
import pandas as pd
import torch
from tabulate import tabulate
from torch.utils.data import DataLoader
import os

from model import BertCustomBinaryClassifier
from utils.ensemble_utils import make_predictions
from utils.evaluate_metrics import evaluate_metrics
from utils.data_preprocessing import load_dataset

In [2]:
import logging
logging.getLogger("transforkmer_values.modeling_utils").setLevel(logging.ERROR)

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
parser = argparse.ArgumentParser()
parser.add_argument("--batch_size", type=int, default=64, help="")
parser.add_argument("--max_length", type=int, default=200, help="")
args = parser.parse_args(args=[])

# **BERT Models**

In [5]:
def get_bert_features(model, dataloader, kmer=3):
    """
    Perform forward pass through the BERT model and retrieve the 768-dimensional features
    (averaged token embeddings after excluding [CLS] and [SEP]).

    Args:
        model (BertCustomBinaryClassifier): The BERT-based model.
        dataloader (torch.utils.data.DataLoader): DataLoader providing batches of data.
        kmer (int, optional): The number of trailing tokens to exclude. Defaults to 3.

    Returns:
        numpy.ndarray: The extracted 768-dimensional features for each input in the dataloader.
    """
    # Set the device to GPU if available, otherwise use CPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)  # Move the model to the device

    # Initialize a list to store the extracted features
    all_features = []

    # Set the model to evaluation mode
    model.eval()
    with torch.no_grad():  # Disable gradient calculation for inference
        for batch in dataloader:
            # Move input data to the specified device
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)

            # Pass input through the BERT model
            bert_outputs = model.bert(input_ids, attention_mask=attention_mask)

            # Extract hidden states from the last layer
            last_hidden_states = bert_outputs.last_hidden_state

            # Exclude [CLS] and [SEP]
            sequence_length = last_hidden_states.size(1)
            start_index = 1  # Exclude [CLS] token
            end_index = sequence_length - kmer # Exclude [SEP] token

            if end_index > start_index:
                token_embeddings = last_hidden_states[:, start_index:end_index] # Shape: (batch_size, num_tokens, hidden_size)
                averaged_embeddings = token_embeddings.mean(dim=-2) # Shape: (batch_size, hidden_size)
            else:
                # Handle cases where sequence length is too short
                averaged_embeddings = torch.zeros(last_hidden_states.size(0), 768).to(last_hidden_states.device)

            # Move the features to CPU and convert to numpy array
            all_features.append(averaged_embeddings.cpu().numpy())

    # Concatenate the features from all batches and return
    return np.concatenate(all_features, axis=0)


In [6]:
threshold = 0.50
kmer_values = [3, 4, 5, 6]
model_date = "2025-02-27_V2"

results = []  # List to store results
train_predictions_list, test_predictions_list = [], []  # Lists for storing model predictions
train_labels_list, test_labels_list = [], []  # Lists for storing true labels
train_logits_list, test_logits_list = [], [] # Lists to store logits

print(f"Threshold: {threshold}")
print(f"Identifier model date: {model_date}")

for kmer in kmer_values:

    args.model_path = f"./outputs/identifier_models/{model_date}/{kmer}-mer"
    args.test_data_path = f"./data/enhancer_identification/{kmer}-mer_identification_test.txt"
    args.train_data_path = f"./data/enhancer_identification/{kmer}-mer_identification_train.txt"

    # Load training and test datasets
    train_dataset = load_dataset(args, validation=False)
    test_dataset = load_dataset(args, validation=True)

    # Initialize data loaders for batch processing
    train_dataloader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=False)
    test_dataloader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False)

    # Model
    model = BertCustomBinaryClassifier.from_pretrained(args.model_path, num_labels=1).to(device)

    # Prediction on training datasets
    train_predictions, train_labels = make_predictions(model, train_dataloader, kmer=kmer)
    train_predictions_list.append(train_predictions)
    train_labels_list.append(train_labels)

    # Training logits
    train_logits = get_bert_features(model, train_dataloader, kmer=kmer)
    train_logits_list.append(train_logits) # Store the logits


    acc, sn, sp, mcc, auc = evaluate_metrics(train_predictions, train_labels)
    results.append({"k-mer": kmer, "Dataset": "Train", "Accuracy": acc, "Sensitivity": sn, "Specificity": sp, "MCC": mcc, "AUC": auc})

    # Prediction on test (independent) dataset
    test_predictions, test_labels = make_predictions(model, test_dataloader, kmer=kmer)
    test_predictions_list.append(test_predictions)
    test_labels_list.append(test_labels)

    

    # Testing logits
    test_logits = get_bert_features(model, test_dataloader, kmer=kmer)
    test_logits_list.append(test_logits) # Store the logits
    
    acc, sn, sp, mcc, auc = evaluate_metrics(test_predictions, test_labels)
    results.append({"k-mer": kmer, "Dataset": "Test", "Accuracy": acc, "Sensitivity": sn, "Specificity": sp, "MCC": mcc, "AUC": auc})

Threshold: 0.5
Identifier model date: 2025-02-27_V2


In [7]:
print("Shape of train_predictions_list:", [len(item) for item in train_predictions_list])
print("Shape of test_predictions_list:", [len(item) for item in test_predictions_list])
print("Shape of train_labels_list:", [len(item) for item in train_labels_list])
print("Shape of test_labels_list:", [len(item) for item in test_labels_list])
print("Shape of train_logits_list:", [item.shape for item in train_logits_list])
print("Shape of test_logits_list:", [item.shape for item in test_logits_list])

Shape of train_predictions_list: [2968, 2968, 2968, 2968]
Shape of test_predictions_list: [400, 400, 400, 400]
Shape of train_labels_list: [2968, 2968, 2968, 2968]
Shape of test_labels_list: [400, 400, 400, 400]
Shape of train_logits_list: [(2968, 768), (2968, 768), (2968, 768), (2968, 768)]
Shape of test_logits_list: [(400, 768), (400, 768), (400, 768), (400, 768)]


In [9]:
output_root_dir = "outputs"
npy_subfolder = "npy"
output_npy_dir = os.path.join(output_root_dir, npy_subfolder)
os.makedirs(output_npy_dir, exist_ok=True)

np.save(os.path.join(output_npy_dir, "train_predictions.npy"), np.array(train_predictions_list))
np.save(os.path.join(output_npy_dir, "test_predictions.npy"), np.array(test_predictions_list))
np.save(os.path.join(output_npy_dir, "train_labels.npy"), np.array(train_labels_list))
np.save(os.path.join(output_npy_dir, "test_labels.npy"), np.array(test_labels_list))
np.save(os.path.join(output_npy_dir, "train_logits.npy"), np.array(train_logits_list))
np.save(os.path.join(output_npy_dir, "test_logits.npy"), np.array(test_logits_list))

print(f"All lists have been saved to .npy files in the '{output_npy_dir}' directory.")

All lists have been saved to .npy files in the 'outputs/npy' directory.
