Generate Bert Embedding + Hydrophilicity Encoding

In [None]:
!pip install transformers
! pip install tape_proteins

In [None]:
import numpy as np
from torch import Generator
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from tempfile import TemporaryFile
import os
import matplotlib.pyplot as plt
import pandas as pd

# Preliminaries

# from torchtext.data import Field, TabularDataset, BucketIterator, Iterator

# Models

import torch.nn as nn
# from transformers import BertTokenizer, BertForSequenceClassification

# Training

import torch.optim as optim

# Evaluation

from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns
# from tape import ProteinBertModel, TAPETokenizer


In [None]:
# Direct to data folder
%cd /content/drive/MyDrive/project_data

/content/drive/.shortcut-targets-by-id/1g3rUaoGdVQ9MqEkCjpoH7he2HFJB6Wws/project_data


# Hyperparams

In [None]:
hyperparams = {
    'validation_split' : 0.2,
    'split_seed' : 42,
    'batch_size' : 32,
    'lr' : 5e-3,
    'weight_decay' : 5e-6,
    'class_num' : 9, # need to change
    'dropout_prob' : 0.2,
    'rnn_hidden_size' : 256,
    'rnn_layer_num' : 3,
    'soft_aug_size' : 40
    }
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
alpha = [np.random.beta(8,8) for _ in range(hyperparams['soft_aug_size'])]

modes = ('train', 'dev', 'soft')

vocab = {'A': 1.8, 'R':-4.5,'N':-3.5,'D':-3.5,'C':2.5,'Q':-3.5,'E':-3.5,'G':-0.4,'H' :-3.2,'I':4.5,
         'L':3.8,'K':-3.9,'M':1.9,'F':2.8,'P':-1.6,'S':-0.8,'T':-0.7,'W':-0.9,'X':2, 'Y':-1.3,'V':4.2,'*':-100}

# Data 

In [None]:
train_data = np.load("train_embeddings.npy", allow_pickle=True)
# divergence
train_labels = raw_train_labels = np.load('train_label_balanced_new.npy', allow_pickle=True)
# clade
# train_labels_clade = np.load('train_label_clade_num.npy', allow_pickle=True)

val_data = np.load("valid_embeddings.npy", allow_pickle=True)
# divergence
val_labels = raw_val_labels = np.load("validation_label_balanced_new.npy", allow_pickle=True)
# clade
# val_labels_clade = np.load('validation_label_clade_num.npy', allow_pickle=True)

In [None]:
print(train_data.shape)
# print(np.shape(train_labels_clade))
print(val_data.shape)
# print(np.shape(val_labels_clade))

(43434,)
(4842,)


## mapping for DIVERGENCE labels in training data
* no need to run for clade

In [None]:
# Importing the relevant modules
from tape import ProteinBertModel, TAPETokenizer
model = ProteinBertModel.from_pretrained('bert-base')

model = model.to(device)

tokenizer = TAPETokenizer(vocab='iupac')  # iupac is the vocab for TAPE models, use unirep for the UniRep model


In [None]:
# Bert Embedding
train_embeds = []
for i, x in enumerate(data_loader['train']):
    if i > 0: break
    for text in x:
        token_id = tokenizer.encode(text)
        token_id = np.pad(token_id, (0, (1280 - len(token_id))))
        token_id = token_id[:1280]

        tokens.append(torch.tensor(token_id).reshape(1,-1))
    token_ids = torch.cat(tokens)
    token_ids = token_ids.to(device)
    output = model(token_ids)
    word_embedding = torch.mean(output[0], dim=1).reshape(-1, 1)
    train_embeds.append(word_embedding)

In [None]:
train_embeds = [embed.cpu().numpy() for embed in train_embeds]

In [None]:
train_embeds = np.array(train_embeds)

In [None]:
np.save('train_embed.npy', train_embeds)

In [None]:
# Hydrophilicity Encoding
vocab = {'A': 1.8, 'R':-4.5,'N':-3.5,'D':-3.5,'C':2.5,'Q':-3.5,'E':-3.5,'G':-0.4,'H' :-3.2,'I':4.5,
         'L':3.8,'K':-3.9,'M':1.9,'F':2.8,'P':-1.6,'S':-0.8,'T':-0.7,'W':-0.9,'X':2, 'Y':-1.3,'V':4.2,'*':-100}

train_encoders = []
for text in train_data:
    map = [vocab[x] for x in text]
    map = map + [0] * (1280-len(map))
    map = map[:1280]
    x = torch.FloatTensor(map).to(device)
    x = torch.unsqueeze(x,dim=1)
    train_encoders.append(x)

In [None]:
train_encoders = np.array([encode.cpu().numpy() for encode in train_encoders])

In [None]:
train_d = np.concatenate((train_embeds, train_encoders), axis=1)

In [None]:
val_embed = []
for text in val_data:
    token_ids = torch.tensor([tokenizer.encode(text[:-1])])
    token_ids = token_ids.to(device)
    with torch.no_grad():
        output = model(token_ids)
    word_embedding = torch.mean(output[0], dim=1).reshape(-1, 1)
    x = word_embedding
    x = torch.squeeze(x, dim=1)
    # print(x.shape)
    val_embed.append(x)

## data loading

In [None]:
class Dataset(Dataset):
    def __init__(self, X, Y = None, test=False, augment=None, aug_size=None):
        self.X = X
        self.Y = Y
        self.test = test
        self.augment = augment
        self.aug_size = aug_size
        if augment == "random_replace":
            self._random_replace()

    def __len__(self):
        assert(len(self.X) == len(self.Y))
        return len(self.X)

    def __getitem__(self,idx):
        x = self.X[idx]
        if not self.test:
            y = self.Y[idx]
        else:
            y = -1
        return (x, y)

    def _random_replace(self):
        ori_len = self.X.shape[0]
        aug_idx = np.random.choice(range(ori_len), size=self.aug_size,replace=False)
        aug_set = [self.X[i] for i in aug_idx]
        aug_data = []
        aug_labels = []
        for i, data in enumerate(aug_set):
            j = np.random.choice(range(data.shape[0]))
            data[j] = np.random.choice(range(24))
            aug_data.append(data)
            aug_labels.append(self.Y[aug_idx[i]])
        self.X = np.append(self.X, np.array(aug_data, dtype='O'), axis=0)
        self.Y = np.append(self.Y, np.array(aug_labels, dtype='O'), axis=0)


In [None]:
train_dataset = Dataset(train_d, train_labels)
val_dataset = Dataset(val_d, val_labels)

In [None]:
datasets = {'train': train_dataset, "dev": val_dataset}

data_loader = {
    mode: DataLoader(
        dataset = datasets[mode],
        batch_size = 64,
        shuffle = (mode == 'train' or mode == 'soft'),
        # collate_fn = mix_pad_collate if mode == 'soft' else pad_collate
    )
    for mode in ['train', "dev"]
}

In [None]:
class Classifier(nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()
        self.out = nn.Sequential(
            nn.Linear(2048, 1024),  
            # nn.Dropout(0.2),
            nn.ReLU(),
            # nn.BatchNorm1d(1024),
            nn.Linear(1024, 512), 
            # nn.Dropout(0.2),
            nn.ReLU(),
            # nn.BatchNorm1d(512),
            nn.Linear(512, 256),  
            # nn.Dropout(0.2),
            nn.ReLU(),
            # nn.BatchNorm1d(256),
            nn.Linear(256, 9),
        )

    def forward(self, x):
        x = x.view(x.size(0), -1)
        # print(x.shape)
        output = self.out(x)
        return output