In [1]:
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from datasets import load_dataset
from collections import Counter
from conlleval import evaluate



In [2]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.ffn = keras.Sequential(
            [
                layers.Dense(ff_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training=False):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)



In [4]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.pos_emb - layers.Embedding(
            input_dim=maxlen, output_dim=embed_dim
        )

    def call(self, inputs):
        maxlen = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        position_embeddings = self.pos_emb(positions)
        token_embeddings = self.token_emb(inputs)
        return token_embeddings + position_embeddings


In [5]:
class NERModel(keras.Model):
    def __init__(self, num_tags, vocab_size, maxlen=128, embed_dim=32, num_heads=2, ff_dim=32):
        super(NERModel, self).__init__()
        self.embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
        self.transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
        self.dropout1 = layers.Dropout(0.1)
        self.ff = layers.Dense(ff_dim, activation="relu")
        self.dropout2 = layers.Dropout(0.1)
        self.ff_final = layers.Dense(num_tags, activation="softmax")

    def call(self, inputs, training=False):
        x = self.embedding_layer(inputs)
        x = self.transformer_block(x)
        x = self.dropout1(x, training=training)
        x = self.ff(x)
        x = self.dropout2(x, training=training)
        x = self.ff_final(x)
        return x


In [11]:
conll_data = load_dataset("conll2003.py")

Downloading and preparing dataset conll2003/conll2003 (download: Unknown size, generated: Unknown size, post-processed: Unknown size, total: Unknown size) to C:\Users\Administrator\.cache\huggingface\datasets\conll2003\conll2003\1.0.0\dce97da350908f6c978cf452149c3683e4f5c5ab7005b3ae5d3c986a93332525...


ConnectionError: Couldn't reach https://github.com/davidsbatista/NER-datasets/raw/master/CONLL2003/train.txt

In [None]:
def export_to_file(export_file_path, data):
    with open(export_file_path, "w") as f:
        for record in data:
            ner_tags = record["ner_tags"]
            tokens = record["tokens"]
            f.write(
                str(len(tokens))
                + "\t"
                + "\t".join(tokens)
                + "\t"
                + "\t".join(map(str, ner_tags))
                + "\n"
            )
data_dir = "datas/conll2003"
os.makedirs(data_dir, exist_ok=True)
export_to_file(os.path.join(data_dir,"conll_train.txt"), conll_data["train"])
export_to_file(os.path.join(data_dir,"conll_val.txt"), conll_data["validation"])



In [12]:
def make_tag_lookup_table():
    iob_labels = ["B", "I"]
    ner_labels = ["PER", "ORG", "LOC", "MISC"]
    all_labels = [(label1, label2) for label2 in ner_labels for label1 in iob_labels]
    all_labels = ["_".join([a, b]) for a, b in all_labels]
    all_labels = ["[PAD]", "O"] + all_labels
    return dict(zip(range(0, len(all_labels)+1), all_labels))

mapping = make_tag_lookup_table()
print(mapping)


{0: '[PAD]', 1: 'O', 2: 'B_PER', 3: 'I_PER', 4: 'B_ORG', 5: 'I_ORG', 6: 'B_LOC', 7: 'I_LOC', 8: 'B_MISC', 9: 'I_MISC'}


In [None]:
all_tokens = sum(conll_data["train"]["tokens"], [])
all_tokens_array = np.array(list(map(str.lower, all_tokens)))
counter = Counter(all_tokens_array)

print(len(counter))

num_tags = len(mapping)
vocab_size = 20000

vocabulary = [token for token, count in counter.most_common(vocab_size - 2)]

lookup_layer = layers.experimental.preprocessing.StringLookup(
    vocabulary=vocabulary
)



In [None]:
train_data = tf.data.TextLineDataset(os.path.join(data_dir, "conll_train.txt"))
val_data = tf.data.TextLineDataset(os.path.join(data_dir, "conll_val.txt"))

print(list(train_data.take(1).as_numpy_iterator()))


In [None]:
def map_record_to_training_data(record):
    record = tf.strings.split(record, sep="\t")
    length = tf.strings.to_number(record[0], out_type=tf.int32)
    tokens = record[1: length+1]
    tags = record[length+1:]
    tags = tf.strings.to_number(tags, out_type=tf.int64)
    tags += 1
    return token, tags


def lowercase_and_convert_to_ids(tokens):
    tokens = tf.strings.lower(tokens)
    return lookup_layer(tokens)


batch_size = 32
train_dataset = (
    train_data.map(map_record_to_training_data)
    .map(lambda x, y: (lowercase_and_convert_to_ids(x), y))
    .padded_batch(batch_size)
)

val_dataset = (
    val_data.map(map_record_to_training_data)
    .map(lambda x, y: (lowercase_and_convert_to_ids(x), y))
    .padded_batch(batch_size)
)

ner_model = NERModel(num_tags, vocab_size, embed_dim=32, num_heads=4, ff_dim=64)


In [None]:
class CustomNonPaddingTokenLoss(keras.losses.Loss):
    def __init__(self, name="custom_ner_loss"):
        super(CustomNonPaddingTokenLoss, self).__init__(name=name)

    def call(self, y_true, y_pred):
        loss_fn = keras.losses.SparseCategoricalCrossentropy(
            from_logits=True, reduction=keras.losses.Reduction.NONE
        )
        loss = loss_fn(y_true, y_pred)
        mask = tf.cast((y_true > 0), dtype=tf.float32)
        loss = loss * mask
        return tf.reduce_sum(loss) / tf.reduce_sum(mask)

loss = CustomNonPaddingTokenLoss()

In [None]:
ner_model.compile(optimizer="adam", loss=loss)
ner_model.fit(train_dataset, epochs=10)

def tokenize_and_convert_to_ids(text):
    tokens = text.split()
    return lowercase_and_convert_to_ids(tokens)

sample_input = tokenize_and_convert_to_ids(
    "eu rejects german call to boycott british lamb"
)

sample_input = tf.reshape(sample_input, shape=[1, -1])

print(sample_input)

output = ner_model.predict(sample_input)
prediction = np.argmax(output, axis=-1)[0]
prediction = [mapping[i] for i in prediction]

print(prediction)



In [None]:
def calculate_metrics(dataset):
    all_true_tag_ids, all_predicted_tag_ids = [], []

    for x, y in dataset:
        output = ner_model.predict(x)
        predictions = np.argmax(output, axis=-1)
        predictions = np.reshape(predictions, [-1])
        true_tag_ids = np.reshape(y, [-1])
        mask = (true_tag_ids > 0) & (predictions > 0)

        true_tag_ids = true_tag_ids[mask]

        predicted_tag_ids = predictions[mask]

        all_true_tag_ids.append(true_tag_ids)
        all_predicted_tag_ids.append(predicted_tag_ids)

    all_true_tag_ids = np.concatenate(all_true_tag_ids)
    all_predicted_tag_ids = np.concatenate(all_predicted_tag_ids)
    predicted_tags = [mapping[tag] for tag in all_predicted_tag_ids]
    real_tags = [mapping[tag] for tag in all_true_tag_ids]
    evaluate(real_tags, predicted_tags)

calculate_metrics(val_dataset)