In [2]:
from google.colab import userdata
import os

os.environ["HUGGINGFACE_API_TOKEN"] = userdata.get('hf_token')
os.environ["ICECAT_TOKEN"]          = userdata.get('icecat_content_token')
os.environ["RAPIDAPI_KEY"]          = userdata.get('rapid_api_key')


In [7]:
from pathlib import Path

code = r'''
from __future__ import annotations
import asyncio, os, time, random, json
from dataclasses import dataclass
from typing import Any, Optional, List, Iterable
import httpx, numpy as np
from cachetools import TTLCache
from pydantic import BaseModel, Field
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type

# =========================
# Data models
# =========================
class Money(BaseModel):
    amount: float
    currency: str = "USD"

class PricePoint(BaseModel):
    ts: float
    price: Money

class Offer(BaseModel):
    retailer: str
    url: str
    price: Money
    shipping: Optional[Money] = None
    in_stock: bool = True
    price_history: list[PricePoint] = Field(default_factory=list)

class Review(BaseModel):
    source: str
    rating: float | None = None
    title: Optional[str] = None
    text: str

class SentimentSummary(BaseModel):
    overall: float  # -1..1
    pros: list[str] = Field(default_factory=list)
    cons: list[str] = Field(default_factory=list)
    themes: list[str] = Field(default_factory=list)

class Product(BaseModel):
    id: str
    title: str
    brand: Optional[str] = None
    category: Optional[str] = None
    features: dict[str, Any] = Field(default_factory=dict)
    image: Optional[str] = None
    offers: list[Offer] = Field(default_factory=list)
    reviews: list[Review] = Field(default_factory=list)
    sentiment: Optional[SentimentSummary] = None

class UserQuery(BaseModel):
    raw: str
    budget: Optional[Money] = None
    must_have: list[str] = Field(default_factory=list)
    nice_to_have: list[str] = Field(default_factory=list)
    category: Optional[str] = None

class Recommendation(BaseModel):
    product_id: str
    title: str
    rationale: str
    score: float
    best_offer: Optional[Offer] = None

class RecommendationBundle(BaseModel):
    query: UserQuery
    recommendations: list[Recommendation]
    generated_at: float = Field(default_factory=lambda: time.time())

# =========================
# Provider interfaces
# =========================
class ProductSearchProvider:
    name = "base"
    async def search(self, query: UserQuery) -> list[Product]:
        raise NotImplementedError

class PriceProvider:
    name = "base"
    async def offers(self, product: Product) -> list[Offer]:
        raise NotImplementedError

class ReviewProvider:
    name = "base"
    async def fetch_reviews(self, product: Product, limit: int = 50) -> list[Review]:
        raise NotImplementedError

class SentimentProvider:
    name = "base"
    async def summarize(self, reviews: list[Review]) -> SentimentSummary:
        raise NotImplementedError

# =========================
# Safe mocks (work instantly)
# =========================
class MockProductSearch(ProductSearchProvider):
    name = "mock_search"
    async def search(self, query: UserQuery) -> list[Product]:
        random.seed(abs(hash(query.raw)) % (2**32))
        brands = ["Acme","ZenWave","SonicPro","QuietCo","PixelPeak"]
        base_cat = query.category or "electronics"
        items = []
        for i in range(1, 7):
            price = round(max(29.0, min(999.0, abs(random.gauss(150, 80)))), 2)
            items.append(Product(
                id=f"mock-{i}",
                title=f"{random.choice(brands)} {query.raw.title()} #{i}",
                brand=random.choice(brands),
                category=base_cat,
                features={
                    "wireless": "wireless" in query.raw.lower(),
                    "noise_cancelling": "noise" in query.raw.lower(),
                    "battery_hours": random.randint(10, 60),
                    # mock identifiers so Icecat can *try* enrichment
                    "mpn": f"MPN-{i:03d}",
                },
                offers=[Offer(retailer="ExampleMart", url="https://example.com/mock", price=Money(amount=price))]
            ))
        return items

class MockPriceProvider(PriceProvider):
    name = "mock_price"
    async def offers(self, product: Product) -> list[Offer]:
        base = product.offers[0].price.amount if product.offers else 100.0
        retailers = ["ExampleMart","DealBay","ShopNow"]
        out = []
        for r in retailers:
            jitter = random.uniform(-0.15, 0.15)
            p = round(max(19.0, base*(1+jitter)), 2)
            hist = [PricePoint(ts=time.time()-86400*d, price=Money(amount=round(p*(1+random.uniform(-0.1,0.1)),2))) for d in range(1,14)]
            out.append(Offer(retailer=r, url=f"https://example.com/{product.id}/{r}", price=Money(amount=p), price_history=hist))
        return out

class MockReviewProvider(ReviewProvider):
    name = "mock_reviews"
    async def fetch_reviews(self, product: Product, limit: int = 50) -> list[Review]:
        samples = [
            "Great sound, comfy on long flights but clamp can be tight.",
            "Battery life is stellar. ANC decent for the price.",
            "Bass heavy; mids a little recessed.",
            "App felt buggy at first but updates helped.",
            "Build quality exceeded expectations for this budget."
        ]
        out = []
        for i in range(min(limit, 25)):
            out.append(Review(source="mock", rating=random.choice([3,4,5]), title=f"Review {i+1}", text=random.choice(samples)))
        return out

# =========================
# External providers you have
# =========================
class OpenIcecatClient:
    """
    Very light best-effort enrichment using Open Icecat.
    We try a permissive 'search' call by title or MPN. If it fails or returns empty, we noop.
    """
    def __init__(self, token: Optional[str] = None):
        self.token = token or os.getenv("ICECAT_TOKEN")

    async def search_enrich(self, prod: Product) -> Product:
        if not self.token:
            return prod
        query = prod.features.get("mpn") or prod.title
        # Best-effort endpoint pattern; gracefully ignore errors.
        params = {"content": self.token, "language": "en", "search": str(query)}
        try:
            async with httpx.AsyncClient(timeout=15) as client:
                r = await client.get("https://live.icecat.biz/api/?", params=params)
                # Expect JSON; structure varies by account/endpoint, so we guard parsing.
                data = r.json()
                # Heuristic extraction
                maybe = None
                if isinstance(data, dict):
                    # Some payloads include "data" or "products"
                    if "data" in data and isinstance(data["data"], list) and data["data"]:
                        maybe = data["data"][0]
                    elif "products" in data and isinstance(data["products"], list) and data["products"]:
                        maybe = data["products"][0]
                if isinstance(maybe, dict):
                    prod.features.update({
                        "icecat_category": maybe.get("Category") or maybe.get("category"),
                        "icecat_brand": maybe.get("Brand") or maybe.get("brand"),
                    })
                    img = (maybe.get("Image") or maybe.get("image") or {}).get("HighPic") if isinstance(maybe.get("Image") or maybe.get("image"), dict) else maybe.get("HighPic") or maybe.get("thumb")
                    if img: prod.image = img
        except Exception:
            # swallow; enrichment is optional
            pass
        return prod

class HuggingFaceSentiment(SentimentProvider):
    name = "hf_inference"
    def __init__(self, model: str = "finiteautomata/bertweet-base-sentiment-analysis"):
        self.model = model
        self._token = os.getenv("HUGGINGFACE_API_TOKEN")

    @retry(wait=wait_exponential(min=0.5, max=8), stop=stop_after_attempt(3), reraise=True,
           retry=retry_if_exception_type(httpx.HTTPError))
    async def _infer(self, texts: list[str]) -> list[float]:
        if not self._token:
            return []
        headers = {"Authorization": f"Bearer {self._token}"}
        payload = {"inputs": texts}
        url = f"https://api-inference.huggingface.co/models/{self.model}"
        async with httpx.AsyncClient(timeout=30) as client:
            r = await client.post(url, headers=headers, json=payload)
            r.raise_for_status()
            data = r.json()
        scores: list[float] = []
        for item in data:
            if isinstance(item, list) and item:
                labels = {d.get("label"): d.get("score") for d in item}
                pos = labels.get("POSITIVE") or labels.get("LABEL_2") or 0.0
                neg = labels.get("NEGATIVE") or labels.get("LABEL_0") or 0.0
                scores.append(float(pos - neg))
            else:
                scores.append(0.0)
        return scores

    async def summarize(self, reviews: list[Review]) -> SentimentSummary:
        texts = [r.text for r in reviews]
        if not texts:
            return SentimentSummary(overall=0.0)
        scores = await self._infer(texts)
        if not scores:
            return SentimentSummary(overall=0.0)
        overall = float(np.mean(scores))
        def extract(tokens: list[str]) -> list[str]:
            banned = {"the","a","is","and","but","for","this","that","are","with","on"}
            c = {}
            for t in tokens:
                for w in t.lower().replace("."," ").replace(","," ").split():
                    if len(w)>=4 and w not in banned:
                        c[w]=c.get(w,0)+1
            return [w for w,_ in sorted(c.items(), key=lambda x:-x[1])[:5]]
        themes = extract(texts)
        pros = [t for t in themes if t not in {"buggy","tight","recessed"}]
        cons = [t for t in themes if t in {"buggy","tight","recessed"}]
        return SentimentSummary(overall=overall, pros=pros, cons=cons, themes=themes)

class TwinwordSentiment(SentimentProvider):
    """
    Fallback sentiment via RapidAPI Twinword (per-text requests).
    """
    name = "twinword"
    def __init__(self):
        self.key = os.getenv("RAPIDAPI_KEY")

    async def _score_one(self, text: str) -> float:
        if not self.key:
            return 0.0
        headers = {
            "X-RapidAPI-Key": self.key,
            "X-RapidAPI-Host": "twinword-sentiment-analysis.p.rapidapi.com",
        }
        params = {"text": text}
        async with httpx.AsyncClient(timeout=20) as client:
            r = await client.get("https://twinword-sentiment-analysis.p.rapidapi.com/analyze/", headers=headers, params=params)
            # On errors, treat neutral
            if r.status_code != 200:
                return 0.0
            data = r.json()
        # Twinword returns 'score' in [-1,1]
        return float(data.get("score", 0.0))

    async def summarize(self, reviews: list[Review]) -> SentimentSummary:
        texts = [r.text for r in reviews]
        if not texts:
            return SentimentSummary(overall=0.0)
        # rate-limit friendly: do in small batches
        out = []
        for t in texts:
            try:
                out.append(await self._score_one(t))
            except Exception:
                out.append(0.0)
        overall = float(np.mean(out)) if out else 0.0
        # naive theme extraction:
        banned = {"the","a","is","and","but","for","this","that","are","with","on"}
        c = {}
        for t in texts:
            for w in t.lower().replace("."," ").replace(","," ").split():
                if len(w)>=4 and w not in banned:
                    c[w]=c.get(w,0)+1
        themes = [w for w,_ in sorted(c.items(), key=lambda x:-x[1])[:5]]
        pros = [t for t in themes if t not in {"buggy","tight","recessed"}]
        cons = [t for t in themes if t in {"buggy","tight","recessed"}]
        return SentimentSummary(overall=overall, pros=pros, cons=cons, themes=themes)

# =========================
# Agents
# =========================
@dataclass
class AgentContext:
    search_providers: list[ProductSearchProvider]
    price_providers: list[PriceProvider]
    review_providers: list[ReviewProvider]
    sentiment_provider: SentimentProvider
    cache: TTLCache
    icecat: Optional[OpenIcecatClient] = None

class BaseAgent:
    name = "base"
    def __init__(self, ctx: AgentContext):
        self.ctx = ctx

class ProductSearchAgent(BaseAgent):
    name = "product_search"
    async def run(self, query: UserQuery) -> list[Product]:
        tasks = [p.search(query) for p in self.ctx.search_providers]
        results_nested = await asyncio.gather(*tasks, return_exceptions=True)
        seen: dict[str, Product] = {}
        for r in results_nested:
            if isinstance(r, Exception): continue
            for prod in r:
                if prod.id not in seen:
                    seen[prod.id] = prod
        products = list(seen.values())
        # Optional enrichment via Icecat
        if self.ctx.icecat:
            enriched = []
            for p in products:
                try:
                    enriched.append(await self.ctx.icecat.search_enrich(p))
                except Exception:
                    enriched.append(p)
            products = enriched
        return products

class PriceComparisonAgent(BaseAgent):
    name = "price_comparison"
    async def run(self, products: list[Product]) -> list[Product]:
        async def one(p: Product) -> Product:
            offers_all: list[Offer] = []
            for prov in self.ctx.price_providers:
                try:
                    offers_all.extend(await prov.offers(p))
                except Exception:
                    pass
            # dedupe by retailer, keep lowest price per retailer
            dedup = {}
            for o in offers_all:
                k = o.retailer.lower()
                if k not in dedup or o.price.amount < dedup[k].price.amount:
                    dedup[k] = o
            p.offers = sorted(dedup.values(), key=lambda x: x.price.amount)
            return p
        return await asyncio.gather(*[one(p) for p in products])

class ReviewAnalysisAgent(BaseAgent):
    name = "review_analysis"
    async def run(self, products: list[Product]) -> list[Product]:
        async def one(p: Product) -> Product:
            rv: list[Review] = []
            for rp in self.ctx.review_providers:
                try:
                    rv.extend(await rp.fetch_reviews(p, limit=40))
                except Exception:
                    pass
            p.reviews = rv
            try:
                p.sentiment = await self.ctx.sentiment_provider.summarize(rv)
            except Exception:
                p.sentiment = SentimentSummary(overall=0.0)
            return p
        return await asyncio.gather(*[one(p) for p in products])

class RecommendationEngineAgent(BaseAgent):
    name = "recommendation_engine"
    def _score(self, prod: Product, q: UserQuery) -> tuple[float,str]:
        best = prod.offers[0] if prod.offers else None
        price_score, budget_reason = 0.0, ""
        if q.budget and best:
            if best.price.amount <= q.budget.amount:
                price_score = 1.0 - (best.price.amount/(q.budget.amount+1e-6))*0.5
                budget_reason = f"within budget ({best.price.amount:.2f} ≤ {q.budget.amount:.2f})"
            else:
                over = best.price.amount - q.budget.amount
                price_score = max(-0.5, -over/(q.budget.amount+1e-6))
                budget_reason = f"over budget by ${over:.2f}"
        elif best:
            price_score = 0.2
        sent = prod.sentiment.overall if prod.sentiment else 0.0
        sentiment_score = (sent + 1)/2  # 0..1
        feat_hits = sum(1 for k in q.must_have if prod.features.get(k) is True or k in str(prod.features).lower())
        feat_score = min(1.0, feat_hits/max(1, len(q.must_have))) if q.must_have else 0.5
        deal_score = 0.5
        if best and best.price_history:
            hist = [pp.price.amount for pp in best.price_history]
            if hist:
                pct = best.price.amount / (np.mean(hist)+1e-6)
                deal_score = max(0.0, min(1.0, 1.2 - pct))  # cheaper than history → higher
        availability = 1.0 if (best and best.in_stock) else 0.3
        weights = np.array([0.35, 0.25, 0.2, 0.15, 0.05])
        comps = np.array([price_score, sentiment_score, feat_score, deal_score, availability])
        score = float(np.dot(weights, comps))
        reason = (f"Price {budget_reason or 'considered'}, reviews {sentiment_score:.2f}, "
                  f"features {feat_score:.2f}, deal {deal_score:.2f}, availability {availability:.2f}")
        return score, reason

    async def run(self, products: list[Product], query: UserQuery, top_k: int = 5) -> list[Recommendation]:
        recs: list[Recommendation] = []
        for p in products:
            s, why = self._score(p, query)
            recs.append(Recommendation(product_id=p.id, title=p.title, rationale=why,
                                       score=s, best_offer=p.offers[0] if p.offers else None))
        recs.sort(key=lambda r: r.score, reverse=True)
        return recs[:top_k]

# =========================
# Orchestrator
# =========================
class Orchestrator:
    def __init__(self, ctx: AgentContext):
        self.ctx = ctx
        self.product_search = ProductSearchAgent(ctx)
        self.price_comp = PriceComparisonAgent(ctx)
        self.review_agent = ReviewAnalysisAgent(ctx)
        self.recommend_agent = RecommendationEngineAgent(ctx)

    async def handle(self, query: UserQuery, top_k: int = 5) -> RecommendationBundle:
        products = await self.product_search.run(query)
        products = await self.price_comp.run(products)
        products = await self.review_agent.run(products)
        recs = await self.recommend_agent.run(products, query, top_k=top_k)
        return RecommendationBundle(query=query, recommendations=recs)

# =========================
# Context builder
# =========================
def build_ctx() -> AgentContext:
    # Choose sentiment: HF if token exists else Twinword if RapidAPI key exists else neutral fallback
    use_hf = bool(os.getenv("HUGGINGFACE_API_TOKEN"))
    use_tw = bool(os.getenv("RAPIDAPI_KEY"))
    sentiment = HuggingFaceSentiment() if use_hf else (TwinwordSentiment() if use_tw else TwinwordSentiment())
    ctx = AgentContext(
        search_providers=[MockProductSearch()],         # plug real eBay/BestBuy providers later
        price_providers=[MockPriceProvider()],          # plug Keepa/retailer providers later
        review_providers=[MockReviewProvider()],        # plug real review sources later
        sentiment_provider=sentiment,
        cache=TTLCache(maxsize=1024, ttl=1800),
        icecat=OpenIcecatClient(token=os.getenv("ICECAT_TOKEN")) if os.getenv("ICECAT_TOKEN") else None,
    )
    return ctx

# =========================
# Public API
# =========================
async def generate_recommendations(raw_query: str, budget: Optional[float] = None, currency: str = "USD",
                                   must_have: Optional[list[str]] = None, nice_to_have: Optional[list[str]] = None,
                                   category: Optional[str] = None, top_k: int = 5) -> RecommendationBundle:
    q = UserQuery(
        raw=raw_query,
        budget=Money(amount=budget, currency=currency) if budget else None,
        must_have=must_have or [],
        nice_to_have=nice_to_have or [],
        category=category
    )
    orch = Orchestrator(build_ctx())
    return await orch.handle(q, top_k=top_k)

# =========================
# Demo & tests (optional)
# =========================
DEMO_QUERIES = [
    ("wireless noise cancelling headphones for travel", 200.0, ["noise_cancelling","wireless"], [], "audio"),
    ("robot vacuum for pet hair", 300.0, ["pet"], ["mapping"], "home"),
    ("4k monitor for photo editing", 400.0, ["4k"], ["wide_gamut"], "monitors"),
]

async def _demo(custom: Optional[str] = None):
    if custom:
        b = await generate_recommendations(custom, budget=200.0)
        print(json.dumps(b.model_dump(), indent=2, default=str))
        return
    for raw, budget, must, nice, cat in DEMO_QUERIES:
        b = await generate_recommendations(raw, budget=budget, must_have=must, nice_to_have=nice, category=cat)
        print("\n=== Query:", raw)
        for r in b.recommendations:
            price = f"${r.best_offer.price.amount:.2f}" if r.best_offer else "?"
            print(f" - {r.title} → {price} | score {r.score:.2f}")
            print(f"   why: {r.rationale}")

async def _tests():
    b = await generate_recommendations("noise cancelling headphones under $100", budget=100.0, must_have=["noise_cancelling"])
    assert b.recommendations, "No recs returned"
    b2 = await generate_recommendations("wireless earbuds", budget=250.0, must_have=["wireless"])
    assert len(b2.recommendations) > 0
    b3 = await generate_recommendations("dyson v15 vs samsung jet vacuum", budget=700.0)
    assert len(b3.recommendations) > 0
    print("All scenario tests passed.")

if __name__ == "__main__":
    import argparse, asyncio
    ap = argparse.ArgumentParser()
    ap.add_argument("--demo", nargs="*", help="Run demo with optional custom query")
    ap.add_argument("--run-tests", action="store_true")
    args = ap.parse_args()
    if args.run_tests:
        asyncio.run(_tests())
    else:
        custom = " ".join(args.demo) if args.demo else None
        asyncio.run(_demo(custom))
'''
Path("agentic_commerce.py").write_text(code)
print("agentic_commerce.py written.")


agentic_commerce.py written.


In [13]:
from pathlib import Path
import re

p = Path("agentic_commerce.py")
text = p.read_text()

# 1) Insert utility helpers after the tenacity import (only once)
util_anchor = "from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type"
util_code = """
# ==== Quality/Ranking utilities ====
import math, json

def _parse_budget_from_text(text: str):
    import re
    t = text.lower()
    m = re.search(r'(?:under|<=|less than|below|up to|budget)\\s*\\$?\\s*(\\d+(?:\\.\\d+)?)', t)
    if m: return float(m.group(1))
    m = re.search(r'\\$(\\d+(?:\\.\\d+)?)', text)
    return float(m.group(1)) if m else None

def _soft_budget_score(price: float, budget):
    if budget is None or price is None or price <= 0:
        return 0.35  # neutral baseline if no budget provided
    ratio = price / (budget + 1e-6)
    return 1 / (1 + math.exp(4 * (ratio - 1)))  # 0..1, smooth around ratio=1

def _z_deal_score(current: float, hist: list):
    if not hist:
        return 0.5
    mu = float(np.mean(hist))
    sd = float(np.std(hist)) + 1e-6
    z = (mu - current) / sd
    rel = (mu - current) / (mu + 1e-6)
    return max(0.0, min(1.0, 0.5 + 0.1 * z + 0.4 * max(0.0, rel)))

def _fuzzy_hit(text: str, key: str) -> bool:
    t = text.lower()
    k = key.lower().replace('_', ' ')
    return (k in t) or any(w in t for w in k.split())

def _mmr(items, lambda_=0.8):
    # items: list[(rec, representation_string)]
    selected, cands = [], items[:]
    def sim(a, b):
        sa, sb = set(a.split()), set(b.split())
        inter = len(sa & sb); union = len(sa | sb) or 1
        return inter / union
    while cands:
        best, best_score = None, -1
        for rec, rep in cands:
            rel = rec.score
            div = max((sim(rep, srep) for _, srep in selected), default=0.0)
            s = lambda_ * rel - (1 - lambda_) * div
            if s > best_score:
                best_score, best = s, (rec, rep)
        selected.append(best)
        cands.remove(best)
    return [r for r, _ in selected]
"""

if util_code.strip() not in text:
    text = text.replace(util_anchor, util_anchor + "\n" + util_code)

# 2) Enhance ReviewAnalysisAgent to store avg_rating/review_count
text = re.sub(
    r"p\.reviews = rv\s*?\n\s*?try:\s*?\n\s*?p\.sentiment = await self\.ctx\.sentiment_provider\.summarize\(rv\)\s*?\n\s*?except Exception:\s*?\n\s*?p\.sentiment = SentimentSummary\(overall=0\.0\)\s*?\n\s*?return p",
    "p.reviews = rv\n            try:\n                p.sentiment = await self.ctx.sentiment_provider.summarize(rv)\n            except Exception:\n                p.sentiment = SentimentSummary(overall=0.0)\n            if rv:\n                ratings = [r.rating for r in rv if r.rating is not None]\n                if ratings:\n                    p.features['avg_rating'] = float(np.mean(ratings))\n                p.features['review_count'] = len(rv)\n            return p",
    text
)

# 3) Replace the _score() method with improved logic
score_pattern = r"def _score\(self, prod: Product, q: UserQuery\) -> tuple\[float, str\]:[\s\S]*?return score, reason"
score_replacement = """def _score(self, prod: Product, q: UserQuery) -> tuple[float, str]:
        best = prod.offers[0] if prod.offers else None
        price = best.price.amount if best else None
        budget = q.budget.amount if q.budget else None

        # Auto-parse budget from text if missing
        if budget is None and q.raw:
            auto = _parse_budget_from_text(q.raw)
            if auto:
                budget = auto

        # 1) Price vs budget (smooth)
        pscore = _soft_budget_score(price, budget)

        # 2) Sentiment with confidence
        n_reviews = int(prod.features.get('review_count', len(prod.reviews) if prod.reviews else 0))
        sent = float(prod.sentiment.overall) if prod.sentiment else 0.0  # -1..1
        conf = min(1.0, math.log(1 + n_reviews, 10) / math.log(1 + 40, 10)) if n_reviews > 0 else 0.0
        sscore = ((sent + 1) / 2) * (0.5 + 0.5 * conf)  # 0..1, scaled by confidence

        # 3) Feature coverage (must-have strict, nice-to-have partial)
        import json as _json
        text_blob = (prod.title + ' ' + _json.dumps(prod.features)).lower()
        mh = q.must_have or []
        nh = q.nice_to_have or []
        must_hits = sum(1 for k in mh if prod.features.get(k) is True or _fuzzy_hit(text_blob, k))
        nice_hits = sum(1 for k in nh if prod.features.get(k) is True or _fuzzy_hit(text_blob, k))
        if mh:
            fscore = 0.2 * (must_hits / len(mh)) + 0.1 * (nice_hits / max(1, len(nh))) + (0.7 if must_hits == len(mh) else 0.0)
        else:
            fscore = 0.4 + 0.2 * (nice_hits / max(1, len(nh)))

        # 4) Deal score from price history
        dscore = 0.5
        if best and best.price_history:
            hist = [pp.price.amount for pp in best.price_history]
            dscore = _z_deal_score(best.price.amount, hist)

        # 5) Availability
        avail = 1.0 if (best and best.in_stock) else 0.2

        # Final weighted score
        weights = np.array([0.38, 0.22, 0.22, 0.13, 0.05])
        comps = np.array([pscore, sscore, fscore, dscore, avail])
        score = float(np.dot(weights, comps))

        why = (f\"Price {'within' if (budget and price and price <= budget) else 'considered'}, \"
               f\"reviews {sscore:.2f} (n={n_reviews}), features {fscore:.2f}, deal {dscore:.2f}, availability {avail:.2f}\")
        return score, why"""
text, n1 = re.subn(score_pattern, score_replacement, text)

# 4) Apply MMR diversity in final ranking (RecommendationEngineAgent.run)
run_pattern = r"async def run\(self, products: list\[Product\], query: UserQuery, top_k: int = 5\) -> list\[Recommendation\]:[\s\S]*?return recs\[:top_k\]"
run_replacement = """async def run(self, products: list[Product], query: UserQuery, top_k: int = 5) -> list[Recommendation]:
        recs: list[Recommendation] = []
        reps = []
        for p in products:
            s, why = self._score(p, query)
            rec = Recommendation(product_id=p.id, title=p.title, rationale=why, score=s,
                                 best_offer=p.offers[0] if p.offers else None)
            recs.append(rec)
            reps.append((rec, (p.title + ' ' + (p.brand or '') + ' ' + (p.category or '')).lower()))
        recs_sorted = sorted(recs, key=lambda r: r.score, reverse=True)
        diverse = _mmr([(r, r.title.lower()) for r in recs_sorted], lambda_=0.8)
        return diverse[:top_k]"""
text, n2 = re.subn(run_pattern, run_replacement, text)

# 5) Parse budget from text in generate_recommendations when budget is None
gen_pattern = r"q = UserQuery\(\s*raw=raw_query,\s*budget=Money\(amount=budget, currency=currency\) if budget else None,"
gen_replacement = """if budget is None:
        auto_budget = _parse_budget_from_text(raw_query)
        if auto_budget:
            budget = auto_budget
    q = UserQuery(
        raw=raw_query,
        budget=Money(amount=budget, currency=currency) if budget else None,"""
text, n3 = re.subn(gen_pattern, gen_replacement, text)

p.write_text(text)


print(f"Replaced _score(): {bool(n1)} | MMR run(): {bool(n2)} | Budget parse insertion: {bool(n3)}")


Replaced _score(): False | MMR run(): False | Budget parse insertion: True


In [14]:
!python agentic_commerce.py --demo "Robot vacuum for pet hair with mapping under $300"

{
  "query": {
    "raw": "Robot vacuum for pet hair with mapping under 300",
    "budget": {
      "amount": 200.0,
      "currency": "USD"
    },
    "must_have": [],
    "nice_to_have": [],
    "category": null
  },
  "recommendations": [
    {
      "product_id": "mock-4",
      "title": "Acme Robot Vacuum For Pet Hair With Mapping Under 300 #4",
      "rationale": "Price within budget (95.66 \u2264 200.00), reviews 0.50, features 0.50, deal 0.20, availability 1.00",
      "score": 0.5707648917231849,
      "best_offer": {
        "retailer": "DealBay",
        "url": "https://example.com/mock-4/DealBay",
        "price": {
          "amount": 95.66,
          "currency": "USD"
        },
        "shipping": null,
        "in_stock": true,
        "price_history": [
          {
            "ts": 1763854698.008151,
            "price": {
              "amount": 97.32,
              "currency": "USD"
            }
          },
          {
            "ts": 1763768298.0081549,
       

In [10]:
!python agentic_commerce.py --demo "wireless noise cancelling headphones under $200 for travel"

{
  "query": {
    "raw": "wireless noise cancelling headphones under 200 for travel",
    "budget": {
      "amount": 200.0,
      "currency": "USD"
    },
    "must_have": [],
    "nice_to_have": [],
    "category": null
  },
  "recommendations": [
    {
      "product_id": "mock-6",
      "title": "PixelPeak Wireless Noise Cancelling Headphones Under 200 For Travel #6",
      "rationale": "Price within budget (99.56 \u2264 200.00), reviews 0.50, features 0.50, deal 0.20, availability 1.00",
      "score": 0.5682664813038381,
      "best_offer": {
        "retailer": "ExampleMart",
        "url": "https://example.com/mock-6/ExampleMart",
        "price": {
          "amount": 99.56,
          "currency": "USD"
        },
        "shipping": null,
        "in_stock": true,
        "price_history": [
          {
            "ts": 1763854605.4399264,
            "price": {
              "amount": 102.07,
              "currency": "USD"
            }
          },
          {
            

In [15]:
!python agentic_commerce.py --run-tests

All scenario tests passed.


In [16]:
import asyncio
from agentic_commerce import generate_recommendations
import nest_asyncio, asyncio


async def run_scenarios_verbose():
    scenarios = [
        ("S1 • strict budget",
         lambda: generate_recommendations("noise cancelling headphones under $100", budget=100.0, must_have=["noise_cancelling"])),
        ("S2 • features matter",
         lambda: generate_recommendations("wireless earbuds", budget=250.0, must_have=["wireless"])),
        ("S3 • comparative shopping",
         lambda: generate_recommendations("dyson v15 vs samsung jet vacuum", budget=700.0)),
    ]
    for name, fn in scenarios:
        b = await fn()
        assert len(b.recommendations) > 0, f"{name} returned no recommendations"
        top = b.recommendations[0]
        print(f"[PASS] {name}: {len(b.recommendations)} recs | top='{top.title}' | score={top.score:.2f}")
    print("All scenario tests passed.")

nest_asyncio.apply()
asyncio.get_event_loop().run_until_complete(run_scenarios_verbose())
asyncio.run(run_scenarios_verbose())
await run_scenarios_verbose()

[PASS] S1 • strict budget: 5 recs | top='PixelPeak Noise Cancelling Headphones Under $100 #1' | score=0.40
[PASS] S2 • features matter: 5 recs | top='PixelPeak Wireless Earbuds #4' | score=0.68
[PASS] S3 • comparative shopping: 5 recs | top='ZenWave Dyson V15 Vs Samsung Jet Vacuum #6' | score=0.64
All scenario tests passed.
[PASS] S1 • strict budget: 5 recs | top='PixelPeak Noise Cancelling Headphones Under $100 #1' | score=0.40
[PASS] S2 • features matter: 5 recs | top='PixelPeak Wireless Earbuds #4' | score=0.68
[PASS] S3 • comparative shopping: 5 recs | top='ZenWave Dyson V15 Vs Samsung Jet Vacuum #6' | score=0.64
All scenario tests passed.
[PASS] S1 • strict budget: 5 recs | top='PixelPeak Noise Cancelling Headphones Under $100 #1' | score=0.40
[PASS] S2 • features matter: 5 recs | top='PixelPeak Wireless Earbuds #4' | score=0.68
[PASS] S3 • comparative shopping: 5 recs | top='ZenWave Dyson V15 Vs Samsung Jet Vacuum #6' | score=0.64
All scenario tests passed.


In [24]:
!pip install streamlit

Collecting streamlit
  Downloading streamlit-1.51.0-py3-none-any.whl.metadata (9.5 kB)
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.51.0-py3-none-any.whl (10.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.2/10.2 MB[0m [31m62.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m45.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pydeck, streamlit
Successfully installed pydeck-0.9.1 streamlit-1.51.0


In [25]:
!which streamlit && streamlit --version

/usr/local/bin/streamlit
Streamlit, version 1.51.0


In [26]:
from pathlib import Path

app_code = r"""
import os, asyncio
import pandas as pd
import streamlit as st

# Import your agent pipeline
from agentic_commerce import generate_recommendations

st.set_page_config(page_title='Agentic AI E-Commerce Assistant', page_icon='🛍️', layout='wide')
st.title('Agentic AI E-Commerce Assistant')
st.caption('Search → Compare → Analyze Reviews → Recommend')


def csv_list(s):
    return [x.strip() for x in s.split(',') if x.strip()]

# Query form
with st.form('query_form', clear_on_submit=False):
    q = st.text_input('What are you shopping for?', 'wireless noise cancelling headphones for travel')
    c1, c2, c3 = st.columns(3)
    with c1: budget = st.number_input('Budget (0 = none)', min_value=0.0, value=200.0, step=10.0)
    with c2: currency = st.selectbox('Currency', ['USD','EUR','GBP','JPY'], index=0)
    with c3: top_k = st.slider('Top K', 1, 10, 5, 1)
    must = st.text_input('Must-have (comma separated)', 'noise_cancelling, wireless')
    nice = st.text_input('Nice-to-have (comma separated)', '')
    cat = st.text_input('Category (optional)', 'audio')
    go = st.form_submit_button('🔎 Find recommendations')

# Async helper
async def _call_agent():
    return await generate_recommendations(
        raw_query=q,
        budget=(budget if budget > 0 else None),
        currency=currency,
        must_have=csv_list(must),
        nice_to_have=csv_list(nice),
        category=(cat or None),
        top_k=top_k
    )

def run_async(coro):
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(coro)
    return loop.run_until_complete(coro)

if go:
    with st.spinner('Contacting agents…'):
        bundle = run_async(_call_agent())
    st.success('Done!')

    st.markdown('---')
    for i, rec in enumerate(bundle.recommendations, start=1):
        with st.container(border=True):
            st.subheader(f'#{i} • {rec.title}')

            # Two-column layout WITHOUT images
            left, right = st.columns([1,2], vertical_alignment="top")

            with left:
                st.markdown(f"**Score:** {rec.score:.2f}")
                if rec.best_offer:
                    st.markdown(f"**Best price:** {rec.best_offer.price.amount:.2f} {rec.best_offer.price.currency}")
                    st.markdown(f"**Retailer:** {rec.best_offer.retailer}")
                    st.markdown(f"[Open listing]({rec.best_offer.url})")
                else:
                    st.caption("No offer data available.")

            with right:
                st.markdown("**Why this made the list**")
                st.write(rec.rationale)

else:
    st.info('Enter your query and click **Find recommendations** to start.')
"""
Path("streamlit_app.py").write_text(app_code)


2688

In [21]:
!wget -q https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 -O /usr/local/bin/cloudflared
!chmod +x /usr/local/bin/cloudflared
!cloudflared --version

cloudflared version 2025.11.1 (built 2025-11-07-16:59 UTC)


In [27]:
import subprocess, time, re, os, signal

# Try to clean up any stale processes
try:
    subprocess.run(["pkill","-f","cloudflared.*tunnel"], check=False)
    subprocess.run(["pkill","-f","streamlit run streamlit_app.py"], check=False)
except Exception:
    pass

# Start Streamlit (headless on port 8501)
streamlit_proc = subprocess.Popen(
    ["streamlit", "run", "streamlit_app.py", "--server.port", "8501", "--server.headless", "true"],
    stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)

time.sleep(4)  # give Streamlit a moment to boot

# Start Cloudflared tunnel
cloudflared_proc = subprocess.Popen(
    ["cloudflared", "tunnel", "--url", "http://localhost:8501", "--no-autoupdate"],
    stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)

public_url = None
deadline = time.time() + 120  # up to ~2 minutes
while time.time() < deadline:
    line = cloudflared_proc.stdout.readline()
    if not line:
        time.sleep(0.2); continue
    if "trycloudflare.com" in line:
        m = re.search(r"https?://[^\s]+trycloudflare\.com", line)
        if m:
            public_url = m.group(0)
            break

print("Streamlit is live at:", public_url if public_url else "Failed to capture URL (scroll logs above).")
print("Keep this cell running to keep the tunnel alive.")

print("streamlit_app.py updated. Reload the Streamlit page.")



Streamlit is live at: https://created-associate-gentle-dominant.trycloudflare.com
Keep this cell running to keep the tunnel alive.
streamlit_app.py updated. Reload the Streamlit page.
