In [None]:
hyperparameters = {
    "lr": 1e-4,
    "batch_size": 1024,
    "embed_dim": 256,
    "epochs": 100,
    "num_warmup_steps": 500,
    "weight_decay": 1e-4,
    "dropout": 0.4,
    "num_heads": 4,
    "num_transformer_layers": 4,
    "ff_dim": 512,
    "cnn_out_channels": 64,
    "k-mers": 3,
    "max_len": 199
}

info = {
    "dataset_size": "data",
    "precision": "FP16",
    "dir_name": "Binary Mutation Model",
    "run": "5thRun",
    "optimizer": "Adam",
    "is_pre_training": False
}

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd

data_path = f"/content/drive/MyDrive/{info['dataset_size']}.csv"

datao = pd.read_csv(data_path)
datao.shape

(1859483, 10)

In [None]:
conflicts = datao.groupby(['sequence', 'ref', 'alt', 'chrom']).label.nunique()

In [None]:
conflict_variants = conflicts[conflicts > 1].index
datao = datao[~datao.set_index(['sequence', 'ref', 'alt', 'chrom']).index.isin(conflict_variants)]

In [None]:
data = datao[datao['label'] != 2]
data.shape

(786865, 10)

In [None]:
data['label'] = data['label'].apply(lambda x : 0 if x in [0, 1] else 1)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['label'] = data['label'].apply(lambda x : 0 if x in [0, 1] else 1)


In [None]:
data['label'].value_counts()

Unnamed: 0_level_0,count
label,Unnamed: 1_level_1
0,700724
1,86141


In [None]:
data1 = data[data['label'] == 1].sample(n=86141, random_state=42)
data0 = data[data['label'] == 0].sample(n=100000, random_state=42)

In [None]:
data = pd.concat([data1, data0])

In [None]:
data['label'].value_counts()

Unnamed: 0_level_0,count
label,Unnamed: 1_level_1
0,100000
1,86141


In [None]:
data.isnull().sum()

Unnamed: 0,0
sequence,0
label,0
mutation_pos,0
ref,0
alt,0
mutation_type,0
chrom,0
genomic_pos,0
context_left,0
context_right,0


In [None]:
def gc_content(seq):
    seq = seq.upper()
    gc = seq.count('G') + seq.count('C')
    return gc / len(seq)

data['gc_content'] = data['sequence'].apply(gc_content)

In [None]:
def at_content(seq):
    seq = seq.upper()
    return (seq.count('A') + seq.count('T')) / len(seq)

data['at_content'] = data['sequence'].apply(at_content)

In [None]:
def is_cpg_site(row):
    seq = row['sequence'].upper()
    pos = row['mutation_pos']

    if pos < len(seq)-1 and seq[pos] == 'C' and seq[pos+1] == 'G':
        return 1

    if pos > 0 and seq[pos-1] == 'C' and seq[pos] == 'G':
        return 1
    return 0

data['cpg_flag'] = data.apply(is_cpg_site, axis=1)

In [None]:
from collections import Counter
import numpy as np

def sequence_entropy(seq):
    counts = Counter(seq)
    total = len(seq)
    probs = [count / total for count in counts.values()]
    return -sum(p * np.log2(p) for p in probs)

data['sequence_entropy'] = data['sequence'].apply(sequence_entropy)

In [None]:
def is_transition(ref, alt):
    transitions = {('A','G'), ('G','A'), ('C','T'), ('T','C')}
    return 1 if (ref, alt) in transitions else 0

data['is_transition'] = data.apply(lambda row: is_transition(row['ref'], row['alt']), axis=1)

In [None]:
chrom_lengths = {
    'chr1': 248956422,
    'chr2': 242193529,
    'chr3': 198295559,
    'chr4': 190214555,
    'chr5': 181538259,
    'chr6': 170805979,
    'chr7': 159345973,
    'chr8': 145138636,
    'chr9': 138394717,
    'chr10': 133797422,
    'chr11': 135086622,
    'chr12': 133275309,
    'chr13': 114364328,
    'chr14': 107043718,
    'chr15': 101991189,
    'chr16': 90338345,
    'chr17': 83257441,
    'chr18': 80373285,
    'chr19': 58617616,
    'chr20': 64444167,
    'chr21': 46709983,
    'chr22': 50818468,
}

def normalized_genomic_pos(row):
    chrom = row['chrom']
    chrom_length = chrom_lengths.get(chrom, 1)
    return row['genomic_pos'] / chrom_length

data['genomic_pos_norm'] = data.apply(normalized_genomic_pos, axis=1)

In [None]:
left_feature = data[['genomic_pos', 'gc_content', 'at_content', 'cpg_flag', 'sequence_entropy', 'is_transition', 'genomic_pos_norm']]

In [None]:
from sklearn.preprocessing import OneHotEncoder

mutation_type_encoder = OneHotEncoder()
chromosome_encoder = OneHotEncoder()
ref_encoder = OneHotEncoder()
alt_encoder = OneHotEncoder()

In [None]:
# For mutation_type
mutation_type_encoder.fit(data[['mutation_type']])
mutation_type_data = pd.DataFrame(mutation_type_encoder.transform(data[['mutation_type']]).toarray())
mutation_type_data.columns = mutation_type_encoder.get_feature_names_out()

In [None]:
# For chrom
chromosome_encoder.fit(data[['chrom']])
chorm_data = pd.DataFrame(chromosome_encoder.transform(data[['chrom']]).toarray())
chorm_data.columns = chromosome_encoder.get_feature_names_out()

In [None]:
# For ref
ref_encoder.fit(data[['ref']])
ref_data = pd.DataFrame(ref_encoder.transform(data[['ref']]).toarray())
ref_data.columns = ref_encoder.get_feature_names_out()

In [None]:
# For alt
alt_encoder.fit(data[['alt']])
alt_data = pd.DataFrame(alt_encoder.transform(data[['alt']]).toarray())
alt_data.columns = alt_encoder.get_feature_names_out()

In [None]:
right_features = np.hstack((mutation_type_data, chorm_data, ref_data, alt_data))

In [None]:
def get_codon(seq, k=hyperparameters['k-mers']):
    return [seq[i:i+k] for i in range(len(seq) - k + 1)]

vocab = {}

for seq in data['sequence']:
    for codons in get_codon(seq.lower()):
        if codons not in vocab:
            vocab[codons] = len(vocab)
        else:
            continue

def get_tensor(text):
    return [vocab[codons.lower()] for codons in get_codon(text)]

In [None]:
x = data['sequence'].values
extra_features = np.hstack((left_feature, right_features))
y = data['label'].values

In [None]:
from sklearn.preprocessing import StandardScaler

feature_scaler = StandardScaler()
scaled_features = feature_scaler.fit_transform(extra_features)

In [None]:
import torch
from torch.utils.data import Dataset

class CustomDataset(Dataset):
    def __init__(self, x, extra_features, y):
        self.x = x
        self.y = y
        self.extra_features = extra_features

    def __len__(self):
        return len(self.x)

    def __getitem__(self, index):
        seq_tensor = torch.tensor(get_tensor(self.x[index]), dtype=torch.long)
        features = torch.tensor(self.extra_features[index], dtype=torch.float32)
        y_tensor = torch.tensor(self.y[index], dtype=torch.long)

        return seq_tensor, features, y_tensor

In [None]:
dataset = CustomDataset(x, scaled_features, y)

In [None]:
dataset[0]

(tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9,  2, 10, 11, 12, 13, 14, 15, 16,
         17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27,  2, 28, 29, 30,  6,  7,  8,
         31, 32, 21, 33, 10,  3, 34,  9, 35, 34, 31, 32, 21, 36,  4,  6,  7,  8,
         37, 38, 39, 33, 10,  3,  4,  5,  6, 21, 33, 10,  3, 40,  7, 30,  5,  5,
          5,  5, 16,  9,  2, 10, 10, 11, 41, 42, 31, 15, 16, 31, 32, 21, 33, 11,
         41, 42, 31, 15, 43, 44, 10, 10,  3,  4, 16, 17, 18, 19,  8, 17, 13, 18,
         45, 33,  3, 34, 17, 14, 32,  7,  8, 17, 13,  1, 35, 46, 47,  4, 43, 48,
         49, 27, 35, 40,  7, 30,  5, 43, 47, 40, 21, 36,  4,  5, 43, 48, 23, 22,
         49, 27, 50, 41, 32, 21, 33, 10,  3, 40,  7, 30,  5,  5, 43, 47, 34,  9,
         51, 52, 53, 54, 42,  9, 50, 55, 50, 55, 35, 34, 31, 42, 17, 14, 56, 47,
          4, 43, 47, 34, 31, 15, 43, 44, 11, 41, 15,  5,  5,  5, 16, 31, 32, 21,
         22]),
 tensor([-0.7094,  1.1601, -1.1601, -0.3426, -0.2129,  0.7577, -0.4487, -0.1801,
         -0.3

In [None]:
from torch.utils.data import random_split

train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

In [None]:
from torch.utils.data import DataLoader

train_loader = DataLoader(
    train_dataset,
    batch_size=hyperparameters['batch_size'],
    shuffle=True
)

test_loader = DataLoader(test_dataset, batch_size=hyperparameters['batch_size'])

In [None]:
import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
    def __init__(self, embed_dim, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, embed_dim)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp((torch.arange(0, embed_dim, 2)) * (-math.log(10000.0) / embed_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :].to(x.device)
        return x


class CNNTransformerHybrid(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes, max_len, num_extra_features,
                 # Transformer specific params
                 num_heads=8, num_transformer_layers=6, ff_dim=2048,
                 # CNN specific params
                 cnn_out_channels=64,
                 # Common params
                 dropout=0.2):

        super(CNNTransformerHybrid, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)

        self.position_encoding = PositionalEncoding(embed_dim=embed_dim, max_len=max_len)
        transformer_encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim, nhead=num_heads, dim_feedforward=ff_dim,
            dropout=dropout, batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer=transformer_encoder_layer,
            num_layers=num_transformer_layers
        )

        self.conv_layers = nn.Sequential(
            nn.Conv1d(in_channels=embed_dim, out_channels=256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Dropout(dropout),

            nn.Conv1d(in_channels=256, out_channels=128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),

            nn.Conv1d(in_channels=128, out_channels=cnn_out_channels, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Dropout(dropout)
        )

        cnn_output_len = max_len // 8
        flattened_cnn_size = cnn_out_channels * cnn_output_len

        combined_features_size = flattened_cnn_size + embed_dim + num_extra_features

        self.fc_layers = nn.Sequential(
            nn.Linear(combined_features_size, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )

    def forward(self, x_sequence, x_features):
        embeddings = self.embedding(x_sequence)

        transformer_input = self.position_encoding(embeddings)
        transformer_output = self.transformer_encoder(transformer_input)

        transformer_features = transformer_output.mean(dim=1)

        cnn_input = embeddings.permute(0, 2, 1)
        cnn_output = self.conv_layers(cnn_input)
        cnn_features = torch.flatten(cnn_output, 1)

        combined_features = torch.cat([transformer_features, cnn_features, x_features], dim=1)

        output = self.fc_layers(combined_features)
        return output

In [None]:
model = CNNTransformerHybrid(
    vocab_size = len(vocab),
    embed_dim = hyperparameters['embed_dim'],
    num_classes = 2,
    max_len = hyperparameters['max_len'],
    dropout = hyperparameters['dropout'],
    num_heads = hyperparameters['num_heads'],
    num_transformer_layers = hyperparameters['num_transformer_layers'],
    ff_dim = hyperparameters['ff_dim'],
    cnn_out_channels = hyperparameters['cnn_out_channels'],
    num_extra_features = extra_features.shape[1],
)

In [None]:
if info['is_pre_training']:
    checkpoint = torch.load(f"/content/drive/MyDrive/{info['dir_name']}/model4thRun_epoch_60.pth")

    start_epoch = checkpoint['epoch']
    model.load_state_dict(checkpoint['model_state_dict'])

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

CNNTransformerHybrid(
  (embedding): Embedding(64, 256, padding_idx=0)
  (position_encoding): PositionalEncoding()
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-3): 4 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=256, out_features=256, bias=True)
        )
        (linear1): Linear(in_features=256, out_features=512, bias=True)
        (dropout): Dropout(p=0.4, inplace=False)
        (linear2): Linear(in_features=512, out_features=256, bias=True)
        (norm1): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.4, inplace=False)
        (dropout2): Dropout(p=0.4, inplace=False)
      )
    )
  )
  (conv_layers): Sequential(
    (0): Conv1d(256, 256, kernel_size=(3,), stride=(1,), padding=(1,))
    (1): ReLU()
    (2): MaxPool1d(kernel_size=2, stride=2, padding=

In [None]:
device

device(type='cuda')

In [None]:
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters: {num_params}")

Total trainable parameters: 2914754


In [None]:
from transformers import get_linear_schedule_with_warmup

ce = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
scaler = torch.cuda.amp.GradScaler()

num_training_steps = len(train_loader) * hyperparameters['epochs']

scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=hyperparameters['num_warmup_steps'],
    num_training_steps=num_training_steps
)

  scaler = torch.cuda.amp.GradScaler()


In [None]:
def train(model, loader, ce, optimizer, scaler, scheduler):
    model.train()
    running_loss, correct, total = 0.0, 0, 0

    for x, features, y in loader:
        optimizer.zero_grad()
        x = x.to(device)
        features = features.to(device)
        y = y.to(device)

        with torch.amp.autocast(device_type="cuda", dtype=torch.float16):
            output = model(x, features)
            loss = ce(output, y)

        prediction = torch.argmax(output, dim=1)
        correct += (prediction == y).sum().item()
        total += len(x)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        scheduler.step()

        running_loss += loss.item() * len(x)

    accuracy = correct / total
    return (
        running_loss / len(loader.dataset),
        accuracy
    )

In [None]:
def validation(model, loader, ce):
    model.eval()

    running_loss, correct, total = 0.0, 0, 0

    with torch.no_grad():
        for x, features, y in loader:
            x = x.to(device)
            features = features.to(device)
            y = y.to(device)

            with torch.amp.autocast(device_type="cuda", dtype=torch.float16):
                output = model(x, features)
                loss = ce(output, y)

            running_loss += loss.item() * len(x)

            prediction = torch.argmax(output, dim=1)
            correct += (prediction == y).sum().item()

            total += len(x)

    accuracy = correct / total

    return (
        running_loss / len(loader.dataset),
        accuracy
    )

In [None]:
import os

patience = 10
best_val_loss = float('inf')
counter = 0
early_stop = False

train_loss_history = []
train_acc_history = []

val_loss_history = []
val_acc_history = []

save_dir = f"/content/drive/MyDrive/{info['dir_name']}"
os.makedirs(save_dir, exist_ok=True)

for epoch in range(1, hyperparameters['epochs']+1):
    current_train_loss, current_train_acc = train(
          model,
          train_loader,
          ce,
          optimizer,
          scaler,
          scheduler
      )

    current_val_loss, current_val_acc = validation(
        model,
        test_loader,
        ce
    )

    train_loss_history.append(current_train_loss)
    train_acc_history.append(current_train_acc)

    val_loss_history.append(current_val_loss)
    val_acc_history.append(current_val_acc)

    print(f"Epoch ({epoch}/{hyperparameters['epochs']}): Train Loss = {current_train_loss:.4f}, Valitation Loss = {current_val_loss:.4f}, Train_acc = {current_train_acc:.4f}, Val_acc = {current_val_acc:.4f}")

    if epoch % 10 == 0:
        checkpoint_path = f"{save_dir}/model{info['run']}_epoch_{epoch}.pth"
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'encoders': {
                'mutation_type': mutation_type_encoder,
                'chromosome': chromosome_encoder,
                'ref': ref_encoder,
                'alt': alt_encoder
            },
            'feature_scaler': feature_scaler,
            'hyperparameters': hyperparameters,
            'vocab': vocab
        }, checkpoint_path)
        print(f"Model saved at {checkpoint_path}")

        if current_val_loss < best_val_loss:
            best_val_loss = current_val_loss
            counter = 0
            continue
        else:
            counter += 1
            print(f"No improvement in val loss Counter = {counter}/{patience}")
            if counter >= patience:
                print("Early stopping triggered!")
                early_stop = True
                break

Epoch (1/100): Train Loss = 0.7869, Valitation Loss = 0.6650, Train_acc = 0.5309, Val_acc = 0.6041
Epoch (2/100): Train Loss = 0.7119, Valitation Loss = 0.6444, Train_acc = 0.5779, Val_acc = 0.6274
Epoch (3/100): Train Loss = 0.6761, Valitation Loss = 0.7624, Train_acc = 0.6018, Val_acc = 0.5443
Epoch (4/100): Train Loss = 0.6141, Valitation Loss = 0.5705, Train_acc = 0.6664, Val_acc = 0.7018
Epoch (5/100): Train Loss = 0.5749, Valitation Loss = 0.5718, Train_acc = 0.7000, Val_acc = 0.7050
Epoch (6/100): Train Loss = 0.5519, Valitation Loss = 0.5730, Train_acc = 0.7193, Val_acc = 0.7058
Epoch (7/100): Train Loss = 0.5351, Valitation Loss = 0.5382, Train_acc = 0.7313, Val_acc = 0.7313
Epoch (8/100): Train Loss = 0.5218, Valitation Loss = 0.5225, Train_acc = 0.7411, Val_acc = 0.7426
Epoch (9/100): Train Loss = 0.5110, Valitation Loss = 0.5078, Train_acc = 0.7504, Val_acc = 0.7517
Epoch (10/100): Train Loss = 0.4983, Valitation Loss = 0.4902, Train_acc = 0.7586, Val_acc = 0.7632
Model sav

KeyboardInterrupt: 

In [None]:
def get_predictions_and_labels(model, loader):
    model.eval()
    all_y_true = []
    all_y_pred = []

    with torch.no_grad():
        for x, features, y in loader:
            x = x.to(device)
            features = features.to(device)
            y = y.to(device)

            yout = model(x, features)

            _, pred_mut = torch.max(yout, 1)

            all_y_true.extend(y.cpu().numpy())
            all_y_pred.extend(pred_mut.cpu().numpy())

    return (
        (all_y_true, all_y_pred)
    )

In [None]:
from sklearn.metrics import classification_report

(y_true, y_pred) = get_predictions_and_labels(model, test_loader)

print("\n" + "="*60)
print("Classification Report Summary")
print("="*60)

print("\n[1] Classification Report â€” Mutation Label")
print("-" * 60)
print(classification_report(y_true, y_pred))

print("="*60 + "\n")

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(xticks_rotation=45)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(train_loss_history, label='Train Loss')
plt.plot(val_loss_history, label='Validation Loss')
plt.title('Training vs Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(10, 5))
plt.plot(train_acc_history, label='Train Accuracy')
plt.plot(val_acc_history, label='Validation Accuracy')
plt.title('Training vs Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)
plt.show()