In [None]:
from datasets import Dataset
import json

# Load the dataset from a JSON file
with open('../data/corpus/train.json', 'r') as f:
    data = json.load(f)

d = {
    'slot': [item['slot'] for item in data],
    'text': [item['text'] for item in data],
    'position': [item['positions'] for item in data]
}


# create dataset from dict (train split)
dataset = Dataset.from_dict(d)
# 

In [None]:
dataset = dataset.train_test_split(test_size=0.025)

In [None]:
from transformers import AutoTokenizer

model_checkpoint = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

In [None]:
def classes(data):
    l = []
    for item in data:
        l.extend(item['slot'].keys())

    list_set = set(l)
    length = len(list_set)
    return list(list_set), length

In [None]:

class_labels, no_classes = classes(data)
print(class_labels)
print(no_classes)

In [None]:
def class_mapper(class_labels):
    no_classes = len(class_labels)
    d = {}
    # 0 is reserved for no class
    d['O'] = 0
    for i in range(1, no_classes + 1):
        d[class_labels[i-1]] = i
    
    # and another dictionary to map the index to the class label

    d_reverse = {}
    d_reverse[0] = 'O'
    for i in range(1, no_classes + 1):
        d_reverse[i] = class_labels[i-1]

    return d, d_reverse

mapper, unmapper = class_mapper(class_labels)
print(mapper, unmapper)


In [None]:
def align_positions_with_tokens(input_ids, offset_mapping, position):
    begin_end_tokens = [(offset_mapping[i][0], offset_mapping[i][1])  for i, token in enumerate(input_ids)]
    labels = [ 0 for i in range(len(input_ids))]
    labels[0] = -100
    # find index of SEP token (102)
    sep_index = input_ids.index(102)
    # from sep_index to the end of the list, set the label to -100
    for i in range(sep_index, len(labels)):
        labels[i] = -100

    
    begin_end_tokens = begin_end_tokens[1:-1]
    for key, val in position.items():
        if (val != None):
            begin_gt = val['begin']
            end_gt = val['end']

            class_label = key


            # find the indices of the tokens that contain the begin and end of the ground truth
            begin_token = [i for i, token in enumerate(begin_end_tokens) if token[0] >= begin_gt and token[0] < end_gt ]
            end_token = [i for i, token in enumerate(begin_end_tokens) if token[1] == end_gt ]


            # create list of indices of tokens that are part of the ground truth
            try: 
                tokens = [i for i in range(begin_token[0], end_token[0]+1)]
            except Exception as e:
                print(begin_end_tokens)
                print(f'begin: {begin_gt}, end: {end_gt}, class: {class_label}')
                print(f'begin_token: {begin_token}, end_token: {end_token}')
                print('---')
                raise e

            # set the label of the tokens that are part of the ground truth
            for token in tokens:
                labels[token + 1] = mapper[class_label]
    
    return labels

sample = dataset['train'][12239]
inputs = tokenizer(sample["text"], is_split_into_words=False, return_offsets_mapping=True)
print(sample['text'])
print(inputs.tokens())
labels = align_positions_with_tokens(inputs['input_ids'], inputs['offset_mapping'], sample['position'])
print(labels)


    

In [None]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["text"], is_split_into_words=False, return_offsets_mapping=True, truncation=True, padding=True, max_length=48
    )
    new_labels = []
    for i, position in enumerate(examples['position']):
        try:
            new_labels.append(align_positions_with_tokens(tokenized_inputs['input_ids'][i], tokenized_inputs['offset_mapping'][i], position))
        except Exception as e:
            print( examples['text'][i])
            print(tokenized_inputs.tokens(i))
            print(f'index: {i}')
            raise e

    tokenized_inputs["labels"] = new_labels
    return tokenized_inputs

In [None]:
tokenized_datasets = dataset.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=["slot", "text", "position"],
)

In [None]:
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

In [None]:
import evaluate

metric = evaluate.load("seqeval")

In [None]:
def decode_labels(labels):
    return [unmapper[label] if label != -100 else 'O' for label in labels]

In [None]:

decoded_labels = decode_labels(tokenized_datasets['train']["labels"][0])
print(decoded_labels)
pred = decoded_labels.copy()
pred[1] = 'bpm_greater_than'

metric.compute(predictions=[pred], references=[decoded_labels], )

In [None]:
import numpy as np
def compute_metrics(eval_preds):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)

    # Remove ignored index (special tokens) and convert to labels
    true_labels = [[ unmapper[label] for label in sample if label != -100] for sample in labels]
    true_predictions = [
        [unmapper[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    all_metrics = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": all_metrics["overall_precision"],
        "recall": all_metrics["overall_recall"],
        "f1": all_metrics["overall_f1"],
        "accuracy": all_metrics["overall_accuracy"],
    }

In [None]:
from transformers import DistilBertForTokenClassification 

model = DistilBertForTokenClassification.from_pretrained(
    model_checkpoint,
    id2label=unmapper,
    label2id=mapper,
)

model.config.num_labels = no_classes + 1


In [None]:
from transformers import TrainingArguments

args = TrainingArguments(
    "distilbert-piu-search",
    evaluation_strategy='steps',
    save_strategy="epoch",
    learning_rate=2e-5,
    num_train_epochs=3,
    weight_decay=0.01,
    eval_steps=500,
    per_device_train_batch_size=16,
)

In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=tokenized_datasets['train'],
    eval_dataset=tokenized_datasets["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=tokenizer,
)
trainer.train()

In [None]:
# load model from disk checkpoint
model_cpu = DistilBertForTokenClassification.from_pretrained(
    "distilbert-piu-search/checkpoint-6094",
)

In [None]:
from transformers import pipeline
token_classifier = pipeline(
    "token-classification", model=model_cpu, tokenizer=tokenizer, aggregation_strategy="simple"
)


In [None]:

pred = token_classifier("EXC d 20")
print(pred)


In [None]:
[ { "class": p['entity_group'], "word": p['word'], "start": p['start'], "end": p['end']} for p in pred]