In [3]:
import re
pattern = r"[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\uff01]"
punctuation_list = re.findall(r'\\u[0-9a-fA-F]{4}', pattern)

punctuation_list = [bytes.fromhex(p[2:]).decode('utf-16be') for p in punctuation_list]

punctuation_map = {idx: value for idx, value in enumerate(punctuation_list)}
punctuation_map[13] = "word"

for i in range(14):
    print(i, punctuation_map[i])


0 。
1 ；
2 ，
3 ：
4 “
5 ”
6 （
7 ）
8 、
9 ？
10 《
11 》
12 ！
13 word


In [4]:
def tagging(isinstance):
    tag = []
    for c_idx in range(len(isinstance)):
        if(c_idx == len(isinstance) - 1):
            if isinstance[c_idx] in punctuation_list:
                continue
            else:
                tag.append(13)
        elif isinstance[c_idx] in punctuation_list:
            continue
        elif isinstance[c_idx + 1] in punctuation_list:
            tag.append(punctuation_list.index(isinstance[c_idx + 1]))
        else:
            tag.append(13) # 13代表word
    return tag




In [5]:
from transformers import BertTokenizerFast

tokenizer = BertTokenizerFast.from_pretrained('bert-base-chinese')
label_all_tokens = True
def align_label(texts, labels):
    tokenized_inputs = tokenizer(texts, truncation=True)

    word_ids = tokenized_inputs.word_ids()

    previous_word_idx = None
    label_ids = []

    for word_idx in word_ids:

        if word_idx is None:
            label_ids.append(-100)

        elif word_idx != previous_word_idx:
            try:
                label_ids.append(labels[word_idx])
            except:
                label_ids.append(-100)
        else:
            try:
                label_ids.append(labels[word_idx] if label_all_tokens else -100)
            except:
                label_ids.append(-100)
        previous_word_idx = word_idx

    return label_ids

  from .autonotebook import tqdm as notebook_tqdm


In [21]:
"""
此 Dataset 每次將 tsv 裡的一筆句子轉換成 BERT 相容的格式，並回傳 3 個 tensors：
- tokens_tensor：句子的索引序列，包含 [CLS] 與 [SEP]
- segments_tensor：可以用來識別兩個句子界限的 binary tensor, 此處全為 0，因為只有一個句子
- label_tensor：將分類標籤轉換成類別索引的 tensor, 如果是測試集則回傳 None
"""
import torch
from torch.utils.data import Dataset
import pandas as pd
import numpy as np


    
class Classical_chinese(Dataset):
    # 讀取前處理後的 tsv 檔並初始化一些參數
    def __init__(self, mode, tokenizer):
          # 一般訓練你會需要 dev set
        self.mode = mode
        # 大數據你會需要用 iterator=True
        self.df =  pd.read_csv("./data/" + mode + ".tsv", sep="\t").fillna("")
        self.len = len(self.df)
        self.tokenizer = tokenizer  # 我們將使用 BERT tokenizer
    
    # 定義回傳一筆訓練 / 測試數據的函式
    def __getitem__(self, idx):
        original, target, tag = self.df.iloc[idx, :].values
        tag = eval(tag)

        

            
        # 建立第一個句子的 BERT tokens 並加入分隔符號 [SEP]
        word_pieces = ["[CLS]"]
        tokens_a = self.tokenizer.tokenize(original)
        word_pieces += tokens_a + ["[SEP]"]
        len_a = len(word_pieces)
        
        # 將整個 token 序列轉換成索引序列
        ids = self.tokenizer.convert_tokens_to_ids(word_pieces)
        tokens_tensor = torch.tensor(ids)

        # 將標記也轉換成索引序列
        if (tag != None):
            tag_id = tag
            tag_id = align_label(original, tag_id)
            tag_tensor = torch.tensor(tag_id)
            
        # 將第一句包含 [SEP] 的 token 位置設為 0
        segments_tensor = torch.tensor([0] * len_a ,dtype=torch.long)

        
        return (tokens_tensor, segments_tensor, tag_tensor)
        
    
    def __len__(self):
        return self.len
    
    def drop_record(self, index):
        # 删除数据集中指定索引的记录
        del self[index]
        self.len = self.len - 1

    
    
# 初始化一個專門讀取訓練樣本的 Dataset，使用中文 BERT 斷詞
trainset = Classical_chinese("train", tokenizer=tokenizer)
testset = Classical_chinese("test", tokenizer=tokenizer)

In [22]:
"""
實作可以一次回傳一個 mini-batch 的 DataLoader
這個 DataLoader 吃我們上面定義的 `Classical chinese`，
回傳訓練 BERT 時會需要的 4 個 tensors：
- tokens_tensors  : (batch_size, max_seq_len_in_batch)
- segments_tensors: (batch_size, max_seq_len_in_batch)
- masks_tensors   : (batch_size, max_seq_len_in_batch)
- label_ids       : (batch_size, max_seq_len_in_batch)
"""

from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

# 這個函式的輸入 `samples` 是一個 list，裡頭的每個 element 都是
# 剛剛定義的 `FakeNewsDataset` 回傳的一個樣本，每個樣本都包含 3 tensors：
# - tokens_tensor
# - segments_tensor
# - label_tensor
# 它會對前兩個 tensors 作 zero padding，並產生前面說明過的 masks_tensors
def create_mini_batch(samples):
    tokens_tensors = [s[0] for s in samples]
    segments_tensors = [s[1] for s in samples]
    tag_tensors = [s[2] for s in samples]
    
    
    # zero pad 到同一序列長度
    
    tokens_tensors = pad_sequence(tokens_tensors, 
                                  batch_first=True)
    segments_tensors = pad_sequence(segments_tensors, 
                                    batch_first=True)
    if samples[0][2] is not None:
        tag_tensors = pad_sequence(tag_tensors, 
                                    batch_first=True, padding_value=-100)
    else:
        tag_tensors = None
    
    
    # attention masks，將 tokens_tensors 裡頭不為 zero padding
    # 的位置設為 1 讓 BERT 只關注這些位置的 tokens
    masks_tensors = torch.zeros(tokens_tensors.shape, 
                                dtype=torch.long)
    masks_tensors = masks_tensors.masked_fill(
        tokens_tensors != 0, 1)
    
    return tokens_tensors, segments_tensors, masks_tensors, tag_tensors


# 初始化一個每次回傳 64 個訓練樣本的 DataLoader
# 利用 `collate_fn` 將 list of samples 合併成一個 mini-batch 是關鍵
BATCH_SIZE = 64
trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, 
                        collate_fn=create_mini_batch)
evalloader = DataLoader(testset, batch_size=BATCH_SIZE,
                        collate_fn=create_mini_batch)

In [15]:
from transformers import BertForTokenClassification, BertTokenizerFast

class BertModel(torch.nn.Module):

    def __init__(self):

        super(BertModel, self).__init__()

        self.bert = BertForTokenClassification.from_pretrained(
            "bert-base-chinese", 
            num_labels=14)

    def forward(self, input_id, mask, label):
        output = self.bert(
            input_ids = input_id,
            attention_mask = mask, 
            labels=label, 
            return_dict=False)

        return output


In [23]:
import torch.optim as optim
from tqdm import tqdm

model = BertModel()
LEARNING_RATE = 5e-3
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE)

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

if use_cuda:
    model = model.cuda()

best_acc = 0
best_loss = 1000

total_acc_train = 0
total_loss_train = 0
model.train()

for tokens_tensor,segments_tensor,mask_tensor,tag_tensor in tqdm(trainloader):

    tag_tensor = tag_tensor.to(device)
    mask_tensor = mask_tensor.squeeze(1).to(device)
    input_id = tokens_tensor.squeeze(1).to(device)

    optimizer.zero_grad()
    loss, logits = model(input_id, mask_tensor, tag_tensor)

    for i in range(logits.shape[0]):

        logits_clean = logits[i][tag_tensor[i] != -100]
        label_clean = tag_tensor[i][tag_tensor[i] != -100]

        predictions = logits_clean.argmax(dim=1)
        acc = (predictions == label_clean).float().mean()
        total_acc_train += acc
        total_loss_train += loss.item()

    loss.backward()
    optimizer.step()
    


Some weights of the model checkpoint at bert-base-chinese were not used when initializing BertForTokenClassification: ['cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.bias']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-c

KeyboardInterrupt: 

In [25]:
import torch.optim as optim
from tqdm import tqdm
def train_loop(model, df_train):

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE)

    if use_cuda:
        model = model.cuda()

    best_acc = 0
    best_loss = 1000

    for epoch_num in range(EPOCHS):

        total_acc_train = 0
        total_loss_train = 0

        model.train()

        for tokens_tensor,segments_tensors,masks_tensors,tag_tensor in tqdm(trainloader):

            tag_tensor = tag_tensor.to(device)
            mask = masks_tensors.to(device)
            input_id = tokens_tensor.to(device)

            optimizer.zero_grad()
            loss, logits = model(input_id, mask, tag_tensor)

            for i in range(logits.shape[0]):

              logits_clean = logits[i][tag_tensor[i] != -100]
              label_clean = tag_tensor[i][tag_tensor[i] != -100]

              predictions = logits_clean.argmax(dim=1)
              acc = (predictions == label_clean).float().mean()
              total_acc_train += acc
              total_loss_train += loss.item()

            loss.backward()
            optimizer.step()

        torch.save(model.state_dict(), "/Users/audi/Desktop/專業選修/人工智慧概論/AI_project/model"+str(epoch_num)+".pth")
        model.eval()

        print(
            f'Epochs: {epoch_num + 1} | Loss: {total_loss_train / len(df_train): .3f} | Accuracy: {total_acc_train / len(df_train): .3f} ')

LEARNING_RATE = 5e-3
EPOCHS = 5
BATCH_SIZE = 64

model = BertModel()
train_loop(model, trainset)

            

Some weights of the model checkpoint at bert-base-chinese were not used when initializing BertForTokenClassification: ['cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.bias']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-c

KeyboardInterrupt: 

In [26]:
from matplotlib import pyplot as plt
def color_confusion_matrix(confusion_matrix):
    # 顏色越深代表數值越大
    plt.imshow(confusion_matrix, cmap=plt.cm.Blues)
    plt.colorbar()
    plt.xticks(range(14), range(14))
    plt.yticks(range(14), range(14))
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.show()

In [27]:
import torch.optim as optim
from tqdm import tqdm

def evaluate(model, df_test):
    

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    if use_cuda:
        model = model.cuda()

    total_acc_test = 0.0
    total_recall_test = np.zeros(14)
    total_precision_test = np.zeros(14)
    total__true_positive = np.zeros(14)
    confusion_matrix = np.zeros((14, 14))
    check_recall = np.zeros(14)
    check_precision = np.zeros(14)


    for tokens_tensor,segments_tensors,masks_tensors,tag_tensor in tqdm(evalloader):

            tag_tensor = tag_tensor.to(device)
            mask = masks_tensors.to(device)
            input_id = tokens_tensor.to(device)

            loss, logits = model(input_id, mask, tag_tensor)
            for i in range(logits.shape[0]):
                logits_clean = logits[i][tag_tensor[i] != -100]
                label_clean = tag_tensor[i][tag_tensor[i] != -100]
                predictions = logits_clean.argmax(dim=1)
                label_clean_list = label_clean.tolist()
                predictions_list = predictions.tolist() #.cpu()

                for idx in range(len(label_clean_list)):
                    total_recall_test[label_clean_list[idx]] += 1
                    total_precision_test[predictions_list[idx]] += 1
                    if label_clean_list[idx] == predictions_list[idx]:
                        total__true_positive[label_clean_list[idx]] += 1
                    confusion_matrix[predictions_list[idx]][label_clean_list[idx]] += 1
                
                acc = (predictions == label_clean).float().mean()
                total_acc_test += acc

    confusion_matrix = np.nan_to_num(confusion_matrix, nan=0)
    color_confusion_matrix(confusion_matrix)
    for i in range(14):
        for j in range(14):
            check_recall[i] += confusion_matrix[i][j]
            check_precision[j] += confusion_matrix[i][j]
    
    val_accuracy = total_acc_test / len(df_test)
    val_recall = total__true_positive / total_recall_test 
    val_recall = np.nan_to_num(val_recall, nan=0)
    val_precision = total__true_positive / total_precision_test
    val_precision = np.nan_to_num(val_precision, nan=0)
    val_f1 = 2 * (total__true_positive / total_precision_test) * (total__true_positive / total_recall_test) / ((total__true_positive / total_precision_test) + (total__true_positive / total_recall_test))
    val_f1 = np.nan_to_num(val_f1, nan=0)
    print(f'\nTest Accuracy: {val_accuracy: .3f}')
    print(f'\nTest Recall:')
    for i in range(14): 
        print(f'{punctuation_map[i]} : {val_recall[i]}')
    print(f'\nTest Precision:')
    for i in range(14):
        print(f'{punctuation_map[i]} : {val_precision[i]}')
    print(f'\nTest F1:')
    for i in range(14):
        print(f'{punctuation_map[i]} : {val_f1[i]}')
    
model = BertModel()
model.load_state_dict(torch.load("./model/50percent_model1.pth"))
evaluate(model, testset)

Some weights of the model checkpoint at bert-base-chinese were not used when initializing BertForTokenClassification: ['cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.bias']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-c

KeyboardInterrupt: 

In [28]:

def evaluate_one_text(model, sentence):
    
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")
    

    if use_cuda:
        model = model.cuda()
    label = tagging(sentence)

    text = tokenizer(sentence, truncation=True, return_tensors="pt")
    


    mask = text['attention_mask'].to(device)
    input_id = text['input_ids'].to(device)
    label_ids = torch.Tensor(align_label(sentence, label)).unsqueeze(0).to(device)

    logits = model(input_id, mask, None)
    logits_clean = logits[0][label_ids != -100]

    predictions = logits_clean.argmax(dim=1).tolist()
    prediction_label = [punctuation_map[i] for i in predictions]
    print(f'original sentence\n{sentence}')

    print("recover_sentence")
    for idx in range(len(sentence)):
        if prediction_label[idx] == 'word':
            print(sentence[idx], end='')
        else: 
            print(sentence[idx], end='')
            print(prediction_label[idx], end='')
        
        

model = BertModel()
model.load_state_dict(torch.load("./model/50percent_model1.pth"))
evaluate_one_text(model, "皇天后土实鉴此心背义忘恩天人共戮")

Some weights of the model checkpoint at bert-base-chinese were not used when initializing BertForTokenClassification: ['cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.bias']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-c

original sentence
皇天后土实鉴此心背义忘恩天人共戮
recover_sentence
皇天后土，实鉴此心，背义忘恩，天人共戮。