# Installs

In [None]:
!pip install transformers --q
!pip install torch --q
!pip install einops --q
!pip install biopython --q

# Imports

In [None]:
from Bio import SeqIO
import torch
from transformers import AutoTokenizer, AutoModel
import random
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer, BertForSequenceClassification, AdamW
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data import random_split
import torch.nn.functional as F
import matplotlib.pyplot as plt
from collections import defaultdict
import pickle
from transformers import TFBertForSequenceClassification
from transformers import OpenAIGPTConfig, OpenAIGPTModel
import torch
import pandas as pd
from tokenizers import Tokenizer
from tokenizers.models import BPE, Unigram, WordLevel, WordPiece
from tokenizers.trainers import BpeTrainer, WordLevelTrainer,WordPieceTrainer, UnigramTrainer
from tokenizers.pre_tokenizers import Sequence, Digits, Whitespace
from transformers import PreTrainedTokenizerFast
import random
from tokenizers import Tokenizer, models, trainers
from transformers import BertConfig, BertForSequenceClassification, DataCollatorForLanguageModeling, Trainer, TrainingArguments
from torch.nn.utils.rnn import pad_sequence
from transformers import AutoTokenizer, BertForMaskedLM
import pandas as pd
from transformers import BertTokenizer, BertForMaskedLM
from torch.utils.data import Dataset, DataLoader
from transformers import BertConfig, BertForMaskedLM
from tqdm import tqdm
import pandas as pd
import numpy as np
from transformers import BertTokenizer, BertModel , RobertaTokenizer, TFRobertaModel , AutoTokenizer, AutoModel
from torch import nn
from torch.optim import Adam
from tqdm import tqdm
import matplotlib.pyplot as plt
import random
from transformers import RobertaForMaskedLM, RobertaTokenizer, RobertaTokenizerFast, RobertaConfig
from tokenizers import Tokenizer, models, pre_tokenizers, decoders, trainers, processors
from transformers import RobertaForSequenceClassification
from sklearn.metrics import confusion_matrix
import json

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data for Tokenizer and Masking model

In [None]:
all_data_pos = pd.read_csv('data/all_data_combined_df.csv', index_col = False)
all_data_pos['True_label']=1

train_df, validation_df = train_test_split(all_data_pos, test_size=0.2, random_state=42)

In [None]:
#this will be used for training the tokenizers
df_sequences = all_data_pos['seq'].tolist()
df_labels = all_data_pos['True_label'].tolist()

#this will be used for the pre-train(masking)
df_train_sequences = train_df['seq'].tolist()
df_train_labels = train_df['True_label'].tolist()

df_val_sequences = validation_df['seq'].tolist()
df_val_labels = validation_df['True_label'].tolist()

# Tokenizer BPE

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class DNADataset(Dataset):
    def __init__(self, sequences, tokenizer, max_length=6):
        self.sequences = sequences
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        sequence = self.sequences[idx]
        inputs = self.tokenizer(
            sequence,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )
        inputs = {key: val.squeeze(0) for key, val in inputs.items()}  # Remove batch dimension
        inputs['labels'] = inputs['input_ids'].clone()
        return inputs

In [None]:
from transformers import RobertaForMaskedLM, RobertaTokenizer, RobertaTokenizerFast, RobertaConfig
from tokenizers import Tokenizer, models, pre_tokenizers, decoders, trainers, processors

# Initialize a blank RoBERTa tokenizer
tokenizer = Tokenizer(models.BPE())

# Define pre-tokenizer
tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel()

# Define decoder
tokenizer.decoder = decoders.ByteLevel()

# Define the trainer
trainer = trainers.BpeTrainer(
    # vocab_size=5000,  # Define the vocab size
    special_tokens=["<s>", "<pad>", "</s>", "<unk>", "<mask>"] #, max_token_length=7
)

# Train the tokenizer
tokenizer.train_from_iterator(df_sequences, trainer=trainer)

# Enable post-processing to add special tokens
tokenizer.post_processor = processors.ByteLevel(trim_offsets=False)

# Wrap the tokenizer in the RobertaTokenizerFast class
roberta_tokenizer = RobertaTokenizerFast(tokenizer_object=tokenizer)






In [None]:
roberta_tokenizer.save_pretrained("models/bpe/bpe_tokenizer_rep")

('models/bpe/bpe_tokenizer_rep/tokenizer_config.json',
 'models/bpe/bpe_tokenizer_rep/special_tokens_map.json',
 'models/bpe/bpe_tokenizer_rep/vocab.json',
 'models/bpe/bpe_tokenizer_rep/merges.txt',
 'models/bpe/bpe_tokenizer_rep/added_tokens.json')

# Masking Model - BPE

In [None]:
roberta_tokenizer = RobertaTokenizer.from_pretrained("models/bpe/bpe_tokenizer_rep")

In [None]:
import json
# Path to the vocabulary JSON file
vocab_file = "models/bpe/bpe_tokenizer_rep/vocab.json"

# Load the vocabulary JSON file
with open(vocab_file, 'r') as f:
    vocab = json.load(f)

# Calculate statistics
token_lengths = [len(token) for token in vocab.keys()]
min_len = min(token_lengths)
avg_len = sum(token_lengths) / len(token_lengths)
max_len = max(token_lengths)
vocab_size = len(vocab)

# Print statistics
print(f"Vocabulary size: {vocab_size}")
print(f"Minimum token length: {min_len}")
print(f"Average token length: {avg_len:.2f}")
print(f"Maximum token length: {max_len}")

Vocabulary size: 30000
Minimum token length: 1
Average token length: 8.32
Maximum token length: 128


Model configuration

In [None]:
# Define model configuration
config = RobertaConfig(
    vocab_size=roberta_tokenizer.vocab_size,
    max_position_embeddings=514,
    num_attention_heads=12,
    num_hidden_layers=6,
    type_vocab_size=1,
    pad_token_id=roberta_tokenizer.pad_token_id,
    bos_token_id=roberta_tokenizer.bos_token_id,
    eos_token_id=roberta_tokenizer.eos_token_id
)

# Initialize the model
model = RobertaForMaskedLM(config)

Dataloaders from positive data

In [None]:
# Create a dataset object
train_dataset = DNADataset(df_train_sequences, tokenizer = roberta_tokenizer)
validation_dataset = DNADataset(df_val_sequences, tokenizer = roberta_tokenizer)

# Create a dataloader
batch_size = 8
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_dataloader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=True)

Training loop

In [None]:
from transformers import AdamW
from sklearn.metrics import accuracy_score

# Define training parameters
num_epochs = 2
learning_rate = 1e-5
optimizer = AdamW(model.parameters(), lr=learning_rate)
train_history=[]
val_history=[]

# Training loop
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for i, batch in enumerate(train_dataloader):
        inputs = {key: val.to(model.device) for key, val in batch.items()}
        outputs = model(**inputs)
        loss = outputs.loss

        optimizer.zero_grad()

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        if i>0 and i % 300 ==0:
            print(f"Epoch {epoch+1}/{num_epochs}, Iteration {i}, Training Loss: {total_loss/i:.4f}")

    avg_loss = total_loss / len(train_dataloader)
    train_history.append(avg_loss)
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_loss:.4f}")

    # Validation
    model.eval()
    with torch.no_grad():
        val_losses = []
        for val_batch in validation_dataloader:
            val_inputs = {key: val.to(model.device) for key, val in val_batch.items()}
            val_outputs = model(**val_inputs)
            val_loss = val_outputs.loss
            val_losses.append(val_loss.item())

        avg_val_loss = sum(val_losses) / len(val_losses)
        val_history.append(avg_val_loss)
        print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {avg_val_loss:.4f}")



Epoch 1/2, Iteration 300, Training Loss: 6.3335
Epoch 1/2, Iteration 600, Training Loss: 5.5724
Epoch 1/2, Iteration 900, Training Loss: 4.9673
Epoch 1/2, Iteration 1200, Training Loss: 4.4685
Epoch 1/2, Iteration 1500, Training Loss: 4.0526
Epoch 1/2, Iteration 1800, Training Loss: 3.7022
Epoch 1/2, Iteration 2100, Training Loss: 3.4030
Epoch 1/2, Iteration 2400, Training Loss: 3.1516
Epoch 1/2, Iteration 2700, Training Loss: 2.9358
Epoch 1/2, Iteration 3000, Training Loss: 2.7467
Epoch 1/2, Iteration 3300, Training Loss: 2.5806
Epoch 1/2, Iteration 3600, Training Loss: 2.4332
Epoch 1/2, Iteration 3900, Training Loss: 2.3023
Epoch 1/2, Iteration 4200, Training Loss: 2.1840
Epoch 1/2, Iteration 4500, Training Loss: 2.0781
Epoch 1/2, Iteration 4800, Training Loss: 1.9817
Epoch 1/2, Iteration 5100, Training Loss: 1.8939
Epoch 1/2, Iteration 5400, Training Loss: 1.8138
Epoch 1/2, Iteration 5700, Training Loss: 1.7405
Epoch 1/2, Iteration 6000, Training Loss: 1.6723
Epoch 1/2, Iteration 63

In [None]:
# Save the model
model.save_pretrained("models/bpe/bpe_masking_model_rep")

# Fine Tune BPE

In [None]:
results_dict = defaultdict(dict)

Data for finetuning - **Substitution**

In [None]:
train_tata = pd.read_csv("data_exp1/train_tata_met_2.csv", index_col = False)
val_tata = pd.read_csv("data_exp1/val_tata_met_2.csv", index_col = False)
test_tata = pd.read_csv('data_exp1/test_tata_met_2.csv', index_col = False)

train_non_tata = pd.read_csv('data_exp1/train_non_tata_met_2.csv', index_col = False)
val_non_tata = pd.read_csv('data_exp1/val_non_tata_met_2.csv', index_col = False)
test_non_tata = pd.read_csv('data_exp1/test_non_tata_met_2.csv', index_col = False)

In [None]:
train = pd.concat([train_tata, train_non_tata])
val = pd.concat([val_tata, val_non_tata])
test = pd.concat([test_tata, test_non_tata])
test.reset_index(drop=True, inplace=True)

Data for finetuning - **Genome**

In [None]:
# train = pd.read_csv("/content/drive/MyDrive/פרויקט גמר/DATA/train and test/exp2 - genome - new train, val, test combined/train_exp2.csv", index_col = False)
# val = pd.read_csv("/content/drive/MyDrive/פרויקט גמר/DATA/train and test/exp2 - genome - new train, val, test combined/val_exp2.csv", index_col = False)
# test_tata = pd.read_csv('/content/drive/MyDrive/פרויקט גמר/DATA/train and test/exp2 - genome - new train, val, test combined/test_tata_exp2.csv', index_col = False)
# test_non_tata = pd.read_csv('/content/drive/MyDrive/פרויקט גמר/DATA/train and test/exp2 - genome - new train, val, test combined/test_non_tata_exp2.csv', index_col = False)

Dataset class for Fine Tune

In [None]:
class DNAClassificationDataset(Dataset):
    def __init__(self, sequences, labels, tokenizer, max_length=128):
        """
        Initializes the DNAClassificationDataset instance.

        Args:
            sequences (list): List of DNA sequences to be tokenized.
            labels (list): List of labels corresponding to each sequence.
            tokenizer (PreTrainedTokenizer): Tokenizer from the Hugging Face library for tokenizing sequences.
            max_length (int, optional): Maximum length of the tokenized sequences after padding. Default is 128.
        """
        self.sequences = sequences
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        """
        Returns the number of sequences in the dataset.

        Returns:
            int: Number of sequences in the dataset.
        """
        return len(self.sequences)

    def __getitem__(self, idx):
        """
        Retrieves a tokenized and padded sequence along with its label at the specified index.

        Args:
            idx (int): Index of the sequence to retrieve.

        Returns:
            dict: Dictionary containing the tokenized and padded sequence, labels, and input IDs.
        """
        sequence = self.sequences[idx]
        label = self.labels[idx]
        inputs = self.tokenizer(
            sequence,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )
        inputs = {key: val.squeeze(0) for key, val in inputs.items()}  # Remove batch dimension

        # Ensure the label is an integer
        inputs['labels'] = torch.tensor(int(label), dtype=torch.long)
        return inputs

In [None]:
seq = test['seq'][0]

BPE Classification Model class


*   Inintialize
*   Create dataloaders
*   Train
*   Test



In [None]:
import torch
from transformers import AdamW, RobertaForSequenceClassification
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import MultiStepLR
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, matthews_corrcoef
import os


class ClassificationModelBPE:
        """
        Initializes the DNAClassificationDataset instance.

        Args:
            sequences (list): List of DNA sequences to be tokenized.
            labels (list): List of labels corresponding to each sequence.
            tokenizer (PreTrainedTokenizer): Tokenizer from the Hugging Face library for tokenizing sequences.
            max_length (int, optional): Maximum length of the tokenized sequences after padding. Default is 128.
        """
    def __init__(self, train_df, val_df, test_tata_df, test_non_tata_df, organism='human', b_size=8):
        """
        Initializes the ClassificationModelBPE class.

        Parameters:
        train_df (DataFrame): Training data.
        val_df (DataFrame): Validation data.
        test_tata_df (DataFrame): Test data with TATA sequences.
        test_non_tata_df (DataFrame): Test data with non-TATA sequences.
        organism (str): Organism type (default is 'human').
        b_size (int): Batch size for data loaders (default is 8).
        """
        self.organism = organism
        self.batch_size = b_size
        self.dataframes = {
            'train': train_df,
            'val': val_df,
            'test_tata': test_tata_df,
            'test_non_tata': test_non_tata_df
        }

        self.tokenizer = RobertaTokenizerFast.from_pretrained("models/bpe/bpe_tokenizer_rep")
        self.masking_model = RobertaForMaskedLM.from_pretrained("models/bpe/bpe_masking_model_rep")
        self.create_model()

        # Create dataloaders and set as attributes
        dataloaders = self.create_dataloaders(batch_size=self.batch_size)
        self.train_dataloader = dataloaders['train']
        self.val_dataloader = dataloaders['val']
        self.test_tata_dataloader = dataloaders['test_tata']
        self.test_non_tata_dataloader = dataloaders['test_non_tata']

    def filter_and_convert(self, df, organism):
        """
        Filters the dataframe based on the organism and converts it to sequences and labels.

        Parameters:
        df (DataFrame): The dataframe to filter and convert.
        organism (str): The organism to filter by.

        Returns:
        list: A list of sequences.
        list: A list of labels.
        """
        filtered_df = df[df['organism'] == organism]
        seq_list = filtered_df['seq'].tolist()
        label_list = filtered_df['True_label'].tolist()
        return seq_list, label_list

    def create_dataloaders(self, batch_size=16):
        """
        Creates dataloaders for training, validation, and test sets.

        Parameters:
        batch_size (int): Batch size for data loaders (default is 16).

        Returns:
        dict: A dictionary containing the data loaders for train, val, test_tata, and test_non_tata.
        """
        train_seq, train_labels = self.filter_and_convert(self.dataframes['train'], self.organism)
        val_seq, val_labels = self.filter_and_convert(self.dataframes['val'], self.organism)
        test_tata_seq, test_tata_labels = self.filter_and_convert(self.dataframes['test_tata'], self.organism)
        test_non_tata_seq, test_non_tata_labels = self.filter_and_convert(self.dataframes['test_non_tata'], self.organism)

        # Create the dataset and dataloader
        train_dataset = DNAClassificationDataset(train_seq, train_labels, self.tokenizer)
        train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        val_dataset = DNAClassificationDataset(val_seq, val_labels, self.tokenizer)
        val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

        test_tata_dataset = DNAClassificationDataset(test_tata_seq, test_tata_labels, self.tokenizer)
        test_tata_dataloader = DataLoader(test_tata_dataset, batch_size=batch_size, shuffle=True)

        test_non_tata_dataset = DNAClassificationDataset(test_non_tata_seq, test_non_tata_labels, self.tokenizer)
        test_non_tata_dataloader = DataLoader(test_non_tata_dataset, batch_size=batch_size, shuffle=True)

        return {
            'train': train_dataloader,
            'val': val_dataloader,
            'test_tata': test_tata_dataloader,
            'test_non_tata': test_non_tata_dataloader
        }

    def create_test_dataloader(self, test_df, organism, batch_size=16):
        """
        Creates dataloaders for test sets
        """
        filtered_df = test_df[test_df['organism'] == organism]
        seq_list = filtered_df['seq'].tolist()
        label_list = filtered_df['True_label'].tolist()

        test_dataset = DNAClassificationDataset(seq_list, label_list, self.tokenizer)
        test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        return test_dataloader

    def create_model(self):
        """
        Creates and initializes the sequence classification model.
        """
        # Define configuration for sequence classification
        config = RobertaConfig.from_pretrained("models/bpe/bpe_masking_model_rep")
        config.num_labels = 2  # binary classification

        # Initialize the sequence classification model
        self.classification_model = RobertaForSequenceClassification(config)

        # Copy the pre-trained weights from the masked language model
        self.classification_model.roberta.load_state_dict(self.masking_model.roberta.state_dict())
        # return classification_model

    def train_model(self, num_epochs=60, base_lr=1e-5, patience=3):
        """
        Trains the classification model.

        Parameters:
        num_epochs (int): Number of epochs to train (default is 60).
        base_lr (float): Base learning rate (default is 1e-5).
        patience (int): Patience for early stopping (default is 3).

        Returns:
        dict: Training history containing loss and accuracy.
        dict: Validation history containing loss, accuracy, f1, auc, and mcc.
        """
        train_dataloader = self.train_dataloader
        val_dataloader = self.val_dataloader

        optimizer = torch.optim.AdamW(self.classification_model.parameters(), lr=base_lr)
        scheduler = MultiStepLR(optimizer, milestones=list(range(4,num_epochs+1,4)), gamma=0.75)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.classification_model.to(device)

       # Initialize history dictionaries
        train_history = {
            'loss': [],
            'accuracy': []
        }
        val_history = {
            'loss': [],
            'accuracy': [],
            'f1': [],
            'auc': [],
            'mcc': []
        }

        # Initialize early stopping criteria
        best_val_accuracy = 0.0
        epochs_without_improvement = 0

        for epoch in range(num_epochs):
            # Training phase
            self.classification_model.train()
            total_train_loss = 0
            correct_train_predictions = 0
            total_train_predictions = 0

            for batch in train_dataloader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)

                optimizer.zero_grad()
                outputs = self.classification_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss
                logits = outputs.logits

                loss.backward()
                optimizer.step()
                scheduler.step()

                # Calculate training accuracy
                total_train_loss += loss.item()
                preds = torch.argmax(logits, dim=1)
                correct_train_predictions += torch.sum(preds == labels).item()
                total_train_predictions += labels.size(0)

            avg_train_loss = total_train_loss / len(train_dataloader)
            train_accuracy = correct_train_predictions / total_train_predictions
            train_history['loss'].append(avg_train_loss)
            train_history['accuracy'].append(train_accuracy)

            # Validation phase
            self.classification_model.eval()
            total_val_loss = 0
            correct_val_predictions = 0
            total_val_predictions = 0

            all_labels = []
            all_preds = []
            all_probs = []

            with torch.no_grad():
                for batch in val_dataloader:
                    input_ids = batch['input_ids'].to(device)
                    attention_mask = batch['attention_mask'].to(device)
                    labels = batch['labels'].to(device)

                    outputs = self.classification_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                    loss = outputs.loss
                    logits = outputs.logits

                    # Calculate validation accuracy
                    total_val_loss += loss.item()
                    preds = torch.argmax(logits, dim=1)
                    probs = torch.softmax(logits, dim=1)[:, 1]

                    correct_val_predictions += torch.sum(preds == labels).item()
                    total_val_predictions += labels.size(0)

                    all_labels.extend(labels.cpu().numpy())
                    all_preds.extend(preds.cpu().numpy())
                    all_probs.extend(probs.cpu().numpy())

            avg_val_loss = total_val_loss / len(val_dataloader)
            val_accuracy = correct_val_predictions / total_val_predictions
            val_f1 = f1_score(all_labels, all_preds)
            val_auc = roc_auc_score(all_labels, all_probs)
            val_mcc = matthews_corrcoef(all_labels, all_preds)

            val_history['loss'].append(avg_val_loss)
            val_history['accuracy'].append(val_accuracy)
            val_history['f1'].append(val_f1)
            val_history['auc'].append(val_auc)
            val_history['mcc'].append(val_mcc)

            print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")

            # Check early stopping criteria
            if val_accuracy > best_val_accuracy:
                best_val_accuracy = val_accuracy
                epochs_without_improvement = 0
            else:
                epochs_without_improvement += 1
                if epochs_without_improvement >= patience:
                    print("Early stopping triggered")
                    break

        # Save the trained classification model
        self.classification_model.save_pretrained(f"models/bpe/bpe fine tune models/rep/bpe_cls_{self.organism}_model_b{self.batch_size}")
        return train_history, val_history

    def test_model(self, wrong_idx_dict, tata_flag):
        """
        Tests the classification model on the test set.

        Parameters:
        tata_flag (str): Flag to indicate which test set to use ('tata', 'non_tata', 'all_data').

        Returns:
        dict: Test metrics containing loss, accuracy, f1, auc, and mcc.
        """
        if tata_flag == 'tata':
            test_dataloader = self.test_tata_dataloader
        elif tata_flag == 'non_tata':
            test_dataloader = self.test_non_tata_dataloader
        elif tata_flag == 'combined':
            print(organism)
            test_dataloader = self.create_test_dataloader(test_df=test, organism=organism, batch_size=16)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.classification_model.to(device)
        self.classification_model.eval()

        total_test_loss = 0
        correct_test_predictions = 0
        total_test_predictions = 0

        all_labels = []
        all_preds = []
        all_probs = []
        all_indices = []
        incorrect_indices = []

        with torch.no_grad():
            for batch_idx, batch in enumerate(test_dataloader):
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)

                outputs = self.classification_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss
                logits = outputs.logits

                total_test_loss += loss.item()

                preds = torch.argmax(logits, dim=1)
                probs = torch.softmax(logits, dim=1)[:, 1]

                correct_test_predictions += torch.sum(preds == labels).item()
                total_test_predictions += labels.size(0)

                batch_indices = list(range(batch_idx * self.batch_size, batch_idx * self.batch_size + labels.size(0)))
                # print(batch_indices)
                all_indices.extend(batch_indices)
                all_labels.extend(labels.cpu().numpy())
                all_preds.extend(preds.cpu().numpy())
                all_probs.extend(probs.cpu().numpy())

                incorrect_batch_indices = [i for i, (pred, label) in zip(batch_indices, zip(preds.cpu().numpy(), labels.cpu().numpy())) if pred != label]
                # print("B:", incorrect_batch_indices)
                incorrect_indices.extend(incorrect_batch_indices)

        avg_test_loss = total_test_loss / len(test_dataloader)
        test_accuracy = correct_test_predictions / total_test_predictions
        test_f1 = f1_score(all_labels, all_preds)
        test_auc = roc_auc_score(all_labels, all_probs)
        test_mcc = matthews_corrcoef(all_labels, all_preds)

        # Calculate confusion matrix
        tn, fp, fn, tp = confusion_matrix(all_labels, all_preds).ravel()
        tp_rate = tp / (tp + fn) if (tp + fn) > 0 else 0
        fp_rate = fp / (fp + tn) if (fp + tn) > 0 else 0
        precision = precision_score(all_labels, all_preds)
        recall = recall_score(all_labels, all_preds)
        print(tata_flag)
        print(f"Test Loss: {avg_test_loss:.4f}")
        print(f"Test Accuracy: {test_accuracy:.4f}")
        print(f"Test F1 Score: {test_f1:.4f}")
        print(f"Test AUC: {test_auc:.4f}")
        print(f"Test MCC: {test_mcc:.4f}")
        print(f"True Positive Rate: {tp_rate:.4f}")
        print(f"False Positive Rate: {fp_rate:.4f}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"True Positives (TP): {tp}")
        print(f"False Positives (FP): {fp}")
        print(f"True Negatives (TN): {tn}")
        print(f"False Negatives (FN): {fn}")
        wrong_idx_dict[f'{self.organism}_{tata_flag}'] = incorrect_indices
        wrong_idx_dict = {str(key): value for key, value in wrong_idx_dict.items()}

        print()

        test_metrics = {
            'loss': avg_test_loss,
            'accuracy': test_accuracy,
            'f1': test_f1,
            'auc': test_auc,
            'mcc': test_mcc,
            'tp_rate': tp_rate,
            'fp_rate': fp_rate,
            'precision': precision,
            'recall': recall,
            'tp': tp,
            'fp': fp,
            'tn': tn,
            'fn': fn,
        }
        return test_metrics, wrong_idx_dict


    def fine_tune_model(self, num_epochs=15, base_lr=1e-5):
        """
        Fine-tunes the classification model.

        Parameters:
        num_epochs (int): Number of epochs to fine-tune (default is 15).
        base_lr (float): Base learning rate (default is 1e-5).

        Returns:
        dict: Training history containing loss and accuracy.
        dict: Validation history containing loss, accuracy, f1, auc, and mcc.
        dict: Test metrics for TATA sequences containing loss, accuracy, f1, auc, and mcc.
        dict: Test metrics for non-TATA sequences containing loss, accuracy, f1, auc, and mcc.
        dict: Test metrics for combined sequences containing loss, accuracy, f1, auc, and mcc.
        """
        wrong_idx_dict = defaultdict(list)
        model_path = f"models/bpe/bpe fine tune models/rep/bpe_cls_{self.organism}_model_b{self.batch_size}"
        self.classification_model = RobertaForSequenceClassification.from_pretrained(model_path)
        train_history, val_history = None, None

        test_tata_history,wrong_idx_dict = self.test_model(wrong_idx_dict, tata_flag='tata')

        test_non_tata_history, wrong_idx_dict = self.test_model(wrong_idx_dict, tata_flag='non_tata')

        # Combine both test sets
        test_combined_history, wrong_idx_dict = self.test_model(wrong_idx_dict, tata_flag='combined')

        return train_history, val_history, test_tata_history, test_non_tata_history, test_combined_history

In [None]:
b_size=16
num_epochs=20
base_lr=1e-4
results_dict = defaultdict(dict)

### Fine tune each organism

**celegans**

In [None]:
organism = 'celegans'
gamma=0.85

cls_model = ClassificationModelBPE(train, val, test_tata, test_non_tata, organism=organism, b_size=b_size)
train_history, val_history, test_tata_history, test_non_tata_history, test_all_data_history = cls_model.fine_tune_model(num_epochs=num_epochs, base_lr=base_lr)

tata
Test Loss: 1.0352
Test Accuracy: 0.7759
Test F1 Score: 0.7708
Test AUC: 0.8484
Test MCC: 0.5523
True Positive Rate: 0.7537
False Positive Rate: 0.2020
Precision: 0.7887
Recall: 0.7537
True Positives (TP): 153
False Positives (FP): 41
True Negatives (TN): 162
False Negatives (FN): 50

non_tata
Test Loss: 1.0214
Test Accuracy: 0.7729
Test F1 Score: 0.7649
Test AUC: 0.8561
Test MCC: 0.5471
True Positive Rate: 0.7390
False Positive Rate: 0.1931
Precision: 0.7928
Recall: 0.7390
True Positives (TP): 903
False Positives (FP): 236
True Negatives (TN): 986
False Negatives (FN): 319

celegans
combined
Test Loss: 1.0348
Test Accuracy: 0.7733
Test F1 Score: 0.7658
Test AUC: 0.8552
Test MCC: 0.5478
True Positive Rate: 0.7411
False Positive Rate: 0.1944
Precision: 0.7922
Recall: 0.7411
True Positives (TP): 1056
False Positives (FP): 277
True Negatives (TN): 1148
False Negatives (FN): 369



**gallus**

In [None]:
organism = 'gallus'
gamma=0.85
cls_model = ClassificationModelBPE(train, val, test_tata, test_non_tata, organism=organism, b_size=b_size)
train_history, val_history, test_tata_history, test_non_tata_history, test_all_data_history = cls_model.fine_tune_model(num_epochs=num_epochs, base_lr=base_lr)

tata
Test Loss: 1.4131
Test Accuracy: 0.6556
Test F1 Score: 0.5279
Test AUC: 0.7379
Test MCC: 0.3698
True Positive Rate: 0.3852
False Positive Rate: 0.0741
Precision: 0.8387
Recall: 0.3852
True Positives (TP): 52
False Positives (FP): 10
True Negatives (TN): 125
False Negatives (FN): 83

non_tata
Test Loss: 1.3451
Test Accuracy: 0.6934
Test F1 Score: 0.5863
Test AUC: 0.7898
Test MCC: 0.4522
True Positive Rate: 0.4345
False Positive Rate: 0.0477
Precision: 0.9011
Recall: 0.4345
True Positives (TP): 474
False Positives (FP): 52
True Negatives (TN): 1039
False Negatives (FN): 617

gallus
combined
Test Loss: 1.3480
Test Accuracy: 0.6892
Test F1 Score: 0.5799
Test AUC: 0.7837
Test MCC: 0.4432
True Positive Rate: 0.4290
False Positive Rate: 0.0506
Precision: 0.8946
Recall: 0.4290
True Positives (TP): 526
False Positives (FP): 62
True Negatives (TN): 1164
False Negatives (FN): 700



**human**

In [None]:
organism = 'human'
gamma=0.8

cls_model = ClassificationModelBPE(train, val, test_tata, test_non_tata, organism=organism, b_size=b_size)
train_history, val_history, test_tata_history, test_non_tata_history, test_all_data_history = cls_model.fine_tune_model(num_epochs=num_epochs, base_lr=base_lr)

tata
Test Loss: 0.6691
Test Accuracy: 0.8241
Test F1 Score: 0.8376
Test AUC: 0.9079
Test MCC: 0.6573
True Positive Rate: 0.9072
False Positive Rate: 0.2590
Precision: 0.7779
Recall: 0.9072
True Positives (TP): 557
False Positives (FP): 159
True Negatives (TN): 455
False Negatives (FN): 57

non_tata
Test Loss: 0.6980
Test Accuracy: 0.8121
Test F1 Score: 0.8234
Test AUC: 0.8979
Test MCC: 0.6295
True Positive Rate: 0.8762
False Positive Rate: 0.2519
Precision: 0.7767
Recall: 0.8762
True Positives (TP): 4650
False Positives (FP): 1337
True Negatives (TN): 3970
False Negatives (FN): 657

human
combined
Test Loss: 0.6947
Test Accuracy: 0.8134
Test F1 Score: 0.8249
Test AUC: 0.8988
Test MCC: 0.6323
True Positive Rate: 0.8794
False Positive Rate: 0.2527
Precision: 0.7768
Recall: 0.8794
True Positives (TP): 5207
False Positives (FP): 1496
True Negatives (TN): 4425
False Negatives (FN): 714



**melanogaster**

In [None]:
organism = 'melanogaster'
gamma=0.85

cls_model = ClassificationModelBPE(train, val, test_tata, test_non_tata, organism=organism, b_size=b_size)
train_history, val_history, test_tata_history, test_non_tata_history, test_all_data_history = cls_model.fine_tune_model(num_epochs=num_epochs, base_lr=base_lr)

tata
Test Loss: 1.0833
Test Accuracy: 0.7029
Test F1 Score: 0.6737
Test AUC: 0.7869
Test MCC: 0.4124
True Positive Rate: 0.6135
False Positive Rate: 0.2077
Precision: 0.7471
Recall: 0.6135
True Positives (TP): 319
False Positives (FP): 108
True Negatives (TN): 412
False Negatives (FN): 201

non_tata
Test Loss: 0.9153
Test Accuracy: 0.7445
Test F1 Score: 0.7334
Test AUC: 0.8222
Test MCC: 0.4907
True Positive Rate: 0.7030
False Positive Rate: 0.2139
Precision: 0.7667
Recall: 0.7030
True Positives (TP): 2021
False Positives (FP): 615
True Negatives (TN): 2260
False Negatives (FN): 854

melanogaster
combined
Test Loss: 0.9432
Test Accuracy: 0.7381
Test F1 Score: 0.7247
Test AUC: 0.8166
Test MCC: 0.4786
True Positive Rate: 0.6892
False Positive Rate: 0.2130
Precision: 0.7640
Recall: 0.6892
True Positives (TP): 2340
False Positives (FP): 723
True Negatives (TN): 2672
False Negatives (FN): 1055



**mulatta**

In [None]:
organism = 'mulatta'
gamma=0.85

cls_model = ClassificationModelBPE(train, val, test_tata, test_non_tata, organism=organism, b_size=b_size)
train_history, val_history, test_tata_history, test_non_tata_history, test_all_data_history = cls_model.fine_tune_model(num_epochs=num_epochs, base_lr=base_lr)

Epoch 1/20, Train Loss: 0.7098, Train Accuracy: 0.5000, Val Loss: 0.6931, Val Accuracy: 0.5188
Epoch 2/20, Train Loss: 0.7031, Train Accuracy: 0.5064, Val Loss: 0.7039, Val Accuracy: 0.5000
Epoch 3/20, Train Loss: 0.6884, Train Accuracy: 0.5591, Val Loss: 0.6786, Val Accuracy: 0.5960
Epoch 4/20, Train Loss: 0.6764, Train Accuracy: 0.5754, Val Loss: 0.6850, Val Accuracy: 0.6230
Epoch 5/20, Train Loss: 0.6534, Train Accuracy: 0.6298, Val Loss: 0.6381, Val Accuracy: 0.6411
Epoch 6/20, Train Loss: 0.6270, Train Accuracy: 0.6542, Val Loss: 0.6377, Val Accuracy: 0.6418
Epoch 7/20, Train Loss: 0.6030, Train Accuracy: 0.6756, Val Loss: 0.6458, Val Accuracy: 0.6322
Epoch 8/20, Train Loss: 0.5928, Train Accuracy: 0.6851, Val Loss: 0.6370, Val Accuracy: 0.6381
Epoch 9/20, Train Loss: 0.6236, Train Accuracy: 0.6597, Val Loss: 0.6412, Val Accuracy: 0.6414
Early stopping triggered
tata
Test Loss: 0.7753
Test Accuracy: 0.5588
Test F1 Score: 0.6512
Test AUC: 0.6609
Test MCC: 0.1387
True Positive Rate:

**musculus**

In [None]:
organism = 'musculus'
# gamma=0.75

cls_model = ClassificationModelBPE(train, val, test_tata, test_non_tata, organism=organism, b_size=b_size)
train_history, val_history, test_tata_history, test_non_tata_history, test_all_data_history = cls_model.fine_tune_model(num_epochs=num_epochs, base_lr=base_lr)

tata
Test Loss: 1.1258
Test Accuracy: 0.8082
Test F1 Score: 0.8124
Test AUC: 0.8845
Test MCC: 0.6169
True Positive Rate: 0.8308
False Positive Rate: 0.2145
Precision: 0.7948
Recall: 0.8308
True Positives (TP): 550
False Positives (FP): 142
True Negatives (TN): 520
False Negatives (FN): 112

non_tata
Test Loss: 1.1041
Test Accuracy: 0.8157
Test F1 Score: 0.8156
Test AUC: 0.8886
Test MCC: 0.6314
True Positive Rate: 0.8155
False Positive Rate: 0.1841
Precision: 0.8158
Recall: 0.8155
True Positives (TP): 3557
False Positives (FP): 803
True Negatives (TN): 3559
False Negatives (FN): 805

musculus
combined
Test Loss: 1.1072
Test Accuracy: 0.8147
Test F1 Score: 0.8152
Test AUC: 0.8881
Test MCC: 0.6294
True Positive Rate: 0.8175
False Positive Rate: 0.1881
Precision: 0.8129
Recall: 0.8175
True Positives (TP): 4107
False Positives (FP): 945
True Negatives (TN): 4079
False Negatives (FN): 917



**norvegicus**

In [None]:
organism = 'norvegicus'
gamma=0.85

cls_model = ClassificationModelBPE(train, val, test_tata, test_non_tata, organism=organism, b_size=b_size)
train_history, val_history, test_tata_history, test_non_tata_history, test_all_data_history = cls_model.fine_tune_model(num_epochs=num_epochs, base_lr=base_lr)

tata
Test Loss: 0.8881
Test Accuracy: 0.7607
Test F1 Score: 0.7797
Test AUC: 0.8337
Test MCC: 0.5293
True Positive Rate: 0.8466
False Positive Rate: 0.3252
Precision: 0.7225
Recall: 0.8466
True Positives (TP): 138
False Positives (FP): 53
True Negatives (TN): 110
False Negatives (FN): 25

non_tata
Test Loss: 0.9191
Test Accuracy: 0.7391
Test F1 Score: 0.7574
Test AUC: 0.8202
Test MCC: 0.4837
True Positive Rate: 0.8146
False Positive Rate: 0.3364
Precision: 0.7077
Recall: 0.8146
True Positives (TP): 1775
False Positives (FP): 733
True Negatives (TN): 1446
False Negatives (FN): 404

norvegicus
combined
Test Loss: 0.9142
Test Accuracy: 0.7406
Test F1 Score: 0.7590
Test AUC: 0.8211
Test MCC: 0.4869
True Positive Rate: 0.8168
False Positive Rate: 0.3356
Precision: 0.7088
Recall: 0.8168
True Positives (TP): 1913
False Positives (FP): 786
True Negatives (TN): 1556
False Negatives (FN): 429



**rerio**

In [None]:
organism = 'rerio'
gamma=0.85

cls_model = ClassificationModelBPE(train, val, test_tata, test_non_tata, organism=organism, b_size=b_size)
train_history, val_history, test_tata_history, test_non_tata_history, test_all_data_history = cls_model.fine_tune_model(num_epochs=num_epochs, base_lr=base_lr)

tata
Test Loss: 1.0304
Test Accuracy: 0.7026
Test F1 Score: 0.7165
Test AUC: 0.7831
Test MCC: 0.4071
True Positive Rate: 0.7518
False Positive Rate: 0.3466
Precision: 0.6844
Recall: 0.7518
True Positives (TP): 321
False Positives (FP): 148
True Negatives (TN): 279
False Negatives (FN): 106

non_tata
Test Loss: 0.9994
Test Accuracy: 0.7215
Test F1 Score: 0.7358
Test AUC: 0.7909
Test MCC: 0.4456
True Positive Rate: 0.7756
False Positive Rate: 0.3326
Precision: 0.6999
Recall: 0.7756
True Positives (TP): 1334
False Positives (FP): 572
True Negatives (TN): 1148
False Negatives (FN): 386

rerio
combined
Test Loss: 1.0042
Test Accuracy: 0.7177
Test F1 Score: 0.7320
Test AUC: 0.7893
Test MCC: 0.4380
True Positive Rate: 0.7708
False Positive Rate: 0.3354
Precision: 0.6968
Recall: 0.7708
True Positives (TP): 1655
False Positives (FP): 720
True Negatives (TN): 1427
False Negatives (FN): 492



# SHAP

In [None]:
import shap
from transformers import AutoModelForSequenceClassification, AutoTokenizer

In [None]:
train_tata = pd.read_csv("data_exp1/train_tata_met_2.csv", index_col = False)
val_tata = pd.read_csv("data_exp1/val_tata_met_2.csv", index_col = False)
test_tata = pd.read_csv('data_exp1/test_tata_met_2.csv', index_col = False)

train_non_tata = pd.read_csv('data_exp1/train_non_tata_met_2.csv', index_col = False)
val_non_tata = pd.read_csv('data_exp1/val_non_tata_met_2.csv', index_col = False)
test_non_tata = pd.read_csv('data_exp1/test_non_tata_met_2.csv', index_col = False)

In [None]:
tokenizer_dir = 'models/bpe_tokenizer'

tokenizer = RobertaTokenizerFast.from_pretrained(tokenizer_dir)

In [None]:
non_tata_test_seq = test_non_tata[test_non_tata['organism'] == 'human']['seq'].tolist()

In [None]:
test_tata_seq = test_tata[test_tata['organism'] == 'human']['seq'].tolist()

In [None]:
test_all = non_tata_test_seq + test_tata_seq

In [None]:
test_encodings_all = tokenizer(test_all, truncation=True, padding=True, return_tensors='pt')

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


In [None]:
# Move inputs and masks to device
input_ids = test_encodings_all['input_ids'].to(device)
attention_mask = test_encodings_all['attention_mask'].to(device)

In [None]:
test_encodings_tokens_subset = test_encodings_all['input_ids'][:10000]
# test_encodings_tokens_subset

In [None]:
masker = shap.maskers.Partition(np.array(test_encodings_tokens_subset))

In [None]:
def plot_shap_values(shap_dict, type="Global"):
  toekns = shap_dict.keys()
  shap_values_ls = shap_dict.values()

  # Determine the color for each bar based on positive or negative value
  colors = ['green' if value >= 0 else 'red' for value in shap_values_ls]

  # Create a bar plott with colors based on positive or negative values
  plt.figure(figsize=(25, 6))
  plt.bar(toekns, shap_values_ls, width=0.5, color=colors)

  # Add title and labels to the plot
  plt.title(f'{type} SHAP Values per Token')
  plt.xlabel('Token')
  plt.ylabel('SHAP Value')
  plt.tight_layout()
  plt.xticks(rotation = 90)

  # Show the plot
  plt.show()

In [None]:
def model_predictions_for_shap(tokenized_data):
    model.eval()  # Ensure the model is in evaluation mode

    # Check if tokenized_data is already a tensor, then move it to the device
    if isinstance(tokenized_data, torch.Tensor):
        tensored_data = tokenized_data.to(device)
    else:
        # If tokenized_data is not a tensor (e.g., a list or numpy array),
        # convert it efficiently to a tensor and then move to the device.
        tensored_data = torch.as_tensor(tokenized_data, device=device)

    with torch.no_grad():  # Context manager that disables gradient calculation
        logits = model(tensored_data).logits
        probabilities = F.softmax(logits, dim=1)  # Compute probabilities

    return probabilities

In [None]:
model_dir = 'models/bpe fine tune models/method2/bpe_cls_human_model_b16'

model = AutoModelForSequenceClassification.from_pretrained(model_dir)
model.to(device)

RobertaForSequenceClassification(
  (roberta): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(30000, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-5): 6 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (

In [None]:
model.eval()

RobertaForSequenceClassification(
  (roberta): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(30000, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-5): 6 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (

In [None]:
# Tokenize your test sequences
test_encodings_all = {key: value.to(device) for key, value in test_encodings_all.items()}
input_ids = test_encodings_all['input_ids']
attention_mask = test_encodings_all['attention_mask']

In [None]:
# Subset of tokens for SHAP analysis
test_encodings_tokens_subset = input_ids[:10000].cpu().numpy()  # Move to CPU and convert to numpy array

In [None]:
# Define SHAP masker
masker = shap.maskers.Partition(test_encodings_tokens_subset)

In [None]:
explainer = shap.PartitionExplainer(model_predictions_for_shap, masker)

In [None]:
shap_values = explainer(np.array(test_encodings_tokens_subset), max_evals=30)

We strongly recommend passing in an `attention_mask` since your input_ids may be padded. See https://huggingface.co/docs/transformers/troubleshooting#incorrect-output-when-padding-tokens-arent-masked.
PartitionExplainer explainer: 10001it [5:22:42,  1.94s/it]                            


In [None]:
import pickle

with open('models/SHAP/shap_bpe_human/shap_values_10000_met2.pkl', 'wb') as file:
    pickle.dump(shap_values, file)

In [None]:
# Accessing all unique features and tokens
unique_features = list(tokenizer.get_vocab().keys())
unique_tokens = list(tokenizer.get_vocab().values())
tokens_and_features_dict = dict(zip(unique_tokens, unique_features))

print(f"Number of unique features: {len(unique_features)}")

Number of unique features: 30000
