In [27]:
!pip install tensorflow==2.14.0
!pip install tensorflow-addons



In [28]:
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [29]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

import warnings
warnings.filterwarnings("ignore")

import tensorflow as tf

from keras import layers
from keras.preprocessing.sequence import pad_sequences
from tensorflow_addons.layers import CRF
from tensorflow_addons.text.crf import crf_log_likelihood, crf_decode
from sklearn.metrics import classification_report
from sklearn.preprocessing import MultiLabelBinarizer

# print(tf.__version__)
# print(tf.config.list_physical_devices("GPU"))
tf.config.experimental.set_memory_growth(tf.config.list_physical_devices("GPU")[0], True)

In [30]:
vocabs_path = "/content/drive/MyDrive/Colab Notebooks/Dataset/vocabs.txt"
tags_path = "/content/drive/MyDrive/Colab Notebooks/Dataset/tags.txt"
train_path = "/content/drive/MyDrive/Colab Notebooks/Dataset/train.txt"
test_path = "/content/drive/MyDrive/Colab Notebooks/Dataset/test.txt"
model_path = "/content/drive/MyDrive/Colab Notebooks/Models/bilstm/bilstm_crf_ner.ckpt"

In [31]:
def read_dataset(file_path):

    data_list = []
    label_list = []
    dataset = []

    with open(file_path, "r", encoding="utf-8") as file:
        dataset = file.read().split("\n\n")

    dataset = [sent.split("\n") for sent in dataset]
    dataset = [[word for word in sent if word != ""] for sent in dataset]
    dataset = [[tuple(word.split("\t")) for word in sent] for sent in dataset]
    dataset = [[word for word in sent if word[0] != "*"] for sent in dataset]

    for sent in dataset:
        try:
            words = [word[0] for word in sent]
            labels = [word[1] for word in sent]
            data_list.append(words)
            label_list.append(labels)
        except:
            print("Error in sentence: ")
            print(sent)
            exit(0)

    return data_list, label_list


def get_from_txt(file_path):
    data = []
    with open(file_path, "r", encoding="utf-8") as f:
        for line in f.readlines():
            line = line.strip()
            data.append(line)
    return data

In [32]:
class BiLSTM_CRF(tf.keras.Model):
    def __init__(self, vocab_size, tag_to_ix, embedding_dim, hidden_dim):
        """
            vocab_size: vocab size
            tag_to_ix: tag to index
            embedding_dim: embedding dimension
            hidden_dim: hidden dimension
        """
        super(BiLSTM_CRF, self).__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.vocab_size = vocab_size
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)

        # self.embedding = layers.Embedding(self.vocab_size, self.embedding_dim)
        self.embedding = layers.Embedding(self.vocab_size, self.embedding_dim, mask_zero=True)

        # self.dropout = layers.Dropout(0.1)
        self.dropout = layers.Dropout(0.2)
        # self.lstm = layers.Bidirectional(layers.LSTM(self.hidden_dim // 2, return_sequences=True))
        self.lstm = layers.Bidirectional(layers.LSTM(self.hidden_dim // 2, return_sequences=True, recurrent_dropout=0.2))

        self.fc = layers.Dense(self.tagset_size)
        self.crf = CRF(self.tagset_size)


        self.transition_params = tf.Variable(tf.random.uniform(shape=(self.tagset_size, self.tagset_size)))

    def call(self, sentence):
        """
            sentence: [batch_size, max_len]
        """
        mask = tf.cast(tf.math.not_equal(sentence, 0), dtype=tf.int32)
        embedding = self.embedding(sentence)
        embedding = self.dropout(embedding)
        lstm = self.lstm(embedding)
        lstm = self.dropout(lstm)
        fc = self.fc(lstm)
        return fc

    def loss(self, y_true, y_pred):
        """
            y_true: [batch_size, max_len]
            y_pred: [batch_size, max_len, tagset_size]
        """
        log_likelihood, self.transition_params = crf_log_likelihood(y_pred, y_true, tf.reduce_sum(tf.cast(tf.math.not_equal(y_true, 0), dtype=tf.int32), axis=1), transition_params=self.transition_params)
        return -tf.reduce_mean(log_likelihood)

    def predict(self, sentence):
        """
            sentence: [batch_size, max_len]
        """
        mask = tf.cast(tf.math.not_equal(sentence, 0), dtype=tf.int32)
        embedding = self.embedding(sentence)
        embedding = self.dropout(embedding)
        lstm = self.lstm(embedding)
        lstm = self.dropout(lstm)
        fc = self.fc(lstm)
        viterbi_sequence, _ = crf_decode(fc, self.transition_params, tf.reduce_sum(mask, axis=1))
        return viterbi_sequence

In [33]:
def data_preprocess(dataset, data2idx, max_len=100):
    """
        dataset: [[word1, word2, ...], ...]
        data2idx: {word: idx, ...}
        max_len: max length of sentence
    """
    dataset = [[data2idx[word] if word in data2idx else data2idx["[UNK]"] for word in sentence] for sentence in dataset]
    dataset = pad_sequences(dataset, maxlen=max_len, padding="post")
    dataset = tf.convert_to_tensor(dataset, dtype=tf.int32)
    return dataset

In [34]:
train_data, train_label = read_dataset(train_path)
test_data, test_label = read_dataset(test_path)

tags = get_from_txt(tags_path)
vocab = get_from_txt(vocabs_path)
tags = ["[PAD]"] + tags
vocab = ["[PAD]", "[UNK]"] + vocab

tags2idx = {tag: idx for idx, tag in enumerate(tags)}
id2tag = {idx: tag for idx, tag in enumerate(tags)}
vocab2idx = {word: idx for idx, word in enumerate(vocab)}
id2vocab = {idx: word for idx, word in enumerate(vocab)}

train_data = data_preprocess(train_data, vocab2idx)
train_label = data_preprocess(train_label, tags2idx)
test_data = data_preprocess(test_data, vocab2idx)
test_label = data_preprocess(test_label, tags2idx)

print(train_data.shape)
print(train_label.shape)
print(test_data.shape)
print(test_label.shape)

EPOCHS = 5
BATCH_SIZE = 64
BUFFER_SIZE = 1000
STEPS_PER_EPOCH = len(train_data) // BATCH_SIZE

train_dataset = tf.data.Dataset.from_tensor_slices((train_data, train_label))
train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

test_dataset = tf.data.Dataset.from_tensor_slices((test_data, test_label))
test_dataset = test_dataset.batch(BATCH_SIZE, drop_remainder=True)

model = BiLSTM_CRF(len(vocab2idx), tags2idx, 128, 128)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))

# Create a callback that saves the model's weights
checkpoint_path = model_path
checkpoint_dir = os.path.dirname(checkpoint_path)

## uncomment below code for training

# model_callback = tf.keras.callbacks.ModelCheckpoint(
#     filepath=checkpoint_path,
#     save_weights_only=True,
#     verbose=1
# )

# # 创建 Early Stopping 回调
# early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# # Train the model with the new callback
# model.fit(
#     train_data,
#     train_label,
#     epochs=EPOCHS,
#     validation_data=(test_data, test_label),
#     callbacks=[model_callback, early_stopping]  # 将 early_stopping 加入 callbacks 列表
# )

# Load weights
model.load_weights(checkpoint_path).expect_partial()

# Evaluate and classification report
pred = model.predict(test_data)
pred = pred.numpy()
test_label = test_label.numpy()

tags2idx.pop("[PAD]")
tags2idx.pop("O")

pred = [[id2tag[idx] for idx in sentence] for sentence in pred]
test_label = [[id2tag[idx] for idx in sentence] for sentence in test_label]

custom_classes = list(tags2idx.keys())
pred = MultiLabelBinarizer(classes=custom_classes).fit_transform(pred)
test_label = MultiLabelBinarizer(classes=custom_classes).fit_transform(test_label)

print(classification_report(test_label, pred, target_names=tags2idx.keys()))
print("f1-score: ", classification_report(test_label, pred, target_names=tags2idx.keys(), output_dict=True)["micro avg"]["f1-score"])



(50075, 100)
(50075, 100)
(14775, 100)
(14775, 100)
              precision    recall  f1-score   support

       B-Tim       0.91      0.99      0.95      9773
       I-Tim       0.96      0.99      0.98      3721
       E-Tim       0.95      0.99      0.97     10121
       S-Tim       0.00      0.00      0.00         1
       B-Org       0.92      0.91      0.91      1468
       I-Org       0.69      0.54      0.61        37
       E-Org       0.91      0.90      0.91      1469
       S-Org       0.81      0.61      0.69       396
       B-Sym       1.00      0.99      0.99     13434
       I-Sym       0.97      0.98      0.97      5788
       E-Sym       1.00      1.00      1.00     13428
       S-Sym       0.96      0.86      0.90      1128
       B-Abb       0.99      1.00      0.99      3056
       I-Abb       0.99      0.93      0.96      2985
       E-Abb       0.99      0.95      0.97      3161
       S-Abb       0.00      0.00      0.00      1176
       B-Exa       0.99      

In [58]:
sentence_original = "昨天開始全身起紅疹。"
sentence = sentence_original
sentence = [vocab2idx[word] if word in vocab2idx else vocab2idx["[UNK]"] for word in sentence]
sentence = pad_sequences([sentence], maxlen=100, padding="post")
sentence = tf.convert_to_tensor(sentence, dtype=tf.int32)
pred = model.predict(sentence)
pred = pred.numpy()
pred = [id2tag[idx] for idx in pred[0]]
pred = pred[:len(sentence_original)]
print(pred)

['B-Tim', 'E-Tim', 'O', 'O', 'B-Org', 'E-Org', 'O', 'B-Sym', 'E-Sym', 'O']
