# Metro Online Scraper
This notebook scrapes product data from Metro Online Pakistan using Playwright.

### Instructions:
1. Run the **Setup Cell** to install dependencies and Playwright browser.
2. Run the **Scraper Logic Cell** to define the classes.
3. Run the **Execution Cell** to start scraping.

In [None]:
# @title Setup Cell
!pip install playwright nest_asyncio pandas
!playwright install chromium
import os
os.makedirs('data', exist_ok=True)
import nest_asyncio
nest_asyncio.apply()

In [None]:
# @title Base Infrastructure
import csv
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
from abc import ABC, abstractmethod
import time
import random

class BaseScraper(ABC):
    def __init__(self, store_name: str, base_url: str, output_dir: str = "data"):
        self.store_name = store_name
        self.base_url = base_url
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.logger = self._setup_logger()
        self.csv_headers = [
            'store_name', 'product_name', 'brand', 'category', 'subcategory',
            'price', 'discounted_price', 'unit', 'quantity', 'url', 'image_url', 'last_updated'
        ]
    
    def _setup_logger(self) -> logging.Logger:
        logger = logging.getLogger(f"{self.store_name}_scraper")
        logger.setLevel(logging.INFO)
        if not logger.handlers:
            ch = logging.StreamHandler()
            ch.setLevel(logging.INFO)
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            ch.setFormatter(formatter)
            logger.addHandler(ch)
        return logger
    
    def _rate_limit(self, min_delay: float = 1.0, max_delay: float = 3.0):
        time.sleep(random.uniform(min_delay, max_delay))
    
    def _clean_price(self, price_str: str) -> Optional[float]:
        if not price_str: return None
        try:
            cleaned = str(price_str).replace('Rs.', '').replace('PKR', '').replace('Rs', '').replace(',', '').strip()
            return float(cleaned)
        except (ValueError, AttributeError): return None
    
    def _parse_unit_quantity(self, text: str) -> tuple[Optional[str], Optional[float]]:
        if not text: return None, None
        import re
        patterns = [r'(\d+\.?\d*)\s*(kg|g|l|ml|piece|pcs|pack)', r'(\d+\.?\d*)(kg|g|l|ml|piece|pcs|pack)']
        for pattern in patterns:
            match = re.search(pattern, text.lower())
            if match: return match.group(2), float(match.group(1))
        return None, None
    
    def save_to_csv(self, products: List[Dict], filename: Optional[str] = None):
        if not products: return
        if filename is None:
            filename = f"{self.store_name.lower().replace(' ', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        filepath = self.output_dir / filename
        with open(filepath, 'w', newline='', encoding='utf-8-sig') as f:
            writer = csv.DictWriter(f, fieldnames=self.csv_headers)
            writer.writeheader()
            for product in products:
                row = {header: product.get(header, None) for header in self.csv_headers}
                writer.writerow(row)
        print(f"\u2713 Saved {len(products)} products to {filepath}")

    def validate_product(self, product: Dict) -> bool: return bool(product.get('product_name') and product.get('price'))
    
    @abstractmethod
    async def scrape(self) -> List[Dict]: pass
    @abstractmethod
    async def get_categories(self) -> List[Dict]: pass
    
    def create_product_dict(self, **kwargs) -> Dict:
        d = {h: kwargs.get(h) for h in self.csv_headers}
        d['store_name'] = self.store_name
        d['last_updated'] = datetime.now().isoformat()
        return d

In [None]:
# @title Metro Scraper Logic
import asyncio
from playwright.async_api import async_playwright

class MetroScraper(BaseScraper):
    def __init__(self, output_dir: str = "data"):
        super().__init__(store_name="Metro", base_url="https://www.metro-online.pk", output_dir=output_dir)
        self.playwright = None
        self.browser = None
        self.context = None
    
    async def setup_browser(self):
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(headless=True, args=['--no-sandbox', '--disable-dev-shm-usage'])
        self.context = await self.browser.new_context(viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0')
    
    async def close_browser(self):
        if self.context: await self.context.close()
        if self.browser: await self.browser.close()
        if self.playwright: await self.playwright.stop()
    
    async def scroll_to_load_all_products(self, page):
        prev_h = 0
        attempts = 0
        while attempts < 15:
            curr_h = await page.evaluate("window.scrollTo(0, document.body.scrollHeight); document.body.scrollHeight")
            await asyncio.sleep(2)
            # Try load more
            for sel in ['button:has-text(\"Load More\")', 'button:has-text(\"Show More\")']:
                btn = await page.query_selector(sel)
                if btn and await btn.is_visible():
                    await btn.click()
                    await asyncio.sleep(3)
            if curr_h == prev_h: break
            prev_h = curr_h
            attempts += 1

    async def get_categories(self) -> List[Dict]:
        page = await self.context.new_page()
        await page.goto(f"{self.base_url}/home", wait_until='networkidle')
        await page.wait_for_selector('.CategoryGrid_grid_item__FXimL', timeout=15000)
        res = await page.evaluate('''() => {
            return Array.from(document.querySelectorAll('.CategoryGrid_grid_item__FXimL')).map(el => {
                const l = el.querySelector('a');
                const i = el.querySelector('img');
                return { name: i?.alt || 'No name', url: l?.href || 'No URL' };
            });
        }''')
        await page.close()
        return res

    async def get_subcategories(self, cat: Dict) -> List[Dict]:
        page = await self.context.new_page()
        try:
            await page.goto(cat['url'], wait_until='networkidle')
            sub_sel = '.sc-gKPRtg.jJzJeK'
            try: await page.wait_for_selector(sub_sel, timeout=5000)
            except: return [{'name': cat['name'], 'url': cat['url'], 'main_category': cat['name']}]
            subs = await page.evaluate('''(sel) => {
                return Array.from(document.querySelectorAll(sel + " a")).map(l => ({
                    name: l.querySelector("h6, .sc-cwSeag")?.textContent?.trim(),
                    url: l.href
                }));
            }''', sub_sel)
            for s in subs: s['main_category'] = cat['name']
            return subs
        finally: await page.close()

    async def scrape_subcategory(self, sub: Dict) -> List[Dict]:
        page = await self.context.new_page()
        products = []
        try:
            await page.goto(sub['url'], wait_until='networkidle')
            try: await page.wait_for_selector('.CategoryGrid_product_card__FUMXW', timeout=10000)
            except: return []
            await self.scroll_to_load_all_products(page)
            cards = await page.evaluate('''() => {
                return Array.from(document.querySelectorAll('.CategoryGrid_product_card__FUMXW')).map(el => ({
                    name: el.querySelector('.CategoryGrid_product_name__3nYsN')?.textContent?.trim(),
                    price: el.querySelector('.CategoryGrid_product_price__Svf8T')?.textContent?.trim(),
                    url: el.querySelector('a')?.href,
                    img: el.querySelector('img')?.src
                }));
            }''')
            for c in cards:
                u, q = self._parse_unit_quantity(c['name'])
                p = self.create_product_dict(product_name=c['name'], price=self._clean_price(c['price']), url=c['url'], image_url=c['img'], category=sub['main_category'], subcategory=sub['name'], unit=u, quantity=q)
                if self.validate_product(p): products.append(p)
            return products
        finally: await page.close()

    async def scrape(self) -> List[Dict]:
        all_p = []
        await self.setup_browser()
        try:
            cats = await self.get_categories()
            for cat in cats[:5]: # Limit for demo
                subs = await self.get_subcategories(cat)
                for sub in subs[:5]:
                    all_p.extend(await self.scrape_subcategory(sub))
                    await asyncio.sleep(1)
        finally: await self.close_browser()
        return all_p

In [None]:
# @title Execution
scraper = MetroScraper()
products = await scraper.scrape()
if products:
    scraper.save_to_csv(products)
import pandas as pd
if products:
    df = pd.DataFrame(products)
    display(df.head())