# Setup

In [102]:
import time
from bs4 import BeautifulSoup
import getpass
import os
from langchain.chat_models import init_chat_model

In [83]:
try:
    # load environment variables from .env file (requires `python-dotenv`)
    from dotenv import load_dotenv

    load_dotenv()
except ImportError:
    pass

In [9]:
os.environ["LANGSMITH_TRACING"] = "true"
if "LANGSMITH_API_KEY" not in os.environ:
    os.environ["LANGSMITH_API_KEY"] = getpass.getpass(
        prompt="Enter your LangSmith API key (optional): "
    )
if "LANGSMITH_PROJECT" not in os.environ:
    os.environ["LANGSMITH_PROJECT"] = getpass.getpass(
        prompt='Enter your LangSmith Project Name (default = "default"): '
    )
    if not os.environ.get("LANGSMITH_PROJECT"):
        os.environ["LANGSMITH_PROJECT"] = "default"

In [10]:
if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

# Read product URLs

In [11]:
import csv

product_urls = []

# Open the CSV file
with open('specbook.csv', 'r', newline='') as csvfile:
    # Create a csv.reader object
    reader = csv.reader(csvfile)

    for row in reader:
        if row[1]:
            product_urls.append(row[1])

In [139]:
# Check for duplicates
print(f"Total URLs: {len(product_urls)}")
print(f"Unique URLs: {len(set(product_urls))}")
print(f"Results: {len(results)}")

# Find duplicates
from collections import Counter
url_counts = Counter(product_urls)
duplicates = {url: count for url, count in url_counts.items() if count > 1}
if duplicates:
    print(f"Duplicate URLs: {duplicates}")

Total URLs: 87
Unique URLs: 82
Results: 82
Duplicate URLs: {'https://www.dunnedwards.com/colors/browser/dew340': 3, 'https://www.fireclaytile.com/tile/colors/detail/daisy/tile-field-2-x-2': 2, 'https://www.kraususa.com/kraus-khu100-32-32-undermount-16-gauge-stainless-steel-single-bowl-kitchen-sink.html': 2, 'https://www.subzero-wolf.com/wolf/range-hood/46-inch-pro-hood-liner-22-inch-depth': 2}


# Fetch Product HTML

In [85]:
import logging

# Configure logging for debugging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [125]:
from concurrent.futures import ThreadPoolExecutor
import requests
from collections import defaultdict

# Just add this function above your existing code:
def fetch_url(url):
    try:
        response = requests.get(url, headers=headers, timeout=10)
        return {
            "code": response.status_code,
            "text": response.text,
            "soup": None
        }
    except Exception as e:
        print(f"Error fetching {url}: {e}")  # Add logging
        return {"code": None, "text": None, "soup": None, "error": str(e)}


# Without headers, website responds with 403 b/c it suspects you're a bot
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"
}

# fetch urls
results = defaultdict(dict)

with ThreadPoolExecutor(max_workers=10) as executor:
    for url, result in zip(product_urls, executor.map(fetch_url, product_urls)):
        results[url] = result

# Add this right after your current code:
print(f"URLs: {len(product_urls)}")
print(f"Unique URLs: {len(set(product_urls))}")
print(f"Results: {len(results)}")

In [132]:
soups = {}

with open(f'01_llmpipeline/1-requests_failed_{int(time.time())}.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['url', 'status_code', 'text'])

    for url, response in results.items():
        soup = BeautifulSoup(response.get("text"), "html.parser")

        code = response.get("code")
        if code != 200:
            writer.writerow([url, code, soup.text])
        else:
            soups[url] = soup

# Preprocess HTML

In [140]:
REMOVE_TAGS = [
    "script", "style", "noscript", "svg", "footer", "header",
    "nav", "form", "iframe", "aside", "canvas", "button", "input", "select", "option"
]

GARBAGE_KEYWORDS = ["cookie", "newsletter", "subscribe", "banner", "social", "share", "advert"]

preprocessed_html = {}

for url, soup in soups.items():

    # Remove noise tags
    for tag in soup(REMOVE_TAGS):
        tag.decompose()

    # # Remove elements with garbage classes/ids
    # for el in soup.find_all(attrs={"class": True}):
    #     cls = " ".join(el.get("class"))
    #     if any(kw in cls.lower() for kw in GARBAGE_KEYWORDS):
    #         el.decompose()
    #
    # for el in soup.find_all(attrs={"id": True}):
    #     id_ = el.get("id")
    #     if id_ and any(kw in id_.lower() for kw in GARBAGE_KEYWORDS):
    #         el.decompose()

    # Extract visible text
    text = soup.get_text(separator="\n", strip=True)
    text_lines = [line.strip() for line in text.splitlines() if line.strip()]
    visible_text = "\n".join(text_lines)

    # Extract metadata
    metadata = {
        tag.get("property") or tag.get("name"): tag.get("content")
        for tag in soup.find_all("meta")
        if tag.get("content")
    }

    # Extract images with alt text
    images = []
    for img in soup.find_all("img"):
        src = img.get("src")
        alt = img.get("alt", "").strip()
        if src:
            images.append({"src": src, "alt": alt})

    preprocessed_html[url] = {
        "title": soup.title.string.strip() if soup.title and soup.title.string else "",
        "metadata": metadata,
        "text": visible_text,
        "images": images
    }


In [135]:
import json
# Max characters after pre-process
print(max([len(json.dumps(x)) for x in preprocessed_html]))

with open('01_llmpipeline/2-preprocessed_html.json', 'w') as json_file:
    json.dump(preprocessed_html, json_file)

# with open('preprocessed_html.json', 'r') as json_file:
#     preprocessed_html = json.load(json_file)

203


# Create Prompts

In [152]:
from langchain_core.prompts import ChatPromptTemplate

system_template = """
You are a project architect tasked with fetching specification details from the following product website's HTML page. Extract the relevant product information for documentation in a specification book.

If you are not 99.9% sure that the information is correct, return the value with the highest probability, including the probability in the value field.

TITLE:
data['title']

METADATA:
data['metadata']

TEXT CONTENT:
data['text']

IMAGES:
data['images']

Extract the following structured data in JSON format from the provided product web page:

- image_url: Direct URL to the product image.
- type: The product category (e.g. range hood, grill, fireplace, etc.).
- description: Short product description, including brand, size, material, color, and any notable features
- model_no: Manufacturer model number, item no, or sku no.
- product_link: Original product page URL.
- qty: Quantity if specified; otherwise return "unspecified".
- key: A unique reference key (if available).

Return your output in this format. You **don’t add extra formatting instructions yourself**:

```json
{{
  "image_url": "",
  "type": "",
  "description": "",
  "model_no": "",
  "product_link": "",
  "qty": "",
  "key": ""
}}
``````
data:
{data}
"""

prompt_template = ChatPromptTemplate.from_messages(
    [("system", system_template)]
)

prompts = {}

for url, website_data in preprocessed_html.items():
    prompts[url] = prompt_template.invoke({"data": json.dumps(website_data)})

In [155]:
with open('01_llmpipeline/3-prompts.csv', mode='w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['url', 'prompt'])

    for url, prompt in prompts.items():
        writer.writerow([url, prompt.to_string()])


# Invoke Model

In [157]:
gpt_4o_mini = init_chat_model("gpt-4o-mini", model_provider="openai")

In [158]:
llm_responses = {}

for url, prompt in prompts.items():
    llm_responses[url] = gpt_4o_mini.invoke(prompt)

2025-07-01 19:02:22,411 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-07-01 19:02:25,231 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-07-01 19:02:29,802 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-07-01 19:02:32,488 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-07-01 19:02:35,812 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-07-01 19:02:39,937 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-07-01 19:02:42,455 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-07-01 19:02:48,063 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-07-01 19:02:50,830 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "

In [165]:
with open('01_llmpipeline/4-llm.csv', mode='w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['url', 'llm_message'])

    for url, llm_message in llm_responses.items():
        writer.writerow([url, llm_message.content])

a# Evaluation

In [167]:
import json
import re
import requests
from urllib.parse import urlparse
from typing import Dict, List, Tuple, Any
from dataclasses import dataclass
from collections import defaultdict

@dataclass
class EvalResult:
    """Stores evaluation results for a single extraction"""
    url_valid: bool
    json_parseable: bool
    required_fields_present: bool
    field_quality_scores: Dict[str, float]
    overall_score: float
    issues: List[str]

class ProductExtractionEvaluator:
    """Evaluates LLM product extraction quality"""

    def __init__(self):
        self.required_fields = ["image_url", "type", "description", "product_link"]
        self.optional_fields = ["model_no", "qty", "key"]

    def evaluate_extraction(self, json_str: str, source_url: str = None) -> EvalResult:
        """
        Evaluate a single product extraction

        Args:
            json_str: The JSON string from LLM
            source_url: Original URL that was scraped (optional)

        Returns:
            EvalResult with detailed scoring
        """
        issues = []
        field_scores = {}

        # 1. JSON Parseability Test
        try:
            data = json.loads(json_str)
            json_parseable = True
        except json.JSONDecodeError as e:
            return EvalResult(
                url_valid=False,
                json_parseable=False,
                required_fields_present=False,
                field_quality_scores={},
                overall_score=0.0,
                issues=[f"JSON parsing failed: {e}"]
            )

        # 2. Required Fields Test
        missing_fields = [f for f in self.required_fields if f not in data]
        required_fields_present = len(missing_fields) == 0
        if missing_fields:
            issues.append(f"Missing required fields: {missing_fields}")

        # 3. Field Quality Evaluation
        field_scores["image_url"] = self._evaluate_url(data.get("image_url", ""))
        field_scores["product_link"] = self._evaluate_url(data.get("product_link", ""))
        field_scores["type"] = self._evaluate_type_field(data.get("type", ""))
        field_scores["description"] = self._evaluate_description(data.get("description", ""))
        field_scores["model_no"] = self._evaluate_model_no(data.get("model_no", ""))
        field_scores["qty"] = self._evaluate_quantity(data.get("qty", ""))

        # 4. URL Validation
        urls_valid = all([
            self._is_valid_url(data.get("image_url", "")),
            self._is_valid_url(data.get("product_link", ""))
        ])

        # 5. Content Consistency Checks
        consistency_score = self._check_consistency(data, source_url)
        field_scores["consistency"] = consistency_score

        # 6. Calculate Overall Score
        overall_score = self._calculate_overall_score(field_scores, required_fields_present, urls_valid)

        return EvalResult(
            url_valid=urls_valid,
            json_parseable=json_parseable,
            required_fields_present=required_fields_present,
            field_quality_scores=field_scores,
            overall_score=overall_score,
            issues=issues
        )

    def _evaluate_url(self, url: str) -> float:
        """Score URL quality (0-1)"""
        if not url or url.strip() == "":
            return 0.0

        if not self._is_valid_url(url):
            return 0.2

        # Check if it's a reasonable image/product URL
        if any(ext in url.lower() for ext in ['.jpg', '.png', '.jpeg', '.webp', '.gif']):
            return 1.0
        elif 'image' in url.lower() or 'photo' in url.lower() or 'product' in url.lower():
            return 0.8
        else:
            return 0.6

    def _is_valid_url(self, url: str) -> bool:
        """Check if URL is properly formatted"""
        try:
            result = urlparse(url)
            return all([result.scheme, result.netloc])
        except Exception:
            return False

    def _evaluate_type_field(self, type_val: str) -> float:
        """Score product type quality"""
        if not type_val or type_val.strip() == "":
            return 0.0

        # Check for reasonable product categories
        common_types = [
            'furniture', 'electronics', 'clothing', 'kitchen', 'outdoor',
            'fireplace', 'appliance', 'tool', 'decoration', 'lighting'
        ]

        type_lower = type_val.lower()
        if any(cat in type_lower for cat in common_types):
            return 1.0
        elif len(type_val.strip()) > 2:
            return 0.7
        else:
            return 0.3

    def _evaluate_description(self, desc: str) -> float:
        """Score description quality"""
        if not desc or desc.strip() == "":
            return 0.0

        desc_clean = desc.strip()

        # Length check
        if len(desc_clean) < 10:
            return 0.3
        elif len(desc_clean) < 50:
            return 0.6
        elif len(desc_clean) > 500:
            return 0.8  # Might be too verbose
        else:
            return 1.0

    def _evaluate_model_no(self, model: str) -> float:
        """Score model number field"""
        if not model or model.strip() == "":
            return 0.5  # Neutral - not always available

        # Look for typical model patterns
        if re.search(r'[A-Z]{2,}[-\s]?\d+', model):
            return 1.0
        elif len(model.strip()) > 2:
            return 0.7
        else:
            return 0.3

    def _evaluate_quantity(self, qty: str) -> float:
        """Score quantity field"""
        if not qty or qty.strip() == "":
            return 0.5

        qty_lower = qty.lower().strip()
        if any(word in qty_lower for word in ['unspecified', 'unknown', 'n/a']):
            return 0.8  # Honest about not knowing
        elif re.search(r'\d+', qty):
            return 1.0
        else:
            return 0.6

    def _check_consistency(self, data: Dict, source_url: str = None) -> float:
        """Check internal consistency of extracted data"""
        score = 1.0

        # Check if product_link and image_url are from same domain
        try:
            if data.get("product_link") and data.get("image_url"):
                prod_domain = urlparse(data["product_link"]).netloc
                img_domain = urlparse(data["image_url"]).netloc

                if prod_domain and img_domain:
                    # Same domain is good
                    if prod_domain == img_domain:
                        score += 0.1
                    # Different but reasonable domains
                    elif any(common in prod_domain for common in img_domain.split('.')):
                        score += 0.05
        except Exception:
            score -= 0.1

        return min(score, 1.0)

    def _calculate_overall_score(self, field_scores: Dict[str, float],
                               required_present: bool, urls_valid: bool) -> float:
        """Calculate weighted overall score"""
        if not required_present:
            return 0.2

        # Weighted scoring
        weights = {
            "image_url": 0.2,
            "product_link": 0.2,
            "type": 0.15,
            "description": 0.25,
            "model_no": 0.05,
            "qty": 0.05,
            "consistency": 0.1
        }

        weighted_score = sum(field_scores.get(field, 0) * weight
                           for field, weight in weights.items())

        # Penalty for invalid URLs
        if not urls_valid:
            weighted_score *= 0.7

        return round(weighted_score, 3)

    def evaluate_batch(self, extractions: List[Tuple[str, str]]) -> Dict[str, Any]:
        """
        Evaluate multiple extractions and return summary statistics

        Args:
            extractions: List of (json_string, source_url) tuples

        Returns:
            Dictionary with batch evaluation results
        """
        results = []
        for json_str, source_url in extractions:
            result = self.evaluate_extraction(json_str, source_url)
            results.append(result)

        # Calculate batch statistics
        scores = [r.overall_score for r in results]
        field_scores = defaultdict(list)

        for result in results:
            for field, score in result.field_quality_scores.items():
                field_scores[field].append(score)

        # Aggregate statistics
        batch_stats = {
            "total_extractions": len(results),
            "avg_score": sum(scores) / len(scores) if scores else 0,
            "min_score": min(scores) if scores else 0,
            "max_score": max(scores) if scores else 0,
            "json_parse_success_rate": sum(1 for r in results if r.json_parseable) / len(results),
            "required_fields_success_rate": sum(1 for r in results if r.required_fields_present) / len(results),
            "url_validity_rate": sum(1 for r in results if r.url_valid) / len(results),
            "field_avg_scores": {
                field: sum(scores) / len(scores) if scores else 0
                for field, scores in field_scores.items()
            },
            "low_quality_extractions": [
                i for i, result in enumerate(results) if result.overall_score < 0.6
            ],
            "common_issues": self._get_common_issues(results)
        }

        print("=== BATCH EVALUATION RESULTS ===")
        print(f"Total extractions: {batch_stats['total_extractions']}")
        print(f"Average score: {batch_stats['avg_score']:.3f}")
        print(f"JSON parse success rate: {batch_stats['json_parse_success_rate']:.2%}")
        print(f"Required fields success rate: {batch_stats['required_fields_success_rate']:.2%}")
        print(f"URL validity rate: {batch_stats['url_validity_rate']:.2%}")
        print("\nField Average Scores:")
        for field, score in batch_stats['field_avg_scores'].items():
            print(f"  {field}: {score:.3f}")

        if batch_stats['low_quality_extractions']:
            print(f"\nLow quality extractions (indices): {batch_stats['low_quality_extractions']}")

        if batch_stats['common_issues']:
            print("\nCommon issues:")
            for issue, count in batch_stats['common_issues'].items():
                print(f"  {issue}: {count} occurrences")

        return batch_stats

    def _get_common_issues(self, results: List[EvalResult]) -> Dict[str, int]:
        """Find most common issues across extractions"""
        issue_counts = defaultdict(int)
        for result in results:
            for issue in result.issues:
                issue_counts[issue] += 1
        return dict(sorted(issue_counts.items(), key=lambda x: x[1], reverse=True))


In [169]:
import re

evaluations = []

def strip_code_blocks(text):
    text = re.sub(r'^```\w*\n?', '', text, flags=re.MULTILINE)

    # Remove closing code block markers
    text = re.sub(r'\n?```$', '', text, flags=re.MULTILINE)

    # Clean up any extra whitespace at the beginning and end
    return text.strip()

for url, llm_response in llm_responses.items():
    evaluations.append((strip_code_blocks(llm_response.content), url))

In [171]:
evaluator = ProductExtractionEvaluator()
batch_results = evaluator.evaluate_batch(evaluations)

=== BATCH EVALUATION RESULTS ===
Total extractions: 77
Average score: 0.670
JSON parse success rate: 100.00%
Required fields success rate: 100.00%
URL validity rate: 67.53%

Field Average Scores:
  image_url: 0.623
  product_link: 0.561
  type: 0.605
  description: 0.826
  model_no: 0.753
  qty: 0.800
  consistency: 1.000

Low quality extractions (indices): [20, 21, 23, 24, 26, 32, 36, 37, 38, 41, 46, 54, 55, 56, 61, 62, 63, 64, 65, 66, 67, 68, 72, 73, 74]


# Generate Specbook

In [81]:
fieldnames = ['image_url', 'type', 'description', 'model_no']

with open('01_llmpipeline/5-specbook.csv', 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader() # Writes the header row
    writer.writerows(filtered_list)