In [121]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import re
import time
import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import duckdb


def bypass_cloudflare():
    options = Options()
    options.add_argument("--disable-blink-features=AutomationControlled")
    options.add_experimental_option("excludeSwitches", ["enable-automation"])
    options.add_experimental_option("useAutomationExtension", False)
    options.add_argument(
        "--user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    )
    options.add_argument("--headless")
    driver = webdriver.Chrome(options=options)
    driver.execute_script(
        "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
    )
    return driver


def get_samsung_product_id(name):
    pattern_list = [r"[A-Z]{2}\d{2}[A-Z0-9]+", r"\d{2}[A-Z]{1,3}\d{0,4}"]
    for pattern in pattern_list:
        match = re.search(pattern, name)
        if match:
            return match.group()
    return None


def get_size(text):
    match = re.search(r"""(\d+)\s*[inch”]""", text, re.IGNORECASE)
    return match.group(1) if match else "No info"


def check_if_keyword_in_name(keyword, name):
    keywords = keyword.split(" ")
    have_all_keyword = None
    for keyword in keywords:
        if keyword.lower().replace("[-_ ]", "") in name.lower().replace("[-_ ]", ""):
            if have_all_keyword == None:
                have_all_keyword = True
            else:
                have_all_keyword = have_all_keyword and True
        else:
            have_all_keyword = False
    if have_all_keyword == None:
        return False
    else:
        return have_all_keyword


class BaseScraping:
    def __init__(self, page_limit):
        self.page_limit = page_limit

    def get_detail_from_page(self, page):
        pass

    def scrape_data(self):
        product_list = []
        for i in range(1, self.page_limit + 1):
            product_list.extend(self.get_detail_from_page(i))
        return product_list

    def create_dataframe(self, product_list):
        df = pd.DataFrame(
            product_list, columns=["website", "keyword", "name", "price"]
        ).drop_duplicates()
        df["product_id"] = df["name"].apply(lambda x: get_samsung_product_id(x))
        df["monitor_size"] = df["name"].apply(lambda x: get_size(x))
        return df

    def execute(self):
        product_list = self.scrape_data()
        df = self.create_dataframe(product_list)
        return df

In [122]:
class HomeProProductScraping(BaseScraping):
    HOMEPRO_URL = "https://www.homepro.co.th/search?searchtype=&q={keyword}&page={page}"

    def __init__(self, keyword, page_limit=3):
        super().__init__(page_limit)
        self.keyword = keyword
        self.page_limit = page_limit

    def get_detail_from_page(self, page):
        product_list = []

        headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        }

        response = requests.get(
            url=self.HOMEPRO_URL.format(
                keyword=self.keyword.replace(" ", "+"), page=page
            ),
            headers=headers,
        )
        soup = BeautifulSoup(response.content, "html.parser")
        products = soup.find_all("div", class_="product-plp-card")
        for i in products:
            _id = i.get("id").replace("product-", "")
            name = i.find("input", id=f"gtmNameEN-{_id}").get("value")
            price = i.find("input", id=f"gtmPrice-{_id}").get("value")
            if check_if_keyword_in_name(self.keyword, name):
                product_list.append(["Homepro", self.keyword, name, price])
        return product_list

In [123]:
homepro = HomeProProductScraping(keyword="TV Samsung", page_limit=3)
homepro.execute()

In [124]:
class PowerBuyProductScraping(BaseScraping):
    POWERBUY_URL = "https://www.powerbuy.co.th/en/search/{keyword}?page={page}"

    def __init__(self, keyword, page_limit=3):
        super().__init__(page_limit)
        self.keyword = keyword
        self.page_limit = page_limit

    def get_detail_from_page(self, page):
        product_list = []
        driver = bypass_cloudflare()
        driver.get(
            self.POWERBUY_URL.format(keyword=self.keyword.replace(" ", "+"), page=page)
        )
        time.sleep(random.uniform(3, 5))

        soup = BeautifulSoup(driver.page_source, "html.parser")
        products = soup.find_all(
            "div",
            class_="MuiGrid-root MuiGrid-item MuiGrid-grid-xs-6 MuiGrid-grid-sm-4 MuiGrid-grid-md-3 MuiGrid-grid-lg-3 MuiGrid-grid-xl-3 css-1nozjar",
        )
        for product in products:
            name = product.find("h2").get_text(strip=True)
            price = (
                product.find(
                    "div",
                    class_="text-redPrice font-bold text-sm leading-3 w-full flex",
                )
                .get_text(strip=True)
                .replace("฿", "")
                .replace(",", "")
            )

            if check_if_keyword_in_name(self.keyword, name):
                product_list.append(["Powerbuy", self.keyword, name, float(price)])
        driver.quit()
        return product_list

In [125]:
powerbuy = PowerBuyProductScraping(keyword="TV Samsung", page_limit=3)
powerbuy.execute()

In [117]:
for i in powerbuy_df.product_id.values:
    if i in homepro_df.product_id.values:
        print(i)

QA85Q65DAKXXT
QA75Q65DAKXXT
QA65Q70DAKXXT
UA65U8500FKXXT
QA75Q7F4AKXXT
QA43Q7F4AKXXT
QA43LS05BBKXXT
QA83S95FAEXXT
QA65S90FAEXXT
QA77S85FAEXXT
QA65S85FAEXXT
QA55S85FAEXXT
QA98Q80CAKXXT
QA85Q7F4AKXXT
QA85Q70DAKXXT
QA77S95DAKXXT
QA65S90DAKXXT


In [126]:
conn = duckdb.connect("products.db")
conn.execute(
    """CREATE TABLE IF NOT EXISTS products  (website string, keyword string, name string, price decimal(15,2),product_id string,monitor_size string)
    """
)
conn.execute("""DELETE FROM products WHERE website = 'Homepro' """)
conn.execute("INSERT INTO products SELECT * FROM homepro_df")

conn.execute("""DELETE FROM products WHERE website = 'Powerbuy' """)
conn.execute("INSERT INTO products SELECT * FROM powerbuy_df")

<duckdb.duckdb.DuckDBPyConnection at 0x11474bbb0>

In [130]:
compare_df = pd.read_sql(
    """SELECT * FROM (PIVOT products 
        ON website 
        USING SUM(price)
        GROUP BY product_id, monitor_size) x
        """,
    con=conn,
)
conn.execute("CREATE TABLE IF NOT EXISTS compare_result AS SELECT * FROM compare_df")
conn.execute("INSERT INTO compare_result SELECT * FROM compare_df")

  compare_df = pd.read_sql(


<duckdb.duckdb.DuckDBPyConnection at 0x11474bbb0>

In [131]:
pd.read_sql("select * from compare_result", con=conn)

  pd.read_sql("select * from compare_result", con=conn)


Unnamed: 0,product_id,monitor_size,Homepro,Powerbuy
0,UA75DU8100KXXT,75,23990.0,
1,UA75U8500FKXXT,75,32990.0,
2,QA85Q65DAKXXT,85,38990.0,38990.0
3,QA85Q8F5AKXXT,85,66990.0,
4,QA65Q8F5AKXXT,65,27990.0,
...,...,...,...,...
249,QN70F,55,,27990.0
250,QA98Q80CAKXXT,80,,169990.0
251,QN90D,98,,269990.0
252,QN800D,65,,99990.0


In [1]:
from helper import HomeProProductScraping, PowerBuyProductScraping