## Annotating the messages

In [2]:
from torch.cuda import is_available

is_available()

False

Here we're writing the patterns into a file for further annotation

In [3]:
# patterns = []
# for pattern_list in result['patterns']:
#     for pattern in pattern_list:
#         patterns.append(pattern)

# with open("../datasets/patterns.txt", 'w') as pattern_file:
#     for pattern in patterns[:20]:
#         pattern_file.write(pattern.strip() + '\n')

At first we need to open the file with annotations and transform it into an adequate form

In [4]:
from json import loads
with open('../datasets/admin1.jsonl', 'r') as annotated_file:
    data = loads(annotated_file.read())

raw_annotations = data['label']
annotations = []
for annotation in raw_annotations:
    start, end, label = annotation
    annotations.append({'start': start, 'end': end, 'label': label, 'text': data['data'][start:end]})

annotations = sorted(annotations, key=lambda x: x['start'])
split_content = data['data'].split('\n')

current_offset = 0
next_offset = len(split_content[0]) + 1
message_number = 0
annotated_messages = [{'text': text, 'annotations': []} for text in split_content]
for annotation in annotations:
    if annotation['start'] >= next_offset and message_number < len(split_content) - 1:
        message_number += 1
        current_offset = next_offset
        next_offset += len(split_content[message_number]) + 1
    offset_annotation = {
        'start': annotation['start'] - current_offset,
        'end': annotation['end'] - current_offset,
        'label': annotation['label'],
        'text': annotation['text']
    }
    annotated_messages[message_number]['annotations'].append(offset_annotation)

len(annotated_messages)


708

This is necessary because the data was a bit mangled for some reason - some line breaks are lost

In [5]:
import re

# s = re.sub(r'\(\.\*\?\)([A-Z])', r'(.*?)\n\1', s)

for message in annotated_messages:
    message['text'] = re.sub(r'\(\.\*\?\)([A-Z])', r'(.*?)\n\1', message['text'])



In [6]:
new_messages = []
for message in annotated_messages:
    pos = 0
    num_splits = 0
    while '\n' in message['text'][pos:]:
        new_pos = message['text'].find('\n', pos)
        new_s = message['text'][pos:new_pos]
        new_a = [
            ann.copy() for ann in message['annotations']
                if ann['start'] >= pos - num_splits and ann['end'] <= new_pos - num_splits
        ]
        for annotation in new_a:
            annotation['start'] -= pos - num_splits
            annotation['end'] -= pos - num_splits
        pos = new_pos + 1
        new_messages.append({
            'text': new_s,
            'annotations': new_a
        })
        num_splits += 1

    new_a = [ann.copy() for ann in message['annotations'] if ann['start'] >= pos - num_splits]
    for annotation in new_a:
        annotation['start'] -= pos - num_splits
        annotation['end'] -= pos - num_splits
    new_messages.append({
        'text': message['text'][pos:],
        'annotations': new_a
    })

annotated_messages = new_messages


Now we want annotations to correspond to tokens instead of string positions

In [7]:
from transformers import BertTokenizerFast, BatchEncoding
from tokenizers import Encoding

tokenizer = BertTokenizerFast.from_pretrained('bert-base-cased')
tokenizer.add_tokens("(.*?)")
text = [message['text'] for message in annotated_messages]
tokenized_batch: BatchEncoding = tokenizer(text, padding=True, truncation=True)


In [8]:
def align_annotations(tokenized_text: Encoding, annotations):
    tokens = tokenized_text.tokens
    labels = ['O'] * len([token for token in tokens if token != '[PAD]'])
    labels.extend(['[PAD]'] * len([token for token in tokens if token == '[PAD]']))
    for anno in annotations:
        annotation_token_set = set()
        for char_ix in range(anno['start'], anno['end']):
            token_ix = tokenized_text.char_to_token(char_ix)
            if token_ix is not None:
                annotation_token_set.add(token_ix)
        if len(annotation_token_set) == 1:
            token_ix = annotation_token_set.pop()
            labels[token_ix] = f"U-{anno['label']}"
        else:
            prefixes = ['B'] + ['I'] * (len(annotation_token_set) - 2) + ['L']
            for prefix, token_ix in zip(prefixes, sorted(annotation_token_set)):
                labels[token_ix] = f"{prefix}-{anno['label']}"
    return labels

labels = [
    align_annotations(
        tokenized_batch[i],
        annotated_messages[i]['annotations']
    ) for i in range(len(annotated_messages))
]

for token, label in zip(tokenized_batch[0].tokens, labels[0]):
    print(f"{token} — {label}")


[CLS] — O
Non — B-Meaningless
- — I-Meaningless
zero — I-Meaningless
return — I-Meaningless
code — I-Meaningless
from — I-Meaningless
generate — I-Meaningless
( — I-Meaningless
97 — I-Meaningless
) — L-Meaningless
; — O
Lo — B-Meaningless
##g — I-Meaningless
##fi — I-Meaningless
##le — I-Meaningless
error — I-Meaningless
in — I-Meaningless
log — I-Meaningless
. — I-Meaningless
generate — L-Meaningless
: — O
" — O
Test — B-Meaningful
##H — I-Meaningful
##ep — I-Meaningful
##MC — I-Meaningful
FA — I-Meaningful
##TA — I-Meaningful
##L — I-Meaningful
The — I-Meaningful
efficiency — I-Meaningful
after — I-Meaningful
101 — I-Meaningful
events — I-Meaningful
is — I-Meaningful
(.*?) — I-Meaningful
! — I-Meaningful
! — I-Meaningful
! — L-Meaningful
" — O
[SEP] — O
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [PAD]
[PAD] — [P

This is a way to convert labels to numbers and back

In [9]:
from itertools import product

all_labels = set(anno['label'] for message in annotated_messages for anno in message['annotations'])
all_labels = ['[PAD]', 'O'] + [f"{prefix}-{label}" for prefix, label in product('BILU', all_labels)]
label_to_id = {label: i for i, label in enumerate(all_labels)}
id_to_label = {i: label for label, i in label_to_id.items()}

At this point we have:
 - `labels`
 - `tokenized_batch`
 - `text`

and we want all those in the dataset

In [10]:
from datasets import Dataset

def encode_dataset(dataset):
    return tokenizer(dataset['text'], padding=True)


dataset = Dataset.from_dict({
    'text': text,
    'tokens': [tokenized_batch[i].tokens for i in range(len(text))],
    'text_labels': labels,
    'labels': [[label_to_id.get(l) for l in lab] for lab in labels]
})

dataset = dataset.map(encode_dataset, batched=True)



  0%|          | 0/1 [00:00<?, ?ba/s]

In [11]:
dataset = dataset.shuffle()
train_dataset, eval_dataset = Dataset.from_dict(dataset[100:]), Dataset.from_dict(dataset[:100])
train_dataset, eval_dataset

(Dataset({
     features: ['attention_mask', 'input_ids', 'labels', 'text', 'text_labels', 'token_type_ids', 'tokens'],
     num_rows: 709
 }),
 Dataset({
     features: ['attention_mask', 'input_ids', 'labels', 'text', 'text_labels', 'token_type_ids', 'tokens'],
     num_rows: 100
 }))

In [11]:
from transformers import BertForTokenClassification, BertConfig
from transformers import Trainer, TrainingArguments
from transformers import DataCollatorForTokenClassification

model_config = BertConfig(vocab_size=30523, num_labels=len(all_labels))
model = BertForTokenClassification(model_config)
model.max_seq_length = 150

data_collator = DataCollatorForTokenClassification(tokenizer)

training_args = TrainingArguments(
    output_dir="./BERT_model",
    overwrite_output_dir=True,
    evaluation_strategy = "epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=1,
    num_train_epochs=3,
    weight_decay=0.01,
    no_cuda=True
)

trainer = Trainer(
    model=model,
    args=training_args,
    tokenizer=tokenizer,
    data_collator=data_collator,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset
)

trainer.train()

Epoch,Training Loss,Validation Loss


TrainOutput(global_step=2127, training_loss=0.22381831582970163)