In [None]:
import os
import random
import numpy as np
from tqdm import tqdm
import torch
from torch import nn, Tensor
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
import pdb
from typing import Optional, Union, Tuple, List
import logging
import math
import pandas as pd
from sklearn.metrics import accuracy_score

from transformers import TrainingArguments, Trainer, TrainerCallback, DefaultDataCollator
from deepchem.feat.smiles_tokenizer import SmilesTokenizer
from transformers.trainer_pt_utils import _get_learning_rate
from transformers.models.bert.modeling_bert import BertEncoder, BertOnlyMLMHead, BertPooler, BertModel
from transformers.modeling_outputs import MaskedLMOutput, BaseModelOutputWithPoolingAndCrossAttentions
from transformers import AutoConfig, BertConfig, BertForMaskedLM, BertPreTrainedModel, BertForSequenceClassification
from transformers.modeling_outputs import SequenceClassifierOutput

# os.environ["TOKENIZERS_PARALLELISM"] = "true"


from accelerate import Accelerator, skip_first_batches
from accelerate import __version__ as accelerate_version
from accelerate.utils import DistributedDataParallelKwargs, GradientAccumulationPlugin

## Seed 고정

In [2]:
def seed_everything(seed:int = 1004):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)  # current gpu seed
    torch.cuda.manual_seed_all(seed) # All gpu seed
    torch.backends.cudnn.deterministic = True  
    torch.backends.cudnn.benchmark = False  # True로 하면 gpu에 적합한 알고리즘을 선택함.

seed_everything(1004)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


## Config

In [3]:
# Please change to BEST model path(BERT_withAbsolutePE model)
MODEL_PATH = '/home/user/10TB/NEWBERT/SmileBERT_absolute_MLMmodel/'

In [4]:
config = AutoConfig.from_pretrained(MODEL_PATH)  

print(config)

BertConfig {
  "_name_or_path": "/home/user/10TB/NEWBERT/SmileBERT_absolute_MLMmodel/",
  "architectures": [
    "BertForMaskedLM"
  ],
  "attention_probs_dropout_prob": 0.1,
  "classifier_dropout": null,
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.1,
  "hidden_size": 768,
  "initializer_range": 0.02,
  "intermediate_size": 3072,
  "layer_norm_eps": 1e-12,
  "max_position_embeddings": 512,
  "model_type": "bert",
  "num_attention_heads": 12,
  "num_hidden_layers": 12,
  "pad_token_id": 0,
  "position_embedding_type": "absolute",
  "torch_dtype": "float32",
  "transformers_version": "4.30.2",
  "type_vocab_size": 1,
  "use_cache": true,
  "vocab_size": 591
}



In [5]:
# please change your own path
vocab_path = '/home/egg2018037024/Jupyter_Home/Brand-New-ChemBERT/vocab.txt'
tokenizer = SmilesTokenizer(vocab_path)

In [6]:
tokenizer

SmilesTokenizer(name_or_path='', vocab_size=591, model_max_length=1000000000000000019884624838656, is_fast=False, padding_side='right', truncation_side='right', special_tokens={'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'}, clean_up_tokenization_spaces=True)

In [8]:
class BertForCustomSequenceClassification(BertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.config = config

        self.bert = BertModel(config)
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

        # Initialize weights and apply final processing
        self.post_init()


    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
            config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
            `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        pooled_output = outputs[1]

        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        
        # class_weights
        nSamples = [483, 1567]
        normedWeights = [1 - (x / sum(nSamples)) for x in nSamples]
        normedWeights = torch.FloatTensor(normedWeights).to(device)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss(normedWeights)  # Apply class-weight
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)
        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [9]:
model = BertForCustomSequenceClassification.from_pretrained(MODEL_PATH)

print(model)

Some weights of the model checkpoint at /home/user/10TB/NEWBERT/SmileBERT_absolute_MLMmodel/ were not used when initializing BertForCustomSequenceClassification: ['cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.bias']
- This IS expected if you are initializing BertForCustomSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForCustomSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForCustomSequenceClassification were not initialized from the model checkpoint at /home/user/10TB/NEWBERT/SmileBERT_

BertForCustomSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(591, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0): BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, ele

In [10]:
print(model.num_parameters())

86496002


## Data Processing

In [None]:
import pandas as pd

# Change your own dataset path
data = pd.read_csv("/home/user/10TB/Data/ChemBERT_FineTuning/BBBP.csv",
                  index_col=0
                  )
data

In [13]:
# label statistics
data["p_np"].value_counts()

p_np
1    1567
0     483
Name: count, dtype: int64

In [14]:
# Train:Valid:Test split

train = data.sample(frac=0.8, random_state=2024)
mid = data.drop(train.index)

val = mid.sample(frac=0.5, random_state=2024)
test = mid.drop(val.index)

print("Before Split : ", len(data))
print("After Split(Train Val Test) : ", len(train), len(val), len(test))

Before Split :  2050
After Split(Train Val Test) :  1640 205 205


In [15]:
train['p_np'].value_counts(), val['p_np'].value_counts(), test['p_np'].value_counts()

(p_np
 1    1256
 0     384
 Name: count, dtype: int64,
 p_np
 1    160
 0     45
 Name: count, dtype: int64,
 p_np
 1    151
 0     54
 Name: count, dtype: int64)

In [16]:
class Bert_Dataset(torch.utils.data.Dataset):
    def __init__(self, data:list, label, tokenizer):  
        # Assume that data is well pre-processed.
        self.data = data
        self.tokenizer = tokenizer
        self.label = label

    def __getitem__(self, idx):
        text = self.data[idx]
        tokens = self.tokenizer(text, 
                              #  return_tensors="pt",  # pytorch.Tensor로 리턴
                                max_length=512, 
                                padding="max_length",  
                                truncation=True,  # truncation, if it exceed max_length
                               )
        tokens['label'] = torch.LongTensor([self.label[idx]])
        
        return tokens

    def __len__(self):  # # of samples
        return len(self.data)

In [17]:
train_dataset = Bert_Dataset(train['smiles'].to_list(), train['p_np'].to_list(), tokenizer)
val_dataset = Bert_Dataset(val['smiles'].to_list(), val['p_np'].to_list(), tokenizer)
test_dataset = Bert_Dataset(test['smiles'].to_list(), test['p_np'].to_list(), tokenizer)

In [18]:
print(train_dataset.__len__())
print(train_dataset.__getitem__(97))
#print(train_dataset.__getitem__(970)['input_ids'].numpy())
# print(tokenizer.decode(train_dataset.__getitem__(970)['input_ids'].numpy()[0]))

1640
{'input_ids': [12, 33, 21, 26, 16, 17, 56, 20, 17, 19, 16, 17, 19, 35, 20, 16, 21, 18, 17, 16, 18, 16, 18, 16, 17, 22, 19, 18, 16, 19, 18, 17, 16, 33, 17, 19, 18, 56, 32, 17, 27, 18, 16, 26, 16, 16, 16, 43, 16, 32, 17, 16, 16, 16, 17, 22, 19, 18, 16, 43, 18, 16, 18, 16, 13, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 

In [19]:
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import multilabel_confusion_matrix
from transformers import EvalPrediction
from sklearn.metrics import mean_absolute_error
import torch

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    
    acc = accuracy_score(labels, preds)
    pre, recall, f1, _ = precision_recall_fscore_support(labels, preds, average="binary")
    
    
    return {
        "F1" : f1,
        'recall' : recall,
        'precision' : pre,
        'acc' : acc,
    }

In [20]:
# Change to your own path(output_dir & logging_dir)
training_args = TrainingArguments(
    output_dir="/home/user/10TB/NEWBERT/absolute_FT/BBBP_model_classweight",
    logging_dir= "/home/user/10TB/NEWBERT/absolute_FT/BBBP_model_classweight_log",
    num_train_epochs=10,
    learning_rate = 1e-6,
   # max_steps=1000,
    per_device_train_batch_size=32,
#    gradient_accumulation_steps = 16,
    per_device_eval_batch_size = 32,
#    eval_accumulation_steps = 32,
    logging_strategy = "epoch",
    save_strategy = "epoch",
    lr_scheduler_type = "linear",
    dataloader_num_workers = 12,
    warmup_ratio = 0.1,
    weight_decay=0.01,
    evaluation_strategy='epoch',
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    data_collator=DefaultDataCollator(return_tensors = "pt"),
    compute_metrics=compute_metrics,
)

In [None]:
#trainer.add_callback(CustomCallback(trainer))
trainer.train()

## Test

In [22]:
# Load your BEST checkpoint model
model = BertForCustomSequenceClassification.from_pretrained("/home/user/10TB/NEWBERT/absolute_FT/BBBP_model_classweight/checkpoint-130/")

In [23]:
eval_trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=DefaultDataCollator(return_tensors = "pt"),
    compute_metrics=compute_metrics,
)

In [None]:
eval_trainer.evaluate(test_dataset)

In [32]:
# If you want to save the FT results(output, label)
output = eval_trainer.predict(test_dataset)

prediction_csv = pd.DataFrame(output.predictions.argmax(-1), columns=['prediction'])
label_csv = pd.DataFrame(output.label_ids, columns=['label'])



In [34]:
# Save files
prediction_csv.to_csv("/home/user/10TB/NEWBERT/sinusoidal_FT/results/BBBP_predictions.csv")
label_csv.to_csv("/home/user/10TB/NEWBERT/sinusoidal_FT/results/BBBP_labels.csv")