In [None]:
import pandas as pd
import numpy as np

from utils.dataset import capture_screenshots_async

In [None]:
# create a dataset
# get screenshot of webpage of provided URL and save screenshot
# add path in 'img_path' column

data = pd.read_excel("./data/Price_Normalization_Dataset.xlsx")

dataset = await capture_screenshots_async(data)

cols_to_fix = ["Explictness","Step1_desired_unit", "Step1_desired_value"]

dataset[cols_to_fix] = dataset[cols_to_fix].where(
    pd.notna(dataset[cols_to_fix]),
    None
)
dataset.to_excel("./dataset/dataset.xlsx")

### STEP1
#### Infer quantity and unit from webpage, with evidence

In [None]:
# pydantic models
# step1output model

from pydantic import BaseModel
from typing import Optional, Literal, List

class Evidence(BaseModel):
    source: Literal["price_label", "specification_table", "more_information","calculator box"]
    text: str

class Quantity(BaseModel):
    value: float
    unit: str


class Step1Output(BaseModel):
    priced_quantity: Optional[Quantity]
    explicitness: Literal["direct", "indirect", "none"]
    confidence: float
    evidence: List[Evidence]
    notes: str


In [None]:
SYSTEM_PROMPT = """You are a pricing quantity inference engine.

Your task is to identify the quantity that the listed price applies to,
using ONLY explicit seller-visible information from the webpage.

Allowed evidence:
- Price labels or text directly adjacent to the price
- Specification tables
- Explicit product labels stating quantity or unit

Do NOT:
- infer from typical product sizes
- assume standard lengths
- perform unit conversion
- perform calculations
- if unit is not visible do not infer or assume any unit
- treat cart quantity selectors, add-to-cart counters, or default quantity values (e.g. "Quantity: 1") as a pricing unit.

Do:
- Unit must be in full from
- Convert millimeter to meters
- If unit is not visible directlty adjacent to main price label set explicitness to "indirect"
- If no standard unit (length, weight, area, volume, pack size) is explicitly stated,
set explicitness to "none".

Return ONLY a JSON object that matches the provided schema.
Billing accuracy is required."""


USER_PROMPT = 'screenshot : '

In [64]:
# load model
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
load_dotenv()

model = ChatOpenAI(model="gpt-4o-mini")

In [None]:
import base64
from langchain_core.messages import HumanMessage, SystemMessage
from openai import OpenAIError


model_with_structured_output = model.with_structured_output(Step1Output)


# llm function
async def infer_from_webpage(image_path : str) -> Step1Output:

    '''input : an image'''

    try:
        with open(image_path, "rb") as image_file:
            encoded_image = base64.b64encode(image_file.read()).decode("utf-8")

            human_msg = HumanMessage([
                    {"type": "text", "text": USER_PROMPT},
                    {
                        "type": "image",
                        "base64": encoded_image,
                        "mime_type": "image/png",
                    },
                ])
            
            system_msg = SystemMessage(SYSTEM_PROMPT)

            response = await model_with_structured_output.ainvoke([system_msg,human_msg])

            return response
        
    except OpenAIError as e:
        raise RuntimeError(f"OpenAI API error: {e}") from e

    except Exception as e:
        raise RuntimeError(f"Inference failed: {e}") from e


In [None]:
# sample record
import pandas as pd
dataset = pd.read_excel('./dataset/dataset.xlsx')
dataset = dataset.rename(columns={'Unit\n':'TargetUnit','Rate Â£':'ExtractedPrice'})

sample = dataset.loc[3]

In [None]:
# unit test infer_from_webpage
image_path = sample['img_path']

response = await infer_from_webpage(image_path)
response

In [None]:
# function to run on multiple records
# input a dataset dataframe
# iterate and perform infer from webpage

import asyncio
import pandas as pd

async def run_step1_on_dataset(
    df: pd.DataFrame,
    semaphore_limit: int = 5,
) -> pd.DataFrame:
    

    df = df.copy()
    df["Output"] = None

    semaphore = asyncio.Semaphore(semaphore_limit)


    async def run_one(index, row):
        async with semaphore:
            try:
                output: Step1Output = await infer_from_webpage(
                    image_path=row["img_path"],
                )
                return index, output.model_dump_json()

            except Exception as e:
                # Store error as string (important for dataset audit)
                return index, f"ERROR: {str(e)}"

    tasks = [
        run_one(idx, row)
        for idx, row in df.iterrows()
    ]

    results = await asyncio.gather(*tasks)

    for idx, result in results:
        df.at[idx, "Output"] = result

    return df

In [None]:
# test on multiple records

dataset = pd.read_excel("./dataset/dataset.xlsx")
result_df = await run_step1_on_dataset(dataset[-2:])
result_df.to_excel('./result/result5.xlsx')

## Evaluation

### STEP2

In [None]:
# run single sample
image_path = sample['img_path']
step1_output = await infer_from_webpage(image_path)
step1_output


In [None]:
# Case2 : inferred and target units are differet
# LLM model should able to find a relation between this units from specs , more info tables
# For Example Litre to Kg conversion, denisty of the liquid would help in creating a formula for conversion

  
SYSTEM_PROMPT_2 = """You are a pricing conversion engine.

Your task is to generate a MATHEMATICAL EXPRESSION to convert a price
from the priced quantity unit to the target unit.

You are provided with:
- extracted_price
- priced_quantity (value and unit) from Step 1
- target_unit
- seller-visible evidence texts
- a screenshot of the webpage

IMPORTANT:
- You must NOT re-infer the priced quantity or unit.
- You must NOT re-evaluate pricing labels.
- You must ONLY use the screenshot to find explicit numeric conversion facts
  (e.g. weight per metre, area per pack).

STRICT RULES:
- Use ONLY numeric values that are explicitly visible in the screenshot or evidence.
- Do NOT assume material properties or typical values.
- Do NOT invent constants.
- Do NOT calculate numeric results.
- Do NOT simplify expressions.
- If no explicit conversion relationship is visible, return FAIL.

VARIABLE RULES:
- Use the variable name `price` for the extracted price.
- All other values must be numeric literals taken directly from the screenshot.

ALLOWED OPERATIONS:
- +  -  *  /
- parentheses ( )

OUTPUT FORMAT:
- Return ONLY:
  - a valid math expression (e.g. `price * (1000 / 3.85)`), OR
  - the single word: FAIL

Billing accuracy is required.
"""

In [None]:
step1_output_json = step1_output.model_dump()
step1_output_json

In [None]:
sample

In [None]:
ExtractedPrice = "3.45"
TargetUnit = sample['TargetUnit']
PricedQuantity = step1_output_json.get('priced_quantity')


In [61]:
llm_input = {'ExtractedPrice':ExtractedPrice,
             'TargetUnit': TargetUnit,
             'PricedQuantity': PricedQuantity}

llm_input

{'ExtractedPrice': 3.45,
 'TargetUnit': 't',
 'PricedQuantity': {'value': 1.0, 'unit': 'm'}}

In [None]:
USER_PROMPT = """Extracted price : {ExtractedPrice} \nPriced quantity : {PricedQuantity} \nTarget Unit : {TargetUnit}
Screenshot : """

prompt = USER_PROMPT.format(**llm_input)

In [None]:
print(prompt)

In [None]:
import base64

with open(image_path, "rb") as image_file:
    encoded_image = base64.b64encode(image_file.read()).decode("utf-8")

In [54]:
encoded_image

'iVBORw0KGgoAAAANSUhEUgAAB4AAAA+7CAIAAAD9kmSYAAAQAElEQVR4nOydBWDUSBfHs9atu7dQKIUWLVK8uLu7Hu7u7nKHu7u7u2txd1qk7q5r30tmN03XWKTAB+93vTA7mUwmM8kk+c/LG6FCoaAQBEEQBEEQBEEQBEEQBEEQ5EcjpBAEQRAEQRAEQRAEQRAEQRAkF0ABGkEQBEEQBEEQBEEQBEEQBMkVUIBGEARBEARBEARBEARBEARBcgUUoBEEQRAEQRAEQRAEQRAEQZBcAQVoBEEQBEEQBEEQBEEQBEEQJFdAARpBEARBEARBEARBEARBEATJFVCARhAEQRAEQRAEQRAEQRAEQXIFFKARBEEQBEEQBEEQBEEQBEGQXAEFaARBEARBEARBEARBEARBECRXQAEaQRAEQRAEQRAEQRAEQRAEyRVQgEYQBEEQBEEQBEEQBEEQBEFyBRSgEQRBEARBEARBEARBEARBkFwBBWgEQRAEQRAEQRAEQRAEQRAkV0ABGkEQBEGQr2D32S0XXmymjNKFRnLq74DH4zFLPvzD4zNLHp8N8ukwxeczS06YTk+jfS2fkwObc8409PZqe1H94FMKVRruWh5PrqAbRSGXK5RLBcAE6XiyViOsYBIpFNrSlMlTv55vV5FQTKrizbugZp0GMAkUFIIg/59YmJvlcXcZ3r97tcplKQRBEARBkNyHx7yWIAiCIAiCfAGJVDpl/YA4wUPqb4JHVGBY8pSKsAqlEq38RSkX6mk4azXCKsVZexrlPimuHq2RA1sGpdYMMjKjODP/KFT/KJVoJTlT0oqz1jRyRWx89LIe17i1Uatp1+CwSApBkD+CAT07Du7TRSDgUwiCIAiCILkJPm0gCIIgCGIQJ64f/PvUZ2JlzGe03hxhHhumjZDpf0hYlYZP4vmMXkxS8lRbUdlhAaWWW3ZYqSzTaYiNc461fD5rkU3UZ7mc0ZvlxN5ZQRsxkxiZgiIr6bVKS2d

In [65]:
human_msg = HumanMessage([
                    {"type": "text", "text": prompt},
                    {
                        "type": "image",
                        "base64": encoded_image,
                        "mime_type": "image/png",
                    },
                ])
            
system_msg = SystemMessage(SYSTEM_PROMPT_2)

response = await model.ainvoke([system_msg,human_msg])

In [66]:
response

AIMessage(content='`price * (0.89 / 1)`', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 11, 'prompt_tokens': 48495, 'total_tokens': 48506, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_3683ee3deb', 'id': 'chatcmpl-D3HTIFLNZfUKBZpPhqgeV7EG9KcO7', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='lc_run--019c08d2-1cae-7070-bdae-bd588652fa30-0', tool_calls=[], invalid_tool_calls=[], usage_metadata={'input_tokens': 48495, 'output_tokens': 11, 'total_tokens': 48506, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})

In [57]:
print(response.content)

FAIL


### STEP3

In [None]:
import ast
import operator as op


# Allowed operators
_ALLOWED_OPERATORS = {
    ast.Add: op.add,
    ast.Sub: op.sub,
    ast.Mult: op.mul,
    ast.Div: op.truediv,
}


class MathExpressionError(Exception):
    pass


def evaluate_math_expression(expression: str) -> float:
    """
    Safely evaluate a mathematical expression consisting of
    numbers, + - * / and parentheses.

    Args:
        expression (str): math expression, e.g. "(1000 / 3.85)"

    Returns:
        float: evaluated result

    Raises:
        MathExpressionError: if expression is invalid or unsafe
    """

    try:
        parsed = ast.parse(expression, mode="eval")
        return _eval_node(parsed.body)

    except Exception as e:
        raise MathExpressionError(f"Invalid math expression: {expression}") from e


def _eval_node(node):
    if isinstance(node, ast.Num):  # Python <3.8
        return node.n

    if isinstance(node, ast.Constant):  # Python 3.8+
        if isinstance(node.value, (int, float)):
            return node.value
        raise MathExpressionError("Only numeric constants allowed")

    if isinstance(node, ast.BinOp):
        if type(node.op) not in _ALLOWED_OPERATORS:
            raise MathExpressionError(f"Operator {type(node.op)} not allowed")

        left = _eval_node(node.left)
        right = _eval_node(node.right)

        return _ALLOWED_OPERATORS[type(node.op)](left, right)

    if isinstance(node, ast.UnaryOp):
        if isinstance(node.op, ast.USub):
            return -_eval_node(node.operand)
        raise MathExpressionError("Unary operator not allowed")

    raise MathExpressionError(f"Unsupported expression element: {type(node)}")


In [49]:
price = 10
result = evaluate_math_expression(f"{price} * (1 / 0.89)")
print(result)

11.235955056179776


  if isinstance(node, ast.Num):  # Python <3.8
  return node.n
