# 1 reading data and preprocessing

In [1]:
import numpy as np
import pickle as pk
import tensorflow as tf
from transformers import BertTokenizerFast, TFBertModel

from google.colab import drive
drive.mount('/content/drive')

def load_dataset(file_path):
    with open(file_path, 'rb') as file:
        return pk.load(file)

train_data = load_dataset('/content/drive/MyDrive/hw6_nlp/train.pickle')
val_data = load_dataset('/content/drive/MyDrive/hw6_nlp/validation.pickle')
test_data = load_dataset('/content/drive/MyDrive/hw6_nlp/test.pickle')

#create label mappings
def generate_label_maps(tags_list):
    unique_tags = set(tag for tags in tags_list for tag in tags)
    tag_to_id = {tag: idx for idx, tag in enumerate(unique_tags)}
    id_to_tag = {idx: tag for tag, idx in tag_to_id.items()}
    return tag_to_id, id_to_tag

ner_tag_to_id, ner_id_to_tag = generate_label_maps(train_data["ner_tags"])
pos_tag_to_id, pos_id_to_tag = generate_label_maps(train_data["pos_tags"])

num_ner_classes = len(ner_tag_to_id)
num_pos_classes = len(pos_tag_to_id)

tokenizer = BertTokenizerFast.from_pretrained("bert-base-cased")

# data preparation
def tag_mapper(tags, idx, word_ids, tag_map):
    tag_sequence = []
    last_word_id = None
    for word_id in word_ids:
        if word_id is None or word_id == last_word_id or word_id >= len(tags[idx]):
            tag_sequence.append(-100)
        else:
            last_word_id = word_id
            tag_sequence.append(tag_map[tags[idx][word_id]])
    return tag_sequence

def create_data_generator(data, tags_ner, tags_pos, batch_size=32):
    def generator():
        for idxs in np.array_split(np.arange(len(data["tokens"])), np.ceil(len(data["tokens"]) / batch_size)):
            tokenized_inputs = tokenizer(
                [" ".join(tokens) for tokens in data["tokens"][idxs]],
                add_special_tokens=True,
                return_tensors="tf",
                truncation=True,
                padding="max_length",
                max_length=128
            )
            ner_labels = [tag_mapper(tags_ner, idx, tokenized_inputs.word_ids(batch_index=i), ner_tag_to_id) for i, idx in enumerate(idxs)]
            pos_labels = [tag_mapper(tags_pos, idx, tokenized_inputs.word_ids(batch_index=i), pos_tag_to_id) for i, idx in enumerate(idxs)]
            yield dict(tokenized_inputs), (tf.constant(ner_labels), tf.constant(pos_labels))
    return generator

train_gen = create_data_generator(train_data, train_data["ner_tags"], train_data["pos_tags"])
val_gen = create_data_generator(val_data, val_data["ner_tags"], val_data["pos_tags"])

train_dataset = tf.data.Dataset.from_generator(train_gen, output_signature=(
    {
        "input_ids": tf.TensorSpec(shape=(None, 128), dtype=tf.int32),
        "token_type_ids": tf.TensorSpec(shape=(None, 128), dtype=tf.int32),
        "attention_mask": tf.TensorSpec(shape=(None, 128), dtype=tf.int32),
    },
    (
        tf.TensorSpec(shape=(None, 128), dtype=tf.int32),
        tf.TensorSpec(shape=(None, 128), dtype=tf.int32)
    )
))

val_dataset = tf.data.Dataset.from_generator(val_gen, output_signature=(
    {
        "input_ids": tf.TensorSpec(shape=(None, 128), dtype=tf.int32),
        "token_type_ids": tf.TensorSpec(shape=(None, 128), dtype=tf.int32),
        "attention_mask": tf.TensorSpec(shape=(None, 128), dtype=tf.int32),
    },
    (
        tf.TensorSpec(shape=(None, 128), dtype=tf.int32),
        tf.TensorSpec(shape=(None, 128), dtype=tf.int32)
    )
))


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


# 2 making the model

In [2]:
input_ids = tf.keras.Input(shape=(128,), dtype=tf.int32, name="input_ids")
token_type_ids = tf.keras.Input(shape=(128,), dtype=tf.int32, name="token_type_ids")
attention_mask = tf.keras.Input(shape=(128,), dtype=tf.int32, name="attention_mask")

class BERTEncoder(tf.keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.model = TFBertModel.from_pretrained("bert-base-cased", trainable=True)

    def call(self, inputs):
        return self.model(inputs)[0]

bert_output = BERTEncoder()({
    "input_ids": input_ids,
    "token_type_ids": token_type_ids,
    "attention_mask": attention_mask
})

# define output layers for NER and POS
def create_classification_layer(bert_output, name, num_classes):
    x = tf.keras.layers.Dropout(0.3)(bert_output)
    x = tf.keras.layers.Dense(256, activation="relu")(x)
    x = tf.keras.layers.Dropout(0.1)(x)
    return tf.keras.layers.Dense(num_classes, activation="softmax", name=name)(x)

ner_output = create_classification_layer(bert_output, "ner_output", num_ner_classes)
pos_output = create_classification_layer(bert_output, "pos_output", num_pos_classes)

model = tf.keras.Model(inputs=[input_ids, token_type_ids, attention_mask], outputs=[ner_output, pos_output])

def masked_accuracy(y_true, y_pred):
    mask = tf.not_equal(y_true, -100)
    y_true_masked = tf.boolean_mask(y_true, mask)
    y_pred_masked = tf.boolean_mask(y_pred, mask)
    return tf.reduce_mean(tf.keras.metrics.sparse_categorical_accuracy(y_true_masked, y_pred_masked))

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
    loss={
        "ner_output": tf.keras.losses.SparseCategoricalCrossentropy(ignore_class=-100),
        "pos_output": tf.keras.losses.SparseCategoricalCrossentropy(ignore_class=-100),
    },
    metrics={
        "ner_output": [masked_accuracy],
        "pos_output": [masked_accuracy],
    }
)

model.summary()


Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFBertModel: ['cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight']
- This IS expected if you are initializing TFBertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFBertModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions w

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 attention_mask (InputLayer  [(None, 128)]                0         []                            
 )                                                                                                
                                                                                                  
 input_ids (InputLayer)      [(None, 128)]                0         []                            
                                                                                                  
 token_type_ids (InputLayer  [(None, 128)]                0         []                            
 )                                                                                                
                                                                                              

# 3 training the model

In [3]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# freeze BERT layers and train only the new layers
model.get_layer('bert_encoder').trainable = False
initial_training_history = model.fit(train_dataset, validation_data=val_dataset, epochs=5)

# unfreeze the entire model and train again
model.get_layer('bert_encoder').trainable = True
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=2e-5),
    loss={
        "ner_output": tf.keras.losses.SparseCategoricalCrossentropy(ignore_class=-100),
        "pos_output": tf.keras.losses.SparseCategoricalCrossentropy(ignore_class=-100),
    },
    metrics={
        "ner_output": [masked_accuracy],
        "pos_output": [masked_accuracy],
    }
)
final_training_history = model.fit(train_dataset, validation_data=val_dataset, epochs=5)


Epoch 1/5




Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Epoch 1/5




Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [4]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
tf.get_logger().setLevel('ERROR')
# Flatten utility function
def flatten(l):
    return [item for sublist in l for item in sublist]

def calculate_macro_accuracy(dataset):
    true_ner = []
    true_pos = []
    pred_ner = []
    pred_pos = []

    for inputs, labels in dataset:
        outputs = model.predict(inputs)
        ner_preds = np.argmax(outputs[0], axis=-1)
        pos_preds = np.argmax(outputs[1], axis=-1)

        for i, label in enumerate(labels[0]):
            true_label_ner = label.numpy()
            true_label_pos = labels[1][i].numpy()
            pred_label_ner = ner_preds[i]
            pred_label_pos = pos_preds[i]

            mask = true_label_ner != -100
            true_ner.extend(true_label_ner[mask])
            pred_ner.extend(pred_label_ner[mask])

            mask = true_label_pos != -100
            true_pos.extend(true_label_pos[mask])
            pred_pos.extend(pred_label_pos[mask])

    ner_accuracy = accuracy_score(true_ner, pred_ner)
    pos_accuracy = accuracy_score(true_pos, pred_pos)

    return ner_accuracy, pos_accuracy

ner_train_acc, pos_train_acc = calculate_macro_accuracy(train_dataset)
ner_val_acc, pos_val_acc = calculate_macro_accuracy(val_dataset)






In [5]:
print(f"Train NER Macro Accuracy: {ner_train_acc}")
print(f"Train POS Macro Accuracy: {pos_train_acc}")
print(f"Validation NER Macro Accuracy: {ner_val_acc}")
print(f"Validation POS Macro Accuracy: {pos_val_acc}")

Train NER Macro Accuracy: 0.9973690119395117
Train POS Macro Accuracy: 0.9868352972186698
Validation NER Macro Accuracy: 0.9816469299169792
Validation POS Macro Accuracy: 0.9301909056428842


# 4 predicting on test

In [7]:
tf.get_logger().setLevel('ERROR')
def test_data_generator(batch_size=32):
    def generator():
        for idxs in np.array_split(np.arange(len(test_data["tokens"])), np.ceil(len(test_data["tokens"]) / batch_size)):
            tokenized_inputs = tokenizer(
                [" ".join(tokens) for tokens in test_data["tokens"][idxs]],
                add_special_tokens=True,
                return_tensors="tf",
                truncation=True,
                padding="max_length",
                max_length=128
            )
            yield dict(tokenized_inputs)
    return generator

test_dataset = tf.data.Dataset.from_generator(test_data_generator(), output_signature={
    "input_ids": tf.TensorSpec(shape=(None, 128), dtype=tf.int32),
    "token_type_ids": tf.TensorSpec(shape=(None, 128), dtype=tf.int32),
    "attention_mask": tf.TensorSpec(shape=(None, 128), dtype=tf.int32)
})

test_predictions = {"tokens": [], "ner_preds": [], "pos_preds": []}

for batch in test_dataset:
    inputs = {key: val for key, val in batch.items()}
    outputs = model.predict(inputs)

    ner_preds = np.argmax(outputs[0], axis=-1)
    pos_preds = np.argmax(outputs[1], axis=-1)

    for i in range(len(inputs["input_ids"])):
        tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"][i].numpy())
        ner_tags = ner_preds[i].tolist()
        pos_tags = pos_preds[i].tolist()

        test_predictions["tokens"].append(tokens)
        test_predictions["ner_preds"].append(ner_tags)
        test_predictions["pos_preds"].append(pos_tags)

with open('/content/drive/MyDrive/hw6_nlp/test_predictions.pickle', 'wb') as file:
    pk.dump(test_predictions, file)

with open('/content/drive/MyDrive/hw6_nlp/test_predictions.pickle', 'rb') as file:
    loaded_predictions = pk.load(file)

print("Length of NER predictions:", len(loaded_predictions['ner_preds']))
print("Length of POS predictions:", len(loaded_predictions['pos_preds']))


Length of NER predictions: 3453
Length of POS predictions: 3453
