In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Free MVP: DuckDuckGo 搜索 Amazon 链接 -> 采集商品信息 + 热门好评 -> 生成视频文稿（本地“AI风格”摘要）
仅用免费工具：requests + BeautifulSoup + 规则化摘要，不调用付费 API。

用法：
    python main.py --keyword "wireless earbuds" --products 2 --reviews 3 --market "com"
输出：
    ./output/results.json, ./output/results.csv, ./output/script_<ASIN>.md
'''
import re
import os
import csv
import time
import json
import argparse
import random
import urllib.parse as up
from typing import List, Dict, Any, Optional

import requests
from bs4 import BeautifulSoup

try:
    import pandas as pd
except Exception:
    pd = None

DEFAULT_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
    "Accept-Language": "en-US,en;q=0.9",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
    "Connection": "keep-alive",
}

DDG_HTML = "https://html.duckduckgo.com/html/"
TIMEOUT = 20

def sleep_jitter(a=1.0, b=2.0):
    time.sleep(random.uniform(a, b))

def ddg_search_amazon_links(keyword: str, market: str = "com", max_links: int = 5, headers: Optional[dict]=None) -> List[str]:
    '''
    使用 DuckDuckGo HTML 搜索，返回包含 /dp/ 或 /gp/product/ 的 Amazon 商品链接。
    '''
    if headers is None:
        headers = DEFAULT_HEADERS
    q = f"site:amazon.{market} {keyword}"
    data = {"q": q}
    resp = requests.post(DDG_HTML, data=data, headers=headers, timeout=TIMEOUT)
    resp.raise_for_status()

    soup = BeautifulSoup(resp.text, "lxml")
    links = []
    for a in soup.select("a.result__a"):
        href = a.get("href", "")
        real = up.unquote(href)
        if "http" in real:
            real_url = real
        else:
            real_url = a.get("href", "")
        if ("amazon." in real_url) and ("/dp/" in real_url or "/gp/product/" in real_url):
            clean = real_url.split("/ref=")[0].split("?")[0]
            if clean not in links:
                links.append(clean)
        if len(links) >= max_links:
            break
    return links

def extract_asin(url: str) -> Optional[str]:
    m = re.search(r"/dp/([A-Z0-9]{10})", url)
    if m:
        return m.group(1)
    m = re.search(r"/gp/product/([A-Z0-9]{10})", url)
    if m:
        return m.group(1)
    return None

def get(url: str, headers: Optional[dict]=None) -> requests.Response:
    if headers is None:
        headers = DEFAULT_HEADERS
    resp = requests.get(url, headers=headers, timeout=TIMEOUT)
    resp.raise_for_status()
    return resp

def parse_product_page(html_text: str) -> Dict[str, Any]:
    soup = BeautifulSoup(html_text, "lxml")
    title = soup.select_one("#productTitle")
    title = title.get_text(strip=True) if title else ""

    bullets = [li.get_text(" ", strip=True) for li in soup.select("#feature-bullets ul li:not(.aok-hidden)")]
    bullets = [b for b in bullets if b and "Click to play" not in b]

    price_el = soup.select_one("#corePrice_feature_div .a-offscreen") or soup.select_one(".a-price .a-offscreen")
    price = price_el.get_text(strip=True) if price_el else ""

    rating_el = soup.select_one("span[data-hook='rating-out-of-text']") or soup.select_one(".a-icon-alt")
    rating = rating_el.get_text(strip=True) if rating_el else ""

    review_count_el = soup.select_one("#acrCustomerReviewText") or soup.select_one("span[data-hook='total-review-count']")
    review_count = review_count_el.get_text(strip=True) if review_count_el else ""

    brand = ""
    for th in soup.select("#productDetails_techSpec_section_1 th, #prodDetails tr th"):
        if th.get_text(strip=True).lower() in ["brand", "品牌"]:
            td = th.find_next("td")
            brand = td.get_text(" ", strip=True) if td else ""
            break

    return {
        "title": title,
        "bullets": bullets,
        "price": price,
        "rating": rating,
        "review_count": review_count,
        "brand": brand,
    }

def parse_reviews_page(html_text: str, max_reviews: int = 3) -> List[Dict[str, Any]]:
    soup = BeautifulSoup(html_text, "lxml")
    reviews = []
    for card in soup.select("div[data-hook='review']"):
        if len(reviews) >= max_reviews:
            break
        stars_el = card.select_one("i[data-hook='review-star-rating'] span") or card.select_one("i.a-icon-star span")
        stars = stars_el.get_text(strip=True) if stars_el else ""
        title_el = card.select_one("a[data-hook='review-title'] span")
        title = title_el.get_text(" ", strip=True) if title_el else ""
        body_el = card.select_one("span[data-hook='review-body'] span")
        body = body_el.get_text(" ", strip=True) if body_el else ""
        helpful_el = card.select_one("span.cr-vote span")
        helpful = helpful_el.get_text(" ", strip=True) if helpful_el else ""
        reviews.append({
            "stars": stars,
            "title": title,
            "body": body,
            "helpful": helpful,
        })
    return reviews

def build_reviews_url(asin: str, market: str="com") -> str:
    return f"https://www.amazon.{market}/product-reviews/{asin}/?sortBy=helpful"

def simple_keywords(text: str, topk: int = 8) -> List[str]:
    stop = set("a an the and or for with from have has this that your you our are is was were to of in on at by it as be can will just very more most not only also get got make makes made into over about after before under above between com amazon use using used product".split())
    words = re.findall(r"[A-Za-z][A-Za-z0-9\-\+]{2,}", text.lower())
    freq = {}
    for w in words:
        if w in stop:
            continue
        freq[w] = freq.get(w, 0) + 1
    return [w for w,_ in sorted(freq.items(), key=lambda x: (-x[1], x[0]))[:topk]]

def pick_selling_points(bullets: List[str], fallback_text: str, k: int = 3) -> List[str]:
    points = [b for b in bullets if len(b) > 0][:k]
    if len(points) < k:
        kws = simple_keywords(fallback_text, topk=10)
        while len(points) < k and kws:
            points.append(f"Key feature: {kws.pop(0)}")
    return points[:k]

def make_video_script(prod: Dict[str, Any], reviews: List[Dict[str, Any]]) -> str:
    title = prod.get("title","").strip()
    bullets = prod.get("bullets", [])
    rating = prod.get("rating", "")
    review_count = prod.get("review_count", "")
    price = prod.get("price", "")
    brand = prod.get("brand", "")

    fallback = " ".join([title] + bullets)
    selling_points = pick_selling_points(bullets, fallback_text=fallback, k=3)

    top_quotes = []
    for r in reviews[:3]:
        body = r.get("body","").strip()
        if body:
            body = re.sub(r"\s+", " ", body)
            if len(body) > 180:
                body = body[:180].rstrip() + "..."
            top_quotes.append(f"“{body}”")

    hook = f"还在为挑选 {brand + ' ' if brand else ''}{title[:50]} 犹豫吗？看看这款的真实口碑！"
    social = "\n".join([f"- {q}" for q in top_quotes]) if top_quotes else "- 来自用户的真实好评，口碑在线。"

    script = f'''# 视频脚本（草案）

**Hook（抓注意力）**  
{hook}

**产品亮点**  
- {selling_points[0] if len(selling_points)>0 else ''}
- {selling_points[1] if len(selling_points)>1 else ''}
- {selling_points[2] if len(selling_points)>2 else ''}

**社会证明（好评节选）**  
{social}

**数据背书**  
- 评分：{rating or 'N/A'}，评论数：{review_count or 'N/A'}  
- 价格参考：{price or 'N/A'} （以页面为准）

**结尾 CTA**  
如果你也在找一款靠谱的 {brand or ''}{title[:20]}，不妨点开看看，早点入手！
'''
    return script

def scrape_one_product(url: str, market: str, headers: dict, review_n: int) -> Dict[str, Any]:
    asin = extract_asin(url)
    if not asin:
        raise ValueError(f"无法从链接中提取 ASIN：{url}")
    prod_url = f"https://www.amazon.{market}/dp/{asin}"
    resp = get(prod_url, headers=headers)
    prod = parse_product_page(resp.text)

    time.sleep(random.uniform(1.0, 2.0))
    rurl = build_reviews_url(asin, market=market)
    rresp = get(rurl, headers=headers)
    revs = parse_reviews_page(rresp.text, max_reviews=review_n)

    prod["asin"] = asin
    prod["url"] = prod_url
    prod["reviews"] = revs
    prod["video_script"] = make_video_script(prod, revs)
    return prod

def save_outputs(items: List[Dict[str, Any]], outdir: str):
    os.makedirs(outdir, exist_ok=True)
    with open(os.path.join(outdir, "results.json"), "w", encoding="utf-8") as f:
        json.dump(items, f, ensure_ascii=False, indent=2)

    rows = []
    for it in items:
        rows.append({
            "asin": it.get("asin",""),
            "title": it.get("title",""),
            "price": it.get("price",""),
            "rating": it.get("rating",""),
            "review_count": it.get("review_count",""),
            "url": it.get("url",""),
        })
    with open(os.path.join(outdir, "results.csv"), "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()) if rows else ["asin","title","price","rating","review_count","url"])
        writer.writeheader()
        for r in rows:
            writer.writerow(r)

    for it in items:
        asin = it.get("asin","NA")
        script = it.get("video_script","")
        with open(os.path.join(outdir, f"script_{asin}.md"), "w", encoding="utf-8") as f:
            f.write(script)

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--keyword", type=str, default="best seller gadgets", help="DuckDuckGo 搜索关键词")
    ap.add_argument("--products", type=int, default=2, help="抓取商品数量")
    ap.add_argument("--reviews", type=int, default=3, help="每个商品抓取评论条数")
    ap.add_argument("--market", type=str, default="com", help="Amazon 站点，如 com / co.uk / de / jp")
    ap.add_argument("--user-agent", type=str, default="", help="自定义 UA")
    args = ap.parse_args()

    headers = DEFAULT_HEADERS.copy()
    if args.user_agent:
        headers["User-Agent"] = args.user_agent

    outdir = os.path.join(os.path.dirname(__file__), "output")
    os.makedirs(outdir, exist_ok=True)

    print(f"[1/3] 搜索 Amazon 链接：{args.keyword}")
    links = ddg_search_amazon_links(args.keyword, market=args.market, max_links=args.products*3, headers=headers)
    print(f"搜索到候选链接 {len(links)} 条")

    seen_asin = set()
    picked = []
    for link in links:
        asin = extract_asin(link)
        if asin and asin not in seen_asin:
            seen_asin.add(asin)
            picked.append(link)
        if len(picked) >= args.products:
            break

    items = []
    for idx, url in enumerate(picked, 1):
        try:
            print(f"[2/3] 抓取第 {idx}/{len(picked)} 个商品：{url}")
            item = scrape_one_product(url, market=args.market, headers=headers, review_n=args.reviews)
            items.append(item)
            time.sleep(random.uniform(1.0, 2.0))
        except Exception as e:
            print(f"!! 抓取失败：{e}")

    print(f"[3/3] 保存输出到 {outdir}")
    save_outputs(items, outdir)

    if pd is not None and items:
        try:
            import pandas as pd
            df = pd.DataFrame([{
                "asin": it.get("asin",""),
                "title": it.get("title",""),
                "price": it.get("price",""),
                "rating": it.get("rating",""),
                "review_count": it.get("review_count",""),
                "url": it.get("url",""),
            } for it in items])
            xlsx_path = os.path.join(outdir, "results.xlsx")
            df.to_excel(xlsx_path, index=False)
            print(f"已保存：{xlsx_path}")
        except Exception as e:
            print(f"保存 XLSX 失败：{e}")

    print("完成。")

if __name__ == "__main__":
    main()

usage: ipykernel_launcher.py [-h] [--keyword KEYWORD] [--products PRODUCTS] [--reviews REVIEWS] [--market MARKET]
                             [--user-agent USER_AGENT]
ipykernel_launcher.py: error: unrecognized arguments: -f C:\Users\Candy\AppData\Roaming\jupyter\runtime\kernel-102e49d4-923a-4296-a2a0-10944b156a6a.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
