In [None]:
# Install Dependencies
!pip install transformers torch datasets kaggle scikit-learn seqeval -q
print("Dependencies installed")

✓ Dependencies installed


In [None]:
# Mount Google Drive and Setup Kaggle
from google.colab import drive
import os

# Mount Google Drive
drive.mount('/content/drive')

# Setup Kaggle credentials
!mkdir -p ~/.kaggle
!cp /content/drive/MyDrive/kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

print("✓ Google Drive mounted")
print("✓ Kaggle credentials configured")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
cp: cannot stat '/content/drive/MyDrive/kaggle.json': No such file or directory
✓ Google Drive mounted
✓ Kaggle credentials configured


In [None]:
# Setup Kaggle API credentials
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
# Clean up old data first
!rm -rf /content/sroie_data

# Download SROIE dataset
!kaggle datasets download -d urbikn/sroie-datasetv2

# Extract with overwrite
!unzip -o -q sroie-datasetv2.zip -d /content/sroie_data

# Check structure
print("Dataset structure:")
!find /content/sroie_data -type d | head -20

print("\n✓ SROIE dataset downloaded and extracted")

Dataset URL: https://www.kaggle.com/datasets/urbikn/sroie-datasetv2
License(s): other
sroie-datasetv2.zip: Skipping, found more recently modified local copy (use --force to force download)
Dataset structure:
/content/sroie_data
/content/sroie_data/SROIE2019
/content/sroie_data/SROIE2019/layoutlm-base-uncased
/content/sroie_data/SROIE2019/train
/content/sroie_data/SROIE2019/train/entities
/content/sroie_data/SROIE2019/train/img
/content/sroie_data/SROIE2019/train/box
/content/sroie_data/SROIE2019/test
/content/sroie_data/SROIE2019/test/entities
/content/sroie_data/SROIE2019/test/img
/content/sroie_data/SROIE2019/test/box

✓ SROIE dataset downloaded and extracted


In [None]:
import json
import re
from pathlib import Path
from collections import Counter

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from transformers import LayoutLMTokenizerFast, LayoutLMForTokenClassification, get_linear_schedule_with_warmup
from sklearn.model_selection import train_test_split
from seqeval.metrics import classification_report, f1_score
from tqdm import tqdm
import numpy as np

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


In [None]:
# Define Labels
# BIO tagging scheme
ENTITY_LABELS = [
    'O',
    'B-COMPANY',
    'I-COMPANY',
    'B-DATE',
    'I-DATE',
    'B-ADDRESS',
    'I-ADDRESS',
    'B-TOTAL',
    'I-TOTAL'
]

label2id = {label: i for i, label in enumerate(ENTITY_LABELS)}
id2label = {i: label for i, label in enumerate(ENTITY_LABELS)}

print(f"Labels: {ENTITY_LABELS}")

Labels: ['O', 'B-COMPANY', 'I-COMPANY', 'B-DATE', 'I-DATE', 'B-ADDRESS', 'I-ADDRESS', 'B-TOTAL', 'I-TOTAL']


In [None]:
# Parse SROIE Dataset with Bounding Boxes
def normalize_bbox(bbox, width=1000, height=1000):
    """Normalize bounding box to 0-1000 scale (LayoutLM format)."""
    x_coords = [bbox[0], bbox[2], bbox[4], bbox[6]]
    y_coords = [bbox[1], bbox[3], bbox[5], bbox[7]]

    x_min = min(x_coords)
    y_min = min(y_coords)
    x_max = max(x_coords)
    y_max = max(y_coords)

    return [
        int((x_min / width) * 1000),
        int((y_min / height) * 1000),
        int((x_max / width) * 1000),
        int((y_max / height) * 1000)
    ]

def normalize_text(text):
    """Normalize text for matching"""
    return re.sub(r'[^a-zA-Z0-9]+', '', text.lower())

def assign_bio_labels(words, entities):
    """
    Assign proper BIO labels to words based on entity annotations.

    IMPROVED: Uses proper B- and I- tagging with sequence matching.
    """
    # Normalize entity values
    entity_normalized = {
        'company': normalize_text(entities.get('company', '')),
        'date': normalize_text(entities.get('date', '')),
        'address': normalize_text(entities.get('address', '')),
        'total': normalize_text(entities.get('total', ''))
    }

    labels = ['O'] * len(words)

    # For each entity type, find matching sequences
    for entity_type in ['company', 'date', 'address', 'total']:
        entity_val = entity_normalized[entity_type]
        if not entity_val:
            continue

        # Normalize all words
        words_norm = [normalize_text(w) for w in words]

        # Try to find the entity as a sequence of words
        i = 0
        while i < len(words):
            # Check if current position starts the entity
            matched_length = 0
            entity_pos = 0
            temp_i = i

            # Try to match entity tokens sequentially
            while temp_i < len(words) and entity_pos < len(entity_val):
                word_norm = words_norm[temp_i]
                if not word_norm:
                    temp_i += 1
                    continue

                # Check if this word is part of the entity
                if word_norm in entity_val[entity_pos:]:
                    matched_length += 1
                    entity_pos += len(word_norm)
                    temp_i += 1
                else:
                    break

            # If we matched significant portion of entity, label it
            if matched_length > 0 and entity_pos / len(entity_val) > 0.5:
                # Assign B- to first token, I- to rest
                labels[i] = f'B-{entity_type.upper()}'
                for j in range(i + 1, i + matched_length):
                    labels[j] = f'I-{entity_type.upper()}'
                i = i + matched_length
            else:
                i += 1

    return labels

def parse_sroie_receipt(box_file, entity_file):
    """Parse SROIE receipt with proper BIO labels."""
    # Read entity annotations
    with open(entity_file, 'r', encoding='utf-8') as f:
        entities = json.load(f)

    words = []
    bboxes = []

    # First pass: get image dimensions
    max_x, max_y = 0, 0
    with open(box_file, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split(',')
            if len(parts) >= 8:
                coords = [int(parts[i]) for i in range(8)]
                max_x = max(max_x, max(coords[0], coords[2], coords[4], coords[6]))
                max_y = max(max_y, max(coords[1], coords[3], coords[5], coords[7]))

    img_width = max_x * 1.1
    img_height = max_y * 1.1

    # Second pass: parse tokens with bboxes
    with open(box_file, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue

            parts = line.split(',')
            if len(parts) < 9:
                continue

            try:
                coords = [int(parts[i]) for i in range(8)]
            except ValueError:
                continue

            word = ','.join(parts[8:]).strip()
            if not word:
                continue

            bbox = normalize_bbox(coords, img_width, img_height)

            words.append(word)
            bboxes.append(bbox)

    # Assign BIO labels using improved algorithm
    labels = assign_bio_labels(words, entities)

    return words, bboxes, labels

def load_sroie_dataset(data_dir, split='train'):
    """Load SROIE dataset with improved BIO labeling."""
    split_dir = Path(data_dir) / split
    box_dir = split_dir / 'box'
    entity_dir = split_dir / 'entities'

    all_words = []
    all_bboxes = []
    all_labels = []

    box_files = sorted(box_dir.glob('*.txt'))

    print(f"Loading {len(box_files)} receipts from {split} set...")

    for box_file in tqdm(box_files):
        entity_file = entity_dir / box_file.name

        if not entity_file.exists():
            continue

        try:
            words, bboxes, labels = parse_sroie_receipt(box_file, entity_file)

            if len(words) > 0:
                all_words.append(words)
                all_bboxes.append(bboxes)
                all_labels.append(labels)
        except Exception as e:
            print(f"Error parsing {box_file.name}: {e}")
            continue

    return all_words, all_bboxes, all_labels

# Load training data
data_dir = '/content/sroie_data/SROIE2019'
train_words, train_bboxes, train_labels = load_sroie_dataset(data_dir, 'train')

print(f"\n✓ Loaded {len(train_words)} training receipts")

if len(train_words) > 0:
    print(f"\nFirst receipt preview:")
    print(f"Words: {train_words[0][:10]}")
    print(f"Bboxes: {train_bboxes[0][:5]}")
    print(f"Labels: {train_labels[0][:10]}")

    # Label distribution
    all_train_labels = [label for receipt_labels in train_labels for label in receipt_labels]
    label_counts = Counter(all_train_labels)
    print(f"\nLabel distribution:")
    for label, count in sorted(label_counts.items()):
        print(f"  {label:15s}: {count:5d}")

    # Check for B- tags
    b_tags = sum(1 for label in all_train_labels if label.startswith('B-'))
    print(f"\n✓ B- tags found: {b_tags} (good if > 0!)")
else:
    print("\n⚠ WARNING: No receipts loaded!")

Loading 626 receipts from train set...


100%|██████████| 626/626 [00:00<00:00, 669.61it/s]


✓ Loaded 626 training receipts

First receipt preview:
Words: ['TAN WOON YANN', 'BOOK TA .K(TAMAN DAYA) SDN BND', '789417-W', 'NO.53 55,57 & 59, JALAN SAGU 18,', 'TAMAN DAYA,', '81100 JOHOR BAHRU,', 'JOHOR.', 'DOCUMENT NO : TD01167104', 'DATE:', '25/12/2018 8:13:39 PM']
Bboxes: [[147, 23, 668, 59], [102, 76, 902, 112], [420, 112, 584, 129], [225, 133, 785, 151], [394, 157, 613, 174]]
Labels: ['O', 'O', 'O', 'B-ADDRESS', 'I-ADDRESS', 'I-ADDRESS', 'I-ADDRESS', 'O', 'O', 'O']

Label distribution:
  B-ADDRESS      :   610
  B-COMPANY      :   481
  B-DATE         :   179
  B-TOTAL        :  1594
  I-ADDRESS      :  1013
  I-COMPANY      :    82
  I-DATE         :   117
  I-TOTAL        :    47
  O              : 29503

✓ B- tags found: 2864 (good if > 0!)





In [None]:
# Create LayoutLM Dataset
class LayoutLMDataset(Dataset):
    def __init__(self, words_list, bboxes_list, labels_list, tokenizer, max_length=512):
        self.words_list = words_list
        self.bboxes_list = bboxes_list
        self.labels_list = labels_list
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.words_list)

    def __getitem__(self, idx):
        words = self.words_list[idx]
        bboxes = self.bboxes_list[idx]
        labels = self.labels_list[idx]

        # Tokenize
        encoding = self.tokenizer(
            words,
            is_split_into_words=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        # Align labels and bboxes with subword tokens
        word_ids = encoding.word_ids()
        aligned_labels = []
        aligned_bboxes = []
        previous_word_idx = None

        for word_idx in word_ids:
            if word_idx is None:
                # Special tokens: [CLS], [SEP], [PAD]
                aligned_labels.append(-100)
                aligned_bboxes.append([0, 0, 0, 0])  # Zero bbox for special tokens
            elif word_idx != previous_word_idx:
                # First subword of a word
                aligned_labels.append(label2id[labels[word_idx]])
                aligned_bboxes.append(bboxes[word_idx])
            else:
                # Continuation subword
                aligned_labels.append(-100)
                aligned_bboxes.append(bboxes[word_idx])  # Reuse same bbox
            previous_word_idx = word_idx

        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'bbox': torch.tensor(aligned_bboxes, dtype=torch.long),
            'labels': torch.tensor(aligned_labels, dtype=torch.long)
        }

print("✓ LayoutLM Dataset class defined")

✓ LayoutLM Dataset class defined


In [None]:
# Prepare Data Loaders
# Initialize LayoutLM tokenizer
tokenizer = LayoutLMTokenizerFast.from_pretrained('microsoft/layoutlm-base-uncased')

# Split train/val
train_words_split, val_words, train_bboxes_split, val_bboxes, train_labels_split, val_labels = train_test_split(
    train_words, train_bboxes, train_labels, test_size=0.1, random_state=42
)

# Create datasets
train_dataset = LayoutLMDataset(train_words_split, train_bboxes_split, train_labels_split, tokenizer)
val_dataset = LayoutLMDataset(val_words, val_bboxes, val_labels, tokenizer)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)

print(f"✓ Train samples: {len(train_dataset)}")
print(f"✓ Val samples: {len(val_dataset)}")
print(f"✓ Train batches: {len(train_loader)}")
print(f"✓ Val batches: {len(val_loader)}")

✓ Train samples: 563
✓ Val samples: 63
✓ Train batches: 71
✓ Val batches: 8


In [None]:
#Initialise LayoutLM Model
# Load LayoutLM for token classification
model = LayoutLMForTokenClassification.from_pretrained(
    'microsoft/layoutlm-base-uncased',
    num_labels=len(ENTITY_LABELS),
    id2label=id2label,
    label2id=label2id
)
model.to(device)

# Training hyperparameters
EPOCHS = 10
LEARNING_RATE = 5e-5
WARMUP_STEPS = 200

# Optimizer and scheduler
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)
total_steps = len(train_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=WARMUP_STEPS,
    num_training_steps=total_steps
)

print(f"✓ LayoutLM model initialized with {sum(p.numel() for p in model.parameters()):,} parameters")
print(f"✓ Training for {EPOCHS} epochs")
print(f"✓ Total steps: {total_steps}")

Some weights of LayoutLMForTokenClassification were not initialized from the model checkpoint at microsoft/layoutlm-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


✓ LayoutLM model initialized with 112,634,889 parameters
✓ Training for 10 epochs
✓ Total steps: 710


In [None]:
# Training
def evaluate(model, data_loader):
    """Evaluate model on validation set"""
    model.eval()
    all_predictions = []
    all_true_labels = []

    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            bbox = batch['bbox'].to(device)
            labels = batch['labels'].to(device)

            # Forward pass
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                bbox=bbox
            )

            predictions = torch.argmax(outputs.logits, dim=2)

            # Convert to labels
            for i in range(len(predictions)):
                pred_labels = []
                true_labels = []

                for j in range(len(predictions[i])):
                    if labels[i][j] != -100:
                        pred_labels.append(id2label[predictions[i][j].item()])
                        true_labels.append(id2label[labels[i][j].item()])

                all_predictions.append(pred_labels)
                all_true_labels.append(true_labels)

    f1 = f1_score(all_true_labels, all_predictions)
    return f1, all_predictions, all_true_labels

# Training loop
best_f1 = 0
for epoch in range(EPOCHS):
    print(f"\n{'='*60}")
    print(f"Epoch {epoch + 1}/{EPOCHS}")
    print(f"{'='*60}")

    # Training
    model.train()
    total_loss = 0

    progress_bar = tqdm(train_loader, desc='Training')
    for batch in progress_bar:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        bbox = batch['bbox'].to(device)
        labels = batch['labels'].to(device)

        # Forward pass
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            bbox=bbox,
            labels=labels
        )

        loss = outputs.loss

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()

        total_loss += loss.item()
        progress_bar.set_postfix({'loss': f'{loss.item():.4f}'})

    avg_train_loss = total_loss / len(train_loader)

    # Validation
    val_f1, _, _ = evaluate(model, val_loader)

    print(f"\nTrain Loss: {avg_train_loss:.4f}")
    print(f"Val F1 Score: {val_f1:.4f}")

    # Save best model
    if val_f1 > best_f1:
        best_f1 = val_f1
        torch.save(model.state_dict(), '/content/best_layoutlm_model.pt')
        print(f"✓ New best model saved (F1: {best_f1:.4f})")

print(f"\n{'='*60}")
print(f"Training completed!")
print(f"Best validation F1: {best_f1:.4f}")
print(f"{'='*60}")


Epoch 1/10


Training: 100%|██████████| 71/71 [00:14<00:00,  4.92it/s, loss=0.3073]



Train Loss: 1.2081
Val F1 Score: 0.3953
✓ New best model saved (F1: 0.3953)

Epoch 2/10


Training: 100%|██████████| 71/71 [00:14<00:00,  4.92it/s, loss=0.3030]



Train Loss: 0.1785
Val F1 Score: 0.8658
✓ New best model saved (F1: 0.8658)

Epoch 3/10


Training: 100%|██████████| 71/71 [00:14<00:00,  4.92it/s, loss=0.0379]



Train Loss: 0.0831
Val F1 Score: 0.8818
✓ New best model saved (F1: 0.8818)

Epoch 4/10


Training: 100%|██████████| 71/71 [00:14<00:00,  4.93it/s, loss=0.0082]



Train Loss: 0.0542
Val F1 Score: 0.9204
✓ New best model saved (F1: 0.9204)

Epoch 5/10


Training: 100%|██████████| 71/71 [00:14<00:00,  4.93it/s, loss=0.0054]



Train Loss: 0.0337
Val F1 Score: 0.8985

Epoch 6/10


Training: 100%|██████████| 71/71 [00:14<00:00,  4.94it/s, loss=0.1245]



Train Loss: 0.0258
Val F1 Score: 0.9184

Epoch 7/10


Training: 100%|██████████| 71/71 [00:14<00:00,  4.94it/s, loss=0.0027]



Train Loss: 0.0182
Val F1 Score: 0.9244
✓ New best model saved (F1: 0.9244)

Epoch 8/10


Training: 100%|██████████| 71/71 [00:14<00:00,  4.93it/s, loss=0.0021]



Train Loss: 0.0131
Val F1 Score: 0.9075

Epoch 9/10


Training: 100%|██████████| 71/71 [00:14<00:00,  4.94it/s, loss=0.0419]



Train Loss: 0.0103
Val F1 Score: 0.9220

Epoch 10/10


Training: 100%|██████████| 71/71 [00:14<00:00,  4.94it/s, loss=0.0010]



Train Loss: 0.0078
Val F1 Score: 0.9174

Training completed!
Best validation F1: 0.9244


In [None]:
# Load best model
model.load_state_dict(torch.load('/content/best_layoutlm_model.pt'))
model.eval()

# Full validation evaluation
val_f1, val_predictions, val_true_labels = evaluate(model, val_loader)

print("\nValidation Set Performance:")
print(classification_report(val_true_labels, val_predictions))

print(f"\nOverall F1 Score: {val_f1:.4f}")


Validation Set Performance:
              precision    recall  f1-score   support

     ADDRESS       0.83      0.89      0.86        61
     COMPANY       0.86      0.93      0.90        46
        DATE       1.00      1.00      1.00        22
       TOTAL       0.94      0.97      0.95       149

   micro avg       0.90      0.95      0.92       278
   macro avg       0.91      0.95      0.93       278
weighted avg       0.90      0.95      0.92       278


Overall F1 Score: 0.9244


In [None]:
# save model for deployment
import os

# Create output directory
output_dir = '/content/drive/MyDrive/ner_layoutlm_final'
os.makedirs(output_dir, exist_ok=True)

# Save model
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)

# Save config
config = {
    'num_labels': len(ENTITY_LABELS),
    'id2label': id2label,
    'label2id': label2id,
    'max_length': 512,
    'model_type': 'layoutlm'
}

with open(os.path.join(output_dir, 'config.json'), 'w') as f:
    json.dump(config, f, indent=2)

print(f"✓ Model saved to {output_dir}")
print("\nFiles saved:")
print("  - pytorch_model.bin")
print("  - config.json")
print("  - tokenizer files")

✓ Model saved to /content/drive/MyDrive/sroie_layoutlm_final

Files saved:
  - pytorch_model.bin
  - config.json
  - tokenizer files
