In [None]:
import pandas as pd
from transformers import AutoTokenizer
import re
import pickle

# Create a label map
label_list = ['supp_name', 'supp_vat', 'date', 'amount']
label_list_B = [f'B-{label}' for label in label_list]
label_list_I = [f'I-{label}' for label in label_list]
label_list_BIO = ['O'] + [elem for pair in zip(label_list_B, label_list_I) for elem in pair]

label2id = {label: i for i, label in enumerate(label_list_BIO)}
id2label = {i: label for label, i in label2id.items()}
num_labels = len(label2id)

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

In [2]:
# Align the labels
def align_labels_with_tokens(labels, word_ids):
    new_labels = []
    current_word = None
    for word_id in word_ids:
        if word_id != current_word:
            # Start of a new word!
            current_word = word_id
            label = -100 if word_id is None else labels[word_id]
            new_labels.append(label)
        elif word_id is None:
            # Special token
            new_labels.append(-100)
        else:
            # Same word as previous token
            label = labels[word_id]
            # If the label is B-XXX we change it to I-XXX
            if label % 2 == 1:
                label += 1
            new_labels.append(label)

    return new_labels


def prepare_data(data):
    # Fix the format of the 'amount' column to always have two decimal places
    data['amount'] = data['amount'].astype(float).apply(lambda x: f"{x:.2f}")
    # Convert all columns to string to handle numeric values
    data = data.astype(str)

    # Prepare the data
    sentences = data['text_extract'].tolist()
    supp_names = data['supp_name'].tolist()
    supp_vats = data['supp_vat'].tolist()
    dates = data['date'].tolist()
    amounts = data['amount'].tolist()

    tokenized_data = []
    labels_full = []

    def find_entity_indices(text, entity, entity_type):
        patterns = [re.escape(entity)]
        if entity_type == 'date':
            if len(entity) == 10:  # DD.MM.YYYY
                short_year = entity[-2:]
                short_entity = entity[:-4] + short_year
                patterns.append(re.escape(short_entity))
            elif len(entity) == 8:  # DD.MM.YY
                long_entity = entity[:-2] + '20' + entity[-2:]
                patterns.append(re.escape(long_entity))
        elif entity_type == 'amount':
            if ',' in entity:
                patterns.append(re.escape(entity.replace(',', '.')))
            elif '.' in entity:
                patterns.append(re.escape(entity.replace('.', ',')))

        for pattern in patterns:
            match = re.search(pattern, text)
            if match:
                return match.start(), match.end()
        return None, None

    for sentence, supp_name, supp_vat, date, amount in zip(sentences, supp_names, supp_vats, dates, amounts):
        words = sentence.split()
        labels = ['O'] * len(words)

        # Assign labels to each word in the sentence
        for entity, label_prefix in zip([supp_name, supp_vat, date, amount],
                                        ['supp_name', 'supp_vat', 'date', 'amount']):
            if entity:
                start, end = find_entity_indices(sentence, entity, label_prefix)
                if start is not None:
                    start_idx = len(sentence[:start].split())
                    end_idx = len(sentence[:end].split())
                    labels[start_idx] = f'B-{label_prefix}'
                    for i in range(start_idx + 1, end_idx):
                        labels[i] = f'I-{label_prefix}'
        # print(sentence, '\n', labels)
        # Tokenize the sentence and align labels with subwords
        labels_ids = [label2id[label] for label in labels]
        # tokenized_inputs = tokenizer(words, truncation=True, is_split_into_words=True)
        tokenized_inputs = tokenizer(words, padding='max_length', truncation=True, is_split_into_words=True)
        aligned_labels = align_labels_with_tokens(labels_ids, tokenized_inputs.word_ids())
        tokenized_inputs["labels"] = aligned_labels
        tokenized_data.append(tokenized_inputs)
        labels_full.append(labels)

    return tokenized_data, labels_full



In [3]:
# Prepare the data
df_train = pd.read_csv('data_train.csv')
df_test1 = pd.read_csv('data_test1.csv')
df_test2 = pd.read_csv('data_test2.csv')

train_data, labels_full = prepare_data(df_train)
# df_train['labels_full'] = labels_full
test1_data, labels_full = prepare_data(df_test1)
# df_test1['labels_full'] = labels_full
test2_data, labels_full = prepare_data(df_test2)
# df_test2['labels_full'] = labels_full

In [None]:
print(type(train_data))
print(train_data[0])
print(train_data[0]['input_ids'])
print(train_data[0]['labels'])
print(train_data[0]['attention_mask'])
# print(df_train['labels_full'][0])

In [5]:
# !pip install evaluate
# !pip install seqeval

import numpy as np
import evaluate
from transformers import AutoModelForTokenClassification, TrainingArguments, DataCollatorForTokenClassification, Trainer

# Load the seqeval metric
metric = evaluate.load("seqeval")

In [6]:
# Define a compute_metrics function using seqeval
def compute_metrics(eval_preds):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)

    # Remove ignored index (special tokens) and convert to labels
    true_labels = [[id2label[l] for l in label if l != -100] for label in labels]
    true_predictions = [
        [id2label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    all_metrics = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "accuracy": all_metrics["overall_accuracy"],
    }

In [7]:
# Function to train and evaluate model
def train_and_eval(train_data, eval_data, label, num_labels):
    model = AutoModelForTokenClassification.from_pretrained(
        "bert-base-cased",
        num_labels=num_labels
    )

    args = TrainingArguments(
        f"bert-finetuned-ner-{label}",
        evaluation_strategy="epoch",
        per_device_train_batch_size=8,
        per_device_eval_batch_size=8,
        learning_rate=2e-5,
        num_train_epochs=10,  # Adjust as needed
        weight_decay=0.01,
    )

    data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

    trainer = Trainer(
        model=model,
        args=args,
        train_dataset=train_data,
        eval_dataset=eval_data,
        data_collator=data_collator,
        compute_metrics=compute_metrics,
        tokenizer=tokenizer,
    )

    trainer.train()
    eval_results = trainer.evaluate()
    print(f"Evaluation results for {label}: {eval_results}")

    return trainer


## Multiclass classification - all entity prediction

In [8]:
def true_and_predicted_words_for_all_labels(predictions, label_ids, test_data):
    predictions = np.argmax(predictions, axis=-1)

    # Remove ignored index (special tokens) and convert to labels
    true_labels = [[l for l in label if l != -100] for label in label_ids]
    predicted_labels = [
        [p for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, label_ids)
    ]

    true_words_dict = {label: [] for label in label_list}
    pred_words_dict = {label: [] for label in label_list}

    for true_labels_row, pred_labels_row, token_ids in zip(true_labels, predicted_labels, [entry['input_ids'] for entry in test_data]):
        tokens = tokenizer.convert_ids_to_tokens(token_ids, skip_special_tokens=True)

        for label in label_list:
            B_label_id = label2id[f'B-{label}']
            I_label_id = label2id[f'I-{label}']

            true_entity = None
            pred_entity = None
            current_true_entity = []
            current_pred_entity = []

            for token, true_label, pred_label in zip(tokens, true_labels_row, pred_labels_row):
                if true_label == B_label_id or true_label == I_label_id:
                    current_true_entity.append(token)
                elif current_true_entity:
                    if not true_entity:
                        true_entity = tokenizer.convert_tokens_to_string(current_true_entity)
                    current_true_entity = []

                if pred_label == B_label_id or pred_label == I_label_id:
                    current_pred_entity.append(token)
                elif current_pred_entity:
                    if not pred_entity:
                        pred_entity = tokenizer.convert_tokens_to_string(current_pred_entity)
                    current_pred_entity = []

                if true_entity and pred_entity:
                    break  # Break the loop once both entities are found

            if current_true_entity and not true_entity:
                true_entity = tokenizer.convert_tokens_to_string(current_true_entity)
            if current_pred_entity and not pred_entity:
                pred_entity = tokenizer.convert_tokens_to_string(current_pred_entity)

            true_words_dict[label].append(true_entity if true_entity else "")
            pred_words_dict[label].append(pred_entity if pred_entity else "")

    return true_words_dict, pred_words_dict

In [None]:
# Train model for all labels together (multiclass classification)

# !pip install transformers[torch]
trainer_all_labels = train_and_eval(train_data, test1_data, 'all_labels', num_labels=len(label_list_BIO))

In [None]:
print("Predicting for test1_data with all labels...")
predictions_all_labels, labels_all_labels, metrics_all_labels = trainer_all_labels.predict(test1_data)
# reassemble the predicted words
true_words_dict, pred_words_dict = true_and_predicted_words_for_all_labels(predictions_all_labels, labels_all_labels, test1_data)
# Add the predicted words for each label as new columns
for label in label_list:
    df_test1[f'{label}_pred_multiclass'] = pred_words_dict[label]

# Remove all spaces from the entries of each prediction column
pattern = r"\s*'\s*|\s*-\s*"
for label in ['supp_vat', 'date', 'amount']:
    df_test1[f'{label}_pred_multiclass'] = df_test1[f'{label}_pred_multiclass'].str.replace(' ', '')

# Function to remove spaces around ', - (McDonald's e.g.)
def remove_spaces_around_characters(text):
    pattern = r"\s*'\s*|\s*-\s*"
    return re.sub(pattern, lambda x: x.group(0).replace(' ', ''), text)

df_test1['supp_name_pred_multiclass'] = df_test1['supp_name_pred_multiclass'].apply(remove_spaces_around_characters)
print(df_test1.head(5))

In [None]:
information_extractions = []
for i in range(df_test1.shape[0]):
  extraction_result = {
            'Supplier company name': [],
            'Supplier VAT number': [],
            'Date': [],
            'Amount': []
        }
  extraction_result['Supplier company name'] = df_test1['supp_name_pred_multiclass'].values[i]
  extraction_result['Supplier VAT number'] = df_test1['supp_vat_pred_multiclass'].values[i]
  extraction_result['Date'] = df_test1['date_pred_multiclass'].values[i]
  extraction_result['Amount'] = df_test1['amount_pred_multiclass'].values[i]
  information_extractions.append(extraction_result)

# Save extraction results
result_file_name = "test1_bert.pickle"
with open(result_file_name, "wb") as file:
    pickle.dump(information_extractions, file, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
print("Predicting for test2_data with all labels...")
predictions_all_labels, labels_all_labels, metrics_all_labels = trainer_all_labels.predict(test2_data)
# reassemble the predicted words
true_words_dict, pred_words_dict = true_and_predicted_words_for_all_labels(predictions_all_labels, labels_all_labels, test2_data)
# Add the predicted words for each label as new columns
for label in label_list:
    df_test2[f'{label}_pred_multiclass'] = pred_words_dict[label]

# Remove all spaces from the entries of each prediction column
for label in ['supp_vat', 'date', 'amount']:
    df_test2[f'{label}_pred_multiclass'] = df_test2[f'{label}_pred_multiclass'].str.replace(' ', '')

# Function to remove spaces around ', - (McDonald's e.g.)
def remove_spaces_around_characters(text):
    pattern = r"\s*'\s*|\s*-\s*"
    return re.sub(pattern, lambda x: x.group(0).replace(' ', ''), text)

df_test2['supp_name_pred_multiclass'] = df_test2['supp_name_pred_multiclass'].apply(remove_spaces_around_characters)
print(df_test2.head(5))

In [None]:
information_extractions = []
for i in range(df_test2.shape[0]):
  extraction_result = {
            'Supplier company name': [],
            'Supplier VAT number': [],
            'Date': [],
            'Amount': []
        }
  extraction_result['Supplier company name'] = df_test2['supp_name_pred_multiclass'].values[i]
  extraction_result['Supplier VAT number'] = df_test2['supp_vat_pred_multiclass'].values[i]
  extraction_result['Date'] = df_test2['date_pred_multiclass'].values[i]
  extraction_result['Amount'] = df_test2['amount_pred_multiclass'].values[i]
  information_extractions.append(extraction_result)

# Save extraction results
result_file_name = "test2_bert.pickle"
with open(result_file_name, "wb") as file:
    pickle.dump(information_extractions, file, protocol=pickle.HIGHEST_PROTOCOL)