In [1]:
import os
import re
import keyword
import pandas as pd
from pydantic import BaseModel, Field
from helper import get_openai_api_key, get_llama_cloud_api_key_us
from llama_cloud_services import ExtractionAgent, LlamaExtract

In [2]:
llama_cloud_api_key_us = get_llama_cloud_api_key_us()
openai_api_key = get_openai_api_key()
# print(llama_cloud_api_key_us)
# print(openai_api_key)

In [4]:
def generate_schema_from_excel(excel_path: str):
    # Read the first sheet
    df = pd.read_excel(excel_path, usecols="A:C", header=None)
    df.columns = ["field_name", "description", "field_type"]

    # Drop rows where field_name is null
    df = df.dropna(subset=["field_name"])

    # Sanitize field names by removing spaces
    def sanitize_field_name(name):
        name = str(name).strip()
        name = re.sub(r'\W|^(?=\d)', '_', name)
        if keyword.iskeyword(name) or name == "":
            name = f"field_{name}"
        return name.lower()

    # Generate class definition for ProductCatalogueRow
    row_fields = ""
    for _, row in df.iterrows():
        raw_field_name = row['field_name']
        field_name = sanitize_field_name(raw_field_name)
        description = str(row['description']) if pd.notnull(row['description']) else ""
        field_type = str(row['field_type']).strip() if pd.notnull(row['field_type']) else "str"
        row_fields += f"    {field_name}: {field_type} = Field(description=\"{description}\")\n"

    schema_code = f"""
from pydantic import BaseModel, Field

class ProductCatalogueRow(BaseModel):
{row_fields}

class ProductCatalogue(BaseModel):
    rows: list[ProductCatalogueRow] = Field(description="A list of product price list entries")
"""
    
    exec(schema_code, globals())
    
    print(schema_code)
    return ProductCatalogue

In [11]:
from llama_cloud.types.extract_config import ExtractConfig

def extract_data_from_pdf(pdf_path: str, schema):
    extractor = LlamaExtract()
    config = ExtractConfig(extraction_mode="ACCURATE")

    agent = extractor.create_agent(
        name="idea",
        data_schema=schema,
        config=config
    )

    result = agent.extract(pdf_path)
    extractor.delete_agent(agent.id)
    return result

In [13]:
import json

def print_and_save_json(result, output_file="extracted_data.json"):
    # Print the JSON result
    print(json.dumps(result.data, indent=4, ensure_ascii=False))
    
    # Define the file path to store the JSON (if you want)
    output_file = "extracted_data.json"
    
    # Write the JSON result to `extracted_data.json` file
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(result.data, f, indent=4, ensure_ascii=False)
    
    print(f"JSON saved successfully to {output_file}")


In [14]:
def map_and_export_to_excel(result, mapping_excel_path: str, output_excel_path="import_template.xlsx"):
    mapping_df = pd.read_excel(mapping_excel_path, usecols="A,D,E", header=None)
    mapping_df.columns = ["json_key", "default_value", "final_column"]

    # Clean whitespace and lowercase for matching
    mapping_df["json_key"] = mapping_df["json_key"].astype(str).str.strip().str.lower()
    mapping_df["final_column"] = mapping_df["final_column"].astype(str).str.strip()

    # Drop mappings without a final column
    mapping_df = mapping_df.dropna(subset=["final_column"])

    # Build lookup dictionaries
    key_to_final_col = mapping_df.set_index("json_key")["final_column"].to_dict()
    key_to_default = mapping_df.set_index("json_key")["default_value"].to_dict()

    # Use in-memory JSON result.data
    raw_json_rows = result.data.get("rows", [])
    json_rows = [{k.lower(): v for k, v in row.items()} for row in raw_json_rows]

    # Prepare the output DataFrame 
    final_columns = mapping_df["final_column"].unique().tolist()
    output_df = pd.DataFrame(columns=final_columns)

    # Fill the rows
    for row in json_rows:
        new_row = {}
        for _, map_row in mapping_df.iterrows():
            json_key = map_row["json_key"]
            final_col = map_row["final_column"]
            default_value = map_row["default_value"]

            if json_key and json_key in row:
                value = row[json_key]
            else:
                value = default_value

            new_row[final_col] = value

        output_df.loc[len(output_df)] = new_row

    # Save final output 
    output_df.to_excel(output_excel_path, index=False)
    print(f"{output_excel_path} created with {len(output_df)} rows.")

In [None]:
schema = generate_schema_from_excel("data/Mapping_Table.xlsx")

In [None]:
result = extract_data_from_pdf("data/CatalogueSept2024.pdf", schema)

In [None]:
# print_and_save_json(result, "extracted_data.json")

In [None]:
map_and_export_to_excel(result, "data/Mapping_Table.xlsx", "import_template.xlsx")