# Invoice, SKU & Product Catalog Matching Workflow

<a href="https://colab.research.google.com/github/run-llama/llamacloud-demo/blob/main/examples/document_workflows/invoice_sku_product_catalog_matching/invoice_sku_product_catalog_matching.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook demonstrates an automated workflow for matching invoice line items with standardized product catalog SKUs. The system helps standardize product identification across varying textual descriptions.

![End to End Process](e2eprocess.png)

#### The Challenge: Understanding Product Variations

Example: When you have an invoice line item like "10 boxes of Acme Widget Model X", the line item only shows a textual description, but you maintain a separate product catalog with standardized SKU codes and other information.

#### The Documents We Work With

1. **Invoice Data**: (`invoice.pdf`)
   * Contains line items with various product descriptions
   * Each line includes:
     * Invoice ID (e.g., INV-2024-001)
     * Line item description
     * Quantity
     * Unit price
     * Date
   * Contains 25 different line items

2. **Product Catalog**: (`product_catalog.csv`)
   * Contains standardized product information:
     * SKU codes (e.g., ACM-WX-001)
     * Standard product names
     * Categories
     * Manufacturers
     * Descriptions
     * Unit prices
   * Contains 50 different products

#### Enrichment Process

1. Parse the line item description
2. Query the product catalog index using RAG to find the closest matching SKU
3. Append the SKU and standardized product information to the line item

#### Value Added

Now your systems can unify reporting across different product descriptions that actually refer to the same standard product. For example:

Different descriptions in invoices:
```
"Acme Widget Model X Premium"
"ACME Premium Widget X"
"Widget X by Acme Corp"
```

All match to the same standardized product:
```
SKU: ACM-WX-001
Standard Name: Acme Widget X
Category: Widgets
Manufacturer: Acme Corporation
```

#### Implementation

The workflow uses:
* LlamaParse for PDF invoice parsing
* Structured data handling with LLMs and Pydantic models
* VectorStoreIndex (Vector similarity) for product matching

#### Input/Output Files

**Inputs:**
- `invoice.pdf`: Invoice containing line items
- `product_catalog.csv`: Reference product catalog

**Output:**
- `enriched_invoice.csv`: Invoice data enriched with matched SKUs and standardized product information

### Installation

In [2]:
!pip install llama-parse llama-index


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


### Setup API Keys

In [4]:
import os

os.environ['LLAMA_CLOUD_API_KEY'] = 'your_llama_cloud_key_here ' # Get your API Key from https://cloud.llamaindex.ai/
os.environ['OPENAI_API_KEY'] = 'openai_api_key' # Get your API Key from https://platform.openai.com/


### Imports

In [5]:
from typing import List, Optional
from pydantic import BaseModel, Field
from decimal import Decimal
from datetime import date
from pathlib import Path
import pandas as pd

In [6]:
from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Context,
    Workflow,
    step,
)

In [7]:
from llama_index.core import VectorStoreIndex, Document
from llama_index.core.llms import LLM
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.prompts import ChatPromptTemplate
from llama_parse import LlamaParse
from llama_index.llms.openai import OpenAI

### Data Models

Here, we define the Pydantic models that are essential for processing invoice line items and matching them with standardized product catalog entries. Pydantic models are used because they provide strict type checking and data validation while ensuring consistent data structures throughout the workflow.

#### Pydantic Models For Raw Invoice Data

`InvoiceLineItem`: Represents a single line item from an invoice, capturing:
- Invoice identifier
- Product description as it appears on the invoice
- Quantity ordered
- Unit price
- Invoice date

`InvoiceOutput`: Container model that holds all line items extracted from an invoice.

#### Pydantic Models for Product Catalog

`ProductCatalog`: Represents a single product entry in the standardized catalog with:
- SKU identifier
- Standard product name
- Product category
- Manufacturer information
- Detailed description
- Standard unit price

#### Pydantic Models for Enriched Output

`EnrichedLineItem`: Represents a line item after SKU matching, containing:
- Original invoice information
- Matched product catalog data
- Match confidence score

`EnrichedInvoice`: Complete representation of an invoice after SKU matching, containing:
- List of enriched line items with their matched catalog information

In [11]:
# Pydantic Models for Invoice Data
class InvoiceLineItem(BaseModel):
    """
    Represents a single line item from an invoice.
    Each line item contains details about the product ordered, quantity, and pricing.
    """
    invoice_id: str = Field(
        description="Unique identifier for the invoice (e.g., 'INV-2024-001')"
    )
    line_item: str = Field(
        description="Description of the product or service as it appears on the invoice"
    )
    quantity: int = Field(
        description="Number of units ordered",
        gt=0  # Ensures quantity is greater than 0
    )
    unit_price: Decimal = Field(
        description="Price per unit in decimal format (e.g., 45.99)",
        gt=0  # Ensures price is greater than 0
    )
    date_: date = Field(
        description="Date of the invoice in YYYY-MM-DD format"
    )

class InvoiceOutput(BaseModel):
    """
    Container model for all line items from an invoice.
    Used as the output format when parsing invoice documents.
    """
    line_items: List[InvoiceLineItem] = Field(
        description="List of all line items extracted from the invoice"
    )

# Pydantic Models for Product Catalog
class ProductCatalog(BaseModel):
    """
    Represents a single product entry in the product catalog.
    Contains standardized product information and SKU details.
    """
    sku: str = Field(
        description="Stock Keeping Unit - unique identifier for the product (e.g., 'ACM-WX-001')"
    )
    standard_name: str = Field(
        description="Standardized product name used across the system"
    )
    category: str = Field(
        description="Product category or classification (e.g., 'Widgets', 'Fasteners')"
    )
    manufacturer: str = Field(
        description="Name of the product manufacturer or supplier"
    )
    description: str = Field(
        description="Detailed product description"
    )
    unit_price: Decimal = Field(
        description="Standard unit price in decimal format",
        gt=0
    )

# Pydantic Models for Enriched Output
class EnrichedLineItem(BaseModel):
    """
    Enhanced version of InvoiceLineItem that includes matched product catalog information.
    Combines original invoice data with standardized product details.
    """
    invoice_id: str = Field(
        description="Original invoice identifier"
    )
    original_line_item: str = Field(
        description="Original product description from the invoice"
    )
    quantity: int = Field(
        description="Quantity ordered",
        gt=0
    )
    unit_price: Decimal = Field(
        description="Original unit price from invoice",
        gt=0
    )
    date_: date = Field(
        description="Invoice date"
    )
    matched_sku: Optional[str] = Field(
        None,
        description="Matched SKU from product catalog, if found"
    )
    standard_name: Optional[str] = Field(
        None,
        description="Standardized product name from catalog"
    )
    category: Optional[str] = Field(
        None,
        description="Product category from catalog"
    )
    manufacturer: Optional[str] = Field(
        None,
        description="Manufacturer information from catalog"
    )
    match_confidence: Optional[float] = Field(
        None,
        description="Confidence score of the SKU match (0.0 to 1.0)",
        ge=0.0,  # Greater than or equal to 0
        le=1.0   # Less than or equal to 1
    )

class EnrichedInvoice(BaseModel):
    """
    Container model for enriched invoice data.
    Contains all line items with their matched product catalog information.
    """
    line_items: List[EnrichedLineItem] = Field(
        description="List of all enriched line items with matched catalog data"
    )

### Event Models for Workflow

`InvoiceEvent`: Carries parsed invoice data through the workflow
`EnrichedEvent`: Carries enriched data after SKU matching
`LogEvent`: Handles logging and progress updates during processing

These models form a pipeline where:
1. Raw invoice data is parsed into `InvoiceLineItem` objects
2. Each line item is matched against `ProductCatalog` entries
3. Results are combined into `EnrichedLineItem` objects
4. Final output is collected in an `EnrichedInvoice`

Example Flow:

```text
Raw Invoice Line:
Line_Item: "Acme Widget Model X Premium, Qty: 10, Price: $45.99"
       ↓
InvoiceLineItem:
{invoice_id: "INV-2024-001", line_item: "Acme Widget Model X Premium", quantity: 10, ...}
       ↓
Matched with ProductCatalog:
{sku: "ACM-WX-001", standard_name: "Acme Widget X", category: "Widgets", ...}
       ↓
EnrichedLineItem:
{line_item: "Acme Widget Model X Premium", matched_sku: "ACM-WX-001", ...}
```

In [13]:
# Event Classes
class InvoiceEvent(Event):
    """
    Workflow event that carries parsed invoice data.
    Triggered after successful invoice parsing step.
    """
    invoice_data: InvoiceOutput = Field(
        description="Parsed invoice data containing all line items"
    )

class EnrichedEvent(Event):
    """
    Workflow event that carries enriched invoice data.
    Triggered after successful SKU matching step.
    """
    enriched_data: EnrichedInvoice = Field(
        description="Enriched invoice data with matched catalog information"
    )

class LogEvent(Event):
    """
    Workflow event for logging messages and progress updates.
    Used throughout the workflow to provide status information.
    """
    msg: str = Field(
        description="Log message content"
    )
    delta: bool = Field(
        False,
        description="Flag indicating if this is a partial update to previous message"
    )

### Prompt

Here we define prompt for extraction of information from the invoice.



In [15]:
# Extract Prompt Template
EXTRACT_PROMPT = '''
Extract the invoice table data from the following content into a structured format.
Each row should include invoice_id, line_item, quantity, unit_price, and date fields.

invoice_data:
{invoice_data}

Extract each row and format according to the provided schema.
Ensure dates are in YYYY-MM-DD format and all numbers are properly formatted.
'''

### Creating the Product Catalog Index

Before we can match invoice line items to standard products, we need to create a searchable index of our product catalog. This function creates a vector store index that combines product names, manufacturers, and descriptions into searchable embeddings. Each product's metadata (SKU, category, etc.) is stored alongside its embedding, allowing us to retrieve full product details when we find a match.

Example: When searching for "Acme Widget Model X Premium", the index will help find the closest matching product based on semantic similarity rather than exact text matching.

In [17]:
def create_product_catalog_index(
    catalog_csv_path: str,
    llm: Optional[LLM] = None
) -> BaseRetriever:
    """Create a vector store index from the product catalog CSV."""
    # Read the CSV
    df = pd.read_csv(catalog_csv_path)
    
    # Create documents for indexing
    documents = []
    for _, row in df.iterrows():
        # Combine fields for embedding
        text = f"{row['standard_name']} {row['manufacturer']} {row['description']}"
        print(f'product catalog index:: {text}')
        
        # Store other fields as metadata
        metadata = {
            "sku": row["sku"],
            "standard_name": row["standard_name"],
            "category": row["category"],
            "manufacturer": row["manufacturer"],
            "unit_price": row["unit_price"]
        }
        
        doc = Document(text=text, metadata=metadata)
        documents.append(doc)
    
    # Create and return the index
    index = VectorStoreIndex.from_documents(
        documents
    )
    
    return index.as_retriever(similarity_top_k=1)

### Main Workflow Implementation

Here we implement the workflow for SKU matching and product standardization.

![Worflow](workflow.png)

**a) Parse Invoice (First Step)**
- Triggered by: Initial StartEvent
- Reads invoice PDF using LlamaParse
- Uses LLM to extract structured line item data
- Outputs: InvoiceEvent containing structured line items

**b) Match Line Items with SKUs**
- Triggered by: InvoiceEvent
- For each line item:
  - Queries product catalog index using vector similarity
  - Finds best matching SKU
  - Records match confidence
- Outputs: EnrichedEvent with matched product information

**c) Generate Final Output**
- Triggered by: EnrichedEvent
- Combines all enriched line items
- Creates standardized CSV output
- Includes original and matched data
- Outputs: StopEvent with final results

The event flow looks like this:
```
StartEvent 
  → InvoiceEvent 
    → EnrichedEvent 
      → StopEvent
```

Throughout the workflow:
1. LogEvents provide progress updates
2. Context (ctx) stores the product catalog index
3. Events carry structured data between steps
4. Each step runs asynchronously (@step decorator)
5. Match confidence scores help validate results

The workflow uses vector similarity searching to match product descriptions to standard SKUs, maintaining accuracy while handling variations in product naming.

In [18]:
class SKUMatchingWorkflow(Workflow):
    """End-to-end workflow for invoice processing and SKU matching."""

    def __init__(
        self,
        parser: LlamaParse,
        catalog_retriever: BaseRetriever,
        llm: LLM,
        output_dir: str = "data_out",
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        
        self.parser = parser
        self.retriever = catalog_retriever
        self.llm = llm
        
        # Setup output directory
        out_path = Path(output_dir) / 'workflow_output'
        if not out_path.exists():
            out_path.mkdir(parents=True, exist_ok=True)
        self.output_dir = out_path

    @step
    async def parse_invoice(
        self, ctx: Context, ev: StartEvent
    ) -> InvoiceEvent:
        """Parse the invoice PDF and extract structured data."""
        if self._verbose:
            ctx.write_event_to_stream(LogEvent(msg=">> Parsing invoice PDF"))
            
        # Parse PDF using LlamaParse
        docs = await self.parser.aload_data(ev.invoice_path)
        invoice_data = "\n".join([d.get_content(metadata_mode="all") for d in docs])
        print(f'invoice_data >> {invoice}')

        # Create extraction prompt
        prompt = ChatPromptTemplate.from_messages([
            ("system", "You are an assistant that extracts structured data from invoice tables."),
            ("user", EXTRACT_PROMPT)
        ])
        
        # Extract structured data
        invoice_structured_data = await self.llm.astructured_predict(
            InvoiceOutput,
            prompt,
            invoice_data=invoice_data
        )

        print('invoice_structured_data >> ', invoice_structured_data)
        
        if self._verbose:
            ctx.write_event_to_stream(
                LogEvent(msg=f">> Extracted {len(invoice_structured_data.line_items)} line items")
            )
            
        return InvoiceEvent(invoice_data=invoice_structured_data)

    @step
    async def match_skus(
        self, ctx: Context,
        ev: InvoiceEvent
    ) -> EnrichedEvent:
        """Match each line item with product catalog SKUs."""
        if self._verbose:
            ctx.write_event_to_stream(LogEvent(msg=">> Matching SKUs"))
            
        enriched_items = []

        for item in ev.invoice_data.line_items:
            # Query the catalog index
            matches = self.retriever.retrieve(item.line_item)
            print('all matches >> ', matches)
            
            if matches and len(matches) > 0:
                top_match = matches[0]
                metadata = top_match.metadata
                enriched_item = EnrichedLineItem(
                    invoice_id=item.invoice_id,
                    original_line_item=item.line_item,
                    quantity=item.quantity,
                    unit_price=item.unit_price,
                    date_=item.date_,
                    matched_sku=metadata.get("sku"),
                    standard_name=metadata.get("standard_name"),
                    category=metadata.get("category"),
                    manufacturer=metadata.get("manufacturer"),
                    match_confidence=top_match.score if hasattr(top_match, "score") else None
                )
            else:
                enriched_item = EnrichedLineItem(
                    invoice_id=item.invoice_id,
                    original_line_item=item.line_item,
                    quantity=item.quantity,
                    unit_price=item.unit_price,
                    date_=item.date_
                )
            
            enriched_items.append(enriched_item)

        return EnrichedEvent(
            enriched_data=EnrichedInvoice(line_items=enriched_items)
        )

    @step
    async def save_output(
        self, ctx: Context, ev: EnrichedEvent
    ) -> StopEvent:
        """Save the enriched data to CSV."""
        if self._verbose:
            ctx.write_event_to_stream(LogEvent(msg=">> Saving enriched data"))
            
        # Convert to DataFrame
        output_data = []
        for item in ev.enriched_data.line_items:
            output_data.append({
                "invoice_id": item.invoice_id,
                "line_item": item.original_line_item,
                "quantity": item.quantity,
                "unit_price": float(item.unit_price),
                "date": item.date_,
                "matched_sku": item.matched_sku,
                "standard_name": item.standard_name,
                "category": item.category,
                "manufacturer": item.manufacturer,
                "match_confidence": item.match_confidence
            })

        df = pd.DataFrame(output_data)
        
        # Save to CSV
        output_path = self.output_dir / "enriched_invoice.csv"
        df.to_csv(output_path, index=False)
        
        if self._verbose:
            ctx.write_event_to_stream(
                LogEvent(msg=f">> Saved output to {output_path}")
            )
        
        return StopEvent(result=ev.enriched_data)

### Create workflow

Here we initialize LLM, parser, create product catalog index and create the workflow.

Note: The default timeout for running the workflow is 10 seconds. However, since parsing and various LLM calls often take longer than 10 seconds, we have opted to extend the timeout to 300 seconds.

In [19]:
parser = LlamaParse(result_type="markdown")
llm = OpenAI(model="gpt-4o")

# Create product catalog index
catalog_retriever = create_product_catalog_index(
    "product-catalog.csv",
    llm=llm
)

# Initialize workflow
workflow = SKUMatchingWorkflow(
    parser=parser,
    catalog_retriever=catalog_retriever,
    llm=llm,
    verbose=True,
    timeout=300
)

product catalog index:: Acme Widget X Standard Acme Corporation Standard quality industrial widget Model X series
product catalog index:: Acme Widget X Premium Acme Corporation Premium quality industrial widget Model X series with enhanced features
product catalog index:: Widget X Maintenance Kit Acme Corporation Replacement and maintenance kit for Widget X series
product catalog index:: Widget X Enterprise Bundle Acme Corporation Complete enterprise solution including Widget X and accessories
product catalog index:: Widget X Service Bundle Acme Corporation Comprehensive service package for Widget X products
product catalog index:: Industrial Fastener 2-inch Industrial Supply Co 2-inch blue-coated industrial grade fasteners
product catalog index:: Premium Steel Fasteners 2-inch Industrial Supply Co 2-inch premium steel fasteners with corrosion resistance
product catalog index:: Industrial Fastener Kit Mixed Industrial Supply Co Comprehensive kit of industrial fasteners in various sizes

### Visualize workflow


In [103]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(SKUMatchingWorkflow, filename="sku_matching_workflow.html")

<class 'NoneType'>
<class '__main__.EnrichedEvent'>
<class '__main__.InvoiceEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
sku_matching_workflow.html


![Visualize Workflow](workflow_visualization.png)

### Run the workflow
Here we run the workflow and check the final result

In [104]:
# Run workflow
handler = workflow.run(invoice_path="invoice.pdf")
async for event in handler.stream_events():
    if isinstance(event, LogEvent):
        print(event.msg)
        
result = await handler

Running step parse_invoice
>> Parsing invoice PDF
Started parsing the file under job_id a5dcb0a7-07bf-435c-ae91-c2f4bdb94a09
invoice_data >> # Invoice

|invoice_id|line_item|quantity|unit_price|date|
|---|---|---|---|---|
|INV-2024-001|Acme Widget Model X Premium|10|45.99|2024-01-15|
|INV-2024-001|Blue Industrial Fasteners 2-inch|25|0.99|2024-01-15|
|INV-2024-001|Widget X Replacement Parts Kit|2|15.50|2024-01-15|
|INV-2024-002|ACME Premium Widget X|15|45.99|2024-01-16|
|INV-2024-002|Industrial Blue Bolt Pack (2")|30|0.99|2024-01-16|
|INV-2024-003|Replacement Kit - Widget Model X|5|15.50|2024-01-17|
|INV-2024-003|Widget X by Acme Corp|8|45.99|2024-01-17|
|INV-2024-004|Heavy Duty Office Chair - Ergonomic Pro|5|299.99|2024-01-20|
|INV-2024-004|Wireless Keyboard MK-7000|12|89.99|2024-01-20|
|INV-2024-005|LED Monitor Stand Deluxe|8|79.99|2024-01-22|
|INV-2024-005|Anti-Glare Screen Protector XL|15|24.99|2024-01-22|
|INV-2024-006|Premium Desk Mat - Extended|20|34.99|2024-01-25|
|INV-2024-006|

### Check Enriched Invoice

In [51]:
df = pd.read_csv("data_out/workflow_output/enriched_invoice.csv")
df.head()

Unnamed: 0,invoice_id,line_item,quantity,unit_price,date,matched_sku,standard_name,category,manufacturer,match_confidence
0,INV-2024-001,Acme Widget Model X Premium,10,45.99,2024-01-15,ACM-WX-002,Acme Widget X Premium,Widgets,Acme Corporation,0.918583
1,INV-2024-001,Blue Industrial Fasteners 2-inch,25,0.99,2024-01-15,IND-FST-201,Industrial Fastener 2-inch,Fasteners,Industrial Supply Co,0.910306
2,INV-2024-001,Widget X Replacement Parts Kit,2,15.5,2024-01-15,ACM-WX-003,Widget X Maintenance Kit,Spare Parts,Acme Corporation,0.879268
3,INV-2024-002,ACME Premium Widget X,15,45.99,2024-01-16,ACM-WX-002,Acme Widget X Premium,Widgets,Acme Corporation,0.914013
4,INV-2024-002,"Industrial Blue Bolt Pack (2"")",30,0.99,2024-01-16,IND-FST-201,Industrial Fastener 2-inch,Fasteners,Industrial Supply Co,0.838258
