In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install -q transformers datasets accelerate bitsandbytes peft torch sentencepiece Levenshtein

In [None]:
import os
import json
import pandas as pd

def load_data(csv_folder, json_folder):
    data = []
    for csv_filename in os.listdir(csv_folder):
        if csv_filename.lower().endswith(".csv"):
            csv_path = os.path.join(csv_folder, csv_filename)
            df = pd.read_csv(csv_path)
            df.columns = [col.lower() for col in df.columns]
            ocr_text = " ".join(df["word"].astype(str).tolist())

            base_name = os.path.splitext(csv_filename)[0]
            json_filename = base_name + ".json"
            json_path = os.path.join(json_folder, json_filename)

            if os.path.exists(json_path):
                with open(json_path, "r", encoding="utf-8") as f:
                    ground_truth = json.load(f)
            else:
                ground_truth = {}


            data.append({
                "file_name": base_name,
                "ocr_text": ocr_text,
                "ground_truth_json": json.dumps(ground_truth, ensure_ascii=False, indent=2)
            })
    return data

csv_folder = "/content/drive/MyDrive/Colab_Folder/Llama_CSV"
json_folder = "/content/drive/MyDrive/Colab_Folder/Llama_JSON/final_input"

data = load_data(csv_folder, json_folder)
print(f"Total examples loaded: {len(data)}")

In [None]:
from datasets import Dataset
def create_prompt(example):
    prompt = f"""
Given the OCR text below, extract the following details in well-formatted JSON exactly as shown, ensuring all "N/A" values remain unchanged.
Extract and include the following fields:
- invoice_id
- invoice_date
- invoice_total
- currency_code
- customer_name
- customer_address
- vendor_name
- vendor_address
- items (description, quantity, unit_price, amount)

OCR Text:
{example["ocr_text"]}
Respond with JSON only.
""".strip()

    return {
        "file_name": example["file_name"],
        "prompt":    prompt,
        "response":  example["ground_truth_json"]
    }
formatted_data = [create_prompt(example) for example in data]
dataset = Dataset.from_dict({
    "file_name": [ex["file_name"] for ex in formatted_data],
    "prompt": [ex["prompt"] for ex in formatted_data],
    "response": [ex["response"] for ex in formatted_data]
})
print("Dataset columns:", dataset.column_names)
print("Number of rows in dataset:", dataset.num_rows)


In [None]:
from datasets import DatasetDict

# Creating an 80,10,10 split for training, validation, and testing

dataset_split = dataset.train_test_split(test_size=0.2, seed=42)
test_valid = dataset_split['test'].train_test_split(test_size=0.5, seed=42)

datasets = DatasetDict({
    'train': dataset_split['train'],
    'validation': test_valid['train'],
    'test': test_valid['test']
})

print(datasets)


In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("unsloth/Llama-3.2-1B-Instruct")
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

def tokenize(example):
    full_prompt = example["prompt"] + example["response"] + tokenizer.eos_token
    tokenized = tokenizer(full_prompt, padding="max_length", truncation=True, max_length=2048)

    labels = tokenized["input_ids"].copy()
    prompt_len = len(tokenizer(example["prompt"])["input_ids"])
    labels[:prompt_len] = [-100] * prompt_len

    tokenized["labels"] = labels
    return tokenized

tokenized_datasets = datasets.map(tokenize, remove_columns=["prompt", "response"])
#tokenized_datasets.set_format("torch")


In [1]:
from transformers import AutoModelForCausalLM, Trainer, TrainingArguments, DataCollatorForLanguageModeling
from peft import LoraConfig, get_peft_model
import torch

model = AutoModelForCausalLM.from_pretrained(
    "unsloth/Llama-3.2-1B-Instruct",
    torch_dtype=torch.float16,
    device_map="auto",
)

lora_config = LoraConfig(
    task_type="CAUSAL_LM",
    r=16,
    lora_alpha=16,
    lora_dropout=0,
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj",
                    "gate_proj", "up_proj", "down_proj"]
)

model = get_peft_model(model, lora_config)

data_collator = DataCollatorForLanguageModeling(tokenizer, mlm=False)

training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/Colab_Folder/ProcessedInvoices",
    eval_strategy="steps",
    eval_steps=50,
    learning_rate=2e-4,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    gradient_accumulation_steps=1,
    num_train_epochs=3,
    warmup_steps=5,
    optim="adamw_8bit",
    lr_scheduler_type="linear",
    seed=3407,
    weight_decay=0.01,
    fp16=True,
    logging_steps=50,
    save_steps=100,
    save_total_limit=2,
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    tokenizer=tokenizer,
    data_collator=data_collator,
)

trainer.train()
#trainer.train(resume_from_checkpoint=True)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/894 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/2.47G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

NameError: name 'tokenizer' is not defined

In [None]:
import os

def extract_document_info_from_test(test_example):
    prompt = test_example["prompt"]
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
    output = model.generate(**inputs, max_new_tokens=2000, temperature=0.1)
    response = tokenizer.decode(output[0], skip_special_tokens=True)
    json_response = response.split("Respond in formatted JSON:")[-1].strip()
    return json_response

output_folder = "/content/drive/MyDrive/Colab_Folder/Generated_JSON"
os.makedirs(output_folder, exist_ok=True)

for example in datasets["test"]:
    file_base_name = example.get("file_name", "unknown")
    output_file = os.path.join(output_folder, file_base_name + ".json")

    if os.path.exists(output_file):
        print(f"Skipping {file_base_name} (already generated).")
        continue

    generated_json = extract_document_info_from_test(example)

    with open(output_file, "w", encoding="utf-8") as f:
        f.write(generated_json)

    print(f"Saved generated JSON for {file_base_name} to {output_file}")


In [None]:
import os
import json
import re

def extract_and_fix_json(text):
    match = re.search(r'{\n\s*"amount_due":.*', text, re.DOTALL)
    if not match:
        return "No valid JSON found starting with 'amount_due'."

    raw_json_text = match.group(0).strip()

    fixed_json_text = re.sub(r"(\w+):", r'"\1":', raw_json_text)
    fixed_json_text = re.sub(r",\s*}", "}", fixed_json_text)
    fixed_json_text = re.sub(r",\s*\]", "]", fixed_json_text)

    open_braces = fixed_json_text.count("{")
    close_braces = fixed_json_text.count("}")
    open_brackets = fixed_json_text.count("[")
    close_brackets = fixed_json_text.count("]")

    if open_braces > close_braces:
        fixed_json_text += "}" * (open_braces - close_braces)
    elif close_braces > open_braces:
        fixed_json_text = fixed_json_text[:-1]

    if open_brackets > close_brackets:
        fixed_json_text += "]" * (open_brackets - close_brackets)
    elif close_brackets > open_brackets:
        fixed_json_text = fixed_json_text[:-1]

    try:
        parsed_json = json.loads(fixed_json_text)
        return json.dumps(parsed_json, indent=2, ensure_ascii=False)
    except json.JSONDecodeError as e:
        return f"Failed to parse JSON: {e}\n\nRaw Extracted JSON:\n{fixed_json_text}"

def process_folder(folder_path):
    for filename in os.listdir(folder_path):
        if not filename.endswith(".json"):
            continue

        file_path = os.path.join(folder_path, filename)

        with open(file_path, "r", encoding="utf-8") as f:
            file_content = f.read()

        cleaned_json = extract_and_fix_json(file_content)

        with open(file_path, "w", encoding="utf-8") as f:
            f.write(cleaned_json)

        print(f"Processed: {filename}")

input_folder = "/content/drive/MyDrive/Colab_Folder/Generated_JSON"
process_folder(input_folder)


In [None]:
import json
import os
import Levenshtein

def load_json(filepath):
    try:
        with open(filepath, 'r') as file:
            return json.load(file)
    except Exception as e:
        print(f"Error loading {filepath}: {e}")
        exit(1)

def canonical_json_str(data):
    return json.dumps(data, sort_keys=True)

def compute_levenshtein(reference_text, candidate_text):
    if not reference_text and not candidate_text:
        return 1
    if not reference_text or not candidate_text:
        return 0
    distance = Levenshtein.distance(reference_text, candidate_text)
    max_len = max(len(reference_text), len(candidate_text))
    return 1 - (distance / max_len) if max_len > 0 else 0

def compare_json_files(file1, file2):
    data1 = load_json(file1)
    data2 = load_json(file2)
    json_str1 = canonical_json_str(data1)
    json_str2 = canonical_json_str(data2)
    similarity = compute_levenshtein(json_str1, json_str2)
    return similarity, json_str1, json_str2

def compare_json_folders(folder1, folder2):
    files = [f for f in os.listdir(folder1) if f.endswith('.json')]

    if not files:
        print("No JSON files found in", folder1)
        return

    results = {}
    total_similarity_all = 0
    total_similarity_above_baseline = 0
    count_all = 0
    count_above_baseline = 0
    baseline = 0.2


    for filename in files:
        file1_path = os.path.join(folder1, filename)
        file2_path = os.path.join(folder2, filename)

        if not os.path.exists(file2_path):
            print(f"File {filename} not found in {folder2}. Skipping.")
            continue

        similarity, ref_str, pred_str = compare_json_files(file1_path, file2_path)
        results[filename] = similarity
        count_all += 1
        total_similarity_all += similarity

        if similarity >= baseline:
            count_above_baseline += 1
            total_similarity_above_baseline += similarity

        print(f"{filename}: Levenshtein similarity = {similarity:.4f}")

    print(f"\nTotal files compared: {count_all}")
    print(f"Average Levenshtein similarity (All files): {total_similarity_all / count_all:.4f}")

    if count_above_baseline > 0:
        print(f"Files with similarity ≥ {baseline}: {count_above_baseline}")
        print(f"Average Levenshtein similarity (≥ {baseline}): {total_similarity_above_baseline / count_above_baseline:.4f}")
    else:
        print(f"No files had similarity ≥ {baseline}.")

    return results

if __name__ == '__main__':
    folder1 = "/content/drive/MyDrive/Colab_Folder/Generated_JSON"
    folder2 = "/content/drive/MyDrive/Colab_Folder/Llama_JSON/final_input"

    print("Comparing JSON files from two folders:")
    compare_json_folders(folder1, folder2)


In [None]:
import os
import json
import glob
from collections import defaultdict

SIMILARITY_THRESHOLD = 0.85

def normalize_value(val):
    """
    Normalize value to handle 'N/A' and blank strings as negatives.
    """
    return "" if val.strip().upper() == "N/A" else val.strip()

def similarity_score(str1, str2):
    if not str1 and not str2:
        return 1
    if not str1 or not str2:
        return 0
    distance = Levenshtein.distance(str1, str2)
    max_len = max(len(str1), len(str2))
    return 1 - (distance / max_len) if max_len > 0 else 0

def safe_divide(numerator, denominator):
    return numerator / denominator if denominator > 0 else 0.0

def compute_field_accuracy_with_confusion(folder1, folder2, output_file):
    json_files1 = set(os.path.basename(f) for f in glob.glob(os.path.join(folder1, "*.json")))
    json_files2 = set(os.path.basename(f) for f in glob.glob(os.path.join(folder2, "*.json")))

    common_files = json_files1.intersection(json_files2)

    field_accuracy = defaultdict(lambda: {
        "TP": 0, "TN": 0, "FP": 0, "FN": 0,
        "total_score": 0.0, "count": 0
    })

    for filename in common_files:
        file1_path = os.path.join(folder1, filename)
        file2_path = os.path.join(folder2, filename)

        try:
            with open(file1_path, 'r', encoding='utf-8') as f1, open(file2_path, 'r', encoding='utf-8') as f2:
                gt_json = json.load(f1)
                pred_json = json.load(f2)

            all_fields = set(gt_json.keys()).union(set(pred_json.keys()))

            for field in all_fields:
                gt_value_raw = str(gt_json.get(field, "")).strip()
                pred_value_raw = str(pred_json.get(field, "")).strip()

                gt_value = normalize_value(gt_value_raw)
                pred_value = normalize_value(pred_value_raw)

                similarity = similarity_score(gt_value, pred_value)

                if field in gt_json:
                    field_accuracy[field]["total_score"] += similarity
                    field_accuracy[field]["count"] += 1

                if not gt_value and not pred_value:
                    field_accuracy[field]["TN"] += 1
                elif similarity >= SIMILARITY_THRESHOLD:
                    field_accuracy[field]["TP"] += 1
                elif not pred_value:
                    field_accuracy[field]["FN"] += 1
                else:
                    field_accuracy[field]["FP"] += 1

        except Exception as e:
            print(f"Error processing {filename}: {e}")

    result = {}
    for field, stats in field_accuracy.items():
        TP = stats["TP"]
        TN = stats["TN"]
        FP = stats["FP"]
        FN = stats["FN"]

        precision = round(safe_divide(TP, TP + FP), 4)
        recall = round(safe_divide(TP, TP + FN), 4)
        f1 = round(safe_divide(2 * precision * recall, precision + recall), 4) if (precision + recall) > 0 else 0.0
        avg_similarity = round(safe_divide(stats["total_score"], stats["count"]) * 100, 2)

        result[field] = {
            "Average Similarity (%)": avg_similarity,
            "True Positives": TP,
            "True Negatives": TN,
            "False Positives": FP,
            "False Negatives": FN,
            "Precision": precision,
            "Recall": recall,
            "F1-score": f1
        }

    with open(output_file, 'w', encoding='utf-8') as out_f:
        json.dump(result, out_f, indent=4, ensure_ascii=False)

    print(f"Field-wise accuracy + confusion matrix + F1-score saved: {output_file}")

if __name__ == "__main__":
    folder1_path = '/content/drive/MyDrive/Colab_Folder/Evaluation/test_JSON'
    folder2_path = "/content/drive/MyDrive/Colab_Folder/Generated_JSON"
    output_file_path = "/content/drive/MyDrive/Colab_Folder/field_accuracy_with_confusion_and_f1.json"

    compute_field_accuracy_with_confusion(folder1_path, folder2_path, output_file_path)


In [2]:
import os
import pandas as pd

folder_path = '/content/drive/MyDrive/Colab_Folder/Llama_CSV'

for filename in os.listdir(folder_path):
    if filename.endswith('.csv'):
        file_path = os.path.join(folder_path, filename)
        try:
            df = pd.read_csv(file_path, nrows=0)  # Read only headers
            if 'bbox' not in df.columns:
                print(f"Missing 'bbox' column: {filename}")
        except Exception as e:
            print(f"Error reading {filename}: {e}")


Error reading MCC1147-NSH_7.csv: No columns to parse from file
