In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

In [2]:
from transformers import AutoProcessor
processor = AutoProcessor.from_pretrained("microsoft/layoutlmv3-base", apply_ocr=False)


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import sys
sys.path.append('../dataset')
import get_funsd
funsd = get_funsd.CustomFunsdDataset('../dataset/')
funsd.split_generators()
funsd

CustomFunsdDataset:
DatasetDict({
    train: Dataset({features: ['id', 'tokens', 'ner_boxes', 'bboxes', 'ner_tags','line_ids','linkings','image','image_name'], num_rows: 149}),
    test: Dataset({features: ['id', 'tokens', 'ner_boxes', 'bboxes', 'ner_tags','line_ids','linkings','image','image_name'], num_rows: 50})
})

In [4]:
label_map = {
    0: 'O',
    1: 'B-HEADER',
    2: 'I-HEADER',
    3: 'B-QUESTION',
    4: 'I-QUESTION',
    5: 'B-ANSWER', 
    6: 'I-ANSWER'}

In [5]:
from datasets import Dataset, DatasetDict

funsd_train_dataset = Dataset.from_dict({
    "id": [entry["id"] for entry in funsd["train"]],
    "tokens": [entry["tokens"] for entry in funsd["train"]],
    "ner_boxes": [entry["ner_boxes"] for entry in funsd["train"]],
    "bboxes": [entry["bboxes"] for entry in funsd["train"]],
    "ner_tags": [entry["ner_tags"] for entry in funsd["train"]],
    "line_ids": [entry["line_ids"] for entry in funsd["train"]],
    "linkings": [entry["linkings"] for entry in funsd["train"]],
    "image": [entry["image"] for entry in funsd["train"]],
    "image_name": [entry["image_name"] for entry in funsd["train"]],
})

funsd_test_dataset = Dataset.from_dict({
    "id": [entry["id"] for entry in funsd["test"]],
    "tokens": [entry["tokens"] for entry in funsd["test"]],
    "ner_boxes": [entry["ner_boxes"] for entry in funsd["test"]],
    "bboxes": [entry["bboxes"] for entry in funsd["test"]],
    "ner_tags": [entry["ner_tags"] for entry in funsd["test"]],
    "line_ids": [entry["line_ids"] for entry in funsd["test"]],
    "linkings": [entry["linkings"] for entry in funsd["test"]],
    "image": [entry["image"] for entry in funsd["test"]],
    "image_name": [entry["image_name"] for entry in funsd["test"]],
})

# Optionally, you can create a DatasetDict if you have train/test splits
dataset = DatasetDict({
    "train": funsd_train_dataset,
    'test': funsd_test_dataset
})


In [6]:
dataset

DatasetDict({
    train: Dataset({
        features: ['id', 'tokens', 'ner_boxes', 'bboxes', 'ner_tags', 'line_ids', 'linkings', 'image', 'image_name'],
        num_rows: 149
    })
    test: Dataset({
        features: ['id', 'tokens', 'ner_boxes', 'bboxes', 'ner_tags', 'line_ids', 'linkings', 'image', 'image_name'],
        num_rows: 50
    })
})

In [7]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [8]:
import sys
sys.path.append('../..')
from mytools import unnormalize_box, normalize_box

In [9]:

import numpy as np
def make_dataset(documents):
    images = documents['image']
    
    rgb_images = list(map(lambda img: img.convert('RGB'), images))
  
    words = documents['tokens']
    
    boxes = documents['bboxes'] ## token boxes normalized below
    
    word_labels = documents['ner_tags']
    
    normalized_boxes = []

    # Normalize the boxes by image width and height
    for i in range(len(boxes)):
        normalized_boxes.append([])
        for j in range(len(boxes[i])):
            normalized_boxes[-1].append(normalize_box(boxes[i][j], images[i].width, images[i].height))

    encoding = processor(rgb_images, words, boxes=normalized_boxes, word_labels=word_labels,
                         truncation=True, padding="max_length", return_offsets_mapping=True)
    
    encoding.pop('offset_mapping')

    return encoding

In [10]:
cols = funsd_train_dataset.column_names

In [11]:
from datasets import Features, Sequence, ClassLabel, Value, Array2D, Array3D

features = Features({
    'pixel_values': Array3D(dtype="float32", shape=(3, 224, 224)),
    'input_ids': Sequence(feature=Value(dtype='int64')),
    'attention_mask': Sequence(Value(dtype='int64')),
    'bbox': Array2D(dtype="int64", shape=(512, 4)),
    'labels': Sequence(feature=Value(dtype='int64')),
})
train_dataset = dataset["train"].map(
    make_dataset,
    batched=True,
    remove_columns = cols,
    features=features)

test_dataset = dataset["test"].map(
    make_dataset,
    batched=True,
    remove_columns = cols,
    features=features)


Map: 100%|██████████| 149/149 [00:02<00:00, 50.64 examples/s]
Map: 100%|██████████| 50/50 [00:00<00:00, 53.37 examples/s]


In [12]:
train_dataset.set_format("torch")
test_dataset.set_format("torch")

In [13]:
from evaluate import load 
metric = load("seqeval")

In [14]:
label_list = ['O', 'B-HEADER', 'I-HEADER', 'B-QUESTION', 'I-QUESTION', 'B-ANSWER', 'I-ANSWER']


In [15]:
import numpy as np

return_entity_level_metrics = False

def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    # Remove ignored index (special tokens)
    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    if return_entity_level_metrics:
        # Unpack nested dictionaries
        final_results = {}
        for key, value in results.items():
            if isinstance(value, dict):
                for n, v in value.items():
                    final_results[f"{key}_{n}"] = v
            else:
                final_results[key] = value
        return final_results
    else:
        return {
            "precision": results["overall_precision"],
            "recall": results["overall_recall"],
            "f1": results["overall_f1"],
            "accuracy": results["overall_accuracy"],
        }

In [16]:
label_map_reversed = {v: k for k, v in label_map.items()} 

In [17]:
from transformers import Trainer, TrainingArguments, EarlyStoppingCallback
from transformers.data.data_collator import default_data_collator
from transformers import LayoutLMv3ForTokenClassification
from transformers.trainer_callback import TrainerCallback

model = LayoutLMv3ForTokenClassification.from_pretrained("microsoft/layoutlmv3-base",
                                                             id2label=label_map,
                                                             label2id=label_map_reversed).to(device)

Some weights of LayoutLMv3ForTokenClassification were not initialized from the model checkpoint at microsoft/layoutlmv3-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [18]:
import pickle, json
with open("optuna_study.pkl", "rb") as f:
    study = pickle.load(f)

with open("tuning.json", "r") as f:
    hyperparameter_dict = json.load(f)

In [19]:
study.best_params

{'dropout': 0.3560592494053517,
 'learning_rate': 3.127610016621582e-05,
 'weight_decay': 0.000817139913231531}

In [28]:
def find_best_epochs_metrics(hyperparameter_dict):
    best_epochs_data = {}
    
    for trial_num, trial_data in hyperparameter_dict.items():
        metrics_history = trial_data['metrics_history']
        eval_metrics = metrics_history['eval']
        epochs = metrics_history['epoch']
        
        f1_scores = [metrics['f1'] for metrics in eval_metrics]
        best_f1_idx = np.argmax(f1_scores)
        best_f1 = f1_scores[best_f1_idx]
        best_epoch = epochs[best_f1_idx]
        
        best_epochs_data[trial_num] = {
            'best_epoch': best_epoch,
            'best_f1': best_f1,
            'learning_rate': trial_data['hyperparameters']['learning_rate'],
            'best_metrics': eval_metrics[best_f1_idx]
        }
    
    return best_epochs_data
best_epochs_data = find_best_epochs_metrics(hyperparameter_dict)

In [34]:
num_epochs = best_epochs_data['18']['best_epoch']

In [35]:
dropout = study.best_params["dropout"]
learning_rate = study.best_params["learning_rate"]
weight_decay = study.best_params["weight_decay"]
batch_size = 2

In [36]:
print('num epochs:', num_epochs)
print('dropout:', dropout)
print('learning rate:', learning_rate)
print('weight decay:', weight_decay)
print('batch size:', batch_size)

num epochs: 11.0
dropout: 0.3560592494053517
learning rate: 3.127610016621582e-05
weight decay: 0.000817139913231531
batch size: 2


In [47]:
from transformers import Trainer, TrainingArguments
from transformers.data.data_collator import default_data_collator
from transformers.trainer_callback import TrainerCallback
import os
import json
import numpy as np


class MetricsTrackingCallback(TrainerCallback):
    def __init__(self):
        self.metrics_history = {
            'train': [],
            'eval': [],
            'epoch': []
        }
        self.current_train_metrics = None
        self.last_logged_epoch = None

    def on_evaluate(self, args, state, control, metrics=None, **kwargs):
        """Called after evaluation"""
        if metrics and state.epoch != self.last_logged_epoch:
            # Store evaluation metrics
            eval_metrics = {
                'precision': metrics.get('eval_precision', None),
                'recall': metrics.get('eval_recall', None),
                'f1': metrics.get('eval_f1', None),
                'accuracy': metrics.get('eval_accuracy', None)
            }
            
            # Store training metrics that were saved during training
            train_metrics = {}
            if self.current_train_metrics is not None:
                train_metrics = self.current_train_metrics
                self.current_train_metrics = None  # Reset for next epoch
            
            # Append metrics for this epoch
            self.metrics_history['train'].append(train_metrics)
            self.metrics_history['eval'].append(eval_metrics)
            self.metrics_history['epoch'].append(state.epoch)
            self.last_logged_epoch = state.epoch
            
            # Save to file
            save_dir = args.output_dir
            os.makedirs(save_dir, exist_ok=True)
            with open(os.path.join(save_dir, 'metrics_history.json'), 'w') as f:
                json.dump(self.metrics_history, f, indent=2)

    def on_log(self, args, state, control, logs=None, **kwargs):
        """Called on each training log"""
        if logs:
            # Check if these are training metrics (not eval metrics)
            if all(not k.startswith('eval_') for k in logs.keys()):
                # Store the most recent training metrics
                self.current_train_metrics = {
                    'loss': logs.get('loss', None),
                    'learning_rate': logs.get('learning_rate', None)
                }


In [48]:

metrics_callback = MetricsTrackingCallback()


trial_output_dir = f'./results/'
training_args = TrainingArguments(
    output_dir=trial_output_dir,
    learning_rate=learning_rate,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=num_epochs,  
    weight_decay=weight_decay,
    logging_steps=10,
    evaluation_strategy="epoch",
    metric_for_best_model="eval_f1",
    load_best_model_at_end=True,
    save_strategy="epoch",
    save_total_limit=1,
    
)


trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=processor,
    data_collator=default_data_collator,
    compute_metrics=compute_metrics,
    callbacks=[metrics_callback]
)





In [49]:
train_result = trainer.train()




KeyboardInterrupt: 

In [None]:
eval_results = trainer.evaluate()

train_results= {
    "final_eval_results": eval_results,
    "training_history": train_result.metrics,
    "hyperparameters": {
        "dropout": dropout,
        "learning_rate": learning_rate,
        "weight_decay": weight_decay,
        "batch_size": batch_size,
    }
}
