In [None]:
# 載入需要使用的套件
'''
PyTorch相關內容
'''
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset,DataLoader, RandomSampler, SequentialSampler, random_split
import numpy as np
'''
Bert模型來源
'''
from transformers import *
'''
評估指標
'''
from sklearn.metrics import f1_score
'''
訓練即時偵測/視覺化工具
'''
import tensorboard
from torch.utils.tensorboard import SummaryWriter
'''
其他相關套件
'''
import os
import random
import csv
import json
from tqdm.notebook import tqdm
import warnings

In [None]:
warnings.filterwarnings('ignore')

In [None]:
# 方便重現結果,把所有可以random的都設定seed
def manual_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

manual_seed(42)

In [None]:
# 列出現有的 GPU列表
list(range(torch.cuda.device_count()))

In [None]:
# 指定要使用的 GPU
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

In [None]:
# 讀取預訓練好的 tokenizer及其 config以方便之後模型建構
bio_tokenizer = BertTokenizer.from_pretrained("monologg/biobert_v1.1_pubmed")
bio_config = BertConfig.from_pretrained("monologg/biobert_v1.1_pubmed", num_labels=5, finetuning_task="ddi")

In [None]:
# 指定我們要使用的 special tokens
ADDITIONAL_SPECIAL_TOKENS = ["<e1>", "</e1>", "<e2>", "</e2>"]
bio_tokenizer.add_special_tokens({"additional_special_tokens": ADDITIONAL_SPECIAL_TOKENS})

In [None]:
# 從資料 label.txt中取得對應的label並轉成 list
def get_label():
    return [label.strip() for label in open(f"./label.txt", 'r', encoding='utf-8')]

get_label()

In [None]:
# transformers套件內建class物件, 可將單一data取出
# 參考資訊： https://huggingface.co/transformers/main_classes/processors.html
class InputExample(object):

    def __init__(self, guid, text_a, label):
        self.guid = guid
        self.text_a = text_a
        self.label = label

    def __repr__(self):
        return str(self.to_json_string())

    def to_dict(self):
        """Serializes this instance to a Python dictionary."""
        output = copy.deepcopy(self.__dict__)
        return output

    def to_json_string(self):
        """Serializes this instance to a JSON string."""
        return json.dumps(self.to_dict(), indent=2, sort_keys=True) + "\n"

In [None]:
# transformers套件內建class物件, 將input features做整合
# 參考資訊同上
class InputFeatures(object):
    
    def __init__(self, input_ids, attention_mask, token_type_ids, label_id, e1_mask, e2_mask):
        self.input_ids = input_ids
        self.attention_mask = attention_mask
        self.token_type_ids = token_type_ids
        self.label_id = label_id
        self.e1_mask = e1_mask
        self.e2_mask = e2_mask

    def __repr__(self):
        return str(self.to_json_string())

    def to_dict(self):
        """Serializes this instance to a Python dictionary."""
        output = copy.deepcopy(self.__dict__)
        return output

    def to_json_string(self):
        """Serializes this instance to a JSON string."""
        return json.dumps(self.to_dict(), indent=2, sort_keys=True) + "\n"

In [None]:
class DDIProcessor(object):

    def __init__(self):
        self.relation_labels = get_label() # 取得 labels

    @classmethod
    def _read_tsv(cls, input_file, quotechar=None):
        # 讀取 tsv檔
        with open(input_file, "r", encoding="utf-8") as f:
            reader = csv.reader(f, delimiter="\t", quotechar=quotechar)
            lines = []
            for line in reader:
                lines.append(line)
            return lines

    def _create_examples(self, lines, set_type):
        # 將 tsv讀取出的資料轉成之後要轉成 Features的格式
        examples = []
        for (i, line) in enumerate(lines):
            guid = "%s-%s" % (set_type, i)  # 取出 id
            text_a = line[1]                # 取出文句
            text_a = text_a.lower()         # 轉成小寫
            label = self.relation_labels.index(line[0])
            examples.append(InputExample(guid=guid, text_a=text_a, label=label))
        return examples

    def get_examples(self, mode):
        file_to_read = None
        if mode == 'train':
            file_to_read = "./train.tsv"
        elif mode == 'test':
            file_to_read = "./test.tsv"
        return self._create_examples(self._read_tsv(file_to_read), mode)

In [None]:
def convert_examples_to_features(examples, max_seq_len, tokenizer,
                                 cls_token_segment_id=0,
                                 pad_token_segment_id=0,
                                 sequence_a_segment_id=0,
                                 mask_padding_with_zero=True):
    # 預設的 special tokens
    cls_token = tokenizer.cls_token        # [CLS]
    sep_token = tokenizer.sep_token        # [SEP]
    pad_token_id = tokenizer.pad_token_id  # [PAD]

    features = []
    for (ex_index, example) in enumerate(examples):

        # 取出文句後利用 tokenizer先行斷句
        tokens_a = tokenizer.tokenize(example.text_a)

        e11_p = tokens_a.index("<e1>")   # 第一種藥的起始
        e12_p = tokens_a.index("</e1>")  # 第一種藥的結尾
        e21_p = tokens_a.index("<e2>")   # 第二種藥的起始
        e22_p = tokens_a.index("</e2>")  # 第二種藥的結尾

        # 將我們自定義的special token置換成vocab中已有的符號
        tokens_a[e11_p] = "$"
        tokens_a[e12_p] = "$"
        tokens_a[e21_p] = "#"
        tokens_a[e22_p] = "#"

        # 先加1因為等下咱們句首會加上[CLS]
        e11_p += 1
        e12_p += 1
        e21_p += 1
        e22_p += 1

        # 在句尾放上[SEP]
        tokens = tokens_a
        tokens += [sep_token]

        # 標示序列為第幾句
        token_type_ids = [sequence_a_segment_id] * len(tokens)

        # 句首加上[CLS]
        tokens = [cls_token] + tokens
        token_type_ids = [cls_token_segment_id] + token_type_ids
        
        # 利用 tokenizer轉成vocab相對應的 id
        input_ids = tokenizer.convert_tokens_to_ids(tokens)

        # attention mask中, 1為要做attention的tokens, 0則否
        attention_mask = [1 if mask_padding_with_zero else 0] * len(input_ids)

        # padding到自訂最大長度
        padding_length = max_seq_len - len(input_ids)
        input_ids = input_ids + ([pad_token_id] * padding_length)
        attention_mask = attention_mask + ([0 if mask_padding_with_zero else 1] * padding_length)
        token_type_ids = token_type_ids + ([pad_token_segment_id] * padding_length)

        # 自製 e1_mask, e2_mask
        e1_mask = [0] * len(attention_mask)
        e2_mask = [0] * len(attention_mask)

        for i in range(e11_p, e12_p + 1):
            e1_mask[i] = 1
        for i in range(e21_p, e22_p + 1):
            e2_mask[i] = 1

        # 利用 assert確認一下轉換的序列長度避免出錯
        assert len(input_ids) == max_seq_len, "input_id 序列長度發生錯誤 {} vs 最大序列長度{}".format(len(input_ids), max_seq_len)
        assert len(attention_mask) == max_seq_len, "attention mask 序列長度發生錯誤 {} vs 最大序列長度{}".format(len(attention_mask), max_seq_len)
        assert len(token_type_ids) == max_seq_len, "token type id 序列長度發生錯誤 {} vs 最大序列長度{}".format(len(token_type_ids), max_seq_len)
        assert len(e1_mask) == max_seq_len, "e1_mask 序列長度發生錯誤 {} vs 最大序列長度{}".format(len(token_type_ids), max_seq_len)
        assert len(e2_mask) == max_seq_len, "e2_mask 序列長度發生錯誤 {} vs 最大序列長度{}".format(len(token_type_ids), max_seq_len)

        label_id = int(example.label)

        features.append(
            InputFeatures(input_ids=input_ids,
                          attention_mask=attention_mask,
                          token_type_ids=token_type_ids,
                          label_id=label_id,
                          e1_mask=e1_mask,
                          e2_mask=e2_mask))

    return features

In [None]:
def load_and_cache_examples(tokenizer, max_len, mode):
    processor = DDIProcessor()
    
    # 建立快取檔案(若已有建立則直接讀取)
    cached_file_name = 'bert_ddi_cached_{}_{}'.format(max_len, mode)
    cached_file_name = cached_file_name + '_lower'

    cached_features_file = "./" + cached_file_name
    
    if os.path.exists(cached_features_file):
        features = torch.load(cached_features_file)
    else:
        if mode == "train":
            examples = processor.get_examples("train")
        elif mode == "test":
            examples = processor.get_examples("test")
        else:
            raise Exception("mode只能輸入train或test")

        features = convert_examples_to_features(examples, max_len, tokenizer)
        torch.save(features, cached_features_file) # 儲存快取

    # 轉成 PyTorch使用的張量
    all_input_ids = torch.tensor([f.input_ids for f in features], dtype=torch.long)
    all_attention_mask = torch.tensor([f.attention_mask for f in features], dtype=torch.long)
    all_token_type_ids = torch.tensor([f.token_type_ids for f in features], dtype=torch.long)
    all_e1_mask = torch.tensor([f.e1_mask for f in features], dtype=torch.long)
    all_e2_mask = torch.tensor([f.e2_mask for f in features], dtype=torch.long)

    all_label_ids = torch.tensor([f.label_id for f in features], dtype=torch.long)
    
    # 轉成張量資料集
    dataset = TensorDataset(all_input_ids, all_attention_mask,
                            all_token_type_ids, all_label_ids, all_e1_mask, all_e2_mask)
    return dataset

In [None]:
# 建立訓練資料集及快取檔案
train_set = load_and_cache_examples(bio_tokenizer, 300, "train")

# 建立測試資料集及快取檔案
test_set = load_and_cache_examples(bio_tokenizer, 300, "test")

In [None]:
# 將訓練資料集分割出驗證資料集 (原訓練資料總數為12841)
train_set, eval_set = random_split(train_set, [9600, 3241])

In [None]:
# 設定 batch size
BATCH_SIZE = 16

In [None]:
# 設定讀取訓練資料集的形式
train_sampler = RandomSampler(train_set)

train_dataloader = DataLoader(train_set, sampler=train_sampler, batch_size=BATCH_SIZE)

# 設定讀取驗證資料集的形式
eval_sampler = SequentialSampler(eval_set)

eval_dataloader = DataLoader(train_set, sampler=eval_sampler, batch_size=BATCH_SIZE)

# 設定讀取測試資料集的形式
test_sampler = SequentialSampler(test_set)

test_dataloader = DataLoader(test_set, sampler=test_sampler, batch_size=BATCH_SIZE)

In [None]:
# 建構自定義全連接層
class FCLayer(nn.Module):
    def __init__(self, input_dim, output_dim, dropout_rate=0., use_activation=True):
        super(FCLayer, self).__init__()
        self.use_activation = use_activation
        self.dropout = nn.Dropout(dropout_rate)
        self.linear = nn.Linear(input_dim, output_dim)
        self.tanh = nn.Tanh()

    def forward(self, x):
        x = self.dropout(x)
        if self.use_activation:
            x = self.tanh(x)
        return self.linear(x)

In [None]:
# 建構模型
class BERT(BertPreTrainedModel):
    def __init__(self, bert_config):
        super(BERT, self).__init__(bert_config)
        # 使用已讀取的config 載入預訓練的 BioBERT模型 
        self.bert = BertModel.from_pretrained("monologg/biobert_v1.1_pubmed", config=bert_config)
        self.num_labels = bert_config.num_labels  # 指定分類的類別數
        self.cls_fc_layer = FCLayer(768, 768, dropout_rate=0.1)
        self.e1_fc_layer = FCLayer(768, 768, dropout_rate=0.1)
        self.e2_fc_layer = FCLayer(768, 768, dropout_rate=0.1)
        self.label_classifier = FCLayer(768 * 3, bert_config.num_labels, dropout_rate=0.1, use_activation=False)

    # 設定計算抽取的entity vectors
    @staticmethod
    def entity_average(output, e_mask):
        e_mask_unsqueeze = e_mask.unsqueeze(1)  # shape為 [batch_size, 1, j-i+1]
        length_tensor = (e_mask != 0).sum(dim=1).unsqueeze(1)  # [b, dim]

        # .bmm 為 batch matrix multiply
        sum_vector = torch.bmm(e_mask_unsqueeze.float(), output).squeeze(1)  # [b, 1, j-i+1] * [b, j-i+1, dim] = [b, 1, dim] -> [b, dim]
        avg_vector = sum_vector.float() / length_tensor.float()  # broadcasting
        return avg_vector

    def forward(self, input_ids, attention_mask, token_type_ids, labels, e1_mask, e2_mask):
        outputs = self.bert(input_ids, attention_mask=attention_mask,
                            token_type_ids=token_type_ids)
        sequence_output = outputs[0]
        pooled_output = outputs[1]  # [CLS]

        # Average
        e1_h = self.entity_average(sequence_output, e1_mask)
        e2_h = self.entity_average(sequence_output, e2_mask)

        # Dropout -> tanh -> fc_layer
        pooled_output = self.cls_fc_layer(pooled_output)
        e1_h = self.e1_fc_layer(e1_h)
        e2_h = self.e2_fc_layer(e2_h)

        # Concat -> label_classifier
        concat_h = torch.cat([pooled_output, e1_h, e2_h], dim=-1)
        logits = self.label_classifier(concat_h)
        
        # 計算 loss
        if labels is not None:
            if self.num_labels == 1:
                loss_fct = nn.MSELoss()
                loss = loss_fct(logits.view(-1), labels.view(-1))
            else:
                loss_fct = nn.CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
                
        outputs = (loss, logits)

        return outputs  # loss, logits

In [None]:
# 模型呼叫建構並分配給 GPU
model = BERT(bio_config)
model.cuda()
model.zero_grad()

In [None]:
# 設定訓練 epoch次數
EPOCH = 5

In [None]:
# 設定 optimizer和 scheduler (linear warmup and decay)
# 參考資訊：https://huggingface.co/transformers/main_classes/optimizer_schedules.html
no_decay = ['bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
    {'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
     'weight_decay': 0.0},
    {'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], 
     'weight_decay': 0.0}
    ]

optimizer = AdamW(optimizer_grouped_parameters, lr=5e-5, eps=1e-8)
scheduler = get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps=600,   # 熱身步數(熱身期間會使 lr逐漸增加到1)
                                            num_training_steps=len(train_set) * EPOCH / BATCH_SIZE,  # 9600 * 5 / 16
                                            last_epoch=-1)

In [None]:
# 定義下我們要評估模型的指標: accuracy, F1 score
def acc_and_f1(preds, labels):
    assert len(preds) == len(labels)
    acc = (preds == labels).mean()
    f1 = f1_score(y_true=labels, y_pred=preds, labels=[1, 2, 3, 4], average='micro')
    return acc, f1

In [None]:
# 設定模型儲存路徑
PATH = "./model-final.pt"

In [None]:
"""""""""""""""
     訓練
"""""""""""""""
global_step = 0           # 總步數
tr_loss = 0.0             # total loss
writer = SummaryWriter()  # tensorboard紀錄用
best_acc = 0.0            # 評估時紀錄目前最好的 accuracy
best_f1 = 0.0             # 評估時紀錄目前最好的 f1 score

for epoch_i in range(EPOCH):
    print("")
    print(f'======== Epoch {epoch_i + 1} / {EPOCH} ========')
    print('訓練模型...')
    
    iterator = tqdm(train_dataloader, desc="Iteration")
    
    steps_in_epoch = 0 # epoch中的步數
    
    for step, batch in enumerate(iterator):
        # 將模型設置為訓練模式
        model.train()
        
        # 模型清空梯度
        model.zero_grad()
        
        # 將 batch中資料配置到 GPU
        batch = tuple(t.cuda() for t in batch)
        
        # 包含六種不同的PyTorch張量:
        # [0]: input_ids 
        # [1]: token_type_ids - 此用來區別前後語句, 此專題不需要
        # [2]: attention_masks
        # [3]: labels 
        # [4]: e1_mask
        # [5]: e2_mask
        inputs = {'input_ids': batch[0],
                  'attention_mask': batch[1],
                  'token_type_ids': batch[2],
                  'labels': batch[3],
                  'e1_mask': batch[4],
                  'e2_mask': batch[5]}
        
        # 過模型 feed-forward pass, 將 loss取出
        outputs = model(**inputs)
        loss = outputs[0]
        
        # Tensorbaord紀錄 loss
        writer.add_scalar('Loss/train', loss.item(), global_step)
        
        # backpropagation計算梯度
        loss.backward()
        
        # 儲存 total loss
        tr_loss += loss.item()
        
        # 設置 clip值避免 exploding
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        # 更新 weights
        optimizer.step()
        
        # 更新 learning rate(若沒設定scheduler則忽略)
        scheduler.step()

        # 更新步數
        steps_in_epoch += 1
        global_step += 1
        
        # 訓練一部分後即進行評估
        if steps_in_epoch % 300 == 0:
            """""""""""""""
                 評估
            """""""""""""""
            print(f'評估模型 {steps_in_epoch // 300}/2')
            eval_loss = 0.0        # 紀錄 evaluation loss
            eval_steps = 0         # 紀錄 evaluation steps
            preds = None           # 紀錄預測的結果
            out_label_ids = None   # 紀錄 ground truth (labels)

            # 將模型設為評估模式
            model.eval()

            for batch in tqdm(eval_dataloader, desc="Evaluating"):
                batch = tuple(t.cuda() for t in batch)
                # 不需要計算梯度
                with torch.no_grad():
                    inputs = {'input_ids': batch[0],
                              'attention_mask': batch[1],
                              'token_type_ids': batch[2],
                              'labels': batch[3],
                              'e1_mask': batch[4],
                              'e2_mask': batch[5]}
                    outputs = model(**inputs)
                    tmp_eval_loss, logits = outputs

                    eval_loss += tmp_eval_loss.mean().item()
                eval_steps += 1

                if preds is None:
                    preds = logits.detach().cpu().numpy()
                    out_label_ids = inputs['labels'].detach().cpu().numpy()
                else:
                    preds = np.append(preds, logits.detach().cpu().numpy(), axis=0)
                    out_label_ids = np.append(out_label_ids, inputs['labels'].detach().cpu().numpy(), axis=0)

            # 計算平均測試 loss
            eval_loss = eval_loss / eval_steps
            # 利用 argmax取出預測的分類
            preds = np.argmax(preds, axis=1)
            # 計算 accuracy / f1 score
            acc, f1 = acc_and_f1(preds, out_label_ids)
            
            # Tensorboard紀錄評估結果
            writer.add_scalar('Valid/loss', eval_loss, global_step)
            writer.add_scalar('Valid/acc', acc, global_step)
            writer.add_scalar('Valid/F1', f1, global_step)

            # 更新 best_acc, best_f1, 如果進步便儲存更新模型
            if acc >= best_acc and f1 >= best_f1:
                best_acc = acc
                best_f1 = f1
                # 儲存模型(此儲存方式為PyTorch官方建議方法, 只儲存參數)
                torch.save(model.state_dict(), PATH)

                print(f"""
                模型更新！ 目前已訓練{global_step}筆batch資料
                評估結果為:
                ---------------------
                {"loss"}: {eval_loss:.6f} 
                {"acc"} : {best_acc:.6f}
                {"F1"}   : {best_f1:.6f}
                ---------------------
                """)
            else:
                print("模型未更新！")

# 關閉 Tensorboard寫入
writer.close()

In [None]:
# 讀取模型
model = BERT(bio_config).cuda()
model.load_state_dict(torch.load(PATH))

In [None]:
"""""""""""""""""""""
    最終評估測試集
"""""""""""""""""""""
test_loss = 0.0        # evaluation loss
test_steps = 0         # evaluation steps
preds = None           # 紀錄預測的結果
out_label_ids = None   # 紀錄ground truth (labels)

# 將模型設為評估模式
model.eval()

for batch in tqdm(test_dataloader, desc="Testing"):
    batch = tuple(t.cuda() for t in batch)
    with torch.no_grad():
        inputs = {'input_ids': batch[0],
                  'attention_mask': batch[1],
                  'token_type_ids': batch[2],
                  'labels': batch[3],
                  'e1_mask': batch[4],
                  'e2_mask': batch[5]}
        outputs = model(**inputs)
        tmp_test_loss, logits = outputs[:2]

        test_loss += tmp_test_loss.mean().item()
    test_steps += 1

    if preds is None:
        preds = logits.detach().cpu().numpy()
        out_label_ids = inputs['labels'].detach().cpu().numpy()
    else:
        preds = np.append(preds, logits.detach().cpu().numpy(), axis=0)
        out_label_ids = np.append(out_label_ids, inputs['labels'].detach().cpu().numpy(), axis=0)

# 計算平均 loss
final_loss = test_loss / test_steps
# 利用 argmax取出預測的分類
preds = np.argmax(preds, axis=1)
# 計算 accuracy / f1 score
acc, f1 = acc_and_f1(preds, out_label_ids)

print(f"""
最終測試結果:
---------------------
{"loss"}: {final_loss:.6f} 
{"acc"} : {acc:.6f}
{"F1"}   : {f1:.6f}
---------------------
""")