In [1]:
!pip install openai



In [2]:
# Required packages to install
# pip install openai requests pydantic

# Required imports
import json
import requests
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from openai import OpenAI  # For chat completions API

# Initialize the OpenAI client
client = OpenAI(api_key="your-openai-api-key")  

# For the vision model, you need either:
# Option 1: If using OpenAI's Vision model
# (already included in the OpenAI package)

# Option 2: If using another vision model like Google's Vertex AI
# pip install google-cloud-aiplatform
# from vertexai.vision_models import ImageCaptioningModel
# vision_model = ImageCaptioningModel.from_pretrained("your-model-name")

In [3]:
# Define the schema for product search parameters
class AmazonSearchQuery(BaseModel):
    product_name: str = Field(..., description="Main product name extracted from the image")
    brand: Optional[str] = Field(None, description="Brand name if visible in the image")
    product_size: Optional[str] = Field(None, description="Size/amount/weight information (e.g., '16oz', '500ml')")
    identifiers: Optional[Dict[str, str]] = Field(None, description="Product identifiers such as UPC, EAN, ASIN if visible")
    key_features: Optional[List[str]] = Field(None, description="Key product features or descriptors visible in the image")
    
# Define the schema for Amazon search results verification
class ProductVerification(BaseModel):
    match_found: bool = Field(..., description="Whether a matching product was found on Amazon")
    confidence_score: float = Field(..., description="Confidence score for the match (0-1)")
    matched_product: Optional[Dict] = Field(None, description="Details of the matched product")
    discrepancies: Optional[List[str]] = Field(None, description="List of discrepancies between image and found product")
    amazon_link: Optional[str] = Field(None, description="Link to the Amazon product page if found")

In [4]:
def amazon_search(search_parameters):
    """
    Search Amazon for products matching parameters extracted from a product image.
    
    Input: Structured data from VLM image analysis
    Output: Top matching Amazon products with detailed information
    """
    import requests
    
    api_key = "FB0C32E1DCB3433C997C0A0FB1E70608"
    base_url = "https://api.rainforestapi.com/request"
    
    # Configure request parameters
    params = {
        "api_key": api_key,
        "amazon_domain": "amazon.com"
    }
    
    # Determine search approach based on available information
    if "asin" in search_parameters and search_parameters["asin"]:
        # Direct ASIN lookup if available
        params["type"] = "product"
        params["asin"] = search_parameters["asin"]
    elif "upc" in search_parameters or "ean" in search_parameters or "isbn" in search_parameters:
        # GTIN/UPC/EAN lookup
        params["type"] = "product"
        params["gtin"] = search_parameters.get("upc") or search_parameters.get("ean") or search_parameters.get("isbn")
    else:
        # Text-based search using extracted information
        params["type"] = "search"
        
        # Construct search term from product name and attributes
        search_term = search_parameters["product_name"]
        if "brand" in search_parameters:
            search_term = f"{search_parameters['brand']} {search_term}"
        if "attributes" in search_parameters:
            for key, value in search_parameters["attributes"].items():
                search_term += f" {value}"
        
        params["search_term"] = search_term
    
    # Make API request
    try:
        response = requests.get(base_url, params=params)
        response.raise_for_status()  # Raise exception for HTTP errors
        data = response.json()
        
        # Process results based on request type
        results = []
        if params["type"] == "product":
            if "product" in data:
                product = data["product"]
                results.append({
                    "asin": product.get("asin"),
                    "title": product.get("title"),
                    "brand": product.get("brand"),
                    "price": product.get("buybox_winner", {}).get("price"),
                    "rating": product.get("rating"),
                    "ratings_total": product.get("ratings_total"),
                    "main_image": product.get("main_image", {}).get("link"),
                    "link": product.get("link"),
                    "dimensions": product.get("dimensions"),
                    "weight": product.get("weight")
                })
        elif params["type"] == "search":
            for item in data.get("search_results", [])[:5]:  # Limit to top 5 results
                results.append({
                    "asin": item.get("asin"),
                    "title": item.get("title"),
                    "link": item.get("link"),
                    "image": item.get("image"),
                    "price": item.get("price")
                })
        
        return {
            "query_info": search_parameters,
            "results": results
        }
    except Exception as e:
        # Handle any errors
        return {
            "query_info": search_parameters,
            "error": str(e),
            "results": []
        }

In [5]:
def extract_product_details_from_image_mixtral(image_path, client, model_id="accounts/fireworks/models/mixtral-8x7b-instruct", timing=None):
    """
    Extract product details from an image using document inlining and a specified model.
    
    Parameters:
    image_path (str): Path to the product image file
    client: API client (OpenAI or compatible)
    model_id (str): Model ID to use for extraction
    timing (dict, optional): Timing dictionary to update with performance metrics
    
    Returns:
    dict: Extracted product details
    """
    import base64
    import time
    import json
    
    # Track timing if requested
    if timing is not None:
        image_prep_start = time.time()
    
    # Prepare image for document inlining
    with open(image_path, "rb") as image_file:
        image_content = image_file.read()
        
    base64_image = base64.b64encode(image_content).decode('utf-8')
    
    # Determine MIME type based on file extension
    if image_path.lower().endswith('.jpg') or image_path.lower().endswith('.jpeg'):
        mime_type = "image/jpeg"
    elif image_path.lower().endswith('.png'):
        mime_type = "image/png"
    else:
        mime_type = "image/jpeg"  # Default to JPEG
    
    # Create image URL with document inlining transform
    image_url = f"data:{mime_type};base64,{base64_image}#transform=inline"
    
    if timing is not None:
        timing['image_preparation'] = time.time() - image_prep_start
        extract_start = time.time()
    
    # Extract product details from image using document inlining
    extract_messages = [
        {"role": "system", "content": "Extract detailed product information from the image. Include product name, brand, size/weight, and any visible identifiers like UPC or model numbers."},
        {"role": "user", "content": [
            {"type": "image_url", "image_url": {"url": image_url}},
            {"type": "text", "text": "What product is shown in this image? Extract all visible details including product name, brand, size/weight, and any identifiers."}
        ]}
    ]
    
    # Use document inlining to extract product details with JSON output
    extract_response = client.chat.completions.create(
        model=model_id,
        messages=extract_messages,
        response_format={"type": "json_object", "schema": AmazonSearchQuery.model_json_schema()},
        max_tokens=1000
    )
    
    # Parse extracted product details
    product_details = json.loads(extract_response.choices[0].message.content)
    
    if timing is not None:
        timing['product_extraction'] = time.time() - extract_start
    
    return product_details

In [6]:
def extract_product_details_from_image_firesearch(image_path, client, timing=None):
    """
    Extract product details from an image using the firesearch-ocr-v6 model.
    
    Parameters:
    image_path (str): Path to the product image file
    client: API client (OpenAI compatible)
    timing (dict, optional): Timing dictionary to update with performance metrics
    
    Returns:
    dict: Extracted product details
    """
    import base64
    import time
    import json
    
    # Track timing if requested
    if timing is not None:
        image_prep_start = time.time()
    
    # Encode image to base64
    with open(image_path, "rb") as image_file:
        base64_image = base64.b64encode(image_file.read()).decode('utf-8')
    
    # Determine MIME type based on file extension
    if image_path.lower().endswith('.jpg') or image_path.lower().endswith('.jpeg'):
        mime_type = "image/jpeg"
    elif image_path.lower().endswith('.png'):
        mime_type = "image/png"
    else:
        mime_type = "image/jpeg"  # Default to JPEG
        
    if timing is not None:
        timing['image_preparation'] = time.time() - image_prep_start
        extract_start = time.time()
    
    # Extract product details using firesearch-ocr-v6 model
    extract_messages = [
        {"role": "system", "content": "You are an OCR and product information extraction assistant. Extract detailed product information from the image. Include product name, brand, size/weight, and any visible identifiers like UPC, EAN, or model numbers."},
        {"role": "user", "content": [
            {"type": "text", "text": "Extract all product information visible in this image. Include the product name, brand, size/weight, and any identifiers like UPC, EAN, or model numbers. Format your response as JSON."},
            {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{base64_image}"}}
        ]}
    ]
    
    # Make API call to firesearch-ocr model
    extract_response = client.chat.completions.create(
        model="accounts/fireworks/models/phi-3-vision-128k-instruct",
        messages=extract_messages,
        response_format={"type": "json_object", "schema": AmazonSearchQuery.model_json_schema()},
        max_tokens=1000
    )
    
    # Parse extracted product details
    product_details = json.loads(extract_response.choices[0].message.content)
    
    if timing is not None:
        timing['product_extraction'] = time.time() - extract_start
    
    return product_details

In [14]:
def process_product_image(image_path, api_key):
    """
    Process a product image using document inlining and JSON mode
    
    Parameters:
    image_path (str): Path to the product image file
    api_key (str): Fireworks API key
    
    Returns:
    dict: Product verification results
    """
    import time
    import json
    
    # Track timing information
    timing = {}
    start_total = time.time()
    
    print(f"\n🔍 Processing image: {image_path}...")
    
    # Initialize client
    client_start = time.time()
    client = OpenAI(
        base_url="https://api.fireworks.ai/inference/v1",
        api_key=api_key
    )
    timing['client_initialization'] = time.time() - client_start
    
    # Extract product details from the image
    extract_start = time.time()
    print(f"📷 Analyzing image with AI... ")
    product_details = extract_product_details_from_image_mixtral(
        image_path=image_path,
        client=client,
        timing=timing
    )
    extract_time = time.time() - extract_start
    
    # Display extracted information
    print(f"✅ Image analyzed in {extract_time:.2f}s")
    print(f"   Product: {product_details.get('brand', 'Unknown brand')} {product_details.get('product_name', 'Unknown product')}")
    if product_details.get('product_size'):
        print(f"   Size: {product_details.get('product_size')}")
    
    # Execute Amazon search using Rainforest API
    search_start = time.time()
    print(f"\n🔎 Searching Amazon for matching products...")
    amazon_results = amazon_search(product_details)
    search_time = time.time() - search_start
    print(f"✅ Amazon search completed in {search_time:.2f}s")
    timing['amazon_search'] = search_time
    
    # Verify product match with LLM using JSON mode
    verify_start = time.time()
    print(f"\n⚖️ Verifying matches...")
    verification_messages = [
        {"role": "system", "content": "Analyze the Amazon product results and determine how well they match the original product details. Assign a confidence score (0-1) to each result based on how closely it matches."},
        {"role": "user", "content": f"Compare these product details:\n\nOriginal product: {json.dumps(product_details)}\n\nAmazon results: {json.dumps(amazon_results)}\n\nFor each result, provide a confidence score and list any discrepancies. Rank them from best match to worst."}
    ]
    
    verification_response = client.chat.completions.create(
        model="accounts/fireworks/models/mixtral-8x7b-instruct",
        messages=verification_messages,
        response_format={"type": "json_object"},
        max_tokens=1000
    )
    
    # Parse verification results
    llm_verification = json.loads(verification_response.choices[0].message.content)
    verification_time = time.time() - verify_start
    print(f"✅ Verification completed in {verification_time:.2f}s")
    timing['verification'] = verification_time
    
    # Process and enhance the results with direct information from the Amazon API
    processing_start = time.time()
    print(f"\n🏁 Processing final results...")
    
    # Create the final output structure
    final_results = {
        "match_found": False,
        "top_match": None,
        "discrepancies": [],
        "timing": timing
    }
    
    # If we have matches to process
    if amazon_results.get("results") and len(amazon_results["results"]) > 0:
        # Extract confidence scores from LLM verification
        matches = []
        for idx, result in enumerate(amazon_results["results"]):
            # Get or estimate the confidence score 
            confidence = 0.0
            discrepancies = []
            
            # Try to find this result in the LLM verification
            if "ranked_matches" in llm_verification:
                for match in llm_verification["ranked_matches"]:
                    if match.get("asin") == result.get("asin"):
                        confidence = match.get("confidence_score", 0.0)
                        discrepancies = match.get("discrepancies", [])
                        break
            
            matches.append({
                "confidence_score": confidence,
                "product_data": result,
                "discrepancies": discrepancies
            })
        
        # Sort matches by confidence score in descending order
        matches.sort(key=lambda x: x["confidence_score"], reverse=True)
        
        # Set match_found if we have at least one high-confidence match
        if matches and matches[0]["confidence_score"] >= 0.7:
            final_results["match_found"] = True
            
        # Add only the top match to final results
        if matches:
            top_match = matches[0]
            match_info = {
                "title": top_match["product_data"].get("title", ""),
                "asin": top_match["product_data"].get("asin", ""),
                "price": top_match["product_data"].get("price", ""),
                "discrepancies": top_match["discrepancies"],
                "link": top_match["product_data"].get("link", ""),
                "image": top_match["product_data"].get("main_image", top_match["product_data"].get("image", ""))
            }
            
            # Add additional fields if available
            for field in ["brand", "rating", "ratings_total", "dimensions", "weight"]:
                if field in top_match["product_data"] and top_match["product_data"][field]:
                    match_info[field] = top_match["product_data"][field]
                    
            final_results["top_match"] = match_info
            
        # Add overall discrepancies from LLM
        if "general_discrepancies" in llm_verification:
            final_results["discrepancies"] = llm_verification["general_discrepancies"]
    
    timing['results_processing'] = time.time() - processing_start
    timing['total_time'] = time.time() - start_total
    
    # Print final match result
    if final_results["top_match"]:
        print("\n✨ Found a match on Amazon!")
        print(f"   Title: {final_results['top_match']['title']}")
        if 'brand' in final_results['top_match']:
            print(f"   Brand: {final_results['top_match']['brand']}")
        if 'price' in final_results['top_match']:
            print(f"   Price: {final_results['top_match']['price']['raw']}")
        print(f"   Link: {final_results['top_match']['link']}")
    else:
        print("\n❌ No suitable match found on Amazon")
    
    # Print overall timing
    print(f"\n⏱️ Total processing time: {timing['total_time']:.2f} seconds")
    
    return final_results

In [15]:
results = process_product_image("tea.png", "fw_3ZYLXEz2c3YWcN3CAkyaVedA")
print(json.dumps(results, indent=2))


🔍 Processing image: tea.png...
📷 Analyzing image with AI... 
✅ Image analyzed in 2.06s
   Product: ITO EN Golden Oolong Tea
   Size: 16.9 fl oz (500 mL)

🔎 Searching Amazon for matching products...
✅ Amazon search completed in 11.94s

⚖️ Verifying matches...


JSONDecodeError: Expecting ',' delimiter: line 3 column 19 (char 1531)

## Amazon Search Test

In [9]:
# Sample input parameters (as if from VLM analyzing a product image)
search_parameters = {
    "product_name": "Pro-V Daily Moisture Renewal Shampoo",
    "brand": "Pantene",
    "attributes": {
        "size": "25.4 fl oz",
        "color": "white bottle with gold cap",
        "description": "For dry hair"
    },
}

In [10]:
# # Import necessary libraries
# import json
# import requests

# # Call the function with sample parameters
# results = amazon_search(search_parameters)

# # Print the results in a readable format
# print(json.dumps(results, indent=2))

In [11]:
import requests
import json

def simple_rainforest_test(api_key):
    """
    Simple test of the Rainforest API using a basic search query
    """
    # Base URL for Rainforest API
    base_url = "https://api.rainforestapi.com/request"
    
    # Simple search parameters
    params = {
        "api_key": api_key,
        "type": "search",
        "amazon_domain": "amazon.com",
        "search_term": "iPhone charger",  # Simple, common product
        "sort_by": "featured"
    }
    
    # Make the request
    response = requests.get(base_url, params=params)
    
    # Check if request was successful
    if response.status_code == 200:
        data = response.json()
        
        # Return basic info about the results
        result_summary = {
            "request_info": data.get("request_info", {}),
            "total_results": len(data.get("search_results", [])),
            "first_result": data.get("search_results", [{}])[0] if data.get("search_results") else None
        }
        
        return result_summary
    else:
        return {
            "error": f"Request failed with status code {response.status_code}",
            "response": response.text
        }

# # Example usage:
# api_key = "FB0C32E1DCB3433C997C0A0FB1E70608"  # Replace with your actual API key
# results = simple_rainforest_test(api_key)
# print(json.dumps(results, indent=2))