In [1]:
pwd

'/home/phuoc/Data_Engineer/project_02'

In [1]:
import pandas as pd
df_test = pd.read_csv("data/products-0-200000.csv")
df_test.head()

Unnamed: 0,id
0,1391347
1,74897599
2,154155413
3,253117062
4,130978358


In [15]:
import asyncio
import aiohttp
import pandas as pd
import json
from pathlib import Path
from bs4 import BeautifulSoup
import unicodedata
import re
from typing import List

# =========================
# CONFIG
# =========================
INPUT_CSV = "data/products-0-200000.csv"
OUTPUT_DIR = Path("tiki_products")

BATCH_SIZE = 1000

INITIAL_CONCURRENCY = 8
MAX_CONCURRENCY = 25
MIN_CONCURRENCY = 3

MAX_RETRIES = 4
RETRY_BACKOFF = [1, 2, 4, 8]  # seconds

TIMEOUT = 30

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (X11; Linux x86_64) "
        "AppleWebKit/537.36 "
        "(KHTML, like Gecko) "
        "Chrome/120.0 Safari/537.36"
    ),
    "Accept": "application/json",
    "Accept-Language": "vi-VN,vi;q=0.9,en;q=0.8",
}

OUTPUT_DIR.mkdir(exist_ok=True)

# =========================
# Normalize description
# =========================
def normalize_description(text: str | None) -> str:
    if not text:
        return ""
    soup = BeautifulSoup(text, "html.parser")
    clean_text = soup.get_text(separator=" ")
    clean_text = unicodedata.normalize("NFKC", clean_text)
    clean_text = re.sub(r"\s+", " ", clean_text).strip()
    return clean_text


# =========================
# Fetch single product (with retry)
# =========================
async def fetch_product(session, product_id: int, sem: asyncio.Semaphore):
    url = f"https://tiki.vn/api/v2/products/{product_id}"

    async with sem:
        for attempt in range(MAX_RETRIES):
            try:
                async with session.get(url, headers=HEADERS) as resp:
                    status = resp.status

                    if status == 200:
                        data = await resp.json()
                        return {
                            "ok": True,
                            "data": {
                                "id": data.get("id"),
                                "name": data.get("name"),
                                "url_key": data.get("url_key"),
                                "price": data.get("price"),
                                "description": normalize_description(data.get("description")),
                                "images": [
                                    img.get("base_url")
                                    for img in data.get("images", [])
                                    if img.get("base_url")
                                ],
                            },
                        }

                    # Retryable HTTP errors
                    if status in (429, 500, 502, 503):
                        await asyncio.sleep(RETRY_BACKOFF[attempt])
                        continue

                    # Non-retryable
                    return {
                        "ok": False,
                        "product_id": product_id,
                        "error_type": f"HTTP_{status}",
                    }

            except asyncio.TimeoutError:
                if attempt == MAX_RETRIES - 1:
                    return {
                        "ok": False,
                        "product_id": product_id,
                        "error_type": "TIMEOUT",
                    }
                await asyncio.sleep(RETRY_BACKOFF[attempt])

            except Exception as e:
                if attempt == MAX_RETRIES - 1:
                    return {
                        "ok": False,
                        "product_id": product_id,
                        "error_type": "EXCEPTION",
                        "message": str(e),
                    }
                await asyncio.sleep(RETRY_BACKOFF[attempt])

    return {
        "ok": False,
        "product_id": product_id,
        "error_type": "RETRY_EXCEEDED",
    }


# =========================
# Fetch batch with adaptive concurrency
# =========================
async def fetch_batch(
    product_ids: List[int],
    batch_idx: int,
    concurrency: int,
):
    sem = asyncio.Semaphore(concurrency)

    timeout = aiohttp.ClientTimeout(total=TIMEOUT)
    connector = aiohttp.TCPConnector(limit=concurrency, ssl=False)

    async with aiohttp.ClientSession(
        timeout=timeout,
        connector=connector,
    ) as session:

        tasks = [
            fetch_product(session, pid, sem)
            for pid in product_ids
        ]
        results = await asyncio.gather(*tasks)

    success_products = [r["data"] for r in results if r["ok"]]
    error_logs = [r for r in results if not r["ok"]]

    # Save success
    with open(
        OUTPUT_DIR / f"products_batch_{batch_idx:04d}.json",
        "w",
        encoding="utf-8",
    ) as f:
        json.dump(success_products, f, ensure_ascii=False)

    # Save errors
    with open(
        OUTPUT_DIR / f"errors_batch_{batch_idx:04d}.json",
        "w",
        encoding="utf-8",
    ) as f:
        json.dump(error_logs, f, ensure_ascii=False, indent=2)

    error_rate = len(error_logs) / len(product_ids)

    return {
        "success": len(success_products),
        "failed": len(error_logs),
        "error_rate": error_rate,
    }


# =========================
# MAIN
# =========================
NUMBER_OF_ID = 1000
async def main():
    df = pd.read_csv(INPUT_CSV)

    # test nhỏ trước
    df = df.head(NUMBER_OF_ID)

    product_ids = df["id"].astype(int).tolist()

    concurrency = INITIAL_CONCURRENCY

    total_success = 0
    total_failed = 0

    for i in range(0, len(product_ids), BATCH_SIZE):
        batch = product_ids[i : i + BATCH_SIZE]
        batch_idx = i // BATCH_SIZE

        stats = await fetch_batch(
            batch,
            batch_idx,
            concurrency,
        )

        total_success += stats["success"]
        total_failed += stats["failed"]

        # Adaptive concurrency
        if stats["error_rate"] < 0.05 and concurrency < MAX_CONCURRENCY:
            concurrency += 2
        elif stats["error_rate"] > 0.2:
            concurrency = max(MIN_CONCURRENCY, concurrency - 3)

        print(
            f"[BATCH {batch_idx:04d}] "
            f"SUCCESS={stats['success']}/{len(batch)} | "
            f"FAILED={stats['failed']} | "
            f"ERROR_RATE={stats['error_rate']:.2%} | "
            f"CONCURRENCY={concurrency}"
        )

        await asyncio.sleep(1)

    print("\n=== DONE ===")
    print(f"TOTAL SUCCESS: {total_success}")
    print(f"TOTAL FAILED : {total_failed}")


# =========================
# ENTRY POINT
# =========================
await main()

[BATCH 0000] SUCCESS=809/1000 | FAILED=191 | ERROR_RATE=19.10% | CONCURRENCY=8

=== DONE ===
TOTAL SUCCESS: 809
TOTAL FAILED : 191


In [16]:
import json
from pathlib import Path
from collections import Counter, defaultdict
import pandas as pd

ERROR_DIR = Path("tiki_products")
TOTAL_REQUEST = NUMBER_OF_ID # đổi nếu bạn crawl nhiều hơn

all_errors = []
error_counter = Counter()
http_status_counter = Counter()

# -----------------------
# Read all error files
# -----------------------
for error_file in ERROR_DIR.glob("errors_batch_*.json"):
    with open(error_file, "r", encoding="utf-8") as f:
        errors = json.load(f)

    for e in errors:
        all_errors.append(e)

        etype = e.get("error_type", "UNKNOWN")
        error_counter[etype] += 1

        if etype.startswith("HTTP_"):
            http_status_counter[etype] += 1

total_errors = len(all_errors)
total_success = TOTAL_REQUEST - total_errors

print("====== OVERALL ======")
print(f"TOTAL REQUEST : {TOTAL_REQUEST}")
print(f"SUCCESS       : {total_success}")
print(f"FAILED        : {total_errors}")
print(f"SUCCESS RATE  : {total_success / TOTAL_REQUEST * 100:.2f}%")
print(f"FAIL RATE     : {total_errors / TOTAL_REQUEST * 100:.2f}%")

print("\n====== ERROR BREAKDOWN ======")
for k, v in error_counter.most_common():
    print(f"{k:15s}: {v:5d} ({v / TOTAL_REQUEST * 100:.2f}%)")

TOTAL REQUEST : 1000
SUCCESS       : 809
FAILED        : 191
SUCCESS RATE  : 80.90%
FAIL RATE     : 19.10%

EXCEPTION      :   187 (18.70%)
HTTP_404       :     4 (0.40%)
