In [None]:
import pandas as pd
from tqdm import tqdm

import numpy as np
import matplotlib.pyplot as plt

import torch
from torch.utils.data import DataLoader, WeightedRandomSampler
import torch.nn as nn
import torch.optim as optim
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("Intel/dynamic_tinybert")

from sklearn.metrics import f1_score, accuracy_score

import os
os.chdir('C:\\Users\\mathi\\SimpleSequenceClassif')
from modules.preprocessing import categories_fit_one_hot, seq_pipeline
from modules.datasets import SeqCatBalancedDataset, sample_weights
from modules.data_specific import cleaning
from modules.models import TinyBERTClassifier

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
base_path = 'C:\\Users\\mathi\\Documents\\sequence_data\\'
df0, df1, df2, df3, df4, test = [pd.read_csv(base_path + suffix) for suffix in [
    'fold_0.csv', 'fold_1.csv', 'fold_2.csv', 'fold_3.csv', 'fold_4.csv', 'test.csv']]
for df in [df0, df1, df2, df3, df4, test]:
  cleaning(df)
full_data = pd.concat([df0, df1, df2, df3, df4])

In [3]:
sequence = 'peptide'
categories = ['class', 'gene', 'variant']

In [4]:
# Data Loaders setup

dataframe = pd.concat([df0, df1, df2, df3, df4])
_, cat_encoders = categories_fit_one_hot(dataframe, categories)

# Create PyTorch DataLoader for training and validation
train_dataset = SeqCatBalancedDataset(dataframe,
                                      sequence,
                                      tokenizer,
                                      categories,
                                      cat_encoders)
# Create a balanced sampler based on the dataset
weights = sample_weights(train_dataset)
balanced_sampler = WeightedRandomSampler(weights,
                                         len(train_dataset),
                                         replacement=True,
                                         generator=None)

# Create DataLoader with the balanced sampler
train_loader = DataLoader(train_dataset, 
                          batch_size=16, 
                          sampler=balanced_sampler)


val_dataset = SeqCatBalancedDataset(test,
                                    sequence,
                                    tokenizer,
                                    categories,
                                    cat_encoders)

val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

In [None]:
# Initialize the model
hidden_dim = 128
num_categories = 72
tinybert_classifier = TinyBERTClassifier(hidden_dim, num_categories)
tinybert_classifier.to(device)

# Define loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(tinybert_classifier.parameters(), lr=1e-5)

# Define the number of iterations
num_epochs = 1
num_iterations = len(train_loader) * num_epochs

# Lists to store loss values
losses = []

# Define loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(tinybert_classifier.parameters(), lr=1e-5)

# Train the model
for epoch in range(num_epochs):
    tinybert_classifier.train()

    # Wrap the train_loader with tqdm to create a progress bar
    for batch_sequence, batch_masks, batch_categorical, batch_y in \
        tqdm(train_loader, desc=f"Epoch {epoch + 1}", ncols=100):

        optimizer.zero_grad()
        output = tinybert_classifier(
            batch_sequence,
            batch_masks,
            batch_categorical
        )
        loss = criterion(output, batch_y)
        loss.backward()
        optimizer.step()

        # Append the loss to the list
        losses.append(loss.item())

# Final loss plot
plt.figure(figsize=(10, 6))
plt.plot(losses, label="Training Loss")
plt.xlabel("Iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
def validate(model, dataloader):
    model.eval()  # Set the model to evaluation mode
    y_true = []
    y_pred = []

    with torch.no_grad():  # Disable gradient computation during validation
        for batch_sequence, batch_masks, batch_categorical, batch_y in tqdm(
            dataloader, desc="Validation", ncols=100):

            # Forward pass
            output = model(batch_sequence,
                           batch_masks,
                           batch_categorical)

            # Convert probabilities to binary predictions (0 or 1)
            predictions = (output > 0.5).float()
            # Append true labels and predictions
            y_true.extend(batch_y.cpu().numpy())
            y_pred.extend(predictions.cpu().numpy())

    f1 = f1_score(np.array(y_true).reshape((-1)), np.array(y_pred).reshape((-1)))  # Calculate the F1 score
    return f1

# Call the validate function to get the F1 score
f1_score = validate(tinybert_classifier, val_loader)
print(f"F1 Score on Test Set: {f1_score}")

In [None]:
# Debug
for batch_sequence, batch_categorical, batch_y in val_loader:
  print(batch_sequence.shape)
  print(batch_categorical.shape)
  print(batch_y.shape)
  break