In [None]:
# !pip uninstall fitz

In [2]:
from langgraph.graph import Graph
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.document_loaders import PyPDFLoader
from langchain.chains import LLMChain
from langchain.document_loaders.image import UnstructuredImageLoader
import os
import base64
import openai
from dotenv import load_dotenv
load_dotenv()
OPENAI_KEY = os.getenv("OPENAI_API_KEY")
TEMPERATURE = os.getenv("TEMPERATURE")
MODEL_NAME = os.getenv("MODEL_NAME")

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain.chat_models import ChatOpenAI
from pdf2image import convert_from_path


In [3]:
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]
    

In [8]:
class Agent:

    def __init__(self,model,system="",filepath=None):
        self.system=system
        self.model = model
        self.filepath = filepath    
        graph = StateGraph(AgentState)
        graph.add_node("classify_file_type",self.classify_file_type)
        graph.add_node("extract_from_pdf",self.extract_pdf_data)
        graph.add_node("extract_from_image",self.extract_data_from_image)
        graph.add_node("extract_attributes",self.extract_attributes_from_text)

        graph.add_conditional_edges("classify_file_type",self.file_type_check,{"PDF":"extract_from_pdf","IMAGE":"extract_from_image"})
        graph.add_edge("extract_from_pdf","extract_attributes")
        graph.add_edge("extract_from_image","extract_attributes")
        
        graph.set_entry_point("classify_file_type")
        graph.set_finish_point("extract_attributes")  # Either can be final
        self.graph = graph.compile()



    def file_type_check(self, state: AgentState):
        print(state['messages'][-1]['file_type'])
        return state['messages'][-1]['file_type']
        # if state["messages"][-1].content.startswith("data:application/pdf;base64,"):


    def classify_file_type(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        print(message.content)
        print("")
        if message.content.upper() == "PDF":
            print("in pdf check")
            dictn = {"file_type":"PDF"}
                
        elif message.content.upper() == "IMAGE" :
            print("in image check")
            dictn = {"file_type":"IMAGE"} 
        else:
            dictn = {"file_type":"None"} 
        
        new_messages = state["messages"]+[dictn]
        # print(new_messages)
        return {**state, "messages": new_messages}


    def extract_pdf_data(self, state: AgentState):
        print("Extracting data from PDF")
        try:
            loader = PyPDFLoader(self.filepath)
            docs = loader.load()
        except Exception as e:
            print(f"Error loading PDF: {e}")
            return None 
        
        pdf_content = "\n".join([doc.page_content for doc in docs])
        
        if(('INVOICE' not in pdf_content.upper()) or ('INVOICE DATE' not in pdf_content.upper())):
            print("PDF does not contain invoice information, converting to image")
            self.convert_pdf_to_image(self)
            pdf_content = self.extract_data_from_image(self)['messages'][0]
            print(pdf_content)


        
        if not pdf_content:
            print("No content in the pdf")
            return None
        
        return {'messages': [pdf_content]}

    def convert_pdf_to_image(self, state: AgentState):
        """Extract text from scanned (image-based) PDF using OCR."""
        print("Extracting text from scanned PDF")
        print(self.filepath)
        try:

            images = convert_from_path(self.filepath, dpi=300)
            
            if images:
                image = images[0]
                directory = os.path.dirname(self.filepath)
                base_name = os.path.basename(self.filepath)             
                file_root = os.path.splitext(base_name)[0]         
                output_path = os.path.join(directory, file_root + ".png")
                output_path = output_path.replace("\\", "/")
                images[0].save(output_path, "PNG")
                print(f"Saved pdf as image: {output_path}")  
                self.filepath = output_path 
                print("New file path:", self.filepath)                
                return None              

        except Exception as e:
            print(f"Error converting PDF to image: {e}")
            return None
        

   
    

    def encode_image(self, state: AgentState):
        with open(self.filepath, "rb") as f:
            return base64.b64encode(f.read()).decode("utf-8")


    def extract_data_from_image(self,state: AgentState):

        print("Extracting data from image")
        print(self.filepath)
        base64_image = self.encode_image(self.filepath)

        response = openai.chat.completions.create(
            model="gpt-4o",
            messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/jpeg;base64,{base64_image}"
                        }
                    },
                    {
                        "type": "text",
                        "text": "Extract all visible text from this image. Do not summarize or omit anything."
                    }
                ]
            }
        ], max_tokens=1000, temperature=0.0
            )

        # print(response['choices'][0]['message']['content'])
        extracted_text = response.choices[0].message.content
        return {'messages': [extracted_text]}

    def extract_attributes_from_text(self,state: AgentState):
        print("Extracting attributes from text")

        prompt = PromptTemplate(template = """You are an AI assistant that extracts structured data from invoices.Extract the following information :
                        invoice number, 
                        invoice date, 
                        vendor information, 
                        line items, 
                        total due, 
                        tax details, 
                        payment terms
                    from Text : {text}. 
                    If any of the field cannot be found , keep it blank rather than putting wrong value.
                    Final output should be json file with above attribute. Line items should be nested json with main json.",input_variables=["text"]""")

        chain = LLMChain(llm=self.model,prompt=prompt)
        response = chain.run(text = state['messages'][-1])
        print(response)

        return {'messages': [response]}

In [11]:
prompt = """You are an analyst who analyses the invoices raised by the vendors. The invoices can be variour types including images,pdfs,scanned pdfs. \
By the name of the file, figure out the type of file and return the file type. The answer should be one word e.g. PDF, IMAGE, SCANNED_PDF.etc \
"""

model=ChatOpenAI(openai_api_key=OPENAI_KEY,model_name = MODEL_NAME,temperature = TEMPERATURE)        
abot = Agent(model, system=prompt,filepath='./data/cpb invoice.pdf')

In [12]:
human_query_prompt = """Extract the following fields:
                    invoice number, 
                    invoice date, 
                    vendor information, 
                    line items, 
                    total due, 
                    tax details, 
                    payment terms from the file at below location filepath = {filepath}"""
formatted_prompt = human_query_prompt.format(filepath=abot.filepath)

messages = [HumanMessage(content=formatted_prompt)]
result = abot.graph.invoke({"messages": messages})

PDF

in pdf check
PDF
Extracting data from PDF
PDF does not contain invoice information, converting to image
Extracting text from scanned PDF
./data/cpb invoice.pdf
Saved pdf as image: ./data/cpb invoice.png
New file path: ./data/cpb invoice.png
Extracting data from image
./data/cpb invoice.png
CPB SOFTWARE (GERMANY) GMBH

Im Bruch 3, 63897 Miltenberg
Telefon: +49 9371 9786 0
germany@cpb-software.com
www.cpb-software.com

CPB Software (Germany) GmbH - Im Bruch 3 - 63897 Miltenberg/Main

Musterkunde AG
Mr. John Doe
Musterstr. 23
12345 Musterstadt

Name: Stefanie Müller
Phone: +49 9371 9786-0

Invoice WMACCESS Internet

VAT No. DE199378386

| Invoice No | Customer No | Invoice Period | Date |
|------------|-------------|----------------|------|
| 123100401  | 12345       | 01.02.2024 - 29.02.2024 | 1. März 2024 |

| Service Description                  | Amount -without VAT- | quantity | Total Amount |
|--------------------------------------|----------------------|----------|------------

In [None]:

# def extract_text_from_scanned_pdf(pdf_path):
#     """Extract text from scanned (image-based) PDF using OCR."""
#     text = ""
#     doc = fitz.open(pdf_path)
#     for page_num in range(len(doc)):
#         page = doc.load_page(page_num)
#         pix = page.get_pixmap(dpi=300)
#         img = Image.open(io.BytesIO(pix.tobytes("png")))
#         text += extract_text_from_image(img)
#     return text

# def is_scanned_pdf(pdf_path):
#     """Check if PDF is likely scanned (image-based)."""
#     text = extract_text_from_pdfminer(pdf_path)
#     return len(text.strip()) < 50  # if very little text, assume it's scanned
