In [None]:
%pip install pytesseract

In [None]:
%pip install pdf2image

In [None]:
%pip install pillow

In [None]:
%pip install azure-ai-inference

In [None]:
%pip install python-dotenv

In [None]:
%pip install PyPDF2

In [None]:
poppler_path = r"YOUR_POPPLER_LOCAL_PATH_GOES_HERE"

#### Splitting the parent PDF combined page-by-page

In [None]:
from PyPDF2 import PdfReader, PdfWriter
import os
def split_parent_pdf_into_individual_pages(input_pdf_path):
    output_folder = "output_pages"  # Folder to store split pages

    # Create output directory if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)

    # Open the input PDF
    reader = PdfReader(input_pdf_path)

    # Loop through each page and save it as a separate file
    for i, page in enumerate(reader.pages):
        writer = PdfWriter()
        writer.add_page(page)

        output_pdf_path = os.path.join(output_folder, f"{i+1}.pdf")
        with open(output_pdf_path, "wb") as output_pdf:
            writer.write(output_pdf)
        
        print(f"Saved: {output_pdf_path}")

    print("PDF split completed!")

In [None]:
split_parent_pdf_into_individual_pages("./sample_invoices/Mustermann 3.pdf")

#### Creating a function to convert pdf pages to images using pdf2image and poppler

In [None]:
def convert_pdf_to_image(child_pdf_path, page_number):
    from pdf2image import convert_from_path
    

    

    # Ensure the main images directory exists
    images_output_folder = './images'
    os.makedirs(images_output_folder, exist_ok=True)


    print(f"Processing: {child_pdf_path}")


    # Convert PDF pages to images
    images = convert_from_path(child_pdf_path, poppler_path=poppler_path)
    # Save the images to the subfolder
    for i, img in enumerate(images):
            image_path = os.path.join(images_output_folder, f'{page_number}.png')
            img.save(image_path, 'PNG')

    print(f"Saved images for {os.path.basename(child_pdf_path)} in {images_output_folder}")
    

#### Finally converting PDF to images with function invocation

In [None]:
for file in sorted(os.listdir('./output_pages'), key=lambda x: int(x.split('.')[0])):
    if file.endswith('.pdf'):
        child_pdf_path = os.path.join('./output_pages', file)
        page_number = file.split('.')[0]  # Extract the page number from the filename
        convert_pdf_to_image(child_pdf_path, page_number)

#### Generating the system prompt to implement invoice split algorithm via the LLM using NER (Named Entity Recognition)

In [None]:
system_prompt_for_invoice_split_LLM_vision = """ You are a helpful ai assistant meant to assist in clubbing together different PDFs into invoices.
The invoices are in German. You will be passed with the images of the pages of the PDF and then you have to extract the text from the images. 
The text that needs to extracted is according to an algorithm that I have desvised and I'll tell you more about it below.

The algorithm keeps track of the vendor name, invoice ID, and customer name or address and does the following:
a) If the vendor name is found and it is the same as the previous vendor name, it indicates a continuation of the same invoice.
b) If the vendor name is found and it is different from the previous vendor name, it indicates a new invoice.
c) If the invoice ID is found and it is the same as the previous invoice ID, it indicates a continuation of the same invoice.
d) If the invoice ID is found and it is different from the previous invoice ID, it indicates a new invoice.
e) If the customer address is found it indicates the starting page of the invoice.

Your goal is to extract the following fields, based on the visible text in the image:
- Vendor Name
- Invoice ID
- Customer Name

### Extraction Patterns:

- **Customer Name and Address Block Example:**
Frau
Maria Mustermann
Musterallee 1
54321 Musterdorf

- Extract only the Customer Name: `Maria Mustermann`

- **Invoice ID Example:**
  - Valid formats:
    - Rechnungsnummer: 1234567890
    - Rechnungs-ID: ABCD123456
    - Rechnung-Nr.: INV2024XY
  - Invoice IDs typically appear near the top of the document and close to the customer name or billing address.
  - Always look for these **German keywords** before extracting:
    - "Rechnungsnummer"
    - "Rechnungs-ID"
    - "Rechnung-Nr."

- Do NOT extract codes that:
  - Appear in tabular sections, such as those under "Referenz", "ISIN", or "Depot/Konto-Nummer"
  - Are short reference IDs like "K23000076" or ISINs
  - Appear near securities or fund transaction details

- Also ignore anything resembling a date or range:
  - 01.01.2020
  - 2020-12-31
  - 01.01.2020 - 31.12.2020

- Only extract the **actual invoice number** from the area where the **customer billing address appears** or where it's clearly labeled with the German keywords above.



 - **Vendor Name Example:**
 Deka
Investments

Vendor Name: Deka Investments
- It usually appears at the top of the page, often styled as a logo or header.


The output should be in the following format:
Vendor Name: <vendor_name>
Invoice ID: <invoice_id>
Customer Name: <customer_name>


If vendor name is not found, then do not print even the key "Vendor Name".
If invoice ID is not found, then do not print even the key "Invoice ID".
If customer name is not found, then do not print even the key "Customer Name".

Be accurate, consistent, and minimal.

Just adhere to the format and do not add any extra text or explanation strictly because i will then use regex-based pattern matching 
to route to the desired custom python logic using my custom python code """

#### Creating a function to generate local base64 URL for an image

In [None]:
import os
import base64
from IPython.display import Image, display

def generate_base64_url(child_pdf_image_path):
            print(f"Processing image: {child_pdf_image_path}")
            
            # Read the image file in binary mode
            with open(child_pdf_image_path, "rb") as img_file:
                raw_data = img_file.read()
                image_data = base64.b64encode(raw_data).decode("utf-8")
            
            # Determine the image format
            image_format = child_pdf_image_path.split('.')[-1]
            
            # Generate the data URL (optional, for other use cases)
            data_url = f"data:image/{image_format};base64,{image_data}"
            
            # Print the data URL (or save it as needed)
            print(f"Data URL for {child_pdf_image_path}:\n{data_url[:100]}...\n")  # printing full base64 is too long
            
           
            return data_url


#### Generating a function to call LLM from Azure AI Foundry for PDF split algorithm

In [None]:
import os
from azure.ai.inference import ChatCompletionsClient
from azure.ai.inference.models import SystemMessage, UserMessage
from azure.core.credentials import AzureKeyCredential
from azure.ai.inference.models import ImageContentItem, ImageUrl, TextContentItem
from dotenv import load_dotenv
load_dotenv()
# Load environment variables from .env file
endpoint = os.getenv("LLM_ENDPOINT")
print("Endpoint: ", endpoint)
model_name = os.getenv("LLM_MODEL_NAME")
print("Model Name: ", model_name)
api_key = os.getenv("LLM_API_KEY")
print("API Key: ", api_key)


def call_LLM_for_invoice_split(system_prompt, image_data_url):

    

    client = ChatCompletionsClient(
        endpoint=endpoint,
        credential=AzureKeyCredential(api_key),
    )

    response = client.complete(
        messages=[
            SystemMessage(content=system_prompt),
            UserMessage(content=[
                TextContentItem(text="I have attached the image"),
                ImageContentItem(image_url = ImageUrl(url=image_data_url))
            ])
        ],
        model=model_name,
        max_tokens=512,  # You can increase this if needed, but shorter helps prevent hallucination
        temperature=0.2,  # Maximum determinism
        top_p=0.9,
        presence_penalty=0.0,
        frequency_penalty=0.0,
        stop=["\n\n", "---", "Explanation", "Note:"]
    )

    return response.choices[0].message.content

#### Defining code to perform regex based matching


In [None]:
def extract_fields_from_text(LLM_response):
    import os
    import re

    # Define regex patterns for extracting fields
    vendor_name_pattern = r"Vendor Name:\s*(.+)"
    invoice_id_pattern = r"Invoice ID:\s*(.+)"
    customer_name_pattern = r"Customer Name:\s*(.+)"
    
    # Perform regex-based extraction
    vendor_name_match = re.search(vendor_name_pattern, LLM_response)
    invoice_id_match = re.search(invoice_id_pattern, LLM_response)
    customer_name_match = re.search(customer_name_pattern, LLM_response)
    
    # Extract values or set to None if not found
    vendor_name = vendor_name_match.group(1) if vendor_name_match else None
    invoice_id = invoice_id_match.group(1) if invoice_id_match else None
    customer_name = customer_name_match.group(1) if customer_name_match else None
    
     # Print extracted values
    print(f"Extracted Vendor Name: {vendor_name}")
    print(f"Extracted Invoice ID: {invoice_id}")
    print(f"Extracted Customer Name: {customer_name}")
    
    


#### Defining a hashmap to keep track of each invoice

In [None]:
hashMap={}

#### Creating a function to keep track of Vendor Names

In [None]:
validate_vendor_names_system_prompt = """
You are an intelligent assistant that helps determine whether two company or vendor names refer to the same organization.

You will be given two vendor names, and your task is to decide if they refer to the **same company or organization**, even if the wording, spelling, or formatting differs.

These vendor names are from **German invoices**, so use your knowledge of German company naming conventions.

Examples of valid matches include:
- "Banque de Luxembourg" and "Banque Internationale à Luxembourg"
- "Deka Investments" and "DekaBank Deutsche Girozentrale"
- "Amazon Web Services Inc." and "AWS"
- "grün form Garten und Landschaft GmbH" and "GRÜNFORM GmbH"
- "Müller GmbH & Co. KG" and "Mueller KG"

You should account for:
- Synonyms or reworded names
- Capitalization differences (e.g., GRÜNFORM vs grün form)
- German umlauts and character variations (e.g., Ü vs UE)
- Merged or split words (e.g., "grün form" vs "GRÜNFORM")
- Legal suffixes such as "GmbH", "AG", "KG" being present or missing
- Abbreviations or expansions (e.g., AWS vs Amazon Web Services)
- Different language variants

Your answer must be one of the following two words only:
- Yes
- No

No explanations. No punctuation. No reasoning. Just output exactly one of: **Yes** or **No**.
This is critical for downstream programmatic processing.
"""


In [None]:
def call_LLM_for_vendor_name_validation(system_prompt, vendor_name_1, vendor_name_2):
    client = ChatCompletionsClient(
        endpoint=endpoint,
        credential=AzureKeyCredential(api_key),
    )
    
    user_query = """ The two vendor names are:
    Vendor Name 1: {}
    Vendor Name 2: {}
    """.format(vendor_name_1, vendor_name_2)

    response = client.complete(
        messages=[
            SystemMessage(content=system_prompt),
            UserMessage(content=[
                TextContentItem(text=user_query),
            ])
        ],
        max_tokens=2048,
        temperature=0.2,
        top_p=0.9,
        presence_penalty=0.0,
        frequency_penalty=0.0,
        model=model_name
    )
    
    print(f"Response: {response.choices[0].message.content}")
    return response.choices[0].message.content 

#### Creating a function to keep track of Invoice ID

In [None]:
import re

def is_invoice_id_changed(invoice_id_1: str, invoice_id_2: str) -> bool:
    """
    Compares the numeric parts of two invoice IDs and determines if they are different.

    Args:
        invoice_id_1 (str): The first invoice ID.
        invoice_id_2 (str): The second invoice ID.

    Returns:
        bool: True if the numeric parts are different, False otherwise.
    """
    # Extract numeric parts using regex
    numeric_part_1 = re.sub(r'\D', '', invoice_id_1)
    numeric_part_2 = re.sub(r'\D', '', invoice_id_2)

    return numeric_part_1 != numeric_part_2

#### Creating a function to keep track of customer name

In [None]:
validate_person_names_system_prompt = """
You are an intelligent assistant that helps determine whether two person names refer to the **same individual**, even if the wording or formatting differs.

You will be given two person names. Your task is to decide if they refer to the **same person**.

These names are from **German business documents**, such as invoices, contracts, or email signatures. Use your knowledge of German naming conventions and honorifics.

Examples of valid matches include:
- "Max Mustermann" and "Maximilian Mustermann"
- "Dr. Max Mustermann" and "Max Mustermann" (only if context suggests both refer to the same person)
- "Anna-Lena Schmidt" and "Anna Lena Schmidt"
- "Müller" and "Mueller" (considering umlaut substitution)

Examples of non-matches include:
- "Dr. Max Mustermann" and "Max Mustermann GmbH" (person vs company)
- "Max Mustermann" and "Erika Mustermann"
- "Max Mustermann" and "Mustermann KG"
- "Dr. Max Mustermann" and "Herr Max Mustermann" (only if context is unclear)

You should consider:
- German honorifics and titles (e.g., Dr., Prof., Herr, Frau)
- Variants of umlauts (e.g., Ü vs UE)
- Capitalization differences
- Hyphenated or compound names
- Whether one name is a company or organization (e.g., contains GmbH, KG, AG)

Your answer must be one of the following two words only:
- Yes
- No

No explanations. No punctuation. No reasoning. Just output exactly one of: **Yes** or **No**.
This is critical for downstream programmatic processing.
"""


In [None]:
def call_LLM_for_customer_name_validation(system_prompt, customer_name_1, customer_name_2):
    client = ChatCompletionsClient(
        endpoint=endpoint,
        credential=AzureKeyCredential(api_key),
    )
    
    user_query = """ The two customer names are:
    Customer Name 1: {}
    Customer Name 2: {}
    """.format(customer_name_1, customer_name_2)

    response = client.complete(
        messages=[
            SystemMessage(content=system_prompt),
            UserMessage(content=[
                TextContentItem(text=user_query),
            ])
        ],
        max_tokens=2048,
        temperature=0.2,
        top_p=0.9,
        presence_penalty=0.0,
        frequency_penalty=0.0,
        model=model_name
    )
    
    print(f"Response: {response.choices[0].message.content}")
    return response.choices[0].message.content 

#### Creating a function to analyse and track invoices using the LLM's response

In [None]:
def document_intelligence(pdf_path, page_number):
    import os
    import re
    try:
        global current_vendor_name  # Declare global to track VendorName across pages
        
        global current_invoice_id  # Declare global to track InvoiceId across pages
        
        global current_customer_name # Declare global to track CustomerName across pages
        
        #Generating the child pdf image path
        child_pdf_image_path = os.path.join("./images", f"{page_number}.png")
        
        # Generating the base64 URL for the image
        base64_url = generate_base64_url(child_pdf_image_path)  # Generate base64 URL for the image
        
        LLM_response = call_LLM_for_invoice_split(system_prompt_for_invoice_split_LLM_vision, base64_url)  # Call the LLM with the image data URL
        print("LLM Response for pdf base {}:".format(os.path.basename(pdf_path)))
        print(LLM_response)
        
       # Define regex patterns for extracting fields
        vendor_name_pattern = r"Vendor Name:\s*(.+)"
        invoice_id_pattern = r"Invoice ID:\s*(.+)"
        customer_name_pattern = r"Customer Name:\s*(.+)"
       

        # Perform regex-based extraction
        vendor_name_match = re.search(vendor_name_pattern, LLM_response)
        invoice_id_match = re.search(invoice_id_pattern, LLM_response)
        customer_name_match = re.search(customer_name_pattern, LLM_response)

        # Extract values or set to None if not found
        current_page_vendor_name = vendor_name_match.group(1) if vendor_name_match else None
        current_page_invoice_id = invoice_id_match.group(1) if invoice_id_match else None
        current_page_customer_name = customer_name_match.group(1) if customer_name_match else None

        
        
        if current_page_vendor_name:
            if current_vendor_name and call_LLM_for_vendor_name_validation(validate_vendor_names_system_prompt,current_page_vendor_name, current_vendor_name)=="No":
               print(f"Vendor name changed from {current_vendor_name} to {current_page_vendor_name} on page {page_number}.")
               current_vendor_name = current_page_vendor_name  #Update the global variable
               return "VendorNameChanged" 
            
            #Update the global variable if its the first page
            current_vendor_name = current_page_vendor_name #Update the local variable
         
        if current_page_invoice_id:
            if current_invoice_id and is_invoice_id_changed(current_page_invoice_id, current_invoice_id) is True:
                print(f"InvoiceId changed from {current_invoice_id} to {current_page_invoice_id} on page {page_number}.")
                current_invoice_id = current_page_invoice_id #Update the global variable
                return "InvoiceIdChanged"
            # Update the global variable if its the first page 
            current_invoice_id = current_page_invoice_id  #Update the global variable  
            
        if current_page_customer_name:
            if current_customer_name and call_LLM_for_customer_name_validation(validate_person_names_system_prompt,current_page_customer_name, current_customer_name)=="No":
                print(f"CustomerName changed from {current_customer_name} to {current_page_customer_name} on page {page_number}.")
                current_customer_name = current_page_customer_name
                return "CustomerNameChanged"
            # Update the global variable if its the first page
            current_customer_name = current_page_customer_name 
        
        # If no relevant fields are found, mark as "child page"
        hashMap[page_number] = "child page"
        print("page number : {} child page".format(page_number))
        return "child page"
    
    except Exception as e:
        print(f"Error processing page {page_number}: {e}")
        return "error"

#### Function to combine multiple PDF files into a single PDF file - for creating a consolidated invoice

In [None]:
def create_pdf_from_pages(input_pdf: str, output_pdf: str, pages: list):
    """
    Creates a new PDF containing only the specified pages from the input PDF.

    :param input_pdf: Path to the original multi-page PDF.
    :param output_pdf: Path to save the new PDF.
    :param pages: List of page numbers (1-based index) to include in the new PDF.
    """
    if not pages:
        print("No pages specified for creating the PDF.")
        return

    try:
        reader = PdfReader(input_pdf)
        writer = PdfWriter()

        for page_num in pages:
            if 1 <= page_num <= len(reader.pages):
                writer.add_page(reader.pages[page_num - 1])  # Convert 1-based to 0-based index
            else:
                print(f"Invalid page number: {page_num}")

        # Ensure the output directory exists
        output_dir = "output_files"
        os.makedirs(output_dir, exist_ok=True)

        with open(os.path.join(output_dir, output_pdf), "wb") as out_file:
            writer.write(out_file)

        print(f"New PDF created: {output_pdf}")

    except Exception as e:
        print(f"Error creating PDF: {e}")

#### Defining function to process pages for Document Analysis

In [None]:
def process_pdf(pdf_path: str):
    """Reads a multi-page PDF and sends each page separately to Azure Document Intelligence."""
    try:
        pdf_reader = PdfReader(pdf_path)
        hashMap.clear()  # Clear the hashMap before processing
        invoiceID = None
        pageList = []  # To store all pages in the hashMap of the current invoice to be consolidated into a single PDF
        count = 1 #for naming the output files
        
        global current_vendor_name
        current_vendor_name = None  # Initialize the current VendorName
        
        global current_invoice_id
        current_invoice_id = None  # Initialize the current InvoiceId
        
        global current_customer_name
        current_customer_name = None  # Initialize the current CustomerName
        
        for page_num in range(len(pdf_reader.pages)):
            # Generate the pdf path for the individual invoice page
            child_pdf_path = os.path.join("output_pages", f"{page_num + 1}.pdf")

            # Call Azure Document Intelligence for this page
            result = document_intelligence(child_pdf_path, page_num + 1)  # Use 1-based page index
            
            if result=="VendorNameChanged" or result=="InvoiceIdChanged" or result=="CustomerNameChanged":
                # Finalize the current invoice (exclude the current page)
                if pageList:
                    create_pdf_from_pages(pdf_path, f"output_{count}.pdf", pageList)
                    print(f"VendorName changed: New PDF created: output_{count}.pdf with pages: {pageList}")
                    count += 1
                    pageList = []  # Reset the page list for the next invoice
                
                current_customer_name = None  # Reset the current customer name for the new invoice
                current_invoice_id = None  # Reset the current invoice ID for the new invoice
                current_vendor_name = None  # Reset the current vendor name for the new invoice
                
                hashMap.clear()  # Clear the hashMap for the new invoice
                
                #Process the current page again
                result = document_intelligence(child_pdf_path, page_num+1) #reprocess the current page
            
            # Add current page to the pageList
            pageList.append(page_num + 1)  # Use 1-based page index
            
            
                
        # Finalize the last invoice if any pages are left in pageList
        if pageList:
            create_pdf_from_pages(pdf_path, f"output_{count}.pdf", pageList)
            print(f"Remaining pages consolidated: New PDF created: output_{count}.pdf with pages: {pageList}")
            
        if not pageList:
            print("No valid invoice data found in the PDF.")

    except Exception as e:
        print(f"An error occurred while processing the PDF: {e}")

    print("\nProcessing completed!")

In [None]:
process_pdf("./sample_invoices/Mustermann 3.pdf")