In [1]:
!pip install torch transformers datasets peft evaluate scikit-learn numpy pandas matplotlib seaborn bitsandbytes tqdm

Collecting datasets
  Downloading datasets-3.6.0-py3-none-any.whl.metadata (19 kB)
Collecting evaluate
  Downloading evaluate-0.4.3-py3-none-any.whl.metadata (9.2 kB)
Collecting bitsandbytes
  Downloading bitsandbytes-0.45.5-py3-none-manylinux_2_24_x86_64.whl.metadata (5.0 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-many

In [1]:
import os
import json
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datasets import Dataset
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    Trainer,
    TrainingArguments,
    DataCollatorForLanguageModeling
)
from peft import (
    LoraConfig,
    get_peft_model,
    prepare_model_for_kbit_training,
    PeftModel
)
import evaluate
from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
torch.manual_seed(42)
np.random.seed(42)

Using device: cuda


In [3]:
model_name = "microsoft/phi-2"

In [4]:
data_file_path = './invoices_500.json'

try:
    with open(data_file_path, 'r') as f:
        invoice_data = json.load(f)
    print(f"Successfully loaded {len(invoice_data)} examples from {data_file_path}")

    print("\nSample invoice data:")
    sample_idx = 0
    print(f"Input (truncated):\n{invoice_data[sample_idx]['input'][:300]}...\n")
    print(f"Expected Output (sample):\n{json.dumps(invoice_data[sample_idx]['output'], indent=2)[:500]}...\n")

except FileNotFoundError:
    print(f"Error: The file {data_file_path} was not found.")
    print("Please ensure the dataset file exists in the correct location.")
    raise
except json.JSONDecodeError:
    print(f"Error: The file {data_file_path} is not a valid JSON file.")
    raise
except Exception as e:
    print(f"An unexpected error occurred: {str(e)}")
    raise

Successfully loaded 500 examples from ./invoices_500.json

Sample invoice data:
Input (truncated):
11117 Campbell Brooks Apt. 246

Elizabethside, AK 00799

 

 

 

 

Description Quantity Unit price
redefine cross-media
systems 7.65 82.78
exploit bleeding-edge
action-items 7.0 59.16
incubate real-time ROI 4.18 58.32
optimize viral
deliverables 1.12 18.83

Amount excluding tax 140.57

Taxes 21.71...

Expected Output (sample):
{
  "buyer": {
    "address": "65302 Booker Trafficway Apt. 529 Christophermouth, WY 67659"
  },
  "invoice": {
    "bc_no": "lo11165",
    "date": "05.04.1994",
    "maturity_date": "30.04.2008",
    "number": 158485
  },
  "products": [
    {
      "amount": 327.14,
      "description": "redefine cross-media systems",
      "quantity": 7.65,
      "unit_price": 82.78,
      "vat_amount": 11.3
    },
    {
      "amount": 227.61,
      "description": "exploit bleeding-edge action-items",
      ...



In [5]:
def preprocess_data(data):
    processed_data = []

    for item in data:
        instruction = "Extract the structured information from this invoice and format it as JSON:"
        input_text = item["input"]

        output_text = json.dumps(item["output"], indent=2)

        prompt = f"{instruction}\n\n{input_text}"

        processed_data.append({
            "instruction": instruction,
            "input": input_text,
            "output": output_text,
            "prompt": prompt
        })

    return processed_data

In [6]:
processed_data = preprocess_data(invoice_data)

In [7]:
train_data, val_data = train_test_split(processed_data, test_size=0.2, random_state=42)
print(f"Training examples: {len(train_data)}, Validation examples: {len(val_data)}")

Training examples: 400, Validation examples: 100


In [8]:
train_dataset = Dataset.from_dict({
    "instruction": [item["instruction"] for item in train_data],
    "input": [item["input"] for item in train_data],
    "output": [item["output"] for item in train_data],
    "prompt": [item["prompt"] for item in train_data]
})

val_dataset = Dataset.from_dict({
    "instruction": [item["instruction"] for item in val_data],
    "input": [item["input"] for item in val_data],
    "output": [item["output"] for item in val_data],
    "prompt": [item["prompt"] for item in val_data]
})

In [9]:
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [11]:
def tokenization_function(examples):
    prompts = []
    targets = []

    for instruction, input_text, output_text in zip(
        examples["instruction"], examples["input"], examples["output"]
    ):
        prompt = f"{instruction}\n\n{input_text}\n\nJSON Result:\n"
        target = f"{prompt}{output_text}{tokenizer.eos_token}"

        prompts.append(prompt)
        targets.append(target)

    tokenized_targets = tokenizer(
        targets,
        padding="max_length",
        truncation=True,
        max_length=2048,
        return_tensors="pt"
    )

    tokenized_prompts = tokenizer(
        prompts,
        padding="max_length",
        truncation=True,
        max_length=2048,
        return_tensors="pt"
    )

    labels = tokenized_targets["input_ids"].clone()

    for i, (prompt_len, target_len) in enumerate(zip(
        tokenized_prompts["attention_mask"].sum(dim=1),
        tokenized_targets["attention_mask"].sum(dim=1)
    )):
        labels[i, :prompt_len] = -100

    tokenized_targets["labels"] = labels

    return tokenized_targets

tokenized_train_dataset = train_dataset.map(
    tokenization_function,
    batched=True,
    remove_columns=["instruction", "input", "output", "prompt"]
)

tokenized_val_dataset = val_dataset.map(
    tokenization_function,
    batched=True,
    remove_columns=["instruction", "input", "output", "prompt"]
)

print("Tokenization complete")

Map:   0%|          | 0/400 [00:00<?, ? examples/s]

Map:   0%|          | 0/100 [00:00<?, ? examples/s]

Tokenization complete


In [None]:
print("Setting up model for LoRA fine-tuning...")

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    load_in_8bit=True,
    torch_dtype=torch.float16,
    device_map="auto",
    trust_remote_code=True
)

model = prepare_model_for_kbit_training(model)

peft_config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "query_key_value",
        "dense",
        "dense_h_to_4h",
        "dense_4h_to_h"
    ]
)

model = get_peft_model(model, peft_config)
model.print_trainable_parameters()

training_args = TrainingArguments(
    output_dir="./phi2-invoice-extraction",
    eval_strategy="steps",
    eval_steps=20,
    save_strategy="steps",
    save_steps=40,
    learning_rate=2e-4,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    num_train_epochs=5,
    weight_decay=0.01,
    warmup_steps=10,
    logging_steps=5,
    fp16=True,
    load_best_model_at_end=True,
    gradient_accumulation_steps=8,
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train_dataset,
    eval_dataset=tokenized_val_dataset,
    tokenizer=tokenizer,
    data_collator=DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)
)

print("Starting training...")
trainer.train()

model.save_pretrained("./phi2-invoice-lora-adapter")
print("Model fine-tuning completed and saved!")

The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


Setting up model for LoRA fine-tuning...


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

  trainer = Trainer(
No label_names provided for model class `PeftModelForCausalLM`. Since `PeftModel` hides base models input arguments, if label_names is not given, label_names can't be set automatically within `Trainer`. Note that empty label_names list will be used instead.


trainable params: 2,621,440 || all params: 2,782,305,280 || trainable%: 0.0942
Starting training...


`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`.
  return fn(*args, **kwargs)


Step,Training Loss,Validation Loss
20,1.6796,1.582295
40,1.3174,1.332488
60,1.212,1.193602


  return fn(*args, **kwargs)


In [None]:
def clean_and_parse_json(text):
    if "JSON Result:" in text:
        try:
            json_text = text.split("JSON Result:")[1].strip()
            return json.loads(json_text)
        except json.JSONDecodeError:
            try:
                json_text = re.sub(r',\s*}', '}', json_text)
                json_text = re.sub(r',\s*]', ']', json_text)
                json_text = re.sub(r'(\w+)(?=\s*:)', r'"\1"', json_text)
                return json.loads(json_text)
            except (json.JSONDecodeError, NameError):
                return {}
    return {}

def evaluate_json_extraction(original, predicted):
    if isinstance(original, str):
        try:
            original = json.loads(original)
        except json.JSONDecodeError:
            original = {}

    if isinstance(predicted, str):
        try:
            predicted = json.loads(predicted)
        except json.JSONDecodeError:
            predicted = {}

    def flatten_dict(d, parent_key='', sep='.'):
        items = []
        for k, v in d.items():
            new_key = f"{parent_key}{sep}{k}" if parent_key else k
            if isinstance(v, dict):
                items.extend(flatten_dict(v, new_key, sep=sep).items())
            elif isinstance(v, list) and all(isinstance(i, dict) for i in v):
                for i, item in enumerate(v):
                    list_key = f"{new_key}[{i}]"
                    items.extend(flatten_dict(item, list_key, sep=sep).items())
            else:
                items.append((new_key, v))
        return dict(items)

    flat_original = flatten_dict(original)
    flat_predicted = flatten_dict(predicted)

    all_keys = set(flat_original.keys())
    found_keys = set(flat_original.keys()).intersection(set(flat_predicted.keys()))

    exact_matches = 0
    field_accuracy = {}

    for key in all_keys:
        if key in flat_predicted and flat_original[key] == flat_predicted[key]:
            exact_matches += 1
            field_accuracy[key] = 1.0
        else:
            field_accuracy[key] = 0.0

    results = {
        "exact_match": exact_matches,
        "fields_found": len(found_keys),
        "total_fields": len(all_keys),
        "field_accuracy": field_accuracy,
        "overall_accuracy": exact_matches / len(all_keys) if all_keys else 0
    }

    return results

eval_examples = val_dataset.select(range(min(5, len(val_dataset))))

def generate_extraction(model, tokenizer, prompt, max_length=2048):
    inputs = tokenizer(prompt, return_tensors="pt").to(device)

    with torch.no_grad():
        outputs = model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_length=max_length,
            temperature=0.1,
            top_p=0.75,
            do_sample=False
        )

    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)

    result_prefix = "JSON Result:"
    if result_prefix in generated_text:
        extraction = generated_text.split(result_prefix)[1].strip()
    else:
        extraction = generated_text.strip()

    try:
        json_result = json.loads(extraction)
        extraction = json.dumps(json_result, indent=2)
    except json.JSONDecodeError:
        pass

    return extraction

print("Evaluating fine-tuned model...")

results = {
    "overall_accuracy": [],
    "fields_found_pct": [],
    "exact_match_pct": [],
    "field_accuracies": {}
}

for example in tqdm(eval_examples):
    prompt = f"{example['instruction']}\n\n{example['input']}\n\nJSON Result:\n"

    predicted_output = generate_extraction(model, tokenizer, prompt)

    try:
        predicted_json = json.loads(predicted_output)
    except json.JSONDecodeError:
        predicted_json = {}

    try:
        expected_json = json.loads(example["output"])
    except json.JSONDecodeError:
        expected_json = {}

    eval_result = evaluate_json_extraction(expected_json, predicted_json)

    results["overall_accuracy"].append(eval_result["overall_accuracy"])
    results["fields_found_pct"].append(eval_result["fields_found"] / eval_result["total_fields"])
    results["exact_match_pct"].append(eval_result["exact_match"] / eval_result["total_fields"])

    for field, accuracy in eval_result["field_accuracy"].items():
        if field not in results["field_accuracies"]:
            results["field_accuracies"][field] = []
        results["field_accuracies"][field].append(accuracy)

    print(f"\n--- Example Evaluation ---")
    print(f"Input: {example['input'][:100]}...")
    print(f"Expected (truncated): {example['output'][:100]}...")
    print(f"Predicted (truncated): {predicted_output[:100]}...")
    print(f"Accuracy: {eval_result['overall_accuracy']:.2f}")

avg_accuracy = np.mean(results["overall_accuracy"])
avg_fields_found = np.mean(results["fields_found_pct"])
avg_exact_match = np.mean(results["exact_match_pct"])

print(f"\n--- Overall Evaluation Results ---")
print(f"Average Accuracy: {avg_accuracy:.4f}")
print(f"Average Fields Found: {avg_fields_found:.4f}")
print(f"Average Exact Match: {avg_exact_match:.4f}")

category_accuracy = {}
for field in results["field_accuracies"]:
    category = field.split('.')[0] if '.' in field else field.split('[')[0] if '[' in field else field
    if category not in category_accuracy:
        category_accuracy[category] = []
    category_accuracy[category].extend(results["field_accuracies"][field])

print("\n--- Category-specific Accuracy ---")
for category, accuracies in category_accuracy.items():
    print(f"{category}: {np.mean(accuracies):.4f}")

In [None]:
plt.figure(figsize=(10, 6))
metrics = ['Overall Accuracy', 'Fields Found', 'Exact Match']
values = [avg_accuracy, avg_fields_found, avg_exact_match]
plt.bar(metrics, values, color=['blue', 'green', 'orange'])
plt.title('Invoice Extraction Performance Metrics')
plt.ylabel('Score (0-1)')
plt.ylim(0, 1)

for i, v in enumerate(values):
    plt.text(i, v + 0.05, f'{v:.2f}', ha='center')

plt.tight_layout()
plt.savefig('overall_metrics.png')
plt.close()

plt.figure(figsize=(12, 8))
categories = list(category_accuracy.keys())
category_values = [np.mean(category_accuracy[cat]) for cat in categories]

sorted_indices = np.argsort(category_values)
sorted_categories = [categories[i] for i in sorted_indices]
sorted_values = [category_values[i] for i in sorted_indices]

plt.barh(sorted_categories, sorted_values, color='skyblue')
plt.title('Accuracy by Information Category')
plt.xlabel('Accuracy (0-1)')
plt.xlim(0, 1)

for i, v in enumerate(sorted_values):
    plt.text(v + 0.05, i, f'{v:.2f}', va='center')

plt.tight_layout()
plt.savefig('category_accuracy.png')
plt.close()

In [None]:
def demo_invoice_extraction(model, tokenizer, input_text):
    instruction = "Extract the structured information from this invoice and format it as JSON:"
    prompt = f"{instruction}\n\n{input_text}\n\nJSON Result:\n"

    extraction = generate_extraction(model, tokenizer, prompt)

    try:
        structured_data = json.loads(extraction)
    except json.JSONDecodeError:
        structured_data = {"error": "Could not parse generated output as valid JSON"}

    return {
        "input": input_text,
        "extracted_text": extraction,
        "structured_data": structured_data
    }

test_invoice = """
TechSolutions Inc.
123 Innovation Drive
Silicon Valley, CA 94025

INVOICE #INV-2025-051
Date: April 15, 2025
Due Date: May 15, 2025

Bill To:
John Smith
123 Client Street
Clientville, CA 90210
Email: john.smith@example.com

Item                           Quantity    Price       Amount
Cloud Storage: Premium tier    1           $99.99      $99.99
Technical Support: 24/7        1           $199.99     $199.99
Software License: Enterprise   5           $299.99     $1,499.95

Subtotal:                                             $1,799.93
Tax (8.5%):                                           $152.99
Total Due:                                            $1,952.92

Payment Terms: Net 30
Payment Method: Credit Card

Thank you for your business!
"""

print("\n--- Demo: Structured Output ---")
result = demo_invoice_extraction(model, tokenizer, test_invoice)

print("Input Invoice (truncated):")
print(test_invoice[:200] + "...")
print("\nExtracted JSON:")
print(result["extracted_text"])
print("\nStructured Data (parsed JSON):")
print(json.dumps(result["structured_data"], indent=2))

print("\n--- Demo: Batch Processing ---")
test_invoices = [invoice_data[0]["input"], invoice_data[1]["input"], test_invoice]
batch_results = []

for invoice in test_invoices:
    extraction = demo_invoice_extraction(model, tokenizer, invoice)
    batch_results.append(extraction["structured_data"])

def flatten_for_dataframe(nested_dict, prefix=''):
    flat_dict = {}
    for key, value in nested_dict.items():
        if isinstance(value, dict):
            flat_dict.update(flatten_for_dataframe(value, f"{prefix}{key}_"))
        elif isinstance(value, list) and len(value) > 0 and isinstance(value[0], dict):
            flat_dict.update(flatten_for_dataframe(value[0], f"{prefix}{key}_item0_"))
        else:
            flat_dict[f"{prefix}{key}"] = value
    return flat_dict

flat_batch_results = [flatten_for_dataframe(result) for result in batch_results]
batch_df = pd.DataFrame(flat_batch_results)

print("Batch Results (sample columns):")
display_columns = batch_df.columns[:5] if len(batch_df.columns) > 5 else batch_df.columns
print(batch_df[display_columns])

print("\n--- Project Summary ---")
print("1. Successfully fine-tuned Phi-2 model for structured JSON invoice extraction using LoRA")
print(f"2. Achieved {avg_accuracy:.2f} overall accuracy on the validation set")
print(f"3. Model can extract complex nested JSON with multiple categories of information")
print(f"4. Top-performing categories: {sorted(category_accuracy.items(), key=lambda x: np.mean(x[1]), reverse=True)[0][0]}")
print("5. Provides output as structured JSON for seamless integration with systems")
print("6. LoRA fine-tuning allowed efficient adaptation of Phi-2 with minimal parameters")

print("\nFine-tuned model can be used in production settings for automated invoice processing!")

## Saving Fine-tuned Phi

In [None]:
import os
import json
import zipfile
from google.colab import files

model_save_dir = "./phi2-invoice-extractor-final"
os.makedirs(model_save_dir, exist_ok=True)

print("Saving the model and related files...")

model.save_pretrained(f"{model_save_dir}/lora_adapter")

tokenizer.save_pretrained(f"{model_save_dir}/tokenizer")

target_modules = list(peft_config.target_modules) if hasattr(peft_config.target_modules, '__iter__') else peft_config.target_modules

config_info = {
    "base_model": model_name,
    "author": "Zeyad-Diaa-1242",
    "date_created": "2025-05-11 19:05:13",
    "task": "Structured JSON Invoice Information Extraction",
    "training_examples": len(train_data),
    "validation_examples": len(val_data),
    "accuracy": float(avg_accuracy),
    "fields_extracted": list(category_accuracy.keys()),
    "lora_config": {
        "r": int(peft_config.r),
        "lora_alpha": float(peft_config.lora_alpha),
        "lora_dropout": float(peft_config.lora_dropout),
        "target_modules": target_modules
    }
}

with open(f"{model_save_dir}/model_info.json", "w") as f:
    json.dump(config_info, f, indent=2)

# Save a README with usage instructions without f-strings
with open(f"{model_save_dir}/README.md", "w") as f:
    f.write("# Phi-2 Structured JSON Invoice Extractor\n\n")
    f.write("## Model Information\n")
    f.write(f"- Base Model: {model_name}\n")
    f.write("- Task: Structured JSON Invoice Information Extraction\n")
    f.write(f"- Author: {config_info['author']}\n")
    f.write(f"- Date Created: {config_info['date_created']}\n")
    f.write(f"- Accuracy: {avg_accuracy:.4f}\n\n")

    f.write("## Usage Instructions\n\n")
    f.write("### Loading the Model\n\n")
    f.write("```python\n")
    f.write("from transformers import AutoModelForCausalLM, AutoTokenizer\n")
    f.write("from peft import PeftModel\n\n")
    f.write("# Load the base model\n")
    f.write(f"base_model = AutoModelForCausalLM.from_pretrained(\n")
    f.write(f'    "{model_name}",\n')
    f.write("    torch_dtype=torch.float16,\n")
    f.write("    device_map=\"auto\",\n")
    f.write("    trust_remote_code=True\n")
    f.write(")\n\n")
    f.write("# Load the tokenizer\n")
    f.write("tokenizer = AutoTokenizer.from_pretrained(\"./tokenizer\")\n\n")
    f.write("# Load the LoRA adapter\n")
    f.write("model = PeftModel.from_pretrained(base_model, \"./lora_adapter\")\n")
    f.write("```\n\n")

    f.write("### Making Predictions\n\n")
    f.write("```python\n")
    f.write("def extract_invoice_info(invoice_text):\n")
    f.write("    # Format the prompt\n")
    f.write('    prompt = f"Extract the structured information from this invoice and format it as JSON:\\n\\n{invoice_text}\\n\\nJSON Result:\\n"\n\n')
    f.write("    # Generate output\n")
    f.write("    inputs = tokenizer(prompt, return_tensors=\"pt\").to(model.device)\n")
    f.write("    with torch.no_grad():\n")
    f.write("        outputs = model.generate(\n")
    f.write("            input_ids=inputs[\"input_ids\"],\n")
    f.write("            attention_mask=inputs[\"attention_mask\"],\n")
    f.write("            max_length=2048,\n")
    f.write("            temperature=0.1,\n")
    f.write("            top_p=0.75,\n")
    f.write("            do_sample=False\n")
    f.write("        )\n\n")
    f.write("    # Decode the output\n")
    f.write("    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)\n\n")
    f.write("    # Extract just the result part\n")
    f.write('    if "JSON Result:" in generated_text:\n')
    f.write('        extraction = generated_text.split("JSON Result:")[1].strip()\n')
    f.write("    else:\n")
    f.write("        extraction = generated_text.strip()\n")
    f.write("        \n")
    f.write("    # Parse as JSON\n")
    f.write("    try:\n")
    f.write("        result = json.loads(extraction)\n")
    f.write("        return result\n")
    f.write("    except json.JSONDecodeError:\n")
    f.write("        return {\"error\": \"Failed to parse output as JSON\"}\n")
    f.write("```\n\n")

    f.write("## Extracted Fields\n")
    f.write("The model can extract the following information categories from invoices:\n")
    f.write(f"{', '.join(list(category_accuracy.keys()))}\n\n")

    f.write("## Performance\n")
    f.write(f"- Overall Accuracy: {avg_accuracy:.4f}\n")
    f.write(f"- Fields Found: {avg_fields_found:.4f}\n")
    f.write(f"- Exact Match: {avg_exact_match:.4f}\n")

# Create a sample inference script without f-strings
with open(f"{model_save_dir}/inference.py", "w") as f:
    f.write("import torch\n")
    f.write("import json\n")
    f.write("from transformers import AutoModelForCausalLM, AutoTokenizer\n")
    f.write("from peft import PeftModel\n")
    f.write("import argparse\n\n")

    f.write("def load_model(base_model_name, adapter_path, tokenizer_path):\n")
    f.write('    print(f"Loading base model: {base_model_name}")\n')
    f.write("    base_model = AutoModelForCausalLM.from_pretrained(\n")
    f.write("        base_model_name,\n")
    f.write("        torch_dtype=torch.float16,\n")
    f.write('        device_map="auto",\n')
    f.write("        trust_remote_code=True\n")
    f.write("    )\n\n")

    f.write('    print(f"Loading tokenizer from: {tokenizer_path}")\n')
    f.write("    tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)\n")
    f.write("    tokenizer.pad_token = tokenizer.eos_token\n\n")

    f.write('    print(f"Loading LoRA adapter from: {adapter_path}")\n')
    f.write("    model = PeftModel.from_pretrained(base_model, adapter_path)\n\n")

    f.write("    return model, tokenizer\n\n")

    f.write("def extract_invoice_info(model, tokenizer, invoice_text):\n")
    f.write("    # Format the prompt\n")
    f.write('    prompt = f"Extract the structured information from this invoice and format it as JSON:\\n\\n{invoice_text}\\n\\nJSON Result:\\n"\n\n')

    f.write("    # Generate output\n")
    f.write('    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)\n')
    f.write("    with torch.no_grad():\n")
    f.write("        outputs = model.generate(\n")
    f.write('            input_ids=inputs["input_ids"],\n')
    f.write('            attention_mask=inputs["attention_mask"],\n')
    f.write("            max_length=2048,\n")
    f.write("            temperature=0.1,\n")
    f.write("            top_p=0.75,\n")
    f.write("            do_sample=False\n")
    f.write("        )\n\n")

    f.write("    # Decode the output\n")
    f.write("    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)\n\n")

    f.write("    # Extract just the result part\n")
    f.write('    if "JSON Result:" in generated_text:\n')
    f.write('        extraction = generated_text.split("JSON Result:")[1].strip()\n')
    f.write("    else:\n")
    f.write("        extraction = generated_text.strip()\n")
    f.write("        \n")
    f.write("    return extraction\n\n")

    f.write("def main():\n")
    f.write('    parser = argparse.ArgumentParser(description="Extract structured JSON information from invoices using fine-tuned Phi-2 model")\n')
    f.write('    parser.add_argument("--invoice", type=str, required=True, help="Path to the invoice text file")\n')
    f.write('    parser.add_argument("--base_model", type=str, default="microsoft/phi-2", help="Base model name or path")\n')
    f.write('    parser.add_argument("--adapter", type=str, default="./lora_adapter", help="Path to the LoRA adapter")\n')
    f.write('    parser.add_argument("--tokenizer", type=str, default="./tokenizer", help="Path to the tokenizer")\n')
    f.write('    parser.add_argument("--output", type=str, default="extraction_result.json", help="Output JSON file path")\n\n')

    f.write("    args = parser.parse_args()\n\n")

    f.write("    # Load the model\n")
    f.write("    model, tokenizer = load_model(args.base_model, args.adapter, args.tokenizer)\n\n")

    f.write("    # Read the invoice text\n")
    f.write("    with open(args.invoice, 'r') as f:\n")
    f.write("        invoice_text = f.read()\n\n")

    f.write("    # Extract information\n")
    f.write('    print("Extracting structured information from invoice...")\n')
    f.write("    extracted_text = extract_invoice_info(model, tokenizer, invoice_text)\n\n")

    f.write("    # Try to parse as JSON\n")
    f.write("    try:\n")
    f.write("        structured_data = json.loads(extracted_text)\n")
    f.write("    except json.JSONDecodeError:\n")
    f.write('        print("Warning: Could not parse output as valid JSON")\n')
    f.write('        structured_data = {"error": "Invalid JSON output", "raw_text": extracted_text}\n\n')

    f.write("    # Save results\n")
    f.write("    results = {\n")
    f.write('        "extracted_text": extracted_text,\n')
    f.write('        "structured_data": structured_data\n')
    f.write("    }\n\n")

    f.write("    with open(args.output, 'w') as f:\n")
    f.write("        json.dump(results, f, indent=2)\n\n")

    f.write('    print(f"Results saved to {args.output}")\n')
    f.write('    print("\\nExtracted Structure:")\n')
    f.write("    print(json.dumps(structured_data, indent=2))\n\n")

    f.write('if __name__ == "__main__":\n')
    f.write("    main()\n")

# Create a zip file of the model directory
print("Creating zip archive of model files...")
zip_path = "./phi2-invoice-json-extractor.zip"
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
    for root, dirs, files in os.walk(model_save_dir):
        for file in files:
            zipf.write(os.path.join(root, file),
                       os.path.relpath(os.path.join(root, file),
                                       os.path.join(model_save_dir, '..')))

print(f"Model saved successfully to {zip_path}")
print("Downloading model zip file...")

# Download the zip file
files.download(zip_path)

print("Model download initiated.")
print("Save this file and use it to deploy your structured JSON invoice extraction model!")