## Install required packages

In [None]:
%pip install py-zerox
%pip install pydantic
%pip install langchain langchain-google-vertexai langchain-openai langchain-community
%pip install python-dotenv

## Load the environment variables

Create a `.env` file in the root directory of the project and add the following environment variables:
```bash
GEMINI_API_KEY=""
OPENAI_API_KEY=""
GOOGLE_SERVICE_ACCOUNT_FILE_PATH=
GOOGLE_FOLDER_ID=""
DOWNLOAD_FOLDER_PATH=""
```

In [None]:
import os
from dotenv import load_dotenv

load_dotenv()

SERVICE_ACCOUNT_FILE = os.getenv("GOOGLE_SERVICE_ACCOUNT_FILE_PATH")
GOOGLE_FOLDER_ID = os.getenv("GOOGLE_FOLDER_ID")
DOWNLOAD_FOLDER_PATH = os.getenv("DOWNLOAD_FOLDER_PATH")

## Configure Google Drive Access

In [None]:
# Configure Google Drive Access
from google.oauth2 import service_account
from googleapiclient.discovery import build
import io
from googleapiclient.http import MediaIoBaseDownload

# Set up Google Drive API authentication
SCOPES = ['https://www.googleapis.com/auth/drive.readonly']

creds = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE, scopes=SCOPES)

# Create a Google Drive API service
drive_service = build('drive', 'v3', credentials=creds)

def get_folder_name(folder_id):
    folder = drive_service.files().get(fileId=folder_id, fields="name").execute()
    return folder.get("name", "default")

# Function to list PDF files in specific Google Drive folder
def list_pdf_files(folder_id):
    query = f"mimeType='application/pdf' and '{folder_id}' in parents"
    results = drive_service.files().list(
        q=query,
        pageSize=10,
        fields="nextPageToken, files(id, name)").execute()
    items = results.get('files', [])
    return items

# Function to download a file from Google Drive
def download_file(file_id, file_name, folder_path):
    # Create nested directory structure
    os.makedirs(folder_path, exist_ok=True)
    
    request = drive_service.files().get_media(fileId=file_id)
    file_path = os.path.join(folder_path, file_name)
    fh = io.FileIO(file_path, 'wb')
    downloader = MediaIoBaseDownload(fh, request)
    done = False
    while done is False:
        status, done = downloader.next_chunk()
        print(f"Download {int(status.progress() * 100)}%.")
    fh.close()

def generate_drive_link(file_id):
    """Generate a shareable Google Drive link for a file."""
    return f"https://drive.google.com/file/d/{file_id}/view"

def get_downloaded_files(folder_id):
    """Get information about locally downloaded files."""
    # Get folder name and local path
    folder_name = get_folder_name(folder_id)
    local_folder = os.path.join(DOWNLOAD_FOLDER_PATH, folder_name)
    
    # Get Drive files information
    drive_files = list_pdf_files(folder_id)
    
    # Create mapping of filenames to Drive IDs
    drive_file_map = {file['name']: file['id'] for file in drive_files}
    
    downloaded_files = []
    
    # Check local directory
    if os.path.exists(local_folder):
        for filename in os.listdir(local_folder):
            if filename in drive_file_map:
                file_info = {
                    'id': drive_file_map[filename],
                    'name': filename,
                    'drive_link': generate_drive_link(drive_file_map[filename]),
                    'local_path': os.path.join(local_folder, filename)
                }
                downloaded_files.append(file_info)
    
    return downloaded_files

folder_name = get_folder_name(GOOGLE_FOLDER_ID)
download_path = os.path.join(DOWNLOAD_FOLDER_PATH, folder_name)
pdf_files = list_pdf_files(GOOGLE_FOLDER_ID)
for file in pdf_files:
    print(f"Found file: {file['name']} ({file['id']})")
    download_file(file['id'], file['name'], download_path)

## Setup utility data extraction

In [None]:
# Set Up Utility Data Processing Functions
import asyncio
from pyzerox import zerox
import nest_asyncio
from pydantic import BaseModel, Field
from typing import Optional, List
from langchain_google_vertexai import ChatVertexAI
from langchain_openai import ChatOpenAI
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import ChatPromptTemplate
from datetime import datetime
from langchain.chains import LLMChain

nest_asyncio.apply()

# Define Pydantic models for utility invoices
class UtilityInvoiceItem(BaseModel):
    type_of_utility: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Type of utility (Electricity, Water, Gas, Fuel, DG)",
            "options": ["Electricity", "Water", "Gas", "Fuel", "DG"]
        }
    )
    quantity: Optional[float] = Field(
        default="",
        json_schema_extra={
            "description": "Total quantity of utility consumed during billing period"
        }
    )
    quantity_unit: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Unit of measurement for the quantity consumed. Choose from one of the provided options, if not applicable then return \"\". If it is `units` then for `Electricity` use `kWh`, for `Water` use `m3`, for `Gas` use `scm`, for `Fuel` use `L`, for `DG` use `kWh`. `CuMtr` is `m3` or `scm`\nAbbreviations: \nkWh - Kilo Watt Hours\nm3 - Cubic meter\nkg - Kilogram\nL - Liter\ntonne - Metric tonnes\nscm - Standard cubic meter\nGJ - Gigajoule\nkL - Kilo liter.",
            "options": ["kWh", "m3", "kg", "L", "tonne", "scm", "GJ", "kL"]
        }
    )
    billing_start_date: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Start date of billing period in DD-MM-YYYY format"
        }
    )
    billing_end_date: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "End date of billing period in DD-MM-YYYY format"
        }
    )
    cost: Optional[float] = Field(
        default=0,
        json_schema_extra={
            "description": "Cost for the utility consumed during billing period"
        }
    )

class UtilityInvoice(BaseModel):
    invoice_number: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Unique invoice identification number"
        }
    )
    due_date: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Last date to pay the invoice in DD-MM-YYYY format"
        }
    )
    arrear_amount: Optional[float] = Field(
        default="",
        json_schema_extra={
            "description": "Outstanding amount from previous billing periods"
        }
    )
    tax_amount: Optional[float] = Field(
        default="",
        json_schema_extra={
            "description": "Total tax amount applied on the invoice"
        }
    )
    invoice_amount: Optional[float] = Field(
        default="",
        json_schema_extra={
            "description": "Base invoice amount excluding taxes and arrears"
        }
    )
    total_amount_payable: Optional[float] = Field(
        default="",
        json_schema_extra={
            "description": "Total amount to be paid including invoice amount, taxes and arrears. This amount should be equal to the sum of invoice_amount, tax_amount and arrear_amount"
        }
    )
    currency: Optional[str] = Field(
        default="INR",
        json_schema_extra={
            "description": "Currency code for invoice amounts"
        }
    )
    vendor_name: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Name of the utility service provider"
        }
    )
    gst_number_of_vendor: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "GST registration number of the vendor"
        }
    )
    business_place_section: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Complete address and location details of the business"
        }
    )
    invoice_date: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Date when invoice was generated in DD-MM-YYYY format"
        }
    )
    invoice_items: List[UtilityInvoiceItem] = Field(
        default_factory=list,
        json_schema_extra={
            "description": "List of utility consumption items in the invoice"
        }
    )

class BasicInvoiceInfo(BaseModel):
    invoice_number: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Unique invoice identification number"
        }
    )
    due_date: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Last date to pay the invoice in DD-MM-YYYY format"
        }
    )
    arrear_amount: Optional[float] = Field(
        default="",
        json_schema_extra={
            "description": "Outstanding amount from previous billing periods"
        }
    )
    tax_amount: Optional[float] = Field(
        default="",
        json_schema_extra={
            "description": "Total tax amount applied on the invoice"
        }
    )
    invoice_amount: Optional[float] = Field(
        default="",
        json_schema_extra={
            "description": "Base invoice amount excluding taxes and arrears"
        }
    )
    total_amount_payable: Optional[float] = Field(
        default="",
        json_schema_extra={
            "description": "Total amount to be paid including invoice amount, taxes and arrears. This amount should be equal to the sum of invoice_amount, tax_amount and arrear_amount"
        }
    )
    currency: Optional[str] = Field(
        default="INR",
        json_schema_extra={
            "description": "Currency code for invoice amounts"
        }
    )
    vendor_name: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Name of the utility service provider"
        }
    )
    gst_number_of_vendor: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "GST registration number of the vendor"
        }
    )
    business_place_section: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Complete address and location details of the business"
        }
    )
    invoice_date: Optional[str] = Field(
        default="",
        json_schema_extra={
            "description": "Date when invoice was generated in DD-MM-YYYY format"
        }
    )

class LineItems(BaseModel):
    items: List[UtilityInvoiceItem] = Field(
        default_factory=list
    )

class UtilityInvoiceList(BaseModel):
    utility_invoices: List[UtilityInvoice] = Field(
        default_factory=list,
        json_schema_extra={
            "description": "Collection of invoices extracted from document"
        }
    )

class DocumentInvoices(BaseModel):
    invoice_sections: List[str] = Field(
        default_factory=list,
        description="List of separate invoice sections found in document"
    )

class DateStandardization(BaseModel):
    standardized_date: str = Field(
        default="",
        description="Date in DD-MM-YYYY format"
    )

# Define function to convert PDF to markdown
custom_system_prompt = """
- Convert the PDF page to markdown.
- Return only the markdown with no explanation text.
- Do not exclude any content from the page.
- Convert any tabular data into markdown tables.
"""

async def convert_to_markdown(file_path):
    print(f"Converting PDF to markdown: {file_path}")
    result = await zerox(
        file_path=file_path, 
        model="gemini/gemini-1.5-flash-002",
        select_pages=None,
        custom_system_prompt=custom_system_prompt
    )
    print(f"Conversion complete for: {file_path} with {len(result.pages)} pages")
    return result

# Define function to extract utility invoices from markdown
async def standardize_date(date_string: str) -> str:
    """Standardize date string to DD-MM-YYYY format"""

    llm = ChatVertexAI(model="gemini-1.5-pro-002", temperature=0)
    
    date_prompt = ChatPromptTemplate.from_template(
        "Convert the following date to DD-MM-YYYY format. "
        "If the date is invalid or cannot be parsed, return empty string.\n"
        "Date: {date_string}\n\n"
        "Return only the formatted date with no additional text."
    )
    
    date_chain = LLMChain(llm=llm, prompt=date_prompt)
    
    try:
        result = await date_chain.ainvoke({"date_string": date_string})
        formatted_date = result['text'].strip()
        # Validate if output matches DD-MM-YYYY format
        datetime.strptime(formatted_date, '%d-%m-%Y')
        return formatted_date
    except:
        return ""

async def split_into_invoices(markdown_content: str) -> DocumentInvoices:
    parser = PydanticOutputParser(pydantic_object=DocumentInvoices)
    prompt = ChatPromptTemplate.from_template(
        "Split this document into separate invoice sections. Each invoice typically starts with an invoice number or date."
        "Return only the distinct invoice sections found."
        "\n\nFormat Instructions: {format_instructions}"
        "\n\nDocument Content:\n```markdown\n{content}\n```"
    )
    
    llm = ChatVertexAI(model="gemini-1.5-pro-002", temperature=0)
    chain = prompt | llm | parser
    
    result = await chain.ainvoke({
        "format_instructions": parser.get_format_instructions(),
        "content": markdown_content
    })
    return result

# Add these new extraction functions
async def extract_basic_invoice_info(markdown_content: str) -> BasicInvoiceInfo:
    parser = PydanticOutputParser(pydantic_object=BasicInvoiceInfo)
    prompt = ChatPromptTemplate.from_template(
        "Extract basic invoice information from the utility invoice. Focus only on the main invoice details, not line items."
        "\n\nFormat Instructions: {format_instructions}"
        "\n\nUtility Invoice Content:\n```markdown\n{content}\n```"
    )
    
    llm = ChatVertexAI(model="gemini-1.5-pro-002", temperature=0)
    chain = prompt | llm | parser
    
    result = await chain.ainvoke({
        "format_instructions": parser.get_format_instructions(),
        "content": markdown_content
    })
    return result

async def extract_line_items(markdown_content: str) -> LineItems:
    parser = PydanticOutputParser(pydantic_object=LineItems)
    prompt = ChatPromptTemplate.from_template(
        "Extract only the utility consumption line items from the invoice. Focus on quantities, dates, and costs per item."
        "\n\nFormat Instructions: {format_instructions}"
        "\n\nUtility Invoice Content:\n```markdown\n{content}\n```"
    )
    
    llm = ChatVertexAI(model="gemini-1.5-pro-002", temperature=0)
    chain = prompt | llm | parser
    
    result = await chain.ainvoke({
        "format_instructions": parser.get_format_instructions(),
        "content": markdown_content
    })
    return result

async def extract_utility_invoices(markdown_content: str) -> UtilityInvoiceList:
    # First split document into separate invoice sections
    doc_invoices = await split_into_invoices(markdown_content)
    all_invoices = []
    
    # Process each invoice section in parallel
    async def process_invoice_section(section: str):
        basic_info_task = extract_basic_invoice_info(section)
        line_items_task = extract_line_items(section)
        basic_info, line_items = await asyncio.gather(basic_info_task, line_items_task)
        
        # Standardize dates
        if basic_info.due_date:
            basic_info.due_date = await standardize_date(basic_info.due_date)
        if basic_info.invoice_date:
            basic_info.invoice_date = await standardize_date(basic_info.invoice_date)
        
        for item in line_items.items:
            if item.billing_start_date:
                item.billing_start_date = await standardize_date(item.billing_start_date)
            if item.billing_end_date:
                item.billing_end_date = await standardize_date(item.billing_end_date)
        
        return UtilityInvoice(
            invoice_number=basic_info.invoice_number,
            due_date=basic_info.due_date,
            arrear_amount=basic_info.arrear_amount,
            tax_amount=basic_info.tax_amount,
            invoice_amount=basic_info.invoice_amount,
            total_amount_payable=basic_info.total_amount_payable,
            currency=basic_info.currency,
            vendor_name=basic_info.vendor_name,
            gst_number_of_vendor=basic_info.gst_number_of_vendor,
            business_place_section=basic_info.business_place_section,
            invoice_date=basic_info.invoice_date,
            invoice_items=line_items.items
        )

    # Process all invoice sections concurrently
    tasks = [process_invoice_section(section) for section in doc_invoices.invoice_sections]
    all_invoices = await asyncio.gather(*tasks)
    
    return UtilityInvoiceList(utility_invoices=all_invoices)

async def process_pdf(file_path):
    result = await convert_to_markdown(file_path)
    markdown_content = ""
    for page in result.pages:
        markdown_content += page.content
    utility_invoices = await extract_utility_invoices(markdown_content)
    return utility_invoices


## Batch processing of invoices

In [None]:
import hashlib
import pandas as pd
from tqdm import tqdm
import asyncio
from collections import deque
import uuid
import os
from PIL import Image
import math
from itertools import islice

def generate_file_hash(file_path):
    """Generate SHA-256 hash of file"""
    sha256_hash = hashlib.sha256()
    with open(file_path, "rb") as f:
        # Read and update hash in chunks of 4K
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

# Update validate_file to include hash generation
def validate_file(file_path):
    """Validate file size and type constraints"""
    # Check file size (10MB = 10 * 1024 * 1024 bytes)
    file_size = os.path.getsize(file_path)
    max_size = 10 * 1024 * 1024
    
    if file_size > max_size:
        return False, f"File size exceeds 10MB limit (size: {file_size/1024/1024:.2f}MB)", None
    
    # Check file extension
    ext = os.path.splitext(file_path)[1].lower()
    if ext not in ['.pdf', '.jpg', '.jpeg', '.png']:
        return False, f"Unsupported file type: {ext}", None
    
    # For PDFs, check page count
    if ext == '.pdf':
        try:
            import PyPDF2
            with open(file_path, 'rb') as pdf_file:
                pdf_reader = PyPDF2.PdfReader(pdf_file)
                if len(pdf_reader.pages) > 10:
                    return False, f"PDF exceeds 10 page limit (pages: {len(pdf_reader.pages)})", None
        except Exception as e:
            return False, f"Error reading PDF: {str(e)}", None
    
    # Generate file hash
    file_hash = generate_file_hash(file_path)
    return True, "File validated successfully", file_hash

def chunk_list(lst, chunk_size):
    """Split list into chunks of specified size"""
    for i in range(0, len(lst), chunk_size):
        yield lst[i:i + chunk_size]

async def process_files_with_semaphore(files, batch_size):
    all_results = []
    total_files = len(files)
    chunks = list(chunk_list(files, batch_size))
    
    async def process_single_file(file):
        # Validate file and get hash
        is_valid, validation_message, file_hash = validate_file(file['local_path'])
        
        if not is_valid:
            return {
                "file_name": file['name'],
                "file_id": file_hash,
                "link": file['drive_link'],
                "status": "rejected",
                "rejected_reason": validation_message,
                "number_of_pages": 0,
                "utility_invoices": None
            }
        
        try:
            result = await process_pdf(file['local_path'])
            return {
                "file_name": file['name'],
                "file_id": file_hash,
                "link": file['drive_link'],
                "status": "completed",
                "rejected_reason": None,
                "number_of_pages": len(result.utility_invoices),
                "utility_invoices": result
            }
        except Exception as e:
            return {
                "file_name": file['name'],
                "file_id": file_hash,
                "link": file['drive_link'],
                "status": "rejected",
                "rejected_reason": str(e),
                "number_of_pages": 0,
                "utility_invoices": None
            }

    async def process_batch(batch):
        tasks = [process_single_file(file) for file in batch]
        return await asyncio.gather(*tasks)

    with tqdm(total=total_files) as pbar:
        for chunk in chunks:
            # Process each batch concurrently
            batch_results = await process_batch(chunk)
            all_results.extend(batch_results)
            pbar.update(len(chunk))

    return all_results

# Function to save results to Excel
def save_to_excel(results):
    documents_data = []
    utility_invoices_data = []
    invoice_items_data = []

    for result in results:
        file_id = result["file_id"]
        
        documents_data.append({
            "name": result["file_name"],
            "file_id": file_id,
            "link": result["link"],
            "status": result["status"],
            "rejected_reason": result["rejected_reason"],
            "number_of_pages": result["number_of_pages"],
        })

        if result["status"] == "completed" and result["utility_invoices"]:
            for invoice in result["utility_invoices"].utility_invoices:
                utility_invoice_id = str(uuid.uuid4())
                
                utility_invoices_data.append({
                    "utility_invoice_id": utility_invoice_id,
                    "file_id": file_id,
                    "invoice_number": invoice.invoice_number or "",
                    "due_date": str(invoice.due_date) if invoice.due_date else "",
                    "arrear_amount": invoice.arrear_amount,
                    "tax_amount": invoice.tax_amount,
                    "invoice_amount": invoice.invoice_amount,
                    "total_amount_payable": invoice.total_amount_payable,
                    "currency": invoice.currency,
                    "vendor_name": invoice.vendor_name,
                    "gst_number_of_vendor": invoice.gst_number_of_vendor,
                    "business_place_section": invoice.business_place_section,
                    "invoice_date": str(invoice.invoice_date) if invoice.invoice_date else ""
                })

                for item in invoice.invoice_items:
                    invoice_items_data.append({
                        "utility_invoice_id": utility_invoice_id,
                        "file_id": file_id,
                        "invoice_number": invoice.invoice_number,
                        "type_of_utility": item.type_of_utility,
                        "quantity": item.quantity,
                        "quantity_unit": item.quantity_unit,
                        "billing_start_date": str(item.billing_start_date) if item.billing_start_date else "",  # Convert to string to preserve format
                        "billing_end_date": str(item.billing_end_date) if item.billing_end_date else "",  # Convert to string to preserve format
                        "cost": item.cost
                    })

    with pd.ExcelWriter(f'./outputs/{folder_name}_utility_invoices_report.xlsx', engine='openpyxl') as writer:
        # Write each dataframe
        dfs = {
            'Documents': pd.DataFrame(documents_data),
            'Utility invoices': pd.DataFrame(utility_invoices_data),
            'Invoice items': pd.DataFrame(invoice_items_data)
        }
        
        for sheet_name, df in dfs.items():
            df.to_excel(writer, sheet_name=sheet_name, index=False)
            worksheet = writer.sheets[sheet_name]
            
            # Adjust column widths based on content
            for idx, col in enumerate(df.columns):
                # Get maximum length of column content
                max_length = max(
                    df[col].astype(str).apply(len).max(),  # max length of values
                    len(str(col))  # length of column name
                ) + 2  # add a little extra space
                
                # Set column width
                worksheet.column_dimensions[chr(65 + idx)].width = min(max_length, 50)  # limit max width to 50
                
            # Set row height
            worksheet.row_dimensions[1].height = 20  # Header row height

# Update the main processing function
async def process_all_files(all_files, batch_size=5):
    return await process_files_with_semaphore(all_files, batch_size)

# Run the processing
all_files = get_downloaded_files(GOOGLE_FOLDER_ID)
all_results = asyncio.run(process_all_files(all_files, batch_size=2))
save_to_excel(all_results)