In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import time, tqdm
from src import bigquery, utils, models

  from tqdm.autonotebook import tqdm


In [3]:
secrets = utils.load_json("secrets.json")

In [4]:
bq_client = bigquery.init_bigquery_client(secrets.get("GCP_CREDENTIALS"))

In [5]:
from typing import Optional, Tuple
from playwright.async_api import async_playwright, Browser, BrowserContext, Page, Response
import asyncio


class PlaywrightSession:
    USER_AGENT = (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/131.0.0.0 Safari/537.36"
    )

    VALID_STATUS_CODES = [200, 400]

    def __init__(
        self,
        headless: bool = True,
        proxy_config: Optional[models.ProxyConfig] = None,
        residential_proxy: bool = True,
        user_agent: Optional[str] = None,
        locale: str = "fr-FR",
        timeout_sec: int = 30,
        max_retries: int = 2,
    ):
        self.headless = headless
        self.proxy_config = proxy_config
        self.residential_proxy = residential_proxy

        self.user_agent = user_agent or self.USER_AGENT
        self.locale = locale
        self.timeout_sec = timeout_sec
        self.max_retries = max_retries

        self._client = None
        self._browser: Optional[Browser] = None
        self._context: Optional[BrowserContext] = None

    async def start(self):
        self._client = await async_playwright().start()
        launch_args = {"headless": self.headless}

        if self.proxy_config:
            launch_args["proxy"] = {"server": self.proxy_config.url(self.residential_proxy)}

        self._browser = await self._client.chromium.launch(**launch_args)

        self._context = await self._browser.new_context(
            user_agent=self.user_agent,
            locale=self.locale,
            viewport={"width": 1400, "height": 900},
        )

        await self._context.add_init_script(
            "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
        )

    async def close(self):
        if self._context:
            await self._context.close()
        if self._browser:
            await self._browser.close()
        if self._client:
            await self._client.stop()

    async def restart(self):
        await self.close()
        await asyncio.sleep(1)
        await self.start()

    async def fetch(self, url: str) -> Tuple[Optional[Response], Optional[str]]:
        if not self._context:
            raise RuntimeError("Browser not started. Call await start() first.")

        response: Optional[Response] = None
        content: Optional[str] = None
        retries = 0

        while retries <= self.max_retries:
            page: Page = await self._context.new_page()

            response: Response = await page.goto(
                url=url,
                wait_until="domcontentloaded",
                timeout=self.timeout_sec * 1000,
            )
            
            content = await page.content()
            await page.close()

            if response.status in self.VALID_STATUS_CODES:
                return response, content

            retries += 1
            await self.restart()

        return response, content

In [6]:
from bs4 import BeautifulSoup


def can_be_purchased(status_code: int, html: str) -> bool:
    if status_code != 200:
        return False

    soup = BeautifulSoup(html, "html.parser")
    button = soup.find("button", {"data-testid": "item-buy-button"})

    return button is not None

In [9]:
query_kwargs = {
    "n": 100,
    "is_women": True,
    "sort_by_date": False,
    "catalog_score": 1
}

loader = bigquery.run_query(
    client=bq_client, 
    query=bigquery.query_items(**query_kwargs), 
    to_list=True
)

In [10]:
session = PlaywrightSession(headless=True)
await session.start()

response_list, item_status_list, elapsed_list, error_list = [], [], [], []
n, n_success = 0, 0
loop = tqdm.tqdm(iterable=loader, total=len(loader))

for entry in loop:
    n += 1

    try:
        start_time = time.time()
        response, content = await session.fetch(url=entry["url"])
        elapsed = time.time() - start_time
        item_status = can_be_purchased(response.status, content)

        response_list.append(response)
        item_status_list.append(item_status)
        elapsed_list.append(elapsed)
        n_success += response is not None and item_status is not None

    except Exception as e:
        error_list.append(str(e))

    loop.set_description(f"Success: {n_success/n:.2f}")

await session.close()

Success: 1.00: 100%|██████████| 100/100 [04:17<00:00,  2.58s/it]


In [13]:
from collections import Counter

status_code_counter = Counter([response.status for response in response_list])
item_status_counter = Counter(item_status_list)
avg_elapsed = sum(elapsed_list) / len(elapsed_list)

print(f"Avg time: {avg_elapsed:.2f}s")
print(f"Status codes: {status_code_counter}")
print(f"Item status: {item_status_counter}")

Avg time: 2.56s
Status codes: Counter({200: 74, 404: 26})
Item status: Counter({True: 58, False: 42})


In [16]:
from typing import Optional, Tuple, List
from playwright.async_api import async_playwright, Browser, BrowserContext, Page, Response
import asyncio


class PlaywrightSession:
    USER_AGENT = (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/131.0.0.0 Safari/537.36"
    )

    VALID_STATUS_CODES = [200, 400]

    def __init__(
        self,
        headless: bool = True,
        proxy_config: Optional["models.ProxyConfig"] = None,
        residential_proxy: bool = True,
        user_agent: Optional[str] = None,
        locale: str = "fr-FR",
        timeout_sec: int = 30,
        max_retries: int = 2,
        max_concurrency: int = 5,
    ):
        self.headless = headless
        self.proxy_config = proxy_config
        self.residential_proxy = residential_proxy

        self.user_agent = user_agent or self.USER_AGENT
        self.locale = locale
        self.timeout_sec = timeout_sec
        self.max_retries = max_retries
        self.default_concurrency = max_concurrency

        self._client = None
        self._browser: Optional[Browser] = None
        self._context: Optional[BrowserContext] = None

        self._restart_lock = asyncio.Lock()

    async def start(self):
        self._client = await async_playwright().start()
        launch_args = {"headless": self.headless}

        if self.proxy_config:
            launch_args["proxy"] = {
                "server": self.proxy_config.url(self.residential_proxy)
            }

        self._browser = await self._client.chromium.launch(**launch_args)

        self._context = await self._browser.new_context(
            user_agent=self.user_agent,
            locale=self.locale,
            viewport={"width": 1400, "height": 900},
        )

        await self._context.add_init_script(
            "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
        )

    async def close(self):
        if self._context:
            await self._context.close()
        if self._browser:
            await self._browser.close()
        if self._client:
            await self._client.stop()
        self._context = None
        self._browser = None
        self._client = None

    async def restart(self):
        await self.close()
        await asyncio.sleep(0.5)
        await self.start()

    async def _safe_restart(self):
        async with self._restart_lock:
            if self._context is not None and self._browser is not None:
                return

            await self.restart()

    async def _fetch_with_retries(self, url: str) -> Tuple[Optional[Response], Optional[str]]:
        if not self._context:
            raise RuntimeError("Browser not started. Call await start() first.")

        response: Optional[Response] = None
        content: Optional[str] = None
        retries = 0

        while retries <= self.max_retries:
            page: Page = await self._context.new_page()
            try:
                response = await page.goto(
                    url=url,
                    wait_until="domcontentloaded",
                    timeout=self.timeout_sec * 1000,
                )
                content = await page.content()

                if response and response.status in self.VALID_STATUS_CODES:
                    return response, content

                retries += 1
                await page.close()
                await self._safe_restart()
                continue

            except Exception:
                retries += 1
                await page.close()
                await self._safe_restart()
                continue

        return response, content

    async def fetch(self, url: str) -> Tuple[Optional[Response], Optional[str]]:
        return await self._fetch_with_retries(url)

    async def fetch_many(
        self,
        urls: List[str],
        max_concurrency: Optional[int] = None,
    ) -> List[Tuple[Optional[Response], Optional[str]]]:
        if not self._context:
            raise RuntimeError("Browser not started. Call await start() first.")

        sem = asyncio.Semaphore(max_concurrency or self.default_concurrency)
        results: List[Tuple[Optional[Response], Optional[str]]] = [ (None, None) ] * len(urls)

        async def worker(idx: int, url: str):
            async with sem:
                results[idx] = await self._fetch_with_retries(url)

        tasks = [asyncio.create_task(worker(i, u)) for i, u in enumerate(urls)]
        await asyncio.gather(*tasks)
        
        return results