In [85]:
import os
import base64
import json
from io import BytesIO
from enum import Enum
from datetime import date
from pydantic import BaseModel, ConfigDict, ValidationError
from pdf2image import convert_from_path
from openai import AsyncOpenAI
from tqdm.asyncio import tqdm_asyncio
import pandas as pd
from config import settings

In [86]:
openai_client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY)

In [87]:
def pdf_to_base64_images(pdf_path):
    imgs = convert_from_path(pdf_path, fmt='png')
    base64_imgs = []
    for image in imgs:
        buffered = BytesIO()
        image.save(buffered, format='PNG')
        img_str = base64.b64encode(buffered.getvalue()).decode()
        base64_imgs.append(img_str)
    
    return base64_imgs

In [88]:
SYSTEM_PROMPT = """You are an AI assistant specialized in extracting information from invoice images for the company "TechNova Solutions, Inc.". Your task is to analyze the given invoice image and extract the relevant information into a structured JSON format. If a field is not present in the invoice or cannot be determined leave it as null.

Extract the information into the following JSON format:
```json
{
  "invoice_number": "string",
  "invoice_date": "YYYY-MM-DD",
  "invoice_type": "incoming | outgoing",
  "issuer": {
    "name": "string",
    "address": "string",
    "phone": "string",
    "email": "string"
  },
  "recipient": {
    "name": "string",
    "address": "string",
    "phone": "string",
    "email": "string"
  },
  "invoice_items": [
    {
      "description": "string",
      "total": "number"
    }
  ],
  "subtotal": "number",
  "tax_rate": "number (percentage)",
  "tax": "number",
  "total": "number",
  "terms": "string"
}
```

For the property "invoice_type" set the value "incoming" or "outgoing" from the point of view of the company "TechNova Solutions, Inc.". If this company does not appear as issuer or recipient, set it to null.
"""

In [89]:
async def extract_invoice_data(base64_img):
    response = await openai_client.chat.completions.create(
        model='gpt-4o',
        response_format={ 'type': 'json_object' },
        messages=[
            {'role': 'system', 'content': SYSTEM_PROMPT},
            {'role': 'user', 'content': [
                {'type': 'image_url', 'image_url': {'url': f'data:image/png;base64,{base64_img}'}}
            ]}
        ],
        temperature=0.1,
    )
    return response.choices[0].message.content

In [90]:
async def extract_invoices_data(invoices_dir):
    invoices_filenames = []
    tasks = []
    print('Extracting the invoices data:')
    for filename in sorted(os.listdir(invoices_dir)):
        if filename.endswith('.pdf'):
            print(filename)
            invoices_filenames.append(filename)
            pdf_path = os.path.join(invoices_dir, filename)
            base64_images = pdf_to_base64_images(pdf_path)
            task = extract_invoice_data(base64_images[0])
            tasks.append(task)
    invoices_json = await tqdm_asyncio.gather(*tasks)
    return invoices_filenames, invoices_json

In [91]:
class InvoiceType(str, Enum):
    incoming = 'incoming'
    outgoing = 'outgoing'

class InvoiceItem(BaseModel):
    description: str
    total: float

class Company(BaseModel):
    name: str
    address: str
    phone: str | None = None
    email: str | None = None

class Invoice(BaseModel):
    model_config = ConfigDict(use_enum_values=True)
    
    invoice_number: str
    invoice_date: date
    invoice_type: InvoiceType | None = None
    issuer: Company
    recipient: Company
    invoice_items: list[InvoiceItem]
    subtotal: float
    tax_rate: float = 0
    tax: float = 0
    total: float
    terms: str | None = None

In [92]:
def normalize_invoice(invoice):
    flat = {
        'Invoice Number': invoice.invoice_number,
        'Invoice Date': invoice.invoice_date,
        'Invoice Type': invoice.invoice_type,
        'Issuer Name': invoice.issuer.name,
        'Issuer Address': invoice.issuer.address,
        'Issuer Phone': invoice.issuer.phone,
        'Issuer Email': invoice.issuer.email,
        'Recipient Name': invoice.recipient.name,
        'Recipient Address': invoice.recipient.address,
        'Recipient Phone': invoice.recipient.phone,
        'Recipient Email': invoice.recipient.email,
        'Subtotal': invoice.subtotal,
        'Tax Rate': invoice.tax_rate,
        'Tax': invoice.tax,
        'Total': invoice.total,
        'Terms': invoice.terms
    }
    
    for i, item in enumerate(invoice.invoice_items, 1):
        flat[f'Item {i} Description'] = item.description
        flat[f'Item {i} Total'] = item.total
    
    return flat

In [93]:
def process_invoices(invoices_filenames, invoices_json):
    normalized_invoices = []
    for filename, invoice_json in zip(invoices_filenames, invoices_json):
        try:
            invoice_data = json.loads(invoice_json)
            validated_invoice = Invoice(**invoice_data)
            normalized_invoice = normalize_invoice(validated_invoice)
            normalized_invoices.append(normalized_invoice)
        except json.JSONDecodeError as e:
            print(f'JSON parsing error in invoice {filename}: {e}')
        except ValidationError as e:
            print(f'Validation error in invoice {filename}: {e}')
        except Exception as e:
            print(f'Unexpected error in invoice {filename}: {e}')
    
    invoices_df = pd.DataFrame(normalized_invoices)
    invoices_df['Invoice Date'] = pd.to_datetime(invoices_df['Invoice Date'])
    invoices_df.insert(2, 'Year-Month', invoices_df['Invoice Date'].dt.to_period('M'))
    return invoices_df

In [100]:
def perform_financial_analysis(invoices_df):
    total_s = pd.Series(dtype='object')

    # Summary Analysis
    total_s['Period'] = f"{invoices_df['Invoice Date'].min().strftime('%Y-%m')} to {invoices_df['Invoice Date'].max().strftime('%Y-%m')}"
    total_s['Invoices'] = len(invoices_df)

    revenue = invoices_df[invoices_df['Invoice Type'] == 'outgoing']['Total'].sum()
    expenses = invoices_df[invoices_df['Invoice Type'] == 'incoming']['Total'].sum()
    total_s['Revenue'] = revenue
    total_s['Expenses'] = expenses
    total_s['Net Income'] = revenue - expenses

    total_tax_collected = invoices_df[invoices_df['Invoice Type'] == 'outgoing']['Tax'].sum()
    total_tax_paid = invoices_df[invoices_df['Invoice Type'] == 'incoming']['Tax'].sum()
    total_s['Tax Collected'] = total_tax_collected
    total_s['Tax Paid'] = total_tax_paid
    total_s['Net Tax'] = total_tax_collected - total_tax_paid

    top_customer = invoices_df[invoices_df['Invoice Type'] == 'outgoing'].groupby('Recipient Name')['Total'].sum().idxmax()
    top_provider = invoices_df[invoices_df['Invoice Type'] == 'incoming'].groupby('Issuer Name')['Total'].sum().idxmax()
    total_s['Top Customer'] = top_customer
    total_s['Top Provider'] = top_provider

    # Monthly Analysis
    monthly_df = invoices_df.groupby('Year-Month').agg(**{
        'Invoices': ('Invoice Number', 'count'),
        'Revenue': ('Total', lambda x: x[invoices_df['Invoice Type'] == 'outgoing'].sum()),
        'Expenses': ('Total', lambda x: x[invoices_df['Invoice Type'] == 'incoming'].sum()),
        'Tax Collected': ('Tax', lambda x: x[invoices_df['Invoice Type'] == 'outgoing'].sum()),
        'Tax Paid': ('Tax', lambda x: x[invoices_df['Invoice Type'] == 'incoming'].sum())
    })
    monthly_df = monthly_df.reset_index()
    monthly_df['Year-Month'] = monthly_df['Year-Month'].astype(str)
    monthly_df.insert(4, 'Net Income', monthly_df['Revenue'] - monthly_df['Expenses'])
    monthly_df['Net Tax'] = monthly_df['Tax Collected'] - monthly_df['Tax Paid']

    return total_s, monthly_df.T

In [111]:
def create_excel(invoices_df, total_s, monthly_df, filepath='data/invoices.xlsx'):
    with pd.ExcelWriter(filepath, engine='xlsxwriter', datetime_format='YYYY-MM-DD') as writer:
        # Write invoice data sheet
        invoices_df.to_excel(writer, sheet_name='Invoices', index=False)

        # Create analysis sheet
        workbook = writer.book
        worksheet = workbook.add_worksheet('Financial Analysis')

        # Define formats
        title_format = workbook.add_format({'bold': True, 'font_size': 14})
        money_format = workbook.add_format({'num_format': '#,##0.00'})

        # Write summary analysis
        worksheet.write('A1', 'TOTAL', title_format)
        total_s.to_frame().to_excel(writer, sheet_name='Financial Analysis', startrow=1, startcol=0, header=False)

        # Write monthly analysis
        worksheet.write('A14', 'MONTHLY', title_format)
        monthly_df.to_excel(writer, sheet_name='Financial Analysis', startrow=14, startcol=0, header=False)

        # Formatting
        for row in [3, 4, 5, 6, 7, 8, 16, 17, 18, 19, 20, 21]:
            worksheet.set_row(row, None, money_format)

        # Create chart
        chart = workbook.add_chart({'type': 'line'})

        num_cols = monthly_df.shape[1]
        for i, metric in enumerate(['Revenue', 'Expenses', 'Profit']):
            chart.add_series({
                'name': metric,
                'categories': ['Financial Analysis', 14, 1, 14, num_cols],
                'values': ['Financial Analysis', 16 + i, 1, 16 + i, num_cols],
            })

        chart.set_x_axis({'name': 'Month'})
        chart.set_y_axis({'name': 'Amount', 'major_gridlines': {'visible': False}})

        worksheet.insert_chart('A25', chart)

In [110]:
INVOICES_DIR = 'data/invoices'

invoices_filenames, invoices_json = await extract_invoices_data(INVOICES_DIR)
invoices_df = process_invoices(invoices_filenames, invoices_json)
summary_ds, monthly_df = perform_financial_analysis(invoices_df)
create_excel(invoices_df, summary_ds, monthly_df)