In [1]:
from collections import Counter

from constants import *
from read_dataset import *
from visualization import *

import tensorflow as tf
from transformers import AutoTokenizer
from transformers import TFAutoModelForTokenClassification

from tqdm import tqdm

In [2]:
def encode_texts(tokenizer, doc_texts, doc_labels, max_length=128):
    input_ids = []
    attention_masks = []
    token_type_ids = []
    label_ids = []

    # Define a mapping of labels to integer IDs
    label_map = {
        'O': 0,
        'B-STREET_ADDRESS': 1, 'I-STREET_ADDRESS': 2,
        'B-PHONE_NUM': 3, 'I-PHONE_NUM': 4,
        'B-URL_PERSONAL': 5, 'I-URL_PERSONAL': 6,
        'B-ID_NUM': 7, 'I-ID_NUM': 8,
        'B-NAME_STUDENT': 9, 'I-NAME_STUDENT': 10,
        'B-USERNAME': 11,
        'B-EMAIL': 12
    }

    for doc_text, doc_label in tqdm(zip(doc_texts, doc_labels), total=len(doc_texts), desc="Encoding documents"):
        for text, labels in zip(doc_text, doc_label):
            # Ensure text is a list of words
            if isinstance(text, str):
                text = text.split()  # This line might not be necessary if text is already a list of words

            tokenized_text = tokenizer.encode_plus(
                text,
                max_length=max_length,
                padding='max_length',
                truncation=True,
                return_attention_mask=True,
                return_token_type_ids=True,
                return_offsets_mapping=True,
                is_split_into_words=True
            )
            offsets = tokenized_text.pop("offset_mapping")
            input_ids.append(tokenized_text['input_ids'])
            attention_masks.append(tokenized_text['attention_mask'])
            token_type_ids.append(tokenized_text['token_type_ids'])

            label_sequence = [label_map["O"]] * max_length
            current_label_index = 0

            for offset in offsets:
                if offset[0] == offset[1]:  # Special tokens like [CLS], [SEP], padding
                    continue
                if current_label_index < len(labels):
                    label_sequence[offset[0]] = label_map.get(labels[current_label_index], label_map["O"])
                    current_label_index += 1
            label_ids.append(label_sequence)

    return {
        'input_ids': tf.constant(input_ids),
        'attention_mask': tf.constant(attention_masks),
        'token_type_ids': tf.constant(token_type_ids),
    }, tf.constant(label_ids)
        
def predict(tokenizer, model, text):
    tokenized_text = tokenizer.encode_plus(
        text,
        max_length=128,
        padding='max_length',
        truncation=True,
        return_tensors="tf"
    )
    predictions = model(tokenized_text)
    logits = predictions.logits
    predicted_label_ids = tf.argmax(logits, axis=-1)
    return predicted_label_ids.numpy()

In [13]:
# Since train.json is too large, it was zipped
# To read the file, unzip then pass to the json parser   
unzip_file(ZIPPED_TRAIN_SET_PATH, "datasets/")
    
document_numbers_train, texts_train, tokens_train, trailing_whitespaces_train, labels_train = read_pii_json(TRAIN_SET_PATH, is_train=True)
document_numbers_test, texts_test, tokens_test, trailing_whitespaces_test = read_pii_json(TEST_SET_PATH)

flat_labels = [label for sublist in labels_train for label in sublist]
unique_labels = set(flat_labels)

# Count the label frequencies
label_counts = Counter(flat_labels)

# Prepare data for plotting
labels, frequencies = zip(*label_counts.items())

In [4]:
class F5Score(tf.keras.metrics.Metric):
    def __init__(self, name="f5_score", **kwargs):
        super(F5Score, self).__init__(name=name, **kwargs)
        self.precision = tf.keras.metrics.Precision()
        self.recall = tf.keras.metrics.Recall()
        self.beta_squared = 5**2

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_pred = tf.argmax(y_pred, axis=-1)
        self.precision.update_state(y_true, y_pred, sample_weight)
        self.recall.update_state(y_true, y_pred, sample_weight)

    def result(self):
        precision = self.precision.result()
        recall = self.recall.result()
        return (1 + self.beta_squared) * ((precision * recall) / ((self.beta_squared * precision) + recall))

    def reset_states(self):
        self.precision.reset_states()
        self.recall.reset_states()

In [5]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
train_dataset = tf.data.Dataset.from_tensor_slices(encode_texts(tokenizer, tokens_train, labels_train))

Encoding documents: 100%|██████████| 6807/6807 [06:41<00:00, 16.94it/s]  


In [10]:
num_labels = len(unique_labels)  # Define the number of unique labels
model = TFAutoModelForTokenClassification.from_pretrained("bert-base-cased", num_labels=num_labels)

optimizer = tf.keras.optimizers.Adam(learning_rate=5e-2)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
metric = tf.keras.metrics.SparseCategoricalAccuracy('accuracy')

model.compile(optimizer=optimizer, loss=loss, metrics=[metric])

model.fit(train_dataset.shuffle(1000).batch(512), epochs=3, batch_size=16)

# # Example prediction
# # predicted_label_ids = predict("Your example sentence here.", model=model, tokenizer=tokenizer)

All PyTorch model weights were used when initializing TFBertForTokenClassification.

Some weights or buffers of the TF 2.0 model TFBertForTokenClassification were not initialized from the PyTorch model and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


AttributeError: 'Variable' object has no attribute '_distribute_strategy'