In [3]:
from curl_cffi import requests as cf
from urllib.parse import urlsplit, urlunsplit, urlencode, parse_qsl
from tqdm import tqdm
import json, time, random, csv, os, re
from typing import Any, Dict, Set, List

IDS_CSV = "best_seller_product_ids.csv"
OUT_CSV = "dataScraping_sociolla_bestSeller_userReview_140925.csv"  # new file name

CAMPAIGN_URL = (
    "https://catalog-api4.sociolla.com/v3/campaigns/best-seller"
    "?filter=%7B%22type%22:%22best_seller%22%7D"
    "&fields=_id,is_active,is_display,name,slug,sections,childs,more_info"
)
BEST_SELLER_LIST_URL = (
    "https://catalog-api.sociolla.com/v3/products/reviews/best-seller"
    "?skip=0&limit=10&sort=-created_at"
)

REVIEWS_API = "https://soco-api.sociolla.com/reviews"

BROWSER_HEADERS = {
    "accept": "application/json, text/plain, */*",
    "origin": "https://www.sociolla.com",
    "referer": "https://www.sociolla.com/",
    "soc-platform": "sociolla-web-desktop",
}


FIELDNAMES = [
    "review_id", "product_id",
    "product.id", "product.name",
    "name", "details",
    "is_recommended", "is_repurchase", "is_verified_purchase",
    "lang", "duration_of_used",
    "created_at", "updated_at",
    "images", "source",
    "average_rating", "counter_star",
    "star_long_wear", "star_packaging", "star_pigmentation",
    "star_texture", "star_value_for_money",
    "brand.id", "brand.name",
    "product.variant", "product.categories",
    "counter_review_rating", "counter_review_star", "counter_review_user",
    "is_active_in_offline_store", "is_active_in_review", "is_active_in_sociolla",
    "is_buy_one_get_one_free", "is_in_stock", "is_in_stock_sociolla",
    "is_reviewed", "is_reviewed_combination", "is_sale", "is_status",
    "max_price", "max_price_after_discount", "min_price", "min_price_after_discount",
    "star_effectiveness", "star_scent", 
    "total_likes",
    "url_sociolla",
]


def backoff_sleep(attempt: int):
    time.sleep(1.0 * (2 ** attempt) + random.random())

def get_json(sess: cf.Session, url: str, headers: Dict[str, str], tries: int = 5):
    for attempt in range(tries):
        try:
            r = sess.get(url, headers=headers, timeout=30)
            r.raise_for_status()
            return r.json()
        except Exception:
            if attempt == tries - 1:
                raise
            backoff_sleep(attempt)

def deep_collect_product_ids(obj: Any, ids: Set[int]):
    def to_pid(v):
        if isinstance(v, int) and v > 0: return v
        if isinstance(v, str) and v.isdigit(): return int(v)
        return None
    if isinstance(obj, dict):
        for k in ("product_id", "productId", "id"):
            if k in obj:
                pid = to_pid(obj[k])
                if pid: ids.add(pid)
        prod = obj.get("product")
        if isinstance(prod, dict):
            for k in ("id", "product_id", "productId"):
                pid = to_pid(prod.get(k))
                if pid: ids.add(pid)
        for k in ("url", "link", "href", "path", "slug"):
            v = obj.get(k)
            if isinstance(v, str):
                m = re.search(r"/p/(\d+)(?:[-/]|$)", v)
                if m: ids.add(int(m.group(1)))
        for v in obj.values():
            deep_collect_product_ids(v, ids)
    elif isinstance(obj, list):
        for it in obj:
            deep_collect_product_ids(it, ids)

def paginate_best_seller_list(sess: cf.Session, base_url: str, headers: Dict[str, str]) -> Set[int]:
    parts = urlsplit(base_url)
    base = parts._replace(query="")
    params = dict(parse_qsl(parts.query, keep_blank_values=True))
    skip = int(params.get("skip", "0") or "0")
    limit = int(params.get("limit", "50") or "50")
    params.setdefault("limit", str(limit))
    params.setdefault("sort", "-created_at")

    found: Set[int] = set()
    page = 0
    while True:
        params["skip"] = str(skip)
        url = urlunsplit(base._replace(query=urlencode(params)))
        data = get_json(sess, url, headers)
        before = len(found)
        deep_collect_product_ids(data, found)

        items_len = 0
        if isinstance(data, dict):
            for k in ("items", "products", "data", "result", "results", "records"):
                v = data.get(k)
                if isinstance(v, list):
                    items_len = len(v); break
        elif isinstance(data, list):
            items_len = len(data)

        if items_len < limit: break
        if len(found) == before and page > 0: break

        skip += limit; page += 1
        time.sleep(0.25 + random.random() * 0.25)

    return found

def load_or_harvest_best_seller_ids(sess: cf.Session) -> List[int]:
    if os.path.exists(IDS_CSV):
        with open(IDS_CSV, newline="", encoding="utf-8") as f:
            next(f)  # skip header
            ids = [int(row.strip()) for row in f if row.strip().isdigit()]
        print(f"[IDs] Loaded {len(ids)} product_ids from {IDS_CSV}")
        return sorted(set(ids))

    all_ids: Set[int] = set()
    try:
        campaign_json = get_json(sess, CAMPAIGN_URL, BROWSER_HEADERS)
        deep_collect_product_ids(campaign_json, all_ids)
        print(f"[campaigns] collected {len(all_ids)} ids so far")
    except Exception as e:
        print("[campaigns] skipped due to error:", e)

    try:
        ids_from_list = paginate_best_seller_list(sess, BEST_SELLER_LIST_URL, BROWSER_HEADERS)
        all_ids |= ids_from_list
        print(f"[products/reviews/best-seller] collected {len(ids_from_list)} ids; total {len(all_ids)}")
    except Exception as e:
        print("[products/reviews/best-seller] skipped due to error:", e)

    with open(IDS_CSV, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f); w.writerow(["product_id"])
        for pid in sorted(all_ids): w.writerow([pid])
    print(f"[IDs] Saved to {os.path.abspath(IDS_CSV)} (count={len(all_ids)})")
    return sorted(all_ids)


def _request_with_handling(sess, url, headers, params, max_retries=4):
    """
    Make a GET with retries for transient errors.
    Returns (response or None, fatal_bad_request: bool).
    fatal_bad_request=True means 4xx bad filter (400/422) -> try next variant.
    """
    for attempt in range(max_retries):
        try:
            r = sess.get(url, headers=headers, params=params, timeout=30)
            if r.status_code in (400, 422):
                return None, True
            if r.status_code in (429, 500, 502, 503, 504):
                backoff_sleep(attempt)
                continue
            r.raise_for_status()
            return r, False
        except Exception:
            if attempt == max_retries - 1:
                return None, True
            backoff_sleep(attempt)
    return None, True

def fetch_reviews_for_product(sess, product_id: int, limit=50, max_reviews=100000):
    """
    Try multiple filter variants to avoid 400/422 for certain products.
    Returns a list of reviews; never raises on 400/422 — it skips politely.
    """
    variants = [
        {"is_published": True, "elastic_search": True,  "product_id": product_id},
        {"is_published": True,                          "product_id": product_id},
        {"is_published": True, "elastic_search": True,  "product_id": str(product_id)},
        {"is_published": True,                          "product_id": str(product_id)},
        {"is_published": True, "product_id": product_id, "is_highlight": True},  # subset fallback
    ]

    for flt in variants:
        out = []
        hit_any_page = False

        for skip in range(0, max_reviews, limit):
            params = {
                "filter": json.dumps(flt, ensure_ascii=False),
                "skip": skip,
                "limit": limit,
                "sort": "most_relevant",  
            }

            resp, fatal_bad = _request_with_handling(sess, REVIEWS_API, BROWSER_HEADERS, params)
            if fatal_bad:
                break

            hit_any_page = True
            data = resp.json()
            items = data.get("data") or data.get("items") or []
            if not items:
                break

            out.extend(items)
            if len(items) < limit:
                break

            # polite pacing
            time.sleep(0.35 + random.random() * 0.25)

        if out:
            return out 
        if hit_any_page:
            break

    print(f"[skip] product_id={product_id} returned 400/empty across all filter variants")
    return []


def _list_to_csv(items, key_candidates=None):
    if not items: return ""
    out = []
    for it in items:
        if isinstance(it, str):
            out.append(it)
        elif isinstance(it, dict):
            picked = None
            if key_candidates:
                for k in key_candidates:
                    v = it.get(k)
                    if v not in (None, ""):
                        picked = str(v); break
            out.append(picked if picked is not None else json.dumps(it, ensure_ascii=False))
        else:
            out.append(str(it))
    return ",".join(out)

def _get_nested(d: dict, path: List[str], default=None):
    cur = d
    for p in path:
        if not isinstance(cur, dict): return default
        cur = cur.get(p)
        if cur is None: return default
    return cur

def to_row(rv):
    product_obj = rv.get("product") or {}
    brand = product_obj.get("brand") or {}
    stars = rv.get("stars") or rv.get("star") or {}
    def star_of(key_flat, key_nested, product_key=None):
        if key_flat in rv:  # review-level flat
            return rv.get(key_flat)
        if key_nested and isinstance(stars, dict) and key_nested in stars:  
            return stars.get(key_nested)
        if product_key and product_key in product_obj:  
            return product_obj.get(product_key)
        return None

    categories_csv = _list_to_csv(product_obj.get("categories", []), key_candidates=["name", "slug", "title"])

    return {
        "review_id": rv.get("_id"),
        "product_id": rv.get("product_id"),
        "product.id": product_obj.get("id") or rv.get("product_id"),
        "product.name": product_obj.get("name"),
        "name": rv.get("name") or _get_nested(rv, ["user","name"]),
        "details": rv.get("details"),
        "is_recommended": rv.get("is_recommended"),
        "is_repurchase": rv.get("is_repurchase"),
        "is_verified_purchase": rv.get("is_verified_purchase"),
        "lang": rv.get("lang"),
        "duration_of_used": rv.get("duration_of_used"),
        "created_at": rv.get("created_at"),
        "updated_at": rv.get("updated_at") or rv.get("edited_at"),
        "images": _list_to_csv(rv.get("images", []), key_candidates=["name", "url", "src"]),
        "source": rv.get("source"),
        "average_rating": rv.get("average_rating") or product_obj.get("average_rating"),
        "counter_star":   rv.get("counter_star")   or product_obj.get("counter_star"),
        "star_long_wear":      star_of("star_long_wear", "long_wear", "star_long_wear"),
        "star_packaging":      star_of("star_packaging", "packaging", "star_packaging"),
        "star_pigmentation":   star_of("star_pigmentation", "pigmentation", "star_pigmentation"),
        "star_texture":        star_of("star_texture", "texture", "star_texture"),
        "star_value_for_money":star_of("star_value_for_money", "value_for_money", "star_value_for_money"),
        "brand.id": brand.get("id") or brand.get("_id") or brand.get("slug"),
        "brand.name": brand.get("name") or brand.get("title"),
        "product.variant": product_obj.get("variant"),
        "product.categories": categories_csv,
        "counter_review_rating": product_obj.get("counter_review_rating") or rv.get("counter_review_rating"),
        "counter_review_star":   product_obj.get("counter_review_star")   or rv.get("counter_review_star"),
        "counter_review_user":   product_obj.get("counter_review_user")   or rv.get("counter_review_user"),
        "is_active_in_offline_store": product_obj.get("is_active_in_offline_store"),
        "is_active_in_review":        product_obj.get("is_active_in_review"),
        "is_active_in_sociolla":      product_obj.get("is_active_in_sociolla"),
        "is_buy_one_get_one_free":    product_obj.get("is_buy_one_get_one_free"),
        "is_in_stock":                product_obj.get("is_in_stock"),
        "is_in_stock_sociolla":       product_obj.get("is_in_stock_sociolla"),
        "is_reviewed":                product_obj.get("is_reviewed"),
        "is_reviewed_combination":    product_obj.get("is_reviewed_combination"),
        "is_sale":                    product_obj.get("is_sale"),
        "is_status":                  product_obj.get("is_status"),
        "max_price":                  product_obj.get("max_price"),
        "max_price_after_discount":   product_obj.get("max_price_after_discount"),
        "min_price":                  product_obj.get("min_price"),
        "min_price_after_discount":   product_obj.get("min_price_after_discount"),
        "star_effectiveness":         product_obj.get("star_effectiveness"),
        "star_scent":                 product_obj.get("star_scent"),
        "total_likes":                product_obj.get("total_likes"),
        "url_sociolla":               product_obj.get("url_sociolla") or product_obj.get("url") or product_obj.get("link"),
    }


if __name__ == "__main__":
    sess = cf.Session(impersonate="chrome")

    product_ids = load_or_harvest_best_seller_ids(sess)
    print(f"[IDs] Using {len(product_ids)} Best Seller product_ids")

    already = set()
    if os.path.exists(OUT_CSV):
        with open(OUT_CSV, newline="", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                if row.get("product_id") and str(row["product_id"]).isdigit():
                    already.add(int(row["product_id"]))
        print(f"[resume] Detected {len(already)} product_ids already written in {OUT_CSV}")

    write_header = not os.path.exists(OUT_CSV)
    with open(OUT_CSV, "a", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=FIELDNAMES)
        if write_header:
            writer.writeheader()

        for pid in tqdm(product_ids, desc="Products"):
            if pid in already:
                continue

            reviews = fetch_reviews_for_product(sess, pid, limit=50)
            if not reviews:
                continue

            for rv in reviews:
                try:
                    writer.writerow(to_row(rv))
                except Exception as e:
                    print("Skip row due to error:", e)

            time.sleep(0.3 + random.random() * 0.3)

    print(f"[done] Saved all reviews to: {os.path.abspath(OUT_CSV)}")


[IDs] Loaded 237 product_ids from best_seller_product_ids.csv
[IDs] Using 237 Best Seller product_ids
[resume] Detected 8 product_ids already written in dataScraping_sociolla_bestSeller_userReview_140925.csv


Products:   1%|▌                                                                       | 2/237 [00:03<06:06,  1.56s/it]

[skip] product_id=3 returned 400/empty across all filter variants


Products:  54%|█████████████████████████████████████▊                                | 128/237 [39:23<03:09,  1.74s/it]

[skip] product_id=108972 returned 400/empty across all filter variants


Products:  69%|████████████████████████████████████████████████▏                     | 163/237 [41:34<05:09,  4.18s/it]

[skip] product_id=112195 returned 400/empty across all filter variants


Products:  98%|████████████████████████████████████████████████████████████████████▊ | 233/237 [43:55<00:02,  1.59it/s]

[skip] product_id=119346 returned 400/empty across all filter variants
[skip] product_id=119348 returned 400/empty across all filter variants


Products: 100%|██████████████████████████████████████████████████████████████████████| 237/237 [43:59<00:00, 11.14s/it]

[done] Saved all reviews to: C:\Users\ASUS\dataScraping_sociolla_bestSeller_userReview_140925.csv



