# Invoice Dataset Preprocessing

This notebook-like file (cell-based) prepares the dataset for training. It reads CSV annotations and image JPG files, builds input-label pairs, and writes a processed_dataset folder with train/val/test splits.


In [None]:
import os
import json
import random
import shutil
from pathlib import Path
from typing import List, Dict

import pandas as pd

# Project paths
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DATASETS_DIR = PROJECT_ROOT / 'datasets'
PROCESSED_DIR = PROJECT_ROOT / 'processed_dataset'
PROCESSED_DIR.mkdir(exist_ok=True)

print('Project root:', PROJECT_ROOT)
print('Datasets dir:', DATASETS_DIR)
print('Processed dir:', PROCESSED_DIR)


### Discover batch CSV files in datasets/


In [None]:
csv_paths = sorted(DATASETS_DIR.glob('*.csv'))
print('Found CSVs:', [p.name for p in csv_paths])


### Loader for a single batch CSV -> records


In [None]:
def load_batch(csv_path: Path) -> List[Dict]:
    df = pd.read_csv(csv_path)
    col_map = {c.lower().strip(): c for c in df.columns}

    def get_col(name: str) -> str:
        target = name.replace(' ', '').lower()
        for key, orig in col_map.items():
            if key.replace(' ', '') == target:
                return orig
        raise KeyError(f'Missing expected column: {name} in {csv_path}')

    fname_col = get_col('File Name')
    json_col = get_col('Json Data')
    ocr_col = get_col('OCRed Text')

    images_dir = csv_path.with_suffix('')
    if not images_dir.exists():
        alt = csv_path.parent / csv_path.stem
        if alt.exists():
            images_dir = alt
    if not images_dir.exists():
        print(f'Warning: images dir not found for {csv_path}, tried {images_dir}')

    records: List[Dict] = []
    for _, row in df.iterrows():
        fname = str(row[fname_col]).strip()
        img_path = images_dir / fname
        json_raw = row[json_col]
        ocred_text = row[ocr_col] if pd.notna(row[ocr_col]) else ''

        json_data = None
        if isinstance(json_raw, str):
            try:
                json_data = json.loads(json_raw)
            except Exception:
                json_data = None

        rec: Dict = {
            'filename': fname,
            'image_path': str(img_path),
            'ocred_text': str(ocred_text) if not pd.isna(ocred_text) else '',
            'json_raw': json_raw if isinstance(json_raw, str) else json.dumps(json_raw)
        }
        if json_data and isinstance(json_data, dict):
            inv = json_data.get('invoice', {})
            rec['invoice_number'] = inv.get('invoice_number', '')
            rec['invoice_date'] = inv.get('invoice_date', '')
            rec['client_name'] = inv.get('client_name', '')
            rec['seller_name'] = inv.get('seller_name', '')
        else:
            rec['invoice_number'] = ''
            rec['invoice_date'] = ''
            rec['client_name'] = ''
            rec['seller_name'] = ''
        records.append(rec)
    return records


### Load all batches


In [None]:
all_records: List[Dict] = []
for csv_path in csv_paths:
    print('Loading', csv_path.name)
    all_records.extend(load_batch(csv_path))

print('Total loaded records:', len(all_records))


### Keep only records whose image exists


In [None]:
valid_records = [r for r in all_records if Path(r['image_path']).exists()]
missing = len(all_records) - len(valid_records)
print(f'Total records: {len(all_records)}, valid with images: {len(valid_records)}, missing images: {missing}')


### Shuffle and split into train/val/test = 80/10/10


In [None]:
random.seed(42)
sz = len(valid_records)
if sz == 0:
    print('No valid records with existing images found. Nothing to write.')
else:
    random.shuffle(valid_records)
    n_train = int(0.8 * sz)
    n_val = int(0.1 * sz)
    n_test = sz - n_train - n_val
    splits = {
        'train': valid_records[:n_train],
        'val': valid_records[n_train:n_train+n_val],
        'test': valid_records[n_train+n_val:]
    }
    print({k: len(v) for k, v in splits.items()})

    #%% md
    ### Write out processed_dataset/<split> with images/ and labels.csv

    #%%
    def write_split(name: str, recs):
        split_dir = PROCESSED_DIR / name
        img_out = split_dir / 'images'
        split_dir.mkdir(parents=True, exist_ok=True)
        img_out.mkdir(parents=True, exist_ok=True)

        rows = []
        for r in recs:
            src = Path(r['image_path'])
            dst = img_out / src.name
            try:
                if not dst.exists():
                    shutil.copy2(src, dst)
            except Exception as e:
                print(f'Copy failed for {src} -> {dst}: {e}')
                continue
            rows.append({
                'filename': src.name,
                'ocred_text': r.get('ocred_text', ''),
                'json_raw': r.get('json_raw', ''),
                'invoice_number': r.get('invoice_number',''),
                'invoice_date': r.get('invoice_date',''),
                'client_name': r.get('client_name',''),
                'seller_name': r.get('seller_name','')
            })
        df = pd.DataFrame(rows)
        df.to_csv(split_dir / 'labels.csv', index=False)
        print(f'Wrote {len(rows)} records to {split_dir}')

    for name, recs in splits.items():
        write_split(name, recs)

    print('Done.')
