In [4]:
import os
import time
import re
import pickle
import zipfile
import shutil
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
import time
from datetime import datetime 

In [5]:
LOCAL_ROOT = './'
ITEMS_PATH = LOCAL_ROOT + 'data/items.pkl'
LOG_PATH = LOCAL_ROOT + 'log.txt'
FILE_CACHE_PATH = LOCAL_ROOT + 'data/pages.zip'
FILE_CACHE_ALGO = zipfile.ZIP_LZMA
CATEGORY_IDS = [
    'damenbekleidung-kleider',
    'damenbekleidung-shirts',
    'damenbekleidung-jeans',
    'damenbekleidung-jacken',
    'damenbekleidung-jacken-maentel',
    'damenbekleidung-blusen-tuniken',
]
PAGE_LIMIT = 60
DRIVER_PATH = '/Users/kofmanya/Desktop/HSE/My assignments/Diploma/Dataset/chromedriver'
ZALANDO_URL = 'https://www.zalando.ch'


def log(pattern: str, *args):
    msg = ('{} ' + pattern + '\n').format(datetime.now(), *args)
    print(msg, end='')
    with open(LOG_PATH, 'a', encoding='utf-8') as f:
        f.write(msg)


class Driver:
    def __init__(self):
        options = webdriver.ChromeOptions()
        service = Service(DRIVER_PATH)
        #self.driver = webdriver.Chrome(executable_path=DRIVER_PATH, options=options)
        self.driver = webdriver.Chrome(service=service, options=options)
        self.driver.execute_cdp_cmd('Network.enable', {})
        self.driver.execute_cdp_cmd('Network.setCookie', {'name': 'language-preference', 'value': 'en', 'domain': 'www.zalando.ch'})
        self.driver.execute_cdp_cmd('Network.disable', {})

    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_value, traceback):
        self.driver.quit()

    def load_complete_page(self, url: str) -> str:
        self.driver.get(url)
        time.sleep(4)
        self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight)")
        time.sleep(1)
        return self.driver.page_source


class FileRotator:
    def __init__(self, path: str, rotate_interval: float):
        self.path = path
        self.rotate_interval = rotate_interval
        self.last_rotate_ts = time.time()
        for suffix in ['.bak4', '.bak3', '.bak2', '.bak1', '']:
            if os.path.exists(self.path + suffix):
                self.copy_file(suffix, '.old')
                break

    def before_wrte(self):
        ts = time.time()
        if ts > self.last_rotate_ts + self.rotate_interval:
            log('[info] rotating {}', self.path)
            self.last_rotate_ts = ts
            self.move_file('.bak3', '.bak4')
            self.move_file('.bak2', '.bak3')
            self.move_file('.bak1', '.bak2')
            self.move_file('', '.bak1')
            self.copy_file('.bak1', '')

    def copy_file(self, src: str, dst: str):
        if os.path.exists(self.path + src):
            shutil.copy(self.path + src, self.path + dst)

    def move_file(self, src: str, dst: str):
        if os.path.exists(self.path + src):
            shutil.move(self.path + src, self.path + dst)


class FileCache:
    def __init__(self):
        self.rotator = FileRotator(FILE_CACHE_PATH, 300.0)
        try:
            log('[info] testing cache integrity')
            if os.path.exists(FILE_CACHE_PATH):
                with zipfile.ZipFile(FILE_CACHE_PATH, 'r', compression=FILE_CACHE_ALGO, compresslevel=9) as zipf:
                    bad_file = zipf.testzip()
                    if bad_file:
                        log('[error] bad file in cache: {}, please remove it', bad_file)
                    else:
                        log('[info] cache is ok')
            else:
                log('[info] cache does not exist')
        except:
            log('[error] failed to test cache integrity')

    def get(self, url: str) -> bytes:
        try:
            with zipfile.ZipFile(FILE_CACHE_PATH, 'r', compression=FILE_CACHE_ALGO, compresslevel=9) as zipf:
                with zipf.open(self.url_to_path(url), 'r') as f:
                    return f.read()
        except:
            return None

    def put(self, url: str, data: bytes):
        self.rotator.before_wrte()
        with zipfile.ZipFile(FILE_CACHE_PATH, 'a', compression=FILE_CACHE_ALGO, compresslevel=9) as zipf:
            with zipf.open(self.url_to_path(url), 'w') as f:
                f.write(data)

    def url_to_path(self, url: str) -> str:
        # drop prefix
        for prefix in ['http://', 'https://', 'www.']:
            if url.startswith(prefix):
                url = url[len(prefix):]
        # handle slashes
        url = url.replace('?', '/')
        while '//' in url:
            url = url.replace('//', '/')
        if url.endswith('/'):
            url = url[:-1]
        if url.startswith('/'):
            url = url[1:]
        if len(url) == 0:
            raise ValueError('empty url')
        # introduce / instead of 4th separator
        parts = url.split('/')
        for i in range(len(parts)):
            for sep in ['-', '_']:
                if parts[i].count(sep) >= 4:
                    groups = parts[i].split(sep, maxsplit=4)
                    parts[i] = sep.join(groups[:-1]) + '/' + groups[-1]
                    break
        return '/'.join(parts) + '.txt'


class Item:
    PRICE_CLEANUP_TOKENS = ['&nbsp;', 'From', 'CHF', '\u2019']
    PRICE_ATTRS = [
        'span class="voFjEy _4sa1cA m3OCL3 HlZ_Tf _65i7kZ"',
        'span class="voFjEy _4sa1cA m3OCL3 Yb63TQ ZiDB59 _65i7kZ"',
    ]

    def __init__(self, category_id: str, category_page: int, item_id: str, url: str, html: str):
        self.category = category_id
        self.category_page = category_page
        self.id = item_id
        self.url = url
        self.images = Item.find_images(html)
        self.brand = Item.find_brand(html)
        self.name = Item.find_name(html)
        self.price = Item.find_price(html)
        self.color = Item.find_color(html)
        self.material = Item.find_material(html)
        error, warning = self.check_fields()
        if error:
            self.error = error
            log('[error] {} - {}', url, error)
        elif warning:
            self.warning = warning
            log('[warning] {} - {}', url, warning)
        else:
            log('[debug] {} | {} | {} | {} | {} | {} | {} img(s)', self.url, self.brand, self.name, self.price, self.color, self.material, len(self.images))
            pass


    def find_images(html: str) -> list[str]:
        images1 = []
        for match in re.finditer('"Thumbnail Image ." src="(https://img01.ztat.net/article/[^"]+\.[a-zA-Z]{3,4})', html):
            images1.append(match.group(1))
        images2 = []
        for match in re.finditer(', Enlarge"( fetchpriority="[a-z]+")? src="(https://img01.ztat.net/article/[^"]+\.[a-zA-Z]{3,4})', html):
            images2.append(match.group(2))
        # merge 2 lists preserving order and removing duplicates
        res = []
        for i in range(max(len(images1), len(images2))):
            for x in [images1, images2]:
                if i < len(x) and (x[i] not in res):
                    res.append(x[i])
        return res

    def find_brand(html: str) -> str:
        return Item.find_attr(html, 'h5 class="voFjEy YZziZ- m3OCL3 HlZ_Tf q84f1m snL7ze"')
    
    def find_name(html: str) -> str:
        return Item.find_attr(html, 'span class="EKabf7 R_QwOV"')

    def find_price(html: str) -> str:
        for attr in Item.PRICE_ATTRS:
            prices = Item.find_attrs(html, attr, 2)
            for price in prices:
                price = Item.cleanup_price(price)
                if price:
                    return price
        return ''
    
    def cleanup_price(price: str) -> str:
        try:
            for token in Item.PRICE_CLEANUP_TOKENS:
                price = price.replace(token, '')
            price.strip()
            if price:
                return str(float(price))
        except:
            pass
        return ''            

    def find_color(html: str) -> str:
        return Item.find_attr(html, 'span class="voFjEy lystZ1 Sb5G3D HlZ_Tf zN9KaA"')

    def find_material(html: str) -> str:
        return Item.find_attr(html, 'dd class="voFjEy lystZ1 m3OCL3 HlZ_Tf zN9KaA" role="definition"')
    
    def find_attrs(html: str, attr: str, limit: int) -> list[str]:
        res = []
        start = 0
        for i in range(limit):
            start = html.find(attr + '>', start)
            if start < 0:
                break
            end = html.find('</', start + len(attr) + 1)
            if end < 0:
                break
            res.append(html[start + len(attr) + 1:end])
            start = end
        return res

    def find_attr(html: str, attr: str) -> str:
        start = html.find(attr + '>')
        if start >= 0:
            end = html.find('</', start + len(attr) + 1)
            if end >= 0:
                return html[start + len(attr) + 1:end]
        return ''

    def check_fields(self) -> tuple[str, str]:
        fields = self.__dict__
        for field in ['brand', 'name', 'price', 'color', 'material', 'images']:
            if not fields[field]:
                return ('no {}'.format(field), '')
        if len(self.images) > 0 and len(self.images) < 3:
            return ('', 'only {} image(s)'.format(len(self.images)))
        for image in self.images:
            dot = image.rfind('.')
            if dot >= 0 and (image[dot:].lower() != '.jpg'):
                return ('', 'non-jpg image {}'.format(image[dot:]))
        return ('', '')


class ItemScraper:
    def __init__(self, driver: Driver):
        self.driver = driver
        self.cache = FileCache()
        self.items_dict_rotator = FileRotator(ITEMS_PATH, 60)
        self.items_dict = dict()
        if os.path.exists(ITEMS_PATH):
            with open(ITEMS_PATH, 'rb') as f:
                self.items_dict = pickle.load(f)
 
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_value, traceback):
        self.save_items()

    def save_items(self):
        self.items_dict_rotator.before_wrte()
        with open(ITEMS_PATH, 'wb') as f:
            pickle.dump(self.items_dict, f)
        log('[info] dumped {} items', len(self.items_dict))

    def scrape_category(self, category_id: str):
        category_url = '{}/{}/'.format(ZALANDO_URL, category_id)
        item_ids = self.get_item_ids_in_category(category_url)
        for item_id, category_page in item_ids.items():
            if item_id in self.items_dict:
                continue
            item = self.load_item(category_id, category_page, item_id)
            self.items_dict[item_id] = item.__dict__
            if len(self.items_dict) % 100 == 0:
                self.save_items()
        self.save_items()

    def get_item_ids_in_category(self, category_url: str) -> dict[str, int]:
        item_ids = dict()
        num_pages = self.get_num_pages_in_category(category_url)
        log('[info] category {} has {} pages', category_url, num_pages)
        for i in range(1, min(num_pages, PAGE_LIMIT) + 1):
            url = '{}?p={}'.format(category_url, i)
            html = self.load_complete_page(url)
            on_page = set()
            for match in re.finditer('href="https://www.zalando.ch/([^"]+)\.html"', html):
                on_page.add(match.group(1))
            log_level = '[warning]' if len(on_page) < 60 else '[debug]'
            log('{} page {}?p={} has {} items', log_level, category_url, i, len(on_page))
            for item_id in on_page:
                item_ids[item_id] = i
        log('[info] category {} has {} items', category_url, len(item_ids))
        return item_ids

    def get_num_pages_in_category(self, category_url: str) -> int:
        html = self.load_complete_page(category_url)
        begin_pos = html.find('>Page 1 of ')
        end_pos = html.find('</span>', begin_pos)
        return int(html[begin_pos + 11:end_pos])

    def load_item(self, category_id: str, category_page: int, item_id: str) -> Item:
        url = '{}/{}.html'.format(ZALANDO_URL, item_id)
        html = self.load_complete_page(url)
        return Item(category_id, category_page, item_id, url, html)

    def load_complete_page(self, url: str) -> str:
        cached = self.cache.get(url)
        if cached:
            return cached.decode('utf-8')
        html = self.driver.load_complete_page(url)
        self.cache.put(url, html.encode('utf-8'))
        return html

In [None]:
with Driver() as driver:
    scrapper = ItemScraper(driver)
    for category_id in CATEGORY_IDS:
        scrapper.scrape_category(category_id)    