<a href="https://colab.research.google.com/github/mightyoctopus/amazon-pricer-model-open-source-fine-tuned-models/blob/main/w7_d5_test_finetuned_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q --upgrade torch==2.5.1+cu124 torchvision==0.20.1+cu124 torchaudio==2.5.1+cu124 --index-url https://download.pytorch.org/whl/cu124
!pip install -q --upgrade requests==2.32.3 bitsandbytes==0.46.0 transformers==4.48.3 accelerate==1.3.0 datasets==3.2.0 peft==0.14.0 trl==0.14.0 matplotlib wandb

In [2]:
import os
import re
import math

from google.colab import userdata, drive
from huggingface_hub import login
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, set_seed
from datasets import load_dataset, Dataset, DatasetDict
from peft import PeftModel, PeftConfig, get_peft_model

from tqdm import tqdm
from datetime import datetime
import matplotlib.pyplot as plt


In [3]:
BASE_MODEL = "MightyOctopus/pricer-merged-model-A-v1"
### Add the original base model llama 3.1 for tokenizer
### since the merged base model above doesn't contain tokenizer.
BASE_MODEL_TOKENIZER = "meta-llama/Llama-3.1-8B"


### True if loading the fine tuned adaptor from Hugging Face, or False if the model from Google Drive
LOAD_FROM_HUGGING_FACE = False
LOAD_FROM_HF_WITHOUT_RUNNAME = True
REVISION = False

### Load a model from Hugging Face or Google Drive
if LOAD_FROM_HUGGING_FACE:
    HF_USER = "MightyOctopus"
    PROJECT_NAME = "product-pricer"
    RUN_NAME = "2025-11-29_11.36.40"
    PROJECT_RUN_NAME = f"{PROJECT_NAME}-{RUN_NAME}"
    FINETUNED_MODEL = f"{HF_USER}/{PROJECT_RUN_NAME}"
### I mistakenly missed applying a (datetime based) run name for this HF repo, so that's why I made this to get around with it
elif LOAD_FROM_HF_WITHOUT_RUNNAME:
    HF_USER = "MightyOctopus"
    MODEL_NAME = "pricer-lora-ft-v3"
    FINETUNED_MODEL = f"{HF_USER}/{MODEL_NAME}"
else:
    PROJECT_NAME = "product-pricer"
    RUN_NAME = "2025-12-13_12.35.49"
    PROJECT_RUN_NAME = f"{PROJECT_NAME}-{RUN_NAME}"
    CHECKPOINT_NAME = "checkpoints/checkpoint-4500"
    FINETUNED_MODEL = f"/content/drive/MyDrive/{PROJECT_RUN_NAME}/{CHECKPOINT_NAME}"


DATASET_NAME = f"MightyOctopus/amazon-pricer-dataset-v2-0"

QUANT_4_BIT = True

%matplotlib inline


# Used for writing to output in color
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
RESET = "\033[0m"
COLOR_MAP = {"red":RED, "orange": YELLOW, "green": GREEN}

In [None]:
drive.mount("/content/drive")

In [None]:
print(os.listdir("/content/drive/MyDrive/product-pricer/product-pricer-2025-12-13_12.35.49/checkpoints/checkpoint-4500/"))

In [None]:
hf_token = os.getenv("HF_TOKEN")
login(hf_token, add_to_git_credential=True)

In [None]:
dataset = load_dataset(DATASET_NAME)
train = dataset["train"]
test = dataset["test"]

In [None]:
test[0]

## Load the tokenizer and models

In [9]:
### Pick the right quantization config for preference (4 or 8 bit)
if QUANT_4_BIT:
    quant_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16
    )
else:
    quant_config = BitsAndBytesConfig(
        load_in_8bit=True,
        bnb_4bit_compute_dtype=torch.bfloat16
    )

In [None]:
tokenizer = AutoTokenizer.from_pretrained(
    BASE_MODEL_TOKENIZER,
    trust_remote_code=True
)

tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"

base_model = AutoModelForCausalLM.from_pretrained(
    BASE_MODEL,
    quantization_config=quant_config,
    trust_remote_code=True,
    device_map="cuda"
)
base_model.generation_config.pad_token_id = tokenizer.pad_token_id

### Load the fine tuned model with PEFT
if REVISION:
    fine_tuned_model = PeftModel.from_pretrained(
        base_model,
        FINETUNED_MODEL,
        revision=REVISION
    )
else:
    fine_tuned_model = PeftModel.from_pretrained(
        base_model,
        FINETUNED_MODEL,
    )


print(f"Memory Footprint: {fine_tuned_model.get_memory_footprint() / 1e9:.1f} GB")

In [None]:
def extract_price(s):
    filter_phrase = "Price is $"
    if filter_phrase in s:
        content = s.split(filter_phrase)[1]
        content = content.replace(",", "")
        match = re.search(r"[-+]?\d*\.\d+|\d+", content)
        return float(match.group()) if match else 0
    return 0

In [None]:
extract_price("Price is $a fabulous 899.99 or so")

In [None]:
# 1. take an input and tokenize it by cuda
# 2. set attention mask on it
# 3. generate output
# 4. decode the output to the natural language
def model_predict(prompt):
    set_seed(42)
    inputs = tokenizer.encode(prompt, return_tensors="pt").to("cuda")
    attention_mask = torch.ones(inputs.shape, device="cuda")
    outputs = fine_tuned_model.generate(
        inputs,
        attention_mask=attention_mask,
        max_new_tokens=3,
        num_return_sequences=1
        )
    response = tokenizer.decode(outputs[0])

    return extract_price(response)


In [11]:
def extract_number_from_token(s):
    """
    Extracts the first number (int or float) from a decoded token string.
    Returns None if no concrete number exists. (e.g. None, characters, symbols etc)
    """

    match = re.search(r"[-+]?\d*\.\d+|\d+", s)

    if match:
        return float(match.group())

    return None

top_K = 3

def improved_model_predict(prompt, device="cuda", temperature=1):
    """
    Predict a price by examining top-K next token candidates and a probability weighted
    average of their numeric values.

    Steps:
      1. Encode the prompt and run a forward pass to obtain next token logits.
      2. Convert logits to probabilities using softmax.
      3. Extract the top-K most probable token predictions.
      4. Decode tokens and keep only numeric ones.
      5. Compute a probability-weighted average of the numeric token values.

    Returns:
        A single floating-point price prediction.
    """
    set_seed(42)

    inputs = tokenizer.encode(prompt, return_tensors="pt").to(device) ## Performance critical -- forward pass needs GPU
    attention_mask = torch.ones(inputs.shape, device=device)

    with torch.no_grad():
        outputs  = fine_tuned_model(inputs, attention_mask=attention_mask)
        next_token_logits = outputs.logits[:, -1, :].cpu() ## Early transfer to the CPU after logit processing to process python work flow (iteration and float)

    ### Apply temperature:
    if temperature != 1.0:
        next_token_logits = next_token_logits / temperature

    next_token_prob = F.softmax(next_token_logits, dim=-1)
    # print("NEXT_TOKEN_PROBABILITY: ", next_token_prob)
    # print("TOP K RAW: ", next_token_prob.topk(top_K))
    top_prob, top_token_id = next_token_prob.topk(top_K)

    # print("TOP_PROB: ", top_prob)
    # print("TOP_TOKEN_ID: ", top_token_id)

    prices, weights = [], []

    for i in range(top_K):
        predicted_token = tokenizer.decode(top_token_id[0][i])
        # print("PREDICTED_TOKEN: : ", predicted_token)
        probability = top_prob[0][i].item()

        price = extract_number_from_token(predicted_token)

        if price is not None and price > 0:
            prices.append(price)
            weights.append(probability)

    if not prices:
        return 0.0

    total_weights = sum(weights)
    weighted_prices = [price * (weight / total_weights) for price, weight in zip(prices, weights)]

    return sum(weighted_prices)

In [12]:
class Tester:

    def __init__(self, predictor, data, title=None, size=250):
        self.predictor = predictor
        self.data = data
        self.title = title or predictor.__name__.replace("_", " ").title()
        self.size = size
        self.guesses = []
        self.truths = []
        self.errors = []
        self.sles = []
        self.colors = []

    def color_for(self, error, truth):
        if error<40 or error/truth < 0.2:
            return "green"
        elif error<80 or error/truth < 0.4:
            return "orange"
        else:
            return "red"

    def run_datapoint(self, i):
        datapoint = self.data[i]
        guess = self.predictor(datapoint["text"])
        truth = datapoint["price"]
        error = abs(guess - truth)
        log_error = math.log(truth+1) - math.log(guess+1)
        sle = log_error ** 2
        color = self.color_for(error, truth)
        title = datapoint["text"].split("\n\n")[1][:20] + "..."
        self.guesses.append(guess)
        self.truths.append(truth)
        self.errors.append(error)
        self.sles.append(sle)
        self.colors.append(color)
        print(f"{COLOR_MAP[color]}{i+1}: Guess: ${guess:,.2f} Truth: ${truth:,.2f} Error: ${error:,.2f} SLE: {sle:,.2f} Item: {title}{RESET}")

    def chart(self, title):
        max_error = max(self.errors)
        plt.figure(figsize=(12, 8))
        max_val = max(max(self.truths), max(self.guesses))
        plt.plot([0, max_val], [0, max_val], color='deepskyblue', lw=2, alpha=0.6)
        plt.scatter(self.truths, self.guesses, s=3, c=self.colors)
        plt.xlabel('Ground Truth')
        plt.ylabel('Model Estimate')
        plt.xlim(0, max_val)
        plt.ylim(0, max_val)
        plt.title(title)
        plt.show()

    def report(self):
        average_error = sum(self.errors) / self.size
        rmsle = math.sqrt(sum(self.sles) / self.size)
        hits = sum(1 for color in self.colors if color=="green")
        title = f"{self.title} Error=${average_error:,.2f} RMSLE={rmsle:,.2f} Hits={hits/self.size*100:.1f}%"
        self.chart(title)

    def run(self):
        self.error = 0
        for i in range(self.size):
            self.run_datapoint(i)
        self.report()

    @classmethod
    def test(cls, function, data):
        cls(function, data).run()

In [None]:
Tester.test(model_predict, test)

In [None]:
Tester.test(improved_model_predict, test)