In [1]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import torch.optim.lr_scheduler as lr_scheduler

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from sklearn.metrics import roc_curve, precision_recall_curve, confusion_matrix, precision_score, recall_score, accuracy_score
import math

In [2]:
import os
import sys

train_path, validation_path, test_path = "", "", ""

# Function to detect the environment
def detect_environment():
    # Check if Google Colab
    try:
        import google.colab
        return 'colab'
    except ImportError:
        pass
    
    # Check if Jupyter Notebook
    try:
        shell = get_ipython().__class__.__name__
        if shell == 'ZMQInteractiveShell':
            return 'jupyter'
        elif shell == 'TerminalInteractiveShell':
            return 'terminal'
        else:
            return 'other'
    except NameError:
        return 'other'

# Set paths based on the detected environment
env = detect_environment()

if env == 'colab':
    from google.colab import drive
    drive.mount('/content/drive')
    
    train_path = "/content/drive/MyDrive/fullset_train.csv"
    validation_path = "/content/drive/MyDrive/fullset_validation.csv"
    test_path = "/content/drive/MyDrive/fullset_test.csv"
elif env == 'jupyter':
    train_path = "../Dataset/fullset_train.csv"
    validation_path = "../Dataset/fullset_validation.csv"
    test_path = "../Dataset/fullset_test.csv"
else:
    print("Unknown environment. Please set paths manually.")

# Use the DATA_PATH in your code
if train_path:
    print(f"train path is set to: {train_path}")
else:
    print("train_path is not set. Please check your environment settings.")


train path is set to: ../Dataset/fullset_train.csv


In [3]:
# device detection
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Device:", device)

Device: cuda


In [15]:
# HYPERPARAMETERS
N_BASES = 4

batch_size = 128
learning_rate = 1e-3
n_epochs = 1

threshold = 0.80

loss_info_ratio = 0.1

In [5]:
# GPU dataset version (immediately moves the data on GPU to enanche performaces)
def convert_string_sequence_into_array(sequence):
    """Transforms a string into a numpy array of one-hot encoded data
    
    Parameters
    ----------
    sequence: str
        input DNA sequence as string

    Returns
    -------
    numpy.ndarray
        the one-hot encoded DNA sequence as an array
    """
    result = np.zeros(shape=(1, N_BASES, len(sequence)))
    base_dict = {'A': 0, 'C': 1, 'G': 2, 'T': 3}
    for idx, base in enumerate(sequence.upper()):
        result[0, base_dict[base], idx] = 1
    return result

class DNADatasetForGPU(Dataset):
    """Dataset built for performance
    The previous versions used specific transformations, as shown in
    https://pytorch.org/tutorials/beginner/basics/transforms_tutorial.html
    but this massively slowed the computation time when we switched to the actual architecture

    The actual dataset processes every tensor at "first-reading time", and moves it to the current device directly if needed.
    """
    def __init__(self, csv_path):
        df = pd.read_csv(csv_path, header=0, names=['seq_id', 'sequence', 'label'])

        self.y = torch.tensor(df['label'].values).float().view(-1, 1).to(device)
        self.x = torch.tensor(
            np.array([convert_string_sequence_into_array(seq) for seq in df['sequence']])
        ).float().to(device)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

In [6]:
# DATASET EXTRACTION
train_dataset = DNADatasetForGPU(train_path)

val_dataset = DNADatasetForGPU(validation_path)

test_dataset = DNADatasetForGPU(test_path)

print(len(train_dataset), len(val_dataset), len(test_dataset))

# calculating the ratio between the classes
n_pos_examples = (train_dataset.y == 1).sum().item()
n_neg_examples = len(train_dataset) - n_pos_examples

fake_weight = n_neg_examples / float(n_pos_examples)
general_weight = n_pos_examples / float(len(train_dataset))

fake_weight_tensor = torch.tensor([fake_weight]).to(device)
general_weight_tensor = torch.tensor([general_weight]).to(device)

print("fake_weight:", fake_weight_tensor.item())
print("fake_weight tensor size:", fake_weight_tensor.shape)
print("general_weight:", general_weight_tensor.item())
print("general_weight tensor size:", general_weight_tensor.shape)

211238 26404 26404
fake_weight: 46.29914855957031
fake_weight tensor size: torch.Size([1])
general_weight: 0.021142030134797096
general_weight tensor size: torch.Size([1])


In [7]:
# testing that is a probability
neg_weight = general_weight
pos_weight = general_weight * fake_weight
print("neg_weight:", neg_weight)
print("pos_weight:", pos_weight)
print("Summation:", neg_weight + pos_weight)

neg_weight: 0.021142029369715674
pos_weight: 0.9788579706302843
Summation: 1.0


In [8]:
# Dataloader declarations
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

In [9]:
INPUT_CHANNELS = 1
SEQ_LENGTH = 300

class AttentionVirusModel(nn.Module):
    def __init__(self, blocks=6, embed_dim=128, L=2):
        super(AttentionVirusModel, self).__init__()

        self.pos_enc = PositionalEncoding(SEQ_LENGTH, L)
        
        self.conv = nn.Sequential(
            nn.Conv2d(INPUT_CHANNELS, embed_dim, kernel_size=(4, 5)),
            nn.BatchNorm2d(embed_dim),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 2))
        )
        
        self.attention = nn.Sequential(
            *[SelfAttentionBlock(embed_dim, [148, embed_dim], embed_dim * 4) for i in range(blocks)]
        )

        self.linear = nn.Linear(embed_dim, 1)

    def forward(self, x):
        x = self.pos_enc(x)
        x = self.conv(x)

        # size [batch_size, self.maps, 1, 148] into [batch_size, 148, self.maps]
        x = x.permute(0, 3, 1, 2).contiguous()
        x = x.view(x.size(0), x.size(1), -1)
        
        x = self.attention(x)
        x = self.linear(x)
        return x

class PositionalEncoding(nn.Module):
    def __init__(self, seq_length, L):
        super(PositionalEncoding, self).__init__()

        encoding = []
        for exp in range(0, L):
            sin_index = 2 * exp
            cos_index = (2 * exp) + 1
            encoding.insert(sin_index, [])
            encoding.insert(cos_index, [])
            omega = 2 ** exp
            for position in range(0, seq_length):
                enc_sin = np.sin(omega * np.pi * position)
                encoding[sin_index].insert(position, enc_sin)
                enc_cos = np.cos(omega * np.pi * position)
                encoding[cos_index].insert(position, enc_cos)
        self.pos_enc = torch.tensor(encoding, dtype=torch.float32).to(device)

    def forward(self, x):
        x += self.pos_enc[:x.shape[1],:]
        return x

class SelfAttentionBlock(nn.Module):
    def __init__(self, embed_dim, norm_dim, linear_dim, heads=8):
        super(SelfAttentionBlock, self).__init__()

        self.attention = nn.MultiheadAttention(embed_dim, heads, batch_first=True)
        self.layer_norm1 = nn.LayerNorm(norm_dim)
        self.linear = nn.Sequential(
            nn.Linear(embed_dim, linear_dim),
            nn.ReLU(),
            nn.Linear(linear_dim, embed_dim)
        )
        self.layer_norm2 = nn.LayerNorm(norm_dim)

    def forward(self, x):
        att, _ = self.attention(x, x, x)
        x = self.layer_norm1(x + att)
        x = self.layer_norm2(x + self.linear(x))
        return x

In [16]:
# MODEL DECLARATION
model = AttentionVirusModel().to(device)

def init_weights(layer):
    """Initialize weights for the model"""
    if isinstance(layer, nn.Linear):
        torch.nn.init.xavier_uniform_(layer.weight)

model.apply(init_weights)

AttentionVirusModel(
  (pos_enc): PositionalEncoding()
  (conv): Sequential(
    (0): Conv2d(1, 128, kernel_size=(4, 5), stride=(1, 1))
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=(1, 2), stride=(1, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (attention): Sequential(
    (0): SelfAttentionBlock(
      (attention): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=128, out_features=128, bias=True)
      )
      (layer_norm1): LayerNorm((148, 128), eps=1e-05, elementwise_affine=True)
      (linear): Sequential(
        (0): Linear(in_features=128, out_features=512, bias=True)
        (1): ReLU()
        (2): Linear(in_features=512, out_features=128, bias=True)
      )
      (layer_norm2): LayerNorm((148, 128), eps=1e-05, elementwise_affine=True)
    )
    (1): SelfAttentionBlock(
      (attention): MultiheadAttention(
        (out_proj): NonDynamicallyQuant

In [17]:
# loss and optimizer definition
criterion = nn.BCEWithLogitsLoss(weight=general_weight_tensor, pos_weight=fake_weight_tensor)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.CyclicLR(optimizer, base_lr=learning_rate, max_lr=learning_rate*100, mode='triangular2')

In [18]:
# loops definition
def train_loop(dataloader, model, criterion, optimizer, loss_list, iter_per_epoch, loss_info_step=0):
    """Train loop
    Performs a single training epoch over the dataset

    Parameters
    ----------
    dataloader: torch.utils.data.dataloader.DataLoader
        dataloader for the current dataset (should be train)
    model: torch.nn.Module
        model to train
    criterion: loss function of pytorch
        loss function used
    optimizer: torch.optim.Optimizer
        algorithm that performs the parameters update
    loss_list: list
        here the function appends each value of the average loss computed each loss_info_step number of iterations (see below)
    iter_per_epoch: positive int
        number of iterations within one epoch
    loss_info_step: positive int (less than iter_per_epochs), optional
        specifies the number of iterations that must pass before appending a new loss value to loss_list. It also specifies the time interval between
        the output printing. If set to 0, no output will be printed and no loss will be stored. (default is 0)
    """
    model.train()
    partial_loss = 0
    
    for batch, (X, y) in enumerate(dataloader):        
        # forward pass and loss computation
        y_pred = model(X)[:, -1, :] # choose the last token for the prediction        
        loss = criterion(y_pred, y)
        partial_loss += loss.item()
    
        # backward pass and params update
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if loss_info_step != 0 and (batch + 1) % loss_info_step == 0:
            # register loss for the plot
            avg_loss = partial_loss / float(loss_info_step)
            loss_list.append(avg_loss)
            partial_loss = 0
            # NOT IMPLEMENTED IN THIS FILE!
            #plot_grad_flow(model.named_parameters())

            # print some infos
            print(f"Iteration {batch + 1}/{iter_per_epoch}: avg_loss = {avg_loss:.6f}")

def validation_loop(dataloader, model, criterion, loss_list, threshold=0.5, loss_info_step=0):
    """Evaluation loop
    Performs a single validation/test epoch over the dataset (evaluates the performaces)

    Parameters
    ----------
    dataloader: torch.utils.data.dataloader.DataLoader
        dataloader for the current dataset (should be validation or test)
    model: torch.nn.Module
        model to evaluate
    criterion: loss function of pytorch
        loss function used
    loss_list: list
        here the function appends each value of the average loss computed each loss_info_step number of iterations (see below)
    iter_per_epoch: positive int
        number of iterations within one epoch
    threshold: [0, 1] float, optional
        threshold value (defualt is 0.5)
    loss_info_step: positive int (less than iter_per_epochs), optional
        specifies the number of iterations that must pass before appending a new loss value to loss_list. It also specifies the time interval between
        the output printing. If set to 0, no output will be printed and no loss will be stored. (default is 0)

    Returns
    -------
    tuple
        a tuple containing all the predictions and all the labels respectively as numpy.ndarray
    """
    model.eval()
    total_loss = 0
    partial_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch, (X, y) in enumerate(dataloader):
            # forward pass and loss computation
            y_pred = model(X)[:, -1, :] # choose the last token for the prediction 
            loss = criterion(y_pred, y)
            total_loss += loss.item()
            partial_loss += loss.item()

            # class prediction
            prob_pred = nn.functional.sigmoid(y_pred)
            #class_pred = prob_pred > threshold

            # reporting results
            all_preds.extend(prob_pred.cpu().numpy())
            all_labels.extend(y.cpu().numpy())

            if loss_info_step != 0 and (batch + 1) % loss_info_step == 0:
                # register loss for the plot
                avg_loss = partial_loss / float(loss_info_step)
                loss_list.append(avg_loss)
                partial_loss = 0

    # compute metrics
    all_preds = np.array(all_preds).flatten()
    all_labels = np.array(all_labels).flatten()
    pred_labels = (all_preds > threshold).astype(int)
    
    accuracy = accuracy_score(all_labels, pred_labels)
    precision = precision_score(all_labels, pred_labels, zero_division=0.0)
    recall = recall_score(all_labels, pred_labels, zero_division=0.0)
    
    avg_loss = total_loss / len(dataloader)
    
    print(f"Validation Accuracy: {accuracy:.4f}, Average Loss: {avg_loss:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")
    return all_preds, all_labels

In [19]:
###### MAIN LOOP
iter_per_epoch_train = math.ceil(len(train_dataset) / float(batch_size))
iter_per_epoch_val = math.ceil(len(val_dataset) / float(batch_size))
loss_info_step_train = math.floor(iter_per_epoch_train * loss_info_ratio)
loss_info_step_val = math.floor(iter_per_epoch_val * loss_info_ratio)

best_val_loss = float('inf')
patience = 5

train_loss_list = []
val_loss_list = []

for epoch in range(n_epochs):
    print(f"Epoch {epoch + 1} -------------")
    train_loop(train_dataloader, model, criterion, optimizer, train_loss_list, iter_per_epoch_train, loss_info_step=loss_info_step_train)
    validation_loop(val_dataloader, model, criterion, val_loss_list, threshold=threshold, loss_info_step=loss_info_step_val)

    scheduler.step()

    torch.save(model.state_dict(), '../Models/best_model_AttentionVirusModel.pth')
    """
    if val_loss_list[-1] < best_val_loss:
        best_val_loss = val_loss_list[-1]
        counter = 0
        torch.save(model.state_dict(), '../Models/best_model_AttentionVirusModel.pth')
    else:
        counter += 1

    if counter >= patience:
        print("Early stopping")
        break
    """

Epoch 1 -------------


KeyboardInterrupt: 