#### import

In [None]:
import os
import sys
import time

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import pandas as pd

from multiprocessing import Pool

from selenium import webdriver
from selenium.webdriver.common.by import By

from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException


from selenium.webdriver.firefox.options import Options

from bs4 import BeautifulSoup

import re

from datetime import datetime

import csv

#### function

In [None]:
def get_currency(price):  # e.g. 'HK$1,234.00'
    currency = price.split('HK$')[1]
    currency = currency.split('.')[0]
    if ',' in currency:
        currency = currency.split(',')
        currency = ''.join(currency)
    currency = float(currency)
    return currency


#### function for interrupted run

In [None]:
print('Main: Start')
start_time = time.time()

max_workers = os.cpu_count()
print("CPU count:", max_workers)

# define the directory name
dir_name = 'sasa'

# check if the directory exists
if not os.path.exists(dir_name):
    # if not, create it
    os.makedirs(dir_name)

# read csv files from the directory
ls = os.listdir(dir_name)

master_df = pd.DataFrame()

count_of_files = 0
for file in ls:
    if file.startswith('sasa_') and file.endswith(datetime.now().strftime('%Y%m%d')+'.csv'):  # '20240227.csv'):
        print(file)
        df = pd.read_csv('sasa/'+file)

        master_df = pd.concat([master_df, df], ignore_index=True)

        count_of_files += 1

# promo_filename = "sasa/tmp_promotion_items_" + datetime.now().strftime('%Y%m%d') + ".csv"
# index_filename = "sasa/tmp_index_items_" + datetime.now().strftime('%Y%m%d') + ".csv"


#### Trim master dataframe

In [None]:
# display(master_df)
print('Before:', master_df.shape)

# Remove duplicates from the master_df DataFrame in Product ID
master_df = master_df.drop_duplicates(subset='Product ID')

print('After:', master_df.shape)

promotion_df = pd.DataFrame()
index_df = pd.DataFrame()

# save master_df
master_df.to_csv('sasa/tmp_master_df.csv', index=False)
# display(master_df.head())

In [None]:
if count_of_files > 0:
    categories = master_df['Category']
    product_ids = master_df['Product ID']
    sasa_urls = master_df['Link']
    # print('Product IDs:', product_ids)

    #list of tuples
    product_info = list(zip(categories, product_ids, sasa_urls))

    if os.path.exists(promo_filename) and os.path.exists(index_filename):
        file1 = pd.read_csv(promo_filename)
        file2 = pd.read_csv(index_filename)

        if file1.empty or file2.empty:
            pass

        else:
            promo_items = list(set(file1['Product ID'].tolist()))
            index_items = list(set(file2['Product ID'].tolist()))

            # If product ID is in the promo_items list and index list, add it o id to remove
            id_to_remove = []

            for p in promo_items:
                if p in index_items:
                    id_to_remove.append(p)

            print('Before:', len(product_info), '\n')

            new_product_info = []
            for p in product_info:
                if p[1] not in id_to_remove:
                    new_product_info.append(p)
            product_info = new_product_info

            print('After:', len(product_info),'\n')

    # split the tuples for threading
    split = 4

    while len(product_info) // split > 50:
        split += 4

    print('Split:', split)

    list_length = len(product_info) // split
    remainder = len(product_info) % split

    tuples = []
    start = 0

    for i in range(split):
        end = start + list_length
        if i >= split - remainder:
            end += 1
        tuples.append(product_info[start:end])
        start = end

    # Print the list of tuples line
    print(len(tuples[0]))
    # print('\n'.join(f'{i+1}: {t}' for i, t in enumerate(map(str, tuples))))

In [None]:
def get_product_info(index_tuple):
    worker_id, products = index_tuple
    # Create Firefox options
    firefox_options = webdriver.FirefoxOptions()
    firefox_options.add_argument("--private")  # Open Firefox in private mode
    firefox_options.add_argument("--lang=zh-TW")  # Set the language to English (en)
    firefox_options.add_argument("--window-position=0,0")  # Set the initial window position

    # Create Chrome options
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--incognito")  # Open Chrome in incognito mode
    chrome_options.add_argument("--lang=zh-TW")  # Set the language to Traditional Chinese (zh-TW)
    chrome_options.add_argument("--window-position=0,0")  # Set the initial window position

    promotion_items = []
    index_items = []

    for index, row in enumerate(products):
        category, product_id, sasa_url = row

        # try:
        if 'browser' not in locals():
            time.sleep(worker_id % 4)
            
            # Browser open
            success = False
            for i in range(5):
                try:
                    
                    print(f'{worker_id}: Open browser')
                    
                    # # Create the Firefox browser with the options
                    # browser = webdriver.Firefox(options=firefox_options)
                    
                    # Create the Chrome browser with the options
                    browser = webdriver.Chrome(options=chrome_options)

                    for j in range(3):
                        browser.get(sasa_url)
                        time.sleep(3)

                        print('{}: Open {}: {}'.format(worker_id, category, sasa_url))
                    
                        if browser.title not in ['', 'None', None]:
                            print("{}: Page title was '{}'".format(worker_id, browser.title))
                            success = True
                            break
                        else:
                            print('{}: Retry to open {}: {} Attempt {}'.format(worker_id, category, sasa_url, j+1))
                        time.sleep(1)

                    success = True
                    break

                except:  # catch all exceptions
                    if browser:  # if browser is not None
                        browser.quit()  # close the browser
                    print('{}: Error A Failed to open {}: {}'.format(worker_id, category, sasa_url))

            if not success:
                print('{}: Error B Failed to open {}: {}'.format(worker_id, category, sasa_url))
                    

            time.sleep(1)

            # add implicit wait
            browser.implicitly_wait(3)

            # try to locate chat button if exist
            # .easychat-chat-dismiss-button-mobile
            try:
                chat_button = browser.find_element(By.XPATH, '//div[@class="easychat-chat-dismiss-button-mobile"]')
                chat_button.click()
                # print('chat button closed')
            except:
                # print('no chat button found')
                pass

            time.sleep(1)

            try:
                cookie_button = browser.find_element(By.CSS_SELECTOR, 'a.ns-cookie-privacy-agree-btn')
                cookie_button.click()
                # print('cookie button closed')
            except:
                # print('no cookie button found')
                pass

            time.sleep(1)

        else:  # if browser is already open
            success = False
            for i in range(5):
                try:
                    # Browser netvigates to the sasa_url
                    browser.get(sasa_url)
                    time.sleep(1)

                    # print('{} Open {}: {}'.format(index, category, sasa_url))
                    # print("Page title was '{}'".format(browser.title))

                    if browser.title not in ['', 'None', None]:
                        success = True
                        if i > 0:
                            print("{}: Page title was '{}'".format(worker_id, browser.title))
                        break
                    else:
                        print('{}: Retry to open {}: {} Attempt {}'.format(worker_id, category, sasa_url, j+1))
                        time.sleep(1)

                except:
                    pass
            
            if not success:
                print('{}: Error in browser already open, Failed to open {}: {}'.format(worker_id, category, sasa_url))

        #scroll down
        new_inner_window_position = browser.execute_script("return window.pageYOffset;")
        browser.execute_script('window.scrollBy(0, 1440)') # use seleneium to run JS to scroll down

        time.sleep(1)

        # # Promotions
        # # Expand the promotions if there are more promotions
        # # find salepage-promotion-more

        # # Try to find the 'salepage-promotion-more' div
        # try:
        #     promotion_more = browser.find_element(By.XPATH, '//div[@class="salepage-promotion-more"]')
        # except:
        #     promotion_more = None

        # Initialize a counter
        promotion_more_counter = 0

        # Try to find the 'salepage-promotion-more' div
        while promotion_more_counter < 3:
            try:
                promotion_more = browser.find_element(By.XPATH, '//div[@class="salepage-promotion-more"]')
                if promotion_more and promotion_more.text == '查看更多':
                    while promotion_more.text == '查看更多':
                        browser.execute_script('window.scrollBy(0,100)')  # Scroll down
                        promotion_more.click()  # Click the div
                        promotion_more = browser.find_element(By.XPATH, '//div[@class="salepage-promotion-more"]')
                        if not promotion_more or promotion_more.text != '查看更多':
                            break  # Break the loop
                        time.sleep(1)
                else:
                    promotion_more_counter += 1
                    time.sleep(1)
            except:
                promotion_more_counter += 1
                time.sleep(1)

        if promotion_more_counter == 3:
            print("Failed to find 'salepage-promotion-more' div after 3 attempts")

        # # If the div is found and its text is '查看更多', enter a loop
        # if promotion_more and promotion_more.text == '查看更多':
        #     while promotion_more.text == '查看更多':
        #         browser.execute_script('window.scrollBy(0,100)')  # Scroll down
        #         try:
        #             promotion_more.click()  # Click the div
        #             promotion_more = browser.find_element(By.XPATH, '//div[@class="salepage-promotion-more"]')
        #         except:
        #             promotion_more = None
        #         if promotion_more and promotion_more.text != '查看更多':
        #             break  # Break the loop
        #         time.sleep(1)

        # Get the page source and create a BeautifulSoup object
        soup = BeautifulSoup(browser.page_source, 'html.parser')

        # Find the 'salepage-top-left' div
        top_left = soup.find('div', {'class': 'salepage-top-left'})

        # # Find the 'salepage-promotion-more' div
        # promotion_more = top_left.find('div', {'class': 'salepage-promotion-more'})

        # # Click the '查看更多' button until it's no longer available
        # while promotion_more and promotion_more.text == '查看更多':
        #     # print(promotion_more.text)
        #     promotion_more.click()
        #     # print('promotion_more clicked')
        #     promotion_more = top_left.find('div', {'class': 'salepage-promotion-more'})

        # Find all 'salepage-promotion' divs
        divs = top_left.find_all('div', {'class': 'salepage-promotion'})

        promotion_tags = []
        promotion_titles = []

        if divs:
            for div in divs:
                lis = div.find_all('li', {'class': 'salepage-promotion-li'})
                if lis:
                    for li in lis:
                        tag = li.find('div', {'class': 'tag-rectangle'})
                        span = li.find('span', {'class': 'salepage-promotion-title'})
                        promotion_tags.append(tag.text)
                        promotion_titles.append(span.text)

        # print('Promotion tags:', promotion_tags)
        # print('Promotion titles:', promotion_titles)
        # Find the 'salepage-top-right' div
        top_right = soup.find('div', {'class': 'salepage-top-right'})

        # Find all 'salepage-tag-ul' uls
        salepage_tag_uls = []
        uls = top_right.find_all('ul', {'class': 'salepage-tag-ul'})
        for ul in uls:
            lis = ul.find_all('li')
            for li in lis:
                salepage_tag_uls.append(li.text)

        # print('salepage-tag-ul:', salepage_tag_uls, type(salepage_tag_uls))

        # Find the 'star-rate' div
        star_rate = top_right.find('div', {'class': 'star-rate'})

        # Find the star rate and the number of comments
    # Find the star rate and the number of comments
        if star_rate:
            try:
                numbers = re.findall(r'\d+\.\d+|\d+', star_rate.text)
                rate, comment = map(float, numbers)
                comment = int(comment)
                # print('Rate:', rate, type(rate))
                # print('Comment:', comment, type(comment))
            except:
                rate = 'None'
                comment = 'None'
                # print('no rating found')
                # print('Rate:', rate, type(rate))
                # print('Comment:', comment, type(comment))
        else:  # if star_rate is None
            # print('Else:')
            rate = 'None'
            comment = 'None'
            # print('no rating found')
            # print('Rate:', rate, type(rate))
            # print('Comment:', comment, type(comment))

        # Find the 'choose-sku' div
        choose_sku = top_right.find('div', {'class': 'choose-sku'})
        if choose_sku:
            level_sku = len(choose_sku.find_all('li', {'class': 'sku-li'}))
            # print('Level of SKU:', level_sku)
        else:
            # print('no choose-sku found')
            sku = None

        # Price
        price = top_right.find('div', {'class': 'salepage-price'}).text
        price = get_currency(price)
        # print('Price:', price, type(price))

        # Suggest price
        try:
            original_price = top_right.find('div', {'class': 'salepage-suggestprice'}).text
            original_price = get_currency(original_price)
            # print('Original Price:', original_price, type(original_price))
        except:
            original_price = price
            # print('No Original Price found')

        # Brand
        brand_ul = top_right.find('ul', {'class': 'salepage-brand-list'})
        brand = brand_ul.text.strip() if brand_ul else None
        # print('Brand:', brand, type(brand))

        # salepage-feature
        salepage_feature_lis = []
        features = top_right.find_all('ul', {'class': 'salepage-feature'})
        for feature in features:
            lis = feature.find_all('li', {'class': 'salepage-feature-li'})
            # print('lis:', lis, type(lis))
            for li in lis:
                for div in li.find_all('div'):
                    # Strip leading/trailing whitespace from the text and add it to the list
                    salepage_feature_lis.append(div.text.strip())

        # salepage_feature_lis to a single string
        features = '<br>'.join(salepage_feature_lis)
        # print('Features:', features, type(features))

        promotion_items.extend([(product_id, tag, title) for tag, title in zip(promotion_tags, promotion_titles)])

        index_items.extend([(product_id, rate, comment, price, original_price, brand, features)])

    browser.quit()
    print('{}: Close browser'.format(worker_id))

    # print('Promotion items:', promotion_items)
    # print('Index items:', index_items)

    # open csv
    with open(promo_filename, 'a', newline='', encoding='utf-8') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_ALL)
        writer.writerows(promotion_items)

    with open(index_filename, 'a', newline='', encoding='utf-8') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_ALL)
        writer.writerows(index_items)

    print('{}: Done'.format(worker_id))

    time.sleep(10)

In [None]:
print(promo_filename)
print(index_filename)

### Start main function

In [None]:
master_start_time = time.time()

# Process each tuple
results = get_product_info(product_info)

# Concatenate the first dataframes into a master dataframe
master_df1 = pd.concat([result[0] for result in results], ignore_index=True)

# Concatenate the second dataframes into another master dataframe
master_df2 = pd.concat([result[1] for result in results], ignore_index=True)

master_end_time = time.time()
print(f"Elapsed time: {(master_end_time - master_start_time) / 60:,.0f} minutes")

In [None]:
filename = 'sasa/sasa_promotion_{}.csv'.format(datetime.now().strftime('%Y%m%d'))
if filename.split('/')[1] in os.listdir('sasa'):
    os.remove(filename)
    print('Old file removed:', filename)
master_df1.to_csv(filename, index=False)

print('Promotion Dataframe saved to:', filename.split('/')[1])

display(master_df1)

