In [38]:
import boto3
import re
import logging
import pandas as pd

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup, ResultSet
from functools import partial
from datetime import datetime

logging.basicConfig(level=logging.INFO)

## Local Environment Setup (On Mac)
1. Please have Google Chrome installed on your laptop
2. Create an python virtual environment with `python -m venv venv` 
3. Prepare the working environment with 
   1. `source venv/bin/activate`
   2. `pip install -r requirements.txt`



In [2]:
class MKBag(object):

    def __init__(self):
        driver_options = Options()
        driver_options.add_argument("--headless=new")
        driver_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36")
        self.driver = webdriver.Chrome(options=driver_options)

    def browse_web(
        self,
        url: str = 'https://www.michaelkors.global/hk/en/women/handbags/?pmin=1.00&start=0&sz={max_item}',
        item_count: int = 50,
    ) -> ResultSet:
        self.driver.get(url.format(max_item=item_count))
        return BeautifulSoup(self.driver.page_source, 'html.parser')
    
    def cleanup(self):
        self.driver.quit()

    @staticmethod
    def extract_item_info(raw_result: ResultSet, path: str, get_item: str) -> list:
        found = raw_result.select(path)
        if not found:
            return []
        if get_item == 'text':
            return [txt.text for txt in found]
        else:
            return [item.get(get_item) for item in found]

    def item_data_constructor(self, raw_result: ResultSet, get_all_details: bool) -> dict:

        extract_info_partial = partial(self.extract_item_info, raw_result=raw_result)
        product_link=f"https://www.michaelkors.global/{extract_info_partial(path='div.pdp-link a', get_item='href')[0]}"

        brand = extract_info_partial(path='div.product-brand a', get_item='text')
        item_name=extract_info_partial(path='div.pdp-link a', get_item='text')[0]

        logging.info(f'Pulling Basic information of item: {item_name}')

        basic_details = dict(
            brand=brand[0] if brand else 'MICHAEL Michael Kors',
            item_name=item_name,
            product_link=product_link,
            default_price=max(extract_info_partial(path='span.default-price .value', get_item='content')),
            current_price=min(extract_info_partial(path='span.default-price .value', get_item='content')),
            colors=extract_info_partial(path='div.swatches img', get_item='title'),
            product_images=extract_info_partial(path='div.image-container img', get_item='src'),
            timestamp=datetime.now(),
        )

        if get_all_details:
            logging.info(f'Pulling dtailed information of item: {item_name}')
            extra_details = self.get_item_details(product_link)
            basic_details.update(extra_details)
        
        return basic_details

    def get_all_bags(self, get_all_details: bool = False) -> list:

        total_bags = self.browse_web().select_one('span.results-count-value').text

        logging.info(f'Start pulling total {total_bags} bags data')
 
        lst_bags = self.browse_web(item_count=total_bags).findAll('div', class_='product-tile')

        bags_data = []

        for idx, bags in enumerate(lst_bags):
            logging.info(f'Working on {idx+1}/{total_bags} bags')
            bags_data.append(self.item_data_constructor(bags, get_all_details))

        return bags_data
    
    def get_item_details(self, product_link: str) -> dict:
        raw_product_detail = self.browse_web(product_link)

        extra_data = dict(
            desceiption=raw_product_detail.select('div.col-12.value.content')[0].text.strip(),
            availability=raw_product_detail.select('div.availability')[0].get('data-available'),
            product_details=raw_product_detail.select('div.col-sm-12.col-md-8.col-lg-12.value.content')[0].text.strip().split('\n')
        )
        dim_expression = r'(\d+(?:\.\d+)?)\S\s?([WHD])'
        dimension = [
            re.findall(dim_expression, details) 
            for details in extra_data['product_details'] 
            if re.findall(dim_expression, details)
        ]

        logging.info(dimension) ##
      
        extra_data['dimension'] = {item[1]: item[0] for item in dimension[0]} if dimension else {}

        return extra_data

        

Starts Scraping

- It will scrape below attributes by default:
  - item name
  - brand
  - default price
  - current price
  - colors
  - product link
  - product image
  - timestamp
- You may choose to retrieve extra details with arg "get_all_details=True" (it takes 10x longer):
  - dimension
  - desceiption
  - availability
- It returns a list of dictionary

In [3]:
mk_obj = MKBag()
bags_data = mk_obj.get_all_bags(get_all_details=False)
mk_obj.cleanup()

INFO:root:Start pulling total 400 bags data
INFO:root:Working on 1/400 bags
INFO:root:Pulling Basic information of item: Ruthie Large Pebbled Leather Satchel
INFO:root:Working on 2/400 bags
INFO:root:Pulling Basic information of item: Ruthie Large Signature Logo Satchel
INFO:root:Working on 3/400 bags
INFO:root:Pulling Basic information of item: Jet Set Large Leather Crossbody Bag
INFO:root:Working on 4/400 bags
INFO:root:Pulling Basic information of item: Jet Set Large Smartphone Convertible Crossbody Bag
INFO:root:Working on 5/400 bags
INFO:root:Pulling Basic information of item: Chantal Medium Pebbled Leather Satchel
INFO:root:Working on 6/400 bags
INFO:root:Pulling Basic information of item: Colby Medium Leather Shoulder Bag
INFO:root:Working on 7/400 bags
INFO:root:Pulling Basic information of item: Colby Medium Two-Tone Neoprene Shoulder Bag
INFO:root:Working on 8/400 bags
INFO:root:Pulling Basic information of item: Colby Medium Leather Shoulder Bag
INFO:root:Working on 9/400 ba

In [35]:
import pandas as pd
import boto3

def get_df(data: dict) -> pd.DataFrame:
    return pd.DataFrame(data)


def get_json(data, file_name: str = 'bags-data.json'):
    df = get_df(data)
    return df.to_json(file_name)


def upload_to_s3(
    s3_bucket: str, 
    s3_prefix: str,
    file_path: str = 'bags-data.json',
    aws_profile: str = None
):
    if aws_profile:
        boto_session = boto3.Session(profile_name=aws_profile)
        s3 = boto_session.client('s3')
    else:
        s3 = boto3.client('s3')

    s3.upload_file(
        file_path, 
        s3_bucket, 
        s3_prefix,
    )



In [18]:
get_json(bags_data)

In [37]:
boto_session = boto3.Session(profile_name='sit')
s3 = boto_session.client('s3')
bucket_name = 'efsg-data-analytics-ap-southeast-1-215702702661-sit'
file_name = 'test-data/bags-data.json'

s3.upload_file(
    'bags-data.json', 
    bucket_name, 
    file_name,
)


INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
