In [18]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
from transformers import BertTokenizerFast, BertForTokenClassification
from sklearn.model_selection import train_test_split

train_df = pd.read_json("/content/drive/MyDrive/Colab Notebooks/pii_detection_ner_bert/train.json")

train_df.head()

In [None]:
train_df['tokens_len'] = train_df['tokens'].apply(lambda x: len(x))
train_df['labels_len'] = train_df['labels'].apply(lambda x: len(x))

train_df

In [None]:
train_df.query('tokens_len != labels_len')

In [None]:
feature_count = train_df['labels'].apply(pd.Series).stack().value_counts()

feature_count

In [None]:
feature_names = feature_count.index.tolist()

feature_names

In [None]:
label_mapping = {
    'O': 0,
    'B-NAME_STUDENT': 1,
    'I-NAME_STUDENT': 2,
    'B-URL_PERSONAL': 3,
    'B-ID_NUM': 4,
    'B-EMAIL': 5,
    'I-STREET_ADDRESS': 6,
    'I-PHONE_NUM': 7,
    'B-USERNAME': 8,
    'B-PHONE_NUM': 9,
    'B-STREET_ADDRESS': 10,
    'I-URL_PERSONAL': 11,
    'I-ID_NUM': 12
}

train_df['label_ids'] = train_df.labels.apply(lambda labels: [label_mapping[label] for label in labels])

train_df

In [None]:
model_name = "bert-base-uncased"

tokenizer = BertTokenizerFast.from_pretrained(model_name)

tokenizer

In [None]:
tokenizer.model_max_length

In [None]:
model_max_length = tokenizer.model_max_length

window_size = model_max_length
overlap = model_max_length / 4

split_tokens = []
split_label_ids = []
split_document_ids = []

for index, row in train_df.iterrows():
    tokens = row['tokens']
    label_ids = row['label_ids']
    document_id = row['document']

    for i in range(0, len(tokens), window_size):
        split_tokens.append(tokens[i:i+window_size])
        split_label_ids.append(label_ids[i:i+window_size])
        split_document_ids.append(document_id)

split_token_label_df = pd.DataFrame({
    'document': split_document_ids,
    'tokens': split_tokens,
    'label_ids': split_label_ids
})

split_token_label_df

In [None]:
print(split_token_label_df['tokens'][0])

In [None]:
max_seq_length = split_token_label_df['tokens'].apply(lambda x: len(x)).max()

tokenized_input = split_token_label_df['tokens'].apply(lambda tokens: tokenizer(tokens, max_length=max_seq_length, padding='max_length', truncation=True, is_split_into_words=True))

tokenized_input

In [None]:
tokenized_input[0].keys()

In [None]:
print(tokenized_input[0]['input_ids'])

In [None]:
print(tokenized_input[0].tokens())

In [None]:
print(tokenized_input[0].word_ids())

In [None]:
tokenized_word_ids = tokenized_input.apply(lambda x: x.word_ids())

tokenized_word_ids

In [None]:
def align_labels_with_tokens(labels, word_ids):
    new_labels = []
    current_word = None
    for word_id in word_ids:
        if word_id is None:
            new_labels.append(-100)
        elif word_id != current_word:
            label = -100 if word_id is None else labels[word_id]
            new_labels.append(label)
            current_word = word_id
        else:
            label = labels[word_id]
            new_labels.append(label)

    return new_labels

split_token_label_df['aligned_label_ids'] = split_token_label_df.apply(lambda row: align_labels_with_tokens(row['label_ids'], tokenized_word_ids.loc[row.name]), axis=1)

split_token_label_df

In [None]:
for token, label in zip(tokenizer.convert_ids_to_tokens(tokenized_input[0]['input_ids']), split_token_label_df['aligned_label_ids'][0]):
    print(f"{token} {label}")

In [None]:
split_token_label_df['input_ids'] = tokenized_input.apply(lambda x: x['input_ids'])
split_token_label_df['token_type_ids'] = tokenized_input.apply(lambda x: x['token_type_ids'])
split_token_label_df['attention_mask'] = tokenized_input.apply(lambda x: x['attention_mask'])

split_token_label_df

In [None]:
columns_to_copy = ['input_ids', 'token_type_ids', 'attention_mask', 'aligned_label_ids']

token_df = pd.DataFrame(split_token_label_df[columns_to_copy])
token_df.rename(columns={'aligned_label_ids': 'labels'}, inplace=True)

token_df

In [None]:
train_dataset, test_dataset = train_test_split(token_df, test_size=0.3, random_state=42)
validation_dataset, test_dataset = train_test_split(test_dataset, test_size=0.5, random_state=42)

In [None]:
len(train_dataset), len(validation_dataset), len(test_dataset)

In [None]:
from datasets import Dataset, DatasetDict

features = ['input_ids', 'token_type_ids', 'attention_mask', 'labels']

final_dataset = DatasetDict({
    'train': Dataset.from_pandas(train_dataset[features]),
    'validation': Dataset.from_pandas(validation_dataset[features]),
    'test': Dataset.from_pandas(test_dataset[features]),
})

final_dataset

In [None]:
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

In [None]:
batch = data_collator([final_dataset['train'][i] for i in range(20)])

batch

In [None]:
!pip install evaluate --no-index --find-links=file:///kaggle/input/module-evaluate/kaggle/working
!pip install seqeval --no-index --find-links=file:///kaggle/input/seqeval-whl

In [None]:
import evaluate
metric = evaluate.load("seqeval")

In [None]:
import numpy as np

def compute_metrics(eval_preds):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)

    true_labels = [[feature_names[l] for l in label if l!=-100] for label in labels]
    true_predictions = [[feature_names[p] for p, l in zip(prediction, label) if l!=-100] for prediction, label in zip(predictions, labels)]

    all_metrics = metric.compute(predictions=true_predictions, references=true_labels)

    return {"precision": all_metrics['overall_precision'],
           "recall": all_metrics['overall_recall'],
           "f1": all_metrics['overall_f1'],
           "accuracy": all_metrics['overall_accuracy']}

In [None]:
bert_model = BertForTokenClassification.from_pretrained(model_name, num_labels=len(feature_count))

bert_model.config.num_labels

In [None]:
from transformers import TrainingArguments

output_directory = "/content/drive/MyDrive/Colab Notebooks/PII Detection using NER BERT"

args = TrainingArguments(output_dir=output_directory,
                         evaluation_strategy='epoch',
                         save_strategy='epoch',
                         learning_rate=2e-5,
                         num_train_epochs=3,
                         weight_decay=0.01,
                        report_to='none',
                        save_total_limit=1)

In [None]:
from transformers import Trainer

trainer = Trainer(model=bert_model,
                  args=args,
                  train_dataset= final_dataset['train'],
                  eval_dataset= final_dataset['validation'],
                  data_collator=data_collator,
                  compute_metrics=compute_metrics,
                  tokenizer=tokenizer)

trainer.train()

In [None]:
reverse_label_mapping = {
    'LABEL_0': 'O',
    'LABEL_1': 'B-NAME_STUDENT',
    'LABEL_2': 'I-NAME_STUDENT',
    'LABEL_3': 'B-URL_PERSONAL',
    'LABEL_4': 'B-ID_NUM',
    'LABEL_5': 'B-EMAIL',
    'LABEL_6': 'I-STREET_ADDRESS',
    'LABEL_7': 'I-PHONE_NUM',
    'LABEL_8': 'B-USERNAME',
    'LABEL_9': 'B-PHONE_NUM',
    'LABEL_10': 'B-STREET_ADDRESS',
    'LABEL_11': 'I-URL_PERSONAL',
    'LABEL_12': 'I-ID_NUM'
}


In [None]:
test_df = pd.read_json("/content/drive/MyDrive/Colab Notebooks/PII Detection using NER BERT/test.json")

test_df.head()

In [None]:
len(test_df)

In [None]:
trainer.state.global_step

In [None]:
last_checkpoint_folder  = f"/content/drive/MyDrive/Colab Notebooks/PII Detection using NER BERT/checkpoint-{trainer.state.global_step}"

last_checkpoint_folder

In [None]:
from transformers import pipeline

token_classifier = pipeline("token-classification", model=last_checkpoint_folder, aggregation_strategy="simple")

In [None]:
test_df['full_text'][0]

In [None]:
token_classifier(test_df['full_text'][0])

In [None]:
import csv
from IPython.display import clear_output

file_name = "submission.csv"

with open(file_name, 'w', newline='') as csv_file:
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(['row_id', 'document', 'token', 'label'])

    for i in range(len(test_df)):
        pii_detection = token_classifier(test_df['full_text'][i])
        discovered_positions = []

        clear_output(wait=True)
        print(f"Now reading Row: {i}")

        for j in range(len(pii_detection)):
            if pii_detection[j]['entity_group'] != 'LABEL_0':
                pii_word = pii_detection[j]['word']
                pii_entity_group = pii_detection[j]['entity_group']

                tokens_list = [token.lower() for token in test_df['tokens'][i]]
                token_position = next((index for index, token in enumerate(tokens_list) if token == pii_word and index not in discovered_positions), None)

                if token_position is not None:
                    discovered_positions.append(token_position)
                    token = pii_word
                else:
                    token = ''

                row_id = i
                document = test_df['document'][i]
                token = token_position
                label = reverse_label_mapping[pii_entity_group]

                csv_writer.writerow([row_id, document, token, label])

                print(f"Word: {pii_word}, Predicted Label: {reverse_label_mapping[pii_entity_group]}")

    print("Finished Reading the Test Data")