In [1]:
import os
import numpy as np
import tensorflow as tf
from transformers import *
from tensorflow.keras.callbacks import  ModelCheckpoint
from seqeval.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report
import pandas as pd



In [2]:
#random seed 고정
tf.random.set_seed(1234)
np.random.seed(1234)

BATCH_SIZE = 2
NUM_EPOCHS = 7
MAX_LEN = 512 # EDA에서 추출된 Max Length
DATA_IN_PATH = 'data_in/KOR'
DATA_OUT_PATH = "data_out/KOR"

In [3]:
DATA_TRAIN_PATH = os.path.join("train_path")
DATA_TEST_PATH = os.path.join("test_path")


def read_file(input_path):
    """Read tsv file, and return words and label as list"""
    with open(input_path, "r", encoding="utf-8") as f:
        sentences = []
        labels = []
        for line in f:
            split_line = line.strip().split("\t")
            sentences.append(split_line[0])
            labels.append(split_line[1])
        return sentences[1:], labels[1:]

train_sentences, train_labels = read_file(DATA_TRAIN_PATH)
for i in range(len(train_labels)):
    train_labels[i] = train_labels[i][1:-1].replace('\'','').replace(' ','').split(",")
train_ner_dict = {"sentence": train_sentences, "label": train_labels}
train_ner_df = pd.DataFrame(train_ner_dict)

test_sentences, test_labels = read_file(DATA_TEST_PATH)
for i in range(len(test_labels)):
    test_labels[i] = test_labels[i][1:-1].replace('\'','').replace(' ','').split(",")
test_ner_dict = {"sentence": test_sentences, "label": test_labels}
test_ner_df = pd.DataFrame(test_ner_dict)

print("개체명 인식 학습 데이터 개수: {}".format(len(train_ner_df)))
print("개체명 인식 테스트 데이터 개수: {}".format(len(test_ner_df)))

개체명 인식 학습 데이터 개수: 21525
개체명 인식 테스트 데이터 개수: 5382


In [4]:
for i in range(len(train_ner_df)):
    if len(train_ner_df['sentence'][i]) > 512:
        train_ner_df = train_ner_df.drop(index=i, axis=0)

for i in range(len(test_ner_df)):
    if len(test_ner_df['sentence'][i]) > 512:
        test_ner_df = test_ner_df.drop(index=i, axis=0)

train_ner_df = train_ner_df.reset_index(drop=True)
test_ner_df = test_ner_df.reset_index(drop=True)
    
print("개체명 인식 학습 데이터 개수: {}".format(len(train_ner_df)))
print("개체명 인식 테스트 데이터 개수: {}".format(len(test_ner_df)))

개체명 인식 학습 데이터 개수: 21500
개체명 인식 테스트 데이터 개수: 5378


In [5]:
ner_labels = ['UNK', 'O', 'B-PDT', 'I-PDT', 'B-MOV', 'I-MOV', 'B-TRV', 'I-TRV']

In [6]:
# 버트 토크나이저 설정

tokenizer = BertTokenizer.from_pretrained("bert-base-multilingual-cased", cache_dir='bert_ckpt')

pad_token_id = tokenizer.pad_token_id # 0
pad_token_label_id = 0
cls_token_label_id = 0
sep_token_label_id = 0

In [7]:
def bert_tokenizer(sent, MAX_LEN):
    
    encoded_dict = tokenizer.encode_plus(
        text = sent,
        truncation=True,
        add_special_tokens = True, #'[CLS]'와 '[SEP]' 추가
        max_length = MAX_LEN,           # 문장 패딩 및 자르기 진행
        pad_to_max_length = True,
        return_attention_mask = True   # 어탠션 마스크 생성
    )
    
    input_id = encoded_dict['input_ids']
    attention_mask = encoded_dict['attention_mask'] 
    token_type_id = encoded_dict['token_type_ids']
    
    return input_id, attention_mask, token_type_id

def convert_label(words, labels_idx, ner_begin_label, max_seq_len):
            
    tokens = []
    label_ids = []

    for word, slot_label in zip(words, labels_idx):
        word_tokens = tokenizer.tokenize(word)
        tokens.extend(word_tokens)
        
        # 슬롯 레이블 값이 Begin이면 I로 추가
        if int(slot_label) in ner_begin_label:
            label_ids.extend([int(slot_label)] + [int(slot_label) + 1] * (len(word_tokens) - 1))
        else:
            label_ids.extend([int(slot_label)] * len(word_tokens))
  
    # [CLS] and [SEP] 설정
    special_tokens_count = 2
    if len(label_ids) > max_seq_len - special_tokens_count:
        label_ids = label_ids[: (max_seq_len - special_tokens_count)]

    # [SEP] 토큰 추가
    label_ids += [sep_token_label_id]

    # [CLS] 토큰 추가
    label_ids = [cls_token_label_id] + label_ids
    
    padding_length = max_seq_len - len(label_ids)
    label_ids = label_ids + ([pad_token_label_id] * padding_length)
    
    return label_ids

In [8]:
# 테스트용
ner_begin_label = [ner_labels.index(begin_label) for begin_label in ner_labels if "B" in begin_label]
ner_begin_label_string = [ner_labels[label_index] for label_index in ner_begin_label]

print(ner_begin_label)
print(ner_begin_label_string)

[2, 4, 6]
['B-PDT', 'B-MOV', 'B-TRV']


In [9]:
ner_begin_label = [ner_labels.index(begin_label) for begin_label in ner_labels if "B" in begin_label]

def create_inputs_targets(df):
    input_ids = []
    attention_masks = []
    token_type_ids = []
    label_list = []

    for i in range(len(df)):
        sentence, labels = df['sentence'][i], df['label'][i]
        words = sentence.split()
        labels_idx = []

        for label in labels:
            labels_idx.append(ner_labels.index(label) if label in ner_labels else ner_labels.index("UNK"))


        #assert len(words) == len(labels_idx)

        input_id, attention_mask, token_type_id = bert_tokenizer(sentence, MAX_LEN)

        convert_label_id = convert_label(words, labels_idx, ner_begin_label, MAX_LEN)

        input_ids.append(input_id)
        attention_masks.append(attention_mask)
        token_type_ids.append(token_type_id)
        label_list.append(convert_label_id)

    input_ids = np.array(input_ids, dtype=int)
    attention_masks = np.array(attention_masks, dtype=int)
    token_type_ids = np.array(token_type_ids, dtype=int)
    label_list = np.asarray(label_list, dtype=int) #레이블 토크나이징 리스트
    inputs = (input_ids, attention_masks, token_type_ids)
    
    return inputs, label_list

In [10]:
train_inputs, train_labels = create_inputs_targets(train_ner_df)
test_inputs, test_labels = create_inputs_targets(test_ner_df)



In [11]:
class TFBertNERClassifier(tf.keras.Model):
    def __init__(self, model_name, dir_path, num_class):
        super(TFBertNERClassifier, self).__init__()

        self.bert = TFBertModel.from_pretrained(model_name, cache_dir=dir_path)
        self.dropout = tf.keras.layers.Dropout(self.bert.config.hidden_dropout_prob)
        self.classifier = tf.keras.layers.Dense(num_class, 
                                                kernel_initializer=tf.keras.initializers.TruncatedNormal(self.bert.config.initializer_range),
                                                name="ner_classifier")
        

    def call(self, inputs, attention_mask=None, token_type_ids=None, training=False):

        #outputs 값: # sequence_output, pooled_output, (hidden_states), (attentions)
        outputs = self.bert(inputs, attention_mask=attention_mask, token_type_ids=token_type_ids)
        sequence_output = outputs[0]
                
        sequence_output = self.dropout(sequence_output, training=training)
        logits = self.classifier(sequence_output)

        return logits

In [12]:
ner_model = TFBertNERClassifier(model_name='bert-base-multilingual-cased',
                                  dir_path='bert_ckpt',
                                  num_class=len(ner_labels))

Some layers from the model checkpoint at bert-base-multilingual-cased were not used when initializing TFBertModel: ['mlm___cls', 'nsp___cls']
- This IS expected if you are initializing TFBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of TFBertModel were initialized from the model checkpoint at bert-base-multilingual-cased.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions without further training.


In [13]:
def compute_loss(labels, logits):
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction=tf.keras.losses.Reduction.NONE
    )
    
    # 0의 레이블 값은 손실 값을 계산할 때 제외
    active_loss = tf.reshape(labels, (-1,)) != 0
    reduced_logits = tf.boolean_mask(tf.reshape(logits, (-1, shape_list(logits)[2])), active_loss)
    labels = tf.boolean_mask(tf.reshape(labels, (-1,)), active_loss)
    
    return loss_fn(labels, reduced_logits)

In [14]:
ner_labels = ['UNK', 'O', 'PDT-B', 'PDT-I', 'MOV-B', 'MOV-I', 'TRV-B', 'TRV-I']

In [15]:
class F1Metrics(tf.keras.callbacks.Callback):
    def __init__(self, x_eval, y_eval):
        self.x_eval = x_eval
        self.y_eval = y_eval

    def compute_f1_pre_rec(self, labels, preds):

        return {
            "accuracy": accuracy_score(labels, preds),
            "precision": precision_score(labels, preds, suffix=True),
            "recall": recall_score(labels, preds, suffix=True),
            "f1": f1_score(labels, preds, suffix=True)
        }


    def show_report(self, labels, preds):
        return classification_report(labels, preds, suffix=True)
        
    def on_epoch_end(self, epoch, logs=None):

        results = {}
        
        pred = self.model.predict(self.x_eval)
        label = self.y_eval
        pred_argmax = np.argmax(pred, axis = 2)

        slot_label_map = {i: label for i, label in enumerate(ner_labels)}

        out_label_list = [[] for _ in range(label.shape[0])]
        preds_list = [[] for _ in range(label.shape[0])]

        for i in range(label.shape[0]):
            for j in range(label.shape[1]):
                if label[i, j] != 0:
                    out_label_list[i].append(slot_label_map[label[i][j]])
                    if pred_argmax[i][j] ==0:
                        pred_argmax[i][j]=1
                    preds_list[i].append(slot_label_map[pred_argmax[i][j]])
                    
        result = self.compute_f1_pre_rec(out_label_list, preds_list)
        results.update(result)

        print("********")
        print("F1 Score")
        for key in sorted(results.keys()):
            print("{}, {:.4f}".format(key, results[key]))
        print("\n" + self.show_report(out_label_list, preds_list))
        print("********")

f1_score_callback = F1Metrics(test_inputs, test_labels)

In [16]:
# Prepare training: Compile tf.keras model with optimizer, loss and learning rate schedule
optimizer = tf.keras.optimizers.Adam(2e-5)
# ner_model.compile(optimizer=optimizer, loss=compute_loss, run_eagerly=True)
ner_model.compile(optimizer=optimizer, loss=compute_loss)

In [None]:
model_name = "tf2_bert_ner"

checkpoint_path = os.path.join(DATA_OUT_PATH, model_name, 'weights.h5')
checkpoint_dir = os.path.dirname(checkpoint_path)

# Create path if exists
if os.path.exists(checkpoint_dir):
    print("{} -- Folder already exists \n".format(checkpoint_dir))
else:
    os.makedirs(checkpoint_dir, exist_ok=True)
    print("{} -- Folder create complete \n".format(checkpoint_dir))
    
cp_callback = ModelCheckpoint(
    checkpoint_path, verbose=1, save_best_only=True, save_weights_only=True)

history = ner_model.fit(train_inputs, train_labels, batch_size=BATCH_SIZE, epochs=NUM_EPOCHS,
                        callbacks=[cp_callback, f1_score_callback])

print(history.history)

data_out/KOR\tf2_bert_ner -- Folder already exists 

Epoch 1/7
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module, class, method, function, traceback, frame, or code object was expected, got cython_function_or_method
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module, class, method, function, traceback, frame, or code object was expected, got cython_function_or_method
Instructions for updating:
The `validate_indices` argument has no effect. Indices are always validated on CPU and never validated on GPU.


In [None]:
ner_model.save_weights("save_path")

In [None]:
plot_graphs(history, 'loss')

# 모델 예측하기

In [112]:
ner_model.load_weights("save_path")

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x190a80cec40>

In [82]:
def new_predict(text):
    input_ids = []
    attention_masks = []
    token_type_ids = []
    label_list = []
    text_list = []

    input_id, attention_mask, token_type_id = bert_tokenizer(text, MAX_LEN)
    input_ids.append(input_id)
    attention_masks.append(attention_mask)
    token_type_ids.append(token_type_id)
    
    new_input_ids = np.array(input_ids, dtype=int)
    new_attention_masks = np.array(attention_masks, dtype=int)
    new_type_ids = np.array(token_type_ids, dtype=int)
    new_inputs = (new_input_ids, new_attention_masks, new_type_ids)
    
    predict = ner_model.predict(new_inputs, batch_size=512)
    predict_list = predict[0].tolist()
    print(len(predict_list[0]))
    
    index = []
    for i in predict_list:
        index.append(i.index(max(i)))
    for i in index:
        label_list.append(ner_labels[i])
        
    for i,j in zip(input_id,label_list):
        if j in ['PDT-B','PDT-I', 'MOV-B','MOV-I', 'TRV-B', 'TRV-I']:
            text_list.append([tokenizer.decode(i),j])   
    for i ,j in text_list:
        if i not in ["[ P A D ]","[ C L S ]","[ S E P ]","[ U N K ]"]:
            print(i ,j)

In [20]:
new_predict('3년만에 영화를 보는데 배우들 연기는 좋은데 영화 스토리가 별로다.')

NameError: name 'new_predict' is not defined

In [62]:
new_predict('이 마스크팩을 사용한지 3년째입니다. 일단 보습과 진정에는 매우 좋아요. 하지만 자극적이라서 피부가 민감하신 분은 사용하기 힘들듯 합니다 ㅠㅠ')

보 PDT-B
# # 습 PDT-I
# # 과 PDT-I
진 PDT-I
# # 정 에 PDT-I
# # 는 PDT-I
매 우 PDT-I
좋 PDT-I
# # 아 PDT-I
# # 요 PDT-I
. PDT-I
자 PDT-B
# # 극 PDT-I
# # 적 PDT-I
# # 이 PDT-I
# # 라 PDT-I
# # 서 PDT-I
사 PDT-I
# # 용 PDT-I
# # 하 기 PDT-I
힘 PDT-I
# # 들 PDT-I
# # 듯 PDT-I
합 PDT-I
# # 니 다 PDT-I
