the transformer model was made with the help of generative ai

In [1]:
# !pip install d2l==1.0.3
# !pip install torch==2.5.1
! pip install biopython
!pip install wandb -qU
!pip install focal_loss_torch
# !pip install torch==2.5.1  # Install PyTorch first
# !pip install numpy==1.24.3  # Install NumPy with a compatible version
!pip install transformers
# !pip install numpy --upgrade



In [2]:
import os
! git clone https://git.wur.nl/bioinformatics/grs34806-deep-learning-project-data.git
os.chdir("grs34806-deep-learning-project-data")

fatal: destination path 'grs34806-deep-learning-project-data' already exists and is not an empty directory.


In [None]:
import math
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader, WeightedRandomSampler
from transformers import AutoTokenizer, AutoModel, BertTokenizer, BertModel
import regex as re
import numpy as np
from sklearn.metrics import f1_score

def read(seqfile: str, posfile: str):
    """
    Read sequences and positive labels from files.
    seqfile: whitespace-separated lines of <id> <sequence>
    posfile: one ID per line for positive examples
    Returns: (list of sequences, list of integer labels)
    """
    datalist, annot_ids = [], []
    with open(seqfile) as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != 2:
                continue
            pid, seq = parts
            annot_ids.append(pid)
            datalist.append(seq)
    pos_ids = set(line.strip() for line in open(posfile))
    labels = [1 if pid in pos_ids else 0 for pid in annot_ids]
    return datalist, labels


def generate_train_test(seqs, labels, test_prop=0.2, seed=42):
    np.random.seed(seed)
    n = len(seqs)
    idx = np.random.permutation(n)
    split = int(n * (1 - test_prop))
    return (
        [seqs[i] for i in idx[:split]], [labels[i] for i in idx[:split]]
    ), (
        [seqs[i] for i in idx[split:]], [labels[i] for i in idx[split:]]
    )

mapaa2num = {aa: i for i, aa in enumerate(list("ACDEFGHIKLMNPQRSTVWY"))}

def pad_or_trim(seq: str, size: int, pad_char: str = '_') -> str:
    if len(seq) > size:
        return seq[:size]
    return seq + pad_char * (size - len(seq))

def add_spaces(seq: str) -> str:
    return ' '.join(list(seq))

def tokenize_map(seqs, mapping, non_aa=20):
    return [[mapping.get(aa, non_aa) for aa in seq] for seq in seqs]

def truncate_pad(ids, max_len, pad_id=20):
    if len(ids) >= max_len:
        return ids[:max_len]
    return ids + [pad_id] * (max_len - len(ids))

def build_seq_array(id_lists, max_len):
    return torch.tensor([truncate_pad(ids, max_len) for ids in id_lists], dtype=torch.long)


def load_data(batch_size: int,
              seqs: list,
              labels: list,
              max_len: int,
              tokenizer=None):
    if tokenizer:
        in_ids, in_mask, in_labels = [], [], []
        for seq, lbl in zip(seqs, labels):
            s = pad_or_trim(seq, max_len)
            s = re.sub(r"[UZOB]", "X", s)
            enc = tokenizer(add_spaces(s), return_tensors='pt',
                            padding='max_length', truncation=True,
                            max_length=max_len)
            in_ids.append(enc['input_ids'])
            in_mask.append(enc['attention_mask'])
            in_labels.append(lbl)
        X = torch.cat(in_ids, dim=0)
        M = torch.cat(in_mask, dim=0)
        Y = torch.tensor(in_labels, dtype=torch.float32).unsqueeze(1)
        ds = TensorDataset(X, M, Y)
        return DataLoader(ds, batch_size=batch_size, shuffle=True)
    else:
        ids = tokenize_map(seqs, mapaa2num)
        X = build_seq_array(ids, max_len)
        Y = torch.tensor(labels, dtype=torch.float32).unsqueeze(1)
        cnt = torch.tensor([labels.count(0), labels.count(1)], dtype=torch.float)
        w = 1.0 / cnt
        sample_w = torch.tensor([w[int(l)] for l in labels])
        sampler = WeightedRandomSampler(sample_w, len(sample_w), True)
        ds = TensorDataset(X, Y)
        return DataLoader(ds, batch_size=batch_size, sampler=sampler)

class ProteinTransformerClassifier(nn.Module):
    def __init__(self,
                 model_name: str = "Rostlab/prot_bert",
                 unfreeze_layers: int = 2,
                 hidden_dim: int = 64,
                 dropout: float = 0.2):
        super().__init__()
        # load any HF model
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, do_lower_case=False)
        self.base = AutoModel.from_pretrained(model_name)

        # freeze all then unfreeze top layers - #not sure what this does
        for p in self.base.parameters(): p.requires_grad = False
        if hasattr(self.base, 'encoder'):
            layers = self.base.encoder.layer[-unfreeze_layers:]
        else:
            layers = list(self.base.children())[-unfreeze_layers:]
        for layer in layers:
            for p in layer.parameters(): p.requires_grad = True

        feat = self.base.config.hidden_size
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Sequential(
            nn.Linear(feat, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, input_ids, attention_mask=None):
        out = self.base(input_ids=input_ids, attention_mask=attention_mask)
        cls = out.last_hidden_state[:, 0, :]
        return self.classifier(self.dropout(cls))

# Training loop with metrics & early stopping
def train_model(model, train_loader, val_loader, device,
                lr=1e-4, weight_decay=1e-4, epochs=20,
                clip=1.0, patience=5):
    model.to(device)
    optim = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=lr, weight_decay=weight_decay)
    sched = torch.optim.lr_scheduler.ReduceLROnPlateau(optim, 'min', patience=2)
    loss_fn = nn.BCEWithLogitsLoss()

    best = float('inf'); wait = 0
    for e in range(epochs):
        # train
        model.train(); tloss=[]; tpr, ttr=[],[]
        for batch in train_loader:
            if len(batch)==3:
                xb, mask, yb = [b.to(device) for b in batch]
                logits = model(xb, attention_mask=mask)
            else:
                xb, yb = [b.to(device) for b in batch]
                logits = model(xb)
            optim.zero_grad()
            loss=loss_fn(logits, yb)
            loss.backward(); torch.nn.utils.clip_grad_norm_(model.parameters(), clip); optim.step()
            tloss.append(loss.item())
            preds=(torch.sigmoid(logits)>0.5).int()
            tpr.extend(preds.cpu().numpy().flatten().tolist()); ttr.extend(yb.cpu().numpy().flatten().tolist())
        tr_loss, tr_acc = np.mean(tloss), np.mean(np.array(tpr)==np.array(ttr))
        tr_f1 = f1_score(ttr, tpr)
        # valid
        model.eval(); vloss=[]; vpr,vtr=[],[]
        with torch.no_grad():
            for batch in val_loader:
                if len(batch)==3:
                    xb, mask, yb = [b.to(device) for b in batch]
                    logits = model(xb, attention_mask=mask)
                else:
                    xb, yb = [b.to(device) for b in batch]
                    logits = model(xb)
                loss=loss_fn(logits, yb); vloss.append(loss.item())
                preds=(torch.sigmoid(logits)>0.5).int()
                vpr.extend(preds.cpu().numpy().flatten().tolist()); vtr.extend(yb.cpu().numpy().flatten().tolist())
        val_loss, val_acc = np.mean(vloss), np.mean(np.array(vpr)==np.array(vtr))
        val_f1 = f1_score(vtr, vpr)
        print(f"Epoch {e:02d} | tr_loss {tr_loss:.4f} acc {tr_acc:.4f} f1 {tr_f1:.4f} "
              f"| val_loss {val_loss:.4f} acc {val_acc:.4f} f1 {val_f1:.4f}")
        sched.step(val_loss)
        if val_loss<best: best,wait=val_loss,0
        else:
            wait+=1
            if wait>=patience:
                print(f"Stopping early at epoch {e}"); break


# two pretrained models used
if __name__ == "__main__":
    seq_file = "expr5Tseq_filtGO_100-1000.lis"
    pos_file = "GO_3A0055085.annotprot"
    seqs, labs = read(seq_file, pos_file)
    (tr_s, tr_l), (va_s, va_l) = generate_train_test(seqs, labs)

    batch_size, seq_len = 128, 200
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model_names = [
        "Rostlab/prot_bert",                       # ProtBERT
        "facebook/esm2_t6_8M_UR50D",              # ESM-2 small                    # ProtAlbert
    ]

    for name in model_names:
        print(f"\n=== Training with {name} ===")
        model = ProteinTransformerClassifier(model_name=name, unfreeze_layers=2)
        loader_tr = load_data(batch_size, tr_s, tr_l, seq_len, tokenizer=model.tokenizer)
        loader_va = load_data(batch_size, va_s, va_l, seq_len, tokenizer=model.tokenizer)
        train_model(model, loader_tr, loader_va, device)



=== Training with Rostlab/prot_bert ===


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/86.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/361 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/81.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/1.68G [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.68G [00:00<?, ?B/s]

Epoch 00 | tr_loss 0.2681 acc 0.9609 f1 0.0185 | val_loss 0.1571 acc 0.9661 f1 0.0000
Epoch 01 | tr_loss 0.1487 acc 0.9668 f1 0.0000 | val_loss 0.1465 acc 0.9661 f1 0.0000
