In [1]:
# Load libraries
import torch 
import pandas as pd 
import numpy as np
import json
import tokenizers

from torch import tensor
from torch.utils.data import DataLoader, Dataset
from transformers import  RobertaConfig
from tokenizers.implementations import ByteLevelBPETokenizer
from tokenizers.models import WordLevel
from tokenizers import pre_tokenizers, normalizers, Tokenizer
from tokenizers.normalizers import Lowercase, NFD
from tokenizers.pre_tokenizers import ByteLevel, Whitespace


In [6]:
# Load settings
with open('settings.json', 'r') as inFile:
    settings = json.load(inFile)
    
# Set device 
device = "cuda:0" if torch.cuda.is_available() else "cpu"

# Load model
model = torch.load('best_model')
model.to(device)
model.eval()



Net(
  (l1): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(4050, 1032, padding_idx=1)
      (position_embeddings): Embedding(512, 1032, padding_idx=1)
      (token_type_embeddings): Embedding(2, 1032)
      (LayerNorm): LayerNorm((1032,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0): RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=1032, out_features=1032, bias=True)
              (key): Linear(in_features=1032, out_features=1032, bias=True)
              (value): Linear(in_features=1032, out_features=1032, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=1032, out_features=1032, bias=True)
              (LayerNorm): LayerNorm((1032,)

In [7]:
class CDR3Dataset(Dataset):
    
    def __init__(self, settings:dict, train:bool = True, label:str = None, tokenizer:tokenizers.Tokenizer=None, equal:bool=False) -> None:
        cols = ["activatedby_HA", "activatedby_NP", "activatedby_HCRT", "activated_any", "multilabel", "negative"]
        if label not in cols:
            raise ValueError("Invalid label type. Expected one of %s" % cols)
        else: 
            self.label = label
        if equal and label == "num_label":
            raise ValueError("Equal size sets only allowed for binary classifications. num_label is multiclass.")
        
        if train == True:
            path_to_data = settings["file"]["train_data"] 
        else:
            path_to_data = settings["file"]["test_data"]   
              
        self.path_to_data = path_to_data
        self.data = pd.read_csv(self.path_to_data)
        if equal == True:
            min_sample=np.min(self.data[self.label].value_counts()) 
            data_pos = self.data[self.data[self.label]==1].sample(min_sample)
            data_neg = self.data[self.data[self.label]==0].sample(min_sample)
            self.data = pd.concat([data_pos, data_neg], ignore_index=True)
        
        if label == "multilabel":
            self.labels = [0,1]
            self.n_labels = 4
        else:
            self.labels = np.unique(self.data[[self.label]])
            self.n_labels = len(self.labels)
            
        self.max_len = self.data.CDR3ab.str.len().max()
        
        self.tokenizer = tokenizer
        
    def __getitem__(self, index:int):
        if isinstance(self.tokenizer, tokenizers.Tokenizer):
            self.tokenizer.enable_padding(length=self.max_len)
            CDR3ab = " ".join(list(self.data.CDR3ab[index]))
            encodings = self.tokenizer.encode(CDR3ab)
            item = {
                "ids":tensor(encodings.ids, dtype=torch.long),
                "attention_mask": tensor(encodings.attention_mask, dtype=torch.long), 
                "CDR3ab": self.data.CDR3ab[index]
                }
        else:
            self.tokenizer.enable_padding(length=self.max_len)
            encodings = self.tokenizer.encode(self.data.CDR3ab[index]) 
            item = {
                "ids":tensor(encodings.ids, dtype=torch.long),
                "attention_mask": tensor(encodings.attention_mask, dtype=torch.long),
                "CDR3ab": self.data.CDR3ab[index]
                }
        if self.label == "multilabel":
            item["target"]=tensor(self.data[["activatedby_HA", "activatedby_NP", "activatedby_HCRT", "negative"]].iloc[index],dtype =torch.long)
        else:
            item["target"] = tensor(self.data[self.label][index], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.data)

In [9]:
# Create tonekizer from tokenizers library 
if settings["param"]["tokenizer"] == "BPE":
    normalizer = normalizers.Sequence([Lowercase(), NFD()])
    pre_tokenizer = pre_tokenizers.Sequence([ByteLevel()])
    tokenizer = ByteLevelBPETokenizer(settings["tokenizer"]["BPE_vocab"], settings["tokenizer"]["BPE_merge"])
    tokenizer.normalizer = normalizer
    tokenizer.pre_tokenizer = pre_tokenizer
elif settings["param"]["tokenizer"] == "WL":
    normalizer = normalizers.Sequence([Lowercase(), NFD()])
    pre_tokenizer = pre_tokenizers.Sequence([Whitespace()])
    tokenizer = Tokenizer(WordLevel()).from_file(settings["tokenizer"]["WL"])
    tokenizer.pre_tokenizer = pre_tokenizer
    tokenizer.normalizer = normalizer
    tokenizer.enable_padding()
else:
    raise ValueError("Unknown tokenizer. Tokenizer argument must be BPE or WL.")
    
# Create training and test dataset
dataset_params={"label":settings["database"]["label"], "tokenizer":tokenizer}
train_data = CDR3Dataset(settings,train=True, equal=False, **dataset_params)
test_data =CDR3Dataset(settings, train=False, **dataset_params)

# Crate dataloaders
loader_params = {'batch_size': 10,
            'shuffle': True,
            'num_workers': 0
            }
train_dataloader = DataLoader(train_data, **loader_params)
test_dataloader = DataLoader(test_data, **loader_params)

In [18]:
# Get 10 random CDRs and predict 
sample_train = next(iter(train_dataloader))

# Predict 
torch.cuda.empty_cache() 
model.eval()
outs_df = []
ids = sample_train["ids"].to(device)
attention_mask = sample_train["attention_mask"].to(device)
targets = sample_train["target"].to(device)
outs = model(ids, attention_mask)

# Bring to CPU 
targets = targets.to('cpu') .detach().numpy()
outs = outs.to('cpu')
outs = outs.detach().numpy()
outs = np.around(outs, decimals=3) 

cols_prob = ["PROB_activatedby_HA", "PROB_activatedby_NP", "PROB_activatedby_HCRT", "PROB_negative"]
cols = ["activatedby_HA", "activatedby_NP", "activatedby_HCRT", "negative"]

# Crate dataframes 
outs_df = pd.DataFrame.from_records(outs)
outs_df.columns = cols_prob

# Crate dataframe of targets
targets_df = pd.DataFrame.from_records(targets)
targets_df.columns = cols
targets_df.insert(0, 'CDR3ab', sample_train['CDR3ab'])

# Concat 
comp_df = pd.concat([targets_df, outs_df], axis=1)
comp_df



Unnamed: 0,CDR3ab,activatedby_HA,activatedby_NP,activatedby_HCRT,negative,PROB_activatedby_HA,PROB_activatedby_NP,PROB_activatedby_HCRT,PROB_negative
0,CAETFRGAQKLVF_CASSSTGNTGELFF,1,0,0,0,0.999,0.0,0.0,0.002
1,CAVGAGFGNEKLTF_CASSNVYKDVGGYTF,0,0,0,1,0.0,0.0,0.0,1.0
2,CAVNAGGTSYGKLTF_CASSQGRMYEQYF,0,1,0,0,0.0,1.0,0.001,0.0
3,CAVNTGFQKLVF_CSAILAGGRQETQYF,0,0,0,1,0.0,0.0,0.0,1.0
4,CAVETDSWGKLQF_CASSQDQGQTQPQHF,0,0,0,1,0.0,0.0,0.0,1.0
5,CALSDRGGSEKLVF_CASSLDGGSTDTQYF,0,0,0,1,0.0,0.0,0.0,1.0
6,CAVEADNYGQNFVF_CASSRPQGYDTQYF,0,0,1,0,0.0,0.0,0.96,0.024
7,CAMREGGTDKLIF_CASSLRTGVGAFF,0,0,0,1,0.0,0.0,0.0,1.0
8,CVVRTGGYQKVTF_CASSFQMERDTQYF,0,0,0,1,0.0,0.0,0.0,1.0
9,CALNTGGFKTIF_CASSYQGEEETQYF,0,0,1,0,0.0,0.0,0.955,0.027


In [83]:

# Get 10 random CDRs and predict 
sample_test = next(iter(test_dataloader))

# Predict 
model.eval()
outs_df = []
ids = sample_test["ids"].to(device)
attention_mask = sample_test["attention_mask"].to(device)
targets = sample_test["target"].to(device)
outs = model(ids, attention_mask)

# Bring to CPU 
targets = targets.to('cpu') .detach().numpy()
outs = outs.to('cpu')
outs = outs.detach().numpy()
outs = np.around(outs, decimals=3) 

cols_prob = ["PROB_activatedby_HA", "PROB_activatedby_NP", "PROB_activatedby_HCRT", "PROB_negative"]
cols = ["activatedby_HA", "activatedby_NP", "activatedby_HCRT", "negative"]

# Crate dataframes 
outs_df = pd.DataFrame.from_records(outs)
outs_df.columns = cols_prob

# Crate dataframe of targets
targets_df = pd.DataFrame.from_records(targets)
targets_df.columns = cols
targets_df.insert(0, 'CDR3ab', sample_test['CDR3ab'])

# Concat 
comp_df = pd.concat([targets_df, outs_df], axis=1)
comp_df



Unnamed: 0,CDR3ab,activatedby_HA,activatedby_NP,activatedby_HCRT,negative,PROB_activatedby_HA,PROB_activatedby_NP,PROB_activatedby_HCRT,PROB_negative
0,CAGRTDSWGKFQF_CSARDRWQQTSYEQYF,0,1,0,0,0.0,0.0,0.0,1.0
1,CAGQTNQGAQKLVF_CASRPLRVQETQYF,0,1,0,0,0.0,0.0,0.0,0.999
2,CAVMGNTGKLIF_CASSSGTSKDTQYF,1,1,0,0,0.0,0.001,0.882,0.078
3,CAVNTGGFKTIF_CSAELAGVSTDTQYF,0,1,0,0,0.0,0.0,0.0,0.999
4,CAVSDRTGGFKTIF_CASSLFDYEQYF,0,1,0,0,0.0,0.0,0.0,1.0
5,CAVDPQAGTALIF_CASSEAGGSNQPQHF,0,1,1,0,0.0,1.0,0.003,0.0
6,CAVETDSWGKLQF_CASSFTGSVGYTF,0,1,0,0,0.0,0.0,0.0,0.999
7,CALMNNNAGNMLTF_CATSSGGGGKAYGYTF,0,1,0,0,0.001,0.0,0.0,1.0
8,CAASKDSSYKLIF_CASSLGGGSETQYF,0,1,0,0,0.001,0.0,0.0,0.999
9,CAAGDNDMRF_CASSQPGGGGANVLTF,1,0,0,0,0.028,0.088,0.004,0.906
