# Preparing the data for DNA+P

In [1]:
!nvidia-smi --query-gpu=memory.used --format=csv

/bin/bash: /home/chris/miniconda3/lib/libtinfo.so.6: no version information available (required by /bin/bash)
memory.used [MiB]
169 MiB


In [5]:
import pandas as pd
import numpy as np

In [6]:
from datasets import load_dataset

data_files = dict(
    train="./promoter_detection/train.csv",
    test="./promoter_detection/test.csv",
    val="./promoter_detection/dev.csv"
)

promoter_dataset = load_dataset("csv", data_files=data_files)

promoter_dataset

  from .autonotebook import tqdm as notebook_tqdm
Found cached dataset csv (/home/chris/.cache/huggingface/datasets/csv/default-68212c3a0ebc43dc/0.0.0/eea64c71ca8b46dd3f537ed218fc9bf495d5707789152eb2764f5c78fa66d59d)
100%|██████████| 3/3 [00:00<00:00, 1023.25it/s]


DatasetDict({
    train: Dataset({
        features: ['sequence', 'label'],
        num_rows: 47356
    })
    test: Dataset({
        features: ['sequence', 'label'],
        num_rows: 5920
    })
    val: Dataset({
        features: ['sequence', 'label'],
        num_rows: 5920
    })
})

In [7]:
train_dataset = promoter_dataset["train"]
val_dataset = promoter_dataset["val"]
test_dataset = promoter_dataset["test"]

In [8]:
seq = train_dataset['sequence'][0]

In [9]:
from Bio.Seq import Seq

def dnap_tokenize(seq: str, stop_symbol: str = "*"):
    # find boundaries of the coding sequence
    start = seq.lower().find("atg")
    stop = seq.lower().find("tga")
    print(f"start: {start}")
    print(f"stop: {stop}")
    protein_seq = str(Seq(seq[start:stop]).translate())
    return seq[0:start-1] + protein_seq + stop_symbol + seq[stop+3:]

In [10]:
dnap_tokenize(seq)

start: 16
stop: 17




'TATAATAATAACGAA*GACGACAGTCGACAAGAAAAGCACCAGCTGTCCCCGCCACATACAAGTATATGAGAAGGGGACGCGGGAGAGCGCCGCGGGGGACCGACGCGCTATTGAGGGGGATGGGTACAAGCGGGGCGGGGAGGCCGGAGCTTTATCCAGGCCAATGAATGGCCACTTGCGATGCCCAATTGCACCAAGCTTGGAGCGCACACTCAACCCCTTCCCCAGCGGTATGCCAAAATTCACCGTCTGAATGGCGTTGGTGCAGGTCGGTACAGAGCTCTCCTGCGCCGAG'

A naive tokenizer like this does not account for ORF-sensitivity and will simply find the first occurence of the stop codon, even if it follows the start codon right away. All sequences such as `atga` would then be truncated to a stop codon with Biopython translation API. So we need something smarter that chunks up the sequences into codons once the ORF is found. This is a good time to develop or pick up an ORF finder.

In [11]:
!pip install orffinder

/bin/bash: /home/chris/miniconda3/lib/libtinfo.so.6: no version information available (required by /bin/bash)

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.2.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [12]:
from Bio.SeqIO import SeqRecord
from orffinder import orffinder

orfs = orffinder.getORFs(SeqRecord(Seq(train_dataset['sequence'][0])), minimum_length=3)
print(orfs)

[{'start': 17, 'end': 173, 'frame': 2, 'sense': '+', 'length': 156, 'trailing': False, 'index': 1}, {'start': 173, 'end': 300, 'frame': 2, 'sense': '+', 'length': 127, 'trailing': True, 'index': 2}, {'start': 186, 'end': 300, 'frame': 3, 'sense': '+', 'length': 114, 'trailing': True, 'index': 3}, {'start': 169, 'end': 259, 'frame': 1, 'sense': '+', 'length': 90, 'trailing': False, 'index': 4}, {'start': 63, 'end': 1, 'frame': 2, 'sense': '-', 'length': 61, 'trailing': True, 'index': 5}, {'start': 259, 'end': 300, 'frame': 1, 'sense': '+', 'length': 41, 'trailing': True, 'index': 6}]


In [13]:
orf = orfs[0]

In [14]:
orf

{'start': 17,
 'end': 173,
 'frame': 2,
 'sense': '+',
 'length': 156,
 'trailing': False,
 'index': 1}

In [15]:
from typing import Dict, Any

def dnap_tokenize(seq: str, orf_data: Dict[str, Any], stop_symbol: str = "*"):
    start = orf_data['start']
    stop = orf_data['end']
    protein_seq = str(Seq(seq[start:stop]).translate())
    return seq[0:start-1] + protein_seq + stop_symbol + seq[stop+3:]

In [16]:
dnap_tokenize(seq, orf)

'TATAATAATAACGAAG*DDSRQEKHQLSPPHTSI*EGDAGERRGGPTRY*GGWVQAGRGGRSFIQANE*CCACTTGCGATGCCCAATTGCACCAAGCTTGGAGCGCACACTCAACCCCTTCCCCAGCGGTATGCCAAAATTCACCGTCTGAATGGCGTTGGTGCAGGTCGGTACAGAGCTCTCCTGCGCCGAG'

That's cool and all but these are all promoter sequences, so within those 300 bp here we would not expect any actual genes to be found.

What we could do instead is append those sequences to existing gene sequences and encode specifically those gene sequences only using a DNA+P representation. We would be using synthetic data but it would be much easier to prepare than looking for specific genes in databases that are prepended by those promoters that Prom300 contains.

In [17]:
seq

'TATAATAATAACGAAGATGAGACGACAGTCGACAAGAAAAGCACCAGCTGTCCCCGCCACATACAAGTATATGAGAAGGGGACGCGGGAGAGCGCCGCGGGGGACCGACGCGCTATTGAGGGGGATGGGTACAAGCGGGGCGGGGAGGCCGGAGCTTTATCCAGGCCAATGAATGGCCACTTGCGATGCCCAATTGCACCAAGCTTGGAGCGCACACTCAACCCCTTCCCCAGCGGTATGCCAAAATTCACCGTCTGAATGGCGTTGGTGCAGGTCGGTACAGAGCTCTCCTGCGCCGAG'

Let's develop a tiny function that will append randomly generated gene sequences to the sequences in the dataset that are marked as promoters:

In [45]:
from datasets import DatasetDict
from typing import Dict, Any, List
import Bio.Data.CodonTable as CodonTable


def append_gene_to_promoter(seq: str,
                            gene_length: int,
                            is_promoter_seq: bool = True,
                            seed: int = 42,
                            codon_table: CodonTable.CodonTable = CodonTable.standard_dna_table):
    np.random.seed(seed)
    num_codons = gene_length - 6 // 3  # -6 because we account for 1 start and 1 stop codon per gene appended
    actual_gene_length = num_codons * 3  # recover if `gene_length` was not divisible by 3
    if is_promoter_seq:
        # 1. Generate a random in-ORF gene of length `gene_length`:
        start = codon_table.start_codons[0]  # ATG
        stop = np.random.choice(codon_table.stop_codons)  # randomly choose one
        codon_seq = np.random.choice(list(codon_table.forward_table.keys()),
                                     size=num_codons,
                                     replace=True)
        codon_seq = "".join(list(codon_seq))
        # 2. Append the gene to the promoter sequence if the sequence is marked as a promoter:
        return seq + start + codon_seq + stop
    else:
        # 1. Generate a random DNA sequence that does not start with an `ATG` codon
        nucleotides = ["A", "T", "G", "C"]
        rand_nucl_seq = np.random.choice(nucleotides,
                                         size=actual_gene_length + 6,  # +6 because we account for the START/STOP
                                                                       # codons that are missing here
                                         replace=True)
        # 2. Append the padding sequence to the non-promoter sequence
        rand_nucl_seq = "".join(list(rand_nucl_seq))
        return seq + rand_nucl_seq


def append_gene_to_dataset_record(seq: str, label: str, gene_length: int, seed: int = 42):
    match label:
        case 0:
            return dict(sequence=append_gene_to_promoter(seq, gene_length, False, seed), label=label)
        case 1:
            return dict(sequence=append_gene_to_promoter(seq, gene_length, True, seed), label=label)
        case _:
            raise ValueError(f"{label} is not a valid label for a binary classification task")


def append_gene_to_dataset_batch(batch: Dict[str, List[Any]], gene_length: int, seed: int = 42):
    seqs = batch["sequence"]
    labels = batch["label"]
    out = dict(sequence=[], label=[])
    for seq, label in zip(seqs, labels):
        updated_record = append_gene_to_dataset_record(seq, label, gene_length, seed)
        out["sequence"].append(updated_record["sequence"])
        out["label"].append(updated_record["label"])
    return out


def append_genes_to_dataset(dataset_collection: DatasetDict, gene_length: int, seed: int = 42):
    new_dataset_collection = DatasetDict()
    for k in dataset_collection.keys():
        # `batched=True` speeds things up by processing sequences in batches
        new_dataset_collection[k] = dataset_collection[k].map(lambda batch: append_gene_to_dataset_batch(batch,
                                                                                                         gene_length,
                                                                                                         seed),
                                                              batched=True)
    return new_dataset_collection


append_gene_to_promoter(seq, 30, True)
append_gene_to_promoter(seq, 30, False)

'TATAATAATAACGAAGATGAGACGACAGTCGACAAGAAAAGCACCAGCTGTCCCCGCCACATACAAGTATATGAGAAGGGGACGCGGGAGAGCGCCGCGGGGGACCGACGCGCTATTGAGGGGGATGGGTACAAGCGGGGCGGGGAGGCCGGAGCTTTATCCAGGCCAATGAATGGCCACTTGCGATGCCCAATTGCACCAAGCTTGGAGCGCACACTCAACCCCTTCCCCAGCGGTATGCCAAAATTCACCGTCTGAATGGCGTTGGTGCAGGTCGGTACAGAGCTCTCCTGCGCCGAGGCAGGCAAGTGGGGCACCCGTATCCTTTCCAACTTACAAGGGTCCCCGTTGTGCGCCAGAGGAAGTCACTTTATATCCGCGCACGGTACT'

In [46]:
seq_with_gene = append_gene_to_promoter(seq, 30, is_promoter_seq=True)
seq_with_rand = append_gene_to_promoter(seq, 30, is_promoter_seq=False)

In [48]:
promoter_dataset_with_genes_appended = append_genes_to_dataset(promoter_dataset, 30)

                                                                    

In [49]:
promoter_dataset_with_genes_appended

DatasetDict({
    train: Dataset({
        features: ['sequence', 'label'],
        num_rows: 47356
    })
    test: Dataset({
        features: ['sequence', 'label'],
        num_rows: 5920
    })
    val: Dataset({
        features: ['sequence', 'label'],
        num_rows: 5920
    })
})

Note: because each sequence in the Prom300 dataset was exactly 300bp-long, we now know that ORFs for those samples that _were_ marked as actual promoters are in fact fake genes that we appended. We can **prepare a tokenizer for DNA+P representation with this in mind** considering only the protein-encoding part (over the 300bp boundary) as our starting point for the ORF. So we need to consider two things for the tokenizer:
1. Start translating the ORF at 301bp mark (that will be string idx `300`).
2. Only the sequences labeled as containing promoters should have the tokenizer applied to them.

**Very important**: for the test set the DNA+P tokenizer **must not** convert the sequences to DNA+P representation. They should stay in their original form so that we can check whether _the model_ learned to pick up on those promoter sequences better than the baseline.

In [50]:
from copy import deepcopy
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
    NormalizedString,
    PreTokenizedString
)
from itertools import accumulate, islice
from dataclasses import dataclass
from typing import Iterable, Literal, Optional
from Bio.Data.CodonTable import standard_dna_table

protein_alphabet_map = {
    "A": "Ala",
    "C": "Cys",
    "D": "Asp",
    "E": "Glu",
    "F": "Phe",
    "G": "Gly",
    "H": "His",
    "I": "Ile",
    "K": "Lys",
    "L": "Leu",
    "M": "Met",
    "N": "Asp",
    "P": "Pro",
    "Q": "Gln",
    "R": "Arg",
    "S": "Ser",
    "T": "Thr",
    "V": "Val",
    "W": "Trp",
    "Y": "Tyr",
    "*": "*"
}


class Uppercase:
    def __init__(self) -> None:
        super().__init__()

    def normalize(self, normalized: NormalizedString):
        return normalized.uppercase()

    def normalize_str(self, sequence: str):
        return sequence.upper()


def take_n(iterable: Iterable, n: int):
    iterator = iter(iterable)
    while True:
        chunk = "".join(list(islice(iterator, n)))
        if chunk is None:
            break
        yield chunk

@dataclass
class DNAPConfig:
    start_translating_from_idx: int
    _length: int = -1

    @property
    def length(self):
        # if self._length == -1:
            # return -1
        return self._length


    def translate(self, seq: str):
        return dnap_tokenize(seq, dict(start=self.start_translating_from_idx,
                                       end=self.start_translating_from_idx + self.length))


def _wrap_in_angle_brackets(seq: str, wrap_tokens: bool = True):
    return seq if not wrap_tokens else "<" + seq + ">"


def _pre_tokenize(sequence: str,
                  k: int,
                  include_cls: bool,
                  split_on_unknown_base: bool,
                  wrap_tokens: bool,
                  include_ranges: bool,
                  representation: Literal["dna", "dnap"],
                  dnap_config: Optional[DNAPConfig] = None):
    
    def _with_ranges(seq: str, idx_start: int, idx_end: int):
        subslice = seq
        if include_ranges:
            subslice = (seq, idx_start, idx_end)
        return subslice

    def _dnap_chunk_tokenize(seq: str, stop_symbol: str = "*"):
        out = []
        n = 3
        chunks = [seq[i:i+n] for i in range(0, len(seq), n)]
        lookup_table = deepcopy(standard_dna_table.forward_table)
        lookup_table.update({k: stop_symbol for k in standard_dna_table.stop_codons})
        # Replace single-letter identifiers with the 3-letter ones:
        three_letter_lookup_table = {k: protein_alphabet_map[v] for k, v in lookup_table.items()}
        for codon in chunks:
            try:
                aa = three_letter_lookup_table[codon]
                out.append(aa)
            except KeyError:
                out.append(codon)
        return "".join(out)

    if dnap_config is None and representation == "dnap":
        raise ValueError("Missing DNAP representation config!")

    i = 0
    j = k
    n = len(str(sequence))
    slices = [] if not include_cls else ["<CLS>"]
    # for seq_slice in take_n(sequence, k):
    while j <= n:
        seq_slice = sequence[i:j]
        if "N" in seq_slice and split_on_unknown_base:
            idx_of_n = seq_slice.find("N")
            l = 0
            subslices = []
            while l != idx_of_n:
                subslice = _with_ranges(_wrap_in_angle_brackets(seq_slice[l], wrap_tokens), l, l + 1)
                subslices.append(subslice)  # split preceding into single bases
                l += 1
            subslice = _with_ranges(_wrap_in_angle_brackets(seq_slice[idx_of_n], wrap_tokens), idx_of_n, idx_of_n + 1)
            subslices.append(subslice)  # add `N` itself
            slices.extend(subslices)  # add the split-up slices to the main list
            i = i + k - idx_of_n - 1
            j = j + k - idx_of_n - 1
        else:
            if representation == "dnap" and i >= dnap_config.start_translating_from_idx:
                slice_ = _with_ranges(_wrap_in_angle_brackets(_dnap_chunk_tokenize(seq_slice), wrap_tokens), i, j)
            else:
                slice_ = _with_ranges(_wrap_in_angle_brackets(seq_slice, wrap_tokens), i, j)
            slices.append(slice_)
            i += k
            j += k
    return slices


# We use identical tokenization strategy to Nucleotide Transformer
class KMerDNAPreTokenizer:

    def __init__(self,
                 k: int = 6,
                 include_cls: bool = True,
                 wrap_tokens: bool = True,
                 split_on_unknown_base: bool = True,
                 include_ranges: bool = True) -> None:
        """Args:
            `k` (int): how many bases per token
            `include_cls` (bool): whether to include the starting `<CLS>` token or not
            `wrap_tokens` (bool): whether to wrap each token in `<>` angle brackets
            `split_on_unknown_base` (bool): if `True` then each `N` in the sequence will
                                            be treated as a separate token and all preceding
                                            bases that don't align to `k` are split up as
                                            separate tokens
            `include_ranges` (bool): whether token slice ranges should be returned in tuples
        """
        self.k = k
        self.include_cls = include_cls
        self.wrap_tokens = wrap_tokens
        self.split_on_unknown_base = split_on_unknown_base
        self.include_ranges = include_ranges
        super().__init__()


    def pre_tokenize(self, pretok: PreTokenizedString):
        # Note: `pre_tokenize` acts in-place on `pretok`, so we should not return anything here
        pretok.split(lambda idx, x: _pre_tokenize(x,
                                                  self.k,
                                                  self.include_cls,
                                                  self.split_on_unknown_base,
                                                  self.wrap_tokens,
                                                  self.include_ranges,
                                                  representation="dna"))

    def pre_tokenize_str(self, sequence: str):
        return _pre_tokenize(sequence,
                             self.k,
                             self.include_cls,
                             self.split_on_unknown_base,
                             self.wrap_tokens,
                             self.include_ranges,
                             representation="dna")


class KMerDNAPPreTokenizer(KMerDNAPreTokenizer):

    def __init__(self,
                 k: int = 6,
                 include_cls: bool = True,
                 wrap_tokens: bool = True,
                 split_on_unknown_base: bool = True,
                 include_ranges: bool = True,
                 start_idx: int = 301) -> None:
        self.start_idx = start_idx
        super().__init__(k,
                         include_cls,
                         wrap_tokens,
                         split_on_unknown_base,
                         include_ranges)

    def pre_tokenize(self, pretok: PreTokenizedString):
        pretok.split(lambda idx, x: _pre_tokenize(x,
                                                  self.k,
                                                  self.include_cls,
                                                  self.split_on_unknown_base,
                                                  self.wrap_tokens,
                                                  self.include_ranges,
                                                  representation="dnap",
                                                  dnap_config=DNAPConfig(self.start_idx)))

    def pre_tokenize_str(self, sequence: str):
        return _pre_tokenize(sequence,
                             self.k,
                             self.include_cls,
                             self.split_on_unknown_base,
                             self.wrap_tokens,
                             self.include_ranges,
                             representation="dnap",
                             dnap_config=DNAPConfig(self.start_idx))


# normalizer = normalizers.Lowercase()
# normalizer = normalizers.Normalizer.custom(Uppercase())
# pre_tokenizer = pre_tokenizers.PreTokenizer.custom(KMerDNAPreTokenizer(6))
# tokenizer = Tokenizer()
# normalizer = Uppercase()
pre_tokenizer = KMerDNAPreTokenizer(6, include_ranges=False)

# we're using the same example as the Nucleotide Transformer to test:
original = "ACGTGTACNTGCACGGANCGACTAGTCTGA"
actual = pre_tokenizer.pre_tokenize_str(original)
expected = ["<CLS>","<ACGTGT>","<A>","<C>","<N>","<TGCACG>","<G>","<A>","<N>","<CGACTA>","<GTCTGA>"]
print(actual)
print(expected)

# compare the two lists:
print(all(accumulate(map(lambda x: x[0] == x[1], zip(actual, expected, strict=True)), lambda x, y: x and y)))

# then instantiate the other pre-tokenizer:
pre_tokenizer_dnap = KMerDNAPPreTokenizer(6, start_idx=30)
original = "ACGTGTACNTGCACGGANCGACTAGTCTGAATGGATGATGATGATTGA"
actual = pre_tokenizer_dnap.pre_tokenize_str(original)
expected = ['<CLS>', ('<ACGTGT>', 0, 6), ('<A>', 0, 1), ('<C>', 1, 2), ('<N>', 2, 3), ('<TGCACG>', 9, 15), ('<G>', 0, 1), ('<A>', 1, 2), ('<N>', 2, 3), ('<CGACTA>', 18, 24), ('<GTCTGA>', 24, 30), ('<MetAsp>', 30, 36), ('<AspAsp>', 36, 42), ('<Asp*>', 42, 48)]
print(actual)
print(expected)
print(all(accumulate(map(lambda x: x[0] == x[1], zip(actual, expected, strict=True)), lambda x, y: x and y)))

['<CLS>', '<ACGTGT>', '<A>', '<C>', '<N>', '<TGCACG>', '<G>', '<A>', '<N>', '<CGACTA>', '<GTCTGA>']
['<CLS>', '<ACGTGT>', '<A>', '<C>', '<N>', '<TGCACG>', '<G>', '<A>', '<N>', '<CGACTA>', '<GTCTGA>']
True
['<CLS>', ('<ACGTGT>', 0, 6), ('<A>', 0, 1), ('<C>', 1, 2), ('<N>', 2, 3), ('<TGCACG>', 9, 15), ('<G>', 0, 1), ('<A>', 1, 2), ('<N>', 2, 3), ('<CGACTA>', 18, 24), ('<GTCTGA>', 24, 30), ('<MetAsp>', 30, 36), ('<AspAsp>', 36, 42), ('<Asp*>', 42, 48)]
['<CLS>', ('<ACGTGT>', 0, 6), ('<A>', 0, 1), ('<C>', 1, 2), ('<N>', 2, 3), ('<TGCACG>', 9, 15), ('<G>', 0, 1), ('<A>', 1, 2), ('<N>', 2, 3), ('<CGACTA>', 18, 24), ('<GTCTGA>', 24, 30), ('<MetAsp>', 30, 36), ('<AspAsp>', 36, 42), ('<Asp*>', 42, 48)]
True


In [80]:
class KMerTokenizer():

    def __init__(self, pre_tokenizer) -> None:
        self.pre_tokenizer = pre_tokenizer
        self.vocab = dict()

    @property
    def last_token_id(self):
        all_id_vals = list(self.vocab.values())
        if len(all_id_vals) > 0:
            return max(all_id_vals)
        return -1

    @property
    def vocab_size(self):
        return len(list(self.vocab.values()))

    @property
    def pad_token_id(self):
        # padding is not really a concern for our task, since all sequences are aligned to 390 bp
        return "<pad>"

    def tokenize(self, seq: str):
        pre_tokenized_seq = self.pre_tokenizer.pre_tokenize_str(seq)
        tokenized_seq = []
        for token_tuple_or_token in pre_tokenized_seq:
            if type(token_tuple_or_token) is tuple:
                token, _, _ = token_tuple_or_token
            else:
                token = token_tuple_or_token
            if token not in self.vocab.keys():
                # each k-mer gets its own input token id
                # this is the same way Nucleotide transformer does it
                self.vocab[token] = self.last_token_id + 1
            token_id = self.vocab[token]
            tokenized_seq.append(token_id)
        return tokenized_seq

    def encode(self, seq: str):
        return self.tokenize(seq)

In [93]:
# tokenizer = KMerTokenizer(pre_tokenizer)

pre_tokenizer_dnap = KMerDNAPPreTokenizer(6, start_idx=301)
tokenizer = KMerTokenizer(pre_tokenizer_dnap)
actual = tokenizer.tokenize(original)
print(actual)

[0, 1, 2, 3, 4, 5, 6, 2, 4, 7, 8, 9, 10, 11]


In [94]:
train_dataset = promoter_dataset_with_genes_appended["train"]
val_dataset = promoter_dataset_with_genes_appended["val"]
test_dataset = promoter_dataset_with_genes_appended["test"]

In [95]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments
from tokenizers.models import Unigram
from pathlib import Path

def model_loader(model_name: str, config, ignore_mismatched_sizes: bool = False):
    config.num_labels=2 
    return AutoModelForSequenceClassification.from_pretrained(model_name,
                                                              config=config,
                                                              ignore_mismatched_sizes=ignore_mismatched_sizes)

model_save_dir = Path("results/model_nucleotide_transformer")
training_args = TrainingArguments(model_save_dir,
                                  evaluation_strategy="epoch")

In [96]:
train_dataset

Dataset({
    features: ['sequence', 'label'],
    num_rows: 47356
})

In [97]:
train_dataset[0]

{'sequence': 'TATAATAATAACGAAGATGAGACGACAGTCGACAAGAAAAGCACCAGCTGTCCCCGCCACATACAAGTATATGAGAAGGGGACGCGGGAGAGCGCCGCGGGGGACCGACGCGCTATTGAGGGGGATGGGTACAAGCGGGGCGGGGAGGCCGGAGCTTTATCCAGGCCAATGAATGGCCACTTGCGATGCCCAATTGCACCAAGCTTGGAGCGCACACTCAACCCCTTCCCCAGCGGTATGCCAAAATTCACCGTCTGAATGGCGTTGGTGCAGGTCGGTACAGAGCTCTCCTGCGCCGAGGCAGGCAAGTGGGGCACCCGTATCCTTTCCAACTTACAAGGGTCCCCGTTGTGCGCCAGAGGAAGTCACTTTATATCCGCGCACGGTACT',
 'label': 0}

We need to get an accurate estimate of the vocabulary size, we might as well produce the `input_ids` now and use them in the training loop without calling the `tokenize` method again:

In [98]:
tokenized_train_dataset = []
for record in train_dataset:
    sequence = record["sequence"]
    label = record["label"]
    # Convert the tokens into numerical IDs using the model's vocabulary
    # input_ids = tokenizer.tokenize(sequence)
    input_ids = tokenizer.tokenize(sequence)
    tokenized_train_dataset.append((input_ids, label))

In [100]:
tokenizer.vocab_size

4127

In [106]:
from transformers import BertForSequenceClassification, BertConfig, AdamW, EsmConfig
import torch

# model_id = "zhihan1996/DNA_bert_6"
model_id = "InstaDeepAI/nucleotide-transformer-500m-human-ref"
config = EsmConfig.from_pretrained('bert-base-uncased')
# config = BertConfig.from_pretrained(model_id)
config.vocab_size = tokenizer.vocab_size


model = model_loader(model_id, config=config, ignore_mismatched_sizes=True)
# model = model_loader("InstaDeepAI/nucleotide-transformer-500m-human-ref")
# device = "cpu"
device = "cuda"
model = model.to(device)


You are using a model of type bert to instantiate a model of type esm. This is not supported for all configurations of models and can yield errors.
Some weights of the model checkpoint at InstaDeepAI/nucleotide-transformer-500m-human-ref were not used when initializing EsmForSequenceClassification: ['esm.encoder.layer.12.attention.self.query.weight', 'esm.encoder.layer.19.output.dense.weight', 'esm.encoder.layer.16.attention.self.value.weight', 'esm.encoder.layer.20.intermediate.dense.bias', 'esm.encoder.layer.15.output.dense.bias', 'esm.encoder.layer.22.LayerNorm.weight', 'esm.encoder.layer.23.intermediate.dense.weight', 'esm.encoder.layer.22.attention.self.key.weight', 'esm.encoder.layer.21.attention.LayerNorm.bias', 'esm.encoder.layer.20.attention.self.value.bias', 'esm.encoder.layer.13.attention.output.dense.bias', 'esm.encoder.layer.15.attention.self.value.weight', 'esm.encoder.layer.17.LayerNorm.bias', 'esm.encoder.layer.17.attention.self.query.weight', 'esm.encoder.layer.19.Laye

In [107]:
model.config.vocab_size

4127

In [108]:

# Set up the optimizer
optimizer = AdamW(model.parameters())

# Training loop
for epoch in range(3):
    for input_ids, label in tokenized_train_dataset:

        # Convert the input IDs and label into tensors
        input_tensor = torch.tensor([input_ids])
        input_tensor = input_tensor.to(device)
        label_tensor = torch.tensor([label])
        label_tensor = label_tensor.to(device)
        
        # Compute the loss and perform a backward pass
        loss = model(input_tensor, labels=label_tensor).loss
        loss.backward()
        
        # Update the model parameters
        optimizer.step()
        optimizer.zero_grad()




In [104]:
!nvidia-smi --query-gpu=memory.used --format=csv

11340.21s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


/bin/bash: /home/chris/miniconda3/lib/libtinfo.so.6: no version information available (required by /bin/bash)
memory.used [MiB]
7598 MiB


Testing workflow adapted from the baseline notebook we used before:

In [105]:
from typing import Dict, List
from sklearn.metrics import precision_score, recall_score, accuracy_score
from pprint import pprint

import evaluate
from typing import Callable, Tuple, Union, Iterable, List
import numpy as np
from sklearn.metrics import accuracy_score
import torch.nn as nn

# Splitting the test set into batches to avoid OOM errors with my lovely RTX 4080:
# 5920 / 16 = 370

Metric = Callable[[torch.Tensor | np.ndarray, torch.Tensor | np.ndarray], torch.Tensor | np.ndarray]

def _eval(model: nn.Module, test_batch: torch.Tensor, attention_mask: torch.Tensor, output_hidden_states: bool):
    with torch.no_grad():
        torch_outs = model(
            test_batch,
            attention_mask=attention_mask,
            output_hidden_states=output_hidden_states
        )
    return torch_outs


def test_and_calculate_metrics(tokens_ids: torch.Tensor,
                               labels: torch.Tensor,
                               model: nn.Module,
                               metrics: List[Metric] | Metric,
                               split_into: int = 4,
                               output_hidden_states: bool = False):
    metric_vals = dict()

    if not type(metrics) is list:
        # wrap singular metric into a list to use the same interface downstream
        metrics = [metric]

    for metric in metrics:
        # initialize lists where we will store collected metrics
        metric_vals[metric.__name__] = []

    slice_size = tokens_ids.shape[0] // split_into
    for test_batch, batch_labels in zip(tokens_ids.split(slice_size),
                                        torch.tensor(labels).split(slice_size)):

        # Compute the embeddings:
        # attention_mask = test_batch != tokenizer.pad_token_id
        attention_mask = torch.ones_like(test_batch)

        # Send tokens and attention mask to the GPU:
        test_batch = test_batch.to("cuda")
        # attention_mask = attention_mask.to("cuda")

        # Model outputs:
        torch_outs = _eval(model,
                           test_batch,
                           attention_mask,
                           output_hidden_states)
        
        y_hat_prob = nn.Sigmoid()(torch_outs.logits)
        y_hat = torch.argmax(y_hat_prob, axis=-1)
        for metric in metrics:
            metric_value = metric(batch_labels.to("cpu").detach().numpy(), y_hat.to("cpu").detach().numpy())
            metric_vals[metric.__name__].append(metric_value)

    return metric_vals


def testing_pipeline(tokenizer, model, test_dataset):

    split_into = 4

    tokens_ids = []
    for record in test_dataset:
        tokens_ids_per_record = tokenizer.tokenize(record["sequence"])
        tokens_ids.append(tokens_ids_per_record)
    tokens_ids = torch.tensor(tokens_ids)
    tokens_ids = tokens_ids.to(device)

    metrics = [accuracy_score, precision_score, recall_score]

    metric_vals = test_and_calculate_metrics(tokens_ids,
                                             test_dataset["label"],
                                             model,
                                             metrics,
                                             split_into)

    avgs_of_metric_vals = dict_average(metric_vals)

    pprint(f"Metrics averaged over batches: {avgs_of_metric_vals}", indent=4)


def dict_average(dict_of_metrics: Dict[str, np.array | List[int] | List[float]]) -> Dict[str, np.ndarray]:
    avg_dict = dict()
    for k, v in dict_of_metrics.items():
        avg_acc = np.mean(v)
        avg_dict[k] = avg_acc
    return avg_dict

# k_mer_tokenizer = KMerTokenizer(pre_tokenizer)
# testing_pipeline(k_mer_tokenizer, model, test_dataset)
testing_pipeline(tokenizer, model, test_dataset)

("Metrics averaged over batches: {'accuracy_score': 0.5033783783783784, "
 "'precision_score': 0.5033783783783784, 'recall_score': 1.0}")


In [74]:
actual = pre_tokenizer_dnap.pre_tokenize_str(test_dataset[0]["sequence"])
print(actual)

['<CLS>', ('<CTAAAT>', 0, 6), ('<ATTAAC>', 6, 12), ('<TGGTCT>', 12, 18), ('<TGTGAG>', 18, 24), ('<ATGTCT>', 24, 30), ('<SerTrp>', 30, 36), ('<LeuGlu>', 36, 42), ('<ProAsp>', 42, 48), ('<HisLeu>', 48, 54), ('<ThrTyr>', 54, 60), ('<CysPhe>', 60, 66), ('<SerSer>', 66, 72), ('<AspCys>', 72, 78), ('<CysLeu>', 78, 84), ('<LeuLeu>', 84, 90), ('<SerLeu>', 90, 96), ('<CysCys>', 96, 102), ('<ArgLeu>', 102, 108), ('<GluLeu>', 108, 114), ('<ArgAla>', 114, 120), ('<AlaGly>', 120, 126), ('<GlyGly>', 126, 132), ('<GlyArg>', 132, 138), ('<ArgLys>', 138, 144), ('<GluGlu>', 144, 150), ('<LysGln>', 150, 156), ('<SerTrp>', 156, 162), ('<ProGly>', 162, 168), ('<SerCys>', 168, 174), ('<TrpLeu>', 174, 180), ('<GlyAla>', 180, 186), ('<ArgThr>', 186, 192), ('<AlaSer>', 192, 198), ('<LeuAsp>', 198, 204), ('<LysGln>', 204, 210), ('<AlaGly>', 210, 216), ('<AlaHis>', 216, 222), ('<IleAla>', 222, 228), ('<LeuGly>', 228, 234), ('<*Val>', 234, 240), ('<ValAla>', 240, 246), ('<LeuThr>', 246, 252), ('<HisLeu>', 252, 25