In [17]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup

import js2py
import time
import json
import re
from datetime import datetime

In [18]:
# 크롬 드라이버 경로
chrome_driver_path = './utils/chromedriver.exe' 

# 크롬 옵션 설정
options = Options()
options.add_argument('--headless') 
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--start-maximized")

options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
# 드라이버 초기화
service = Service(chrome_driver_path)
driver = webdriver.Chrome(service=service, options=options)

In [None]:
def get_info(url):
    # 아마존 페이지 접속
    driver.get(url)

    time.sleep(3)
    wait = WebDriverWait(driver, 10)

    # 제품명 
    try:
        title = wait.until(EC.presence_of_element_located((By.ID, 'productTitle'))).text.strip()
    except Exception as e:
        title = "제목을 찾을 수 없습니다."
        print(f"Error finding title: {e}")

    #가격 정보
    try:
        saving_element = driver.find_element(By.CLASS_NAME, 'aok-relative')
        saving = saving_element.text.strip().split('\n')
    except Exception as e:
        saving = "할인률를 찾을 수 없습니다."
        print(f"Error finding rating: {e}")

    # 평점
    try:
        rating = driver.find_element(By.XPATH, '//*[@id="acrPopover"]/span[1]/a/span').text.strip()
    except Exception as e:
        rating = "평점을 찾을 수 없습니다."
        print(f"Error finding alternative rating: {e}")

    # 리뷰 수
    try:
        review_text = driver.find_element(By.ID, 'acrCustomerReviewText').text.strip()
        review_count = re.sub(r'[^0-9]', '', review_text)
    except Exception as e:
        review_count = "리뷰 수를 찾을 수 없습니다."
        print(f"Error finding review count: {e}")

    # ASIN 코드
    try:
        asin = driver.find_element(
            By.XPATH,
            '//th[contains(text(), "ASIN")]/following-sibling::td'
        ).text.strip()
    except Exception as e:
        asin = "ASIN 코드를 찾을 수 없습니다."
        print(f"Error finding ASIN: {e}")

    result = {
        "title": title,
        "price_info": saving,
        "rating": rating,
        "review_count": review_count,
        "ASIN": asin
    }   

    return result

In [None]:
with open('./ASIN/asin.json', 'r') as f:
    data = json.load(f)

asin_list = [i['ASIN'] for i in data]

In [None]:
data = []
for asin_code in asin_list[:10]:
    data.append(get_info(f'https://www.amazon.com/dp/{asin_code}'))

with open('product_info.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, indent=4, ensure_ascii=False)  

driver.quit()

상세정보 전체 추출

In [26]:
def product_detail_info(url):
    driver.get(url)
    time.sleep(5)
    WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.CLASS_NAME, 'prodDetTable'))
    )
    product_info_section = driver.find_element(By.ID, "prodDetails")
    product_info_rows = product_info_section.find_elements(By.XPATH, ".//tr")

    key_list = ['Hard Drive', 'Brand', 'Series', 'Hardware Platform', 'Item Weight', 
            'Product Dimensions', 'Color', 'Hard Drive Interface', 'Manufacturer', 
            'ASIN', 'Country of Origin', 'Date First Available', 'Hardware Interface', 'Item model number']
    product_info = {}
    best_sellers_rank_found = False

    soup = BeautifulSoup(driver.page_source, 'html.parser')

    tables = soup.find_all('table', attrs={'class': ['a-keyvalue', 'prodDetTable']})


    for table in tables:
        for row in table.find_all('tr'):
            header = row.find('th')
            cells = row.find_all('td')

            if header and len(cells) >= 1:
                key = header.get_text(strip=True)
                value = cells[0].get_text(strip=True)
                field_name = key.replace(' ', '_')
                product_info[field_name] = value

    # data_gbn
    # try:
    #     if best_sellers_rank_found:
    #         if int(best_sellers_rank.split()[0][1:].replace(',','')) <= 100:
    #             product_info['data_gbn'] = 'BEST'
    #         else:
    #             product_info['data_gbn'] = 'NORMAL'
    #     else:
    #         product_info['data_gbn'] = 'NORMAL'
    # except Exception as e:
    #     print(f"[ERROR] best_sellers_rank parsing 실패! 값: {best_sellers_rank}")
    #     print(f"[ERROR] 예외 메시지: {e}")
    # url
    product_info['url'] = url

    wait = WebDriverWait(driver, 10)
    # product_name
    try:
        title = wait.until(EC.presence_of_element_located((By.ID, 'productTitle'))).text.strip()
    except Exception as e:
        title = "제목을 찾을 수 없습니다."
        print(f"Error finding title: {e}")
    product_info['product_name'] = title

    # prod expand detail
    try:
        page_source = driver.page_source
        soup = BeautifulSoup(page_source, 'html.parser')
        table = soup.find('table', {'class': 'a-normal a-spacing-micro'})

        rows = table.find_all('tr')
        for row in rows:
            tds = row.find_all('td')
            
            if len(tds) >= 2:
                key = tds[0].get_text(strip=True)
                value = tds[1].get_text(strip=True)

                if key and value:
                    product_info[key] = value
    except Exception as e:
        print(f"poExpander 추출 실패: {e}")

    # style
    try:
        style_span = soup.find('span', id='inline-twister-expanded-dimension-text-style_name')

        if style_span:
            style_value = style_span.get_text(strip=True)
        else:
            style_value = ''  

        product_info['Style'] = style_value
    except Exception as e:
        print(f"style 추출 실패: {e}")

    # board_name
    board_type = None

    installation_type = product_info.get('Installation Type', '').lower()
    if 'external' in installation_type:
        board_type = 'External SSD'
    elif 'internal' in installation_type:
        board_type = 'Internal SSD'

    if not board_type:
        product_name_lower = product_info.get('product_name', '').lower()
        if 'external' in product_name_lower:
            board_type = 'External SSD'
        elif 'internal' in product_name_lower:
            board_type = 'Internal SSD'

    if not board_type:
        board_type = 'Micro SD'

    if product_info.get('data_gbn') == 'BEST':
        board_name = f'BEST_{board_type}'
    else:
        board_name = board_type

    product_info['board_name'] = board_name

    # division
    if board_type == 'External SSD':
        product_info['division'] = 'PSSD'
    elif board_type == 'Internal SSD':
        product_info['division'] = 'SSD'
    elif board_type == 'Micro SD':
        product_info['division'] = 'microSD'
    else:
        product_info['division'] = 'Unknown'

    # image_url
    try:
        soup = BeautifulSoup(driver.page_source, 'html.parser')
        img_tag = soup.find('img', id='landingImage')

        if img_tag and img_tag.has_attr('data-old-hires'):
            image_url = img_tag['data-old-hires']
        elif img_tag and img_tag.has_attr('src'):
            image_url = img_tag['src']
        else:
            image_url = ''
        
        product_info['image_url'] = image_url

    except Exception as e:
        print(f"이미지 추출 실패: {e}")
        product_info['image_url'] = ''
     
    #datetime
    product_info['date'] = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
    
    print(json.dumps(product_info, indent=4, ensure_ascii=False))
    return product_info

In [27]:
info = product_detail_info('https://www.amazon.com/dp/B0B89L2WNV')

{
    "Standing_screen_display_size": "‎32 Inches",
    "Screen_Resolution": "‎3840x2160",
    "Max_Screen_Resolution": "‎3840x2160 Pixels",
    "Number_of_USB_3.0_Ports": "‎1",
    "Brand": "SAMSUNG",
    "Series": "‎32\" M8 Smart Monitor",
    "Item_model_number": "‎LS32BM805UNXGO",
    "Item_Weight": "‎14.8 pounds",
    "Product_Dimensions": "‎8 x 28.1 x 22.6 inches",
    "Item_Dimensions__LxWxH": "‎8 x 28.1 x 22.6 inches",
    "Color": "‎Warm White",
    "Voltage": "‎110 Volts",
    "Manufacturer": "‎Samsung",
    "ASIN": "‎B0B89L2WNV",
    "Date_First_Available": "‎October 26, 2022",
    "Customer_Reviews": "3.93.9 out of 5 stars387 ratings3.9 out of 5 stars",
    "Best_Sellers_Rank": "#109,344 in Electronics (See Top 100 in Electronics)#470 inLED & LCD TVs",
    "url": "https://www.amazon.com/dp/B0B89L2WNV",
    "product_name": "SAMSUNG 32\" M80B 4K UHD HDR Smart Computer Monitor Screen with Streaming TV, SlimFit Camera Included, Wireless Remote PC Access, Alexa Built-In, LS32BM8

In [6]:
with open('./amazon_crawler/output.json', 'r') as f:
    data = json.load(f)
category = []
for i in data:
    a = i['best_sellers_rank']
    if '#' in a:
        b = a.split('#')[0]
        c = ' '.join(a.split('#')[1].split()[2:])

        if b not in category:
            category.append(b)
        if c not in category:
            category.append(c)
    else:
        if a not in category:
            category.append(a)

In [None]:
for item in category:
    print(item)

In [None]:
with open('./ASIN/asin.json', 'r') as f:
    data = json.load(f)

asin_list = [i['ASIN'] for i in data]

In [None]:
# B08GDCKQ8C, B0CGHQT47M <-- 404 error Asin code
data = []
for asin_code in asin_list:
    data.append(product_detail_info(f'https://www.amazon.com/dp/{asin_code}'))

with open('product_info_master.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, indent=4, ensure_ascii=False)  

driver.quit()

In [None]:
driver.get('https://www.amazon.com/dp/B08HN37XC1')

soup = BeautifulSoup(driver.page_source, 'html.parser')
script_tags = soup.find_all('script')

target_script = None
for tag in script_tags:
    if tag.string and 'var dataToReturn' in tag.string:
        target_script = tag.string
        break

if target_script:
    # dataToReturn = { ... }; 부분만 추출
    match = re.search(r'var\s+dataToReturn\s*=\s*({.*?});', target_script, re.DOTALL)
    if match:
        js_code = match.group(0)  # 전체 JS 코드 line

        try:
            # JS 코드 실행
            context = js2py.EvalJs()
            context.execute(js_code)

            # 원하는 키 추출
            keys_to_extract = [
                'currentAsin', 'landingAsin', 'parentAsin',
                'dimensionToAsinMap', 'variationValues',
                'num_total_variations', 'dimensionValuesDisplayData',
                'variationDisplayLabels'
            ]

            result = {}
            for key in keys_to_extract:
                try:
                    value = getattr(context.dataToReturn, key)
                    # JsObjectWrapper → dict/list 변환
                    if hasattr(value, 'to_dict'):
                        result[key] = value.to_dict()
                    elif hasattr(value, 'to_list'):
                        result[key] = value.to_list()
                    else:
                        result[key] = value
                except Exception:
                    result[key] = None

            # JSON 출력
            print(json.dumps(result, indent=4, ensure_ascii=False))

        except Exception as e:
            print(f"❌ JavaScript 실행 실패: {e}")
    else:
        print("❌ dataToReturn 객체를 찾을 수 없음")
else:
    print("❌ dataToReturn 스크립트를 찾을 수 없음")

In [26]:
def enrich_with_variation_data(product_info, variant_data):
    product_info['current_asin'] = variant_data.get('currentAsin')
    product_info['parent_asin'] = variant_data.get('parentAsin')
    product_info['variations'] = variant_data.get('variationValues', {})
    product_info['variation_display_labels'] = variant_data.get('variationDisplayLabels', {})
    product_info['asin_variation_mapping'] = variant_data.get('dimensionValuesDisplayData', {})
    product_info['available_asins'] = list(variant_data.get('dimensionValuesDisplayData', {}).keys())
    product_info['variation_matrix'] = variant_data.get('dimensionToAsinMap', {})

    print(json.dumps(product_info, indent=4, ensure_ascii=False))


In [None]:
enrich_with_variation_data(info, result)

##### date, series, image_url
##### expand detail 항목 필요없는 것 삭제

In [26]:
import mysql.connector

# MySQL 서버에 연결
conn = mysql.connector.connect(
    host="localhost",
    port=3306,
    user="root",           # 설치 시 설정한 사용자명
    password="1111",  # 설치 시 설정한 비밀번호
    database="testdb"      # 앞에서 만든 데이터베이스 이름
)

# 커서 생성
cursor = conn.cursor()

# 쿼리 실행
cursor.execute("SELECT * FROM users")

# 결과 가져오기
rows = cursor.fetchall()

# 출력
for row in rows:
    print(row)

# 연결 종료
cursor.close()
conn.close()
# 70000  26000   

(1, 'Alice', 'alice@example.com')
(2, 'Bob', 'bob@example.com')


In [21]:
# 정상, 삭제 asin 코드 같이 들어왔을 때 동시 처리용 리스트
asin_list = ['B0BQX6NNVC', 'B0CGHQT47M']

In [25]:
def check_page_validity(driver):
    try:
        title = driver.title.lower()
        if 'page not found' in title or '404' in title:
            return False, "Page Not Found"
        
        # 페이지 소스에서 "Page Not Found" 텍스트 확인
        if 'page not found' in driver.page_source.lower():
            return False, "Page Not Found in page content"
    except Exception as e:
        print(f"페이지 타이틀 확인 중 오류: {e}")

    return True, "Page is valid and product is available"

for code in asin_list:    
    url = f'https://www.amazon.com/dp/{code}'
    driver.get(url)
    time.sleep(5)

    product_info = {}
    product_info['url'] = url

    # 페이지 유효성 검사
    valid, status = check_page_validity(driver)

    if not valid:
        product_info['asin'] = url.split('/')[-1]
        product_info['data_gbn'] = 'DELETE'
        product_info['error'] = status
        product_info['status'] = 'invalid'
        product_info['date'] = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
        print(f"유효하지 않은 페이지: {status}")
        print(json.dumps(product_info, indent=4, ensure_ascii=False))
        break

    product_info['status'] = 'valid'
    print(json.dumps(product_info, indent=4, ensure_ascii=False))

{
    "url": "https://www.amazon.com/dp/B0BQX6NNVC",
    "status": "valid"
}
유효하지 않은 페이지: Page Not Found
{
    "url": "https://www.amazon.com/dp/B0CGHQT47M",
    "asin": "B0CGHQT47M",
    "data_gbn": "DELETE",
    "error": "Page Not Found",
    "status": "invalid",
    "date": "2025/05/14 13:18:03"
}


In [55]:
def get_bsk(url):
    driver.get(url)
    try:
        product_info_section = driver.find_element(By.ID, "prodDetails")
        product_info_rows = product_info_section.find_elements(By.XPATH, ".//tr")
        for item in product_info_rows:
            try:
                header = item.find_element(By.XPATH, './/th').text.strip()
                value = item.find_element(By.XPATH, './/td').text.strip()

                if header == 'Best Sellers Rank':
                    best_sellers_rank = value
                else:
                    best_sellers_rank = '1 1not best sellers'
            except Exception as e:
                print(f"Error parsing row: {str(e)}")
    except Exception as e:
        print(f"{e}")
    best_sellers_rank = '1 1 page not found'
    return best_sellers_rank

In [56]:
# 받은 ASIN 목록 리스트
import pandas as pd

file_path = './product_list/amazon_review_open.xlsx'
df = pd.read_excel(file_path)

asin_list = df['ASIN'].dropna().unique().tolist()

In [None]:
category = []
for i in asin_list:
    cat = get_bsk(f'https://www.amazon.com/dp/{i}') 
    cat = ' '.join(cat.split()[2:])
    if cat not in category:
        category.append(cat)

In [7]:
import requests

proxies = {
    "http": "90.156.194.71:8080",
    "https": "90.156.194.71:8080"
}

try:
    response = requests.get("https://www.amazon.com/dp/B0BQX6NNVC", proxies=proxies, timeout=10)
    print(response.status_code)
    print(response.text[:300])
except Exception as e:
    print(f"Error: {e}")

Error: HTTPSConnectionPool(host='www.amazon.com', port=443): Max retries exceeded with url: /dp/B0BQX6NNVC (Caused by ProxyError('Unable to connect to proxy', ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x000001F64D40DC90>, 'Connection to 90.156.194.71 timed out. (connect timeout=10)')))


In [2]:
import pandas as pd
file_path = '../amazon_crawler/data/amazon_review_open.xlsx'
df = pd.read_excel(file_path)
df = df[df['DATA_GBN'] != 'DELETE']
asin_list = df['ASIN'].dropna().unique().tolist()
urls = [f'https://www.amazon.com/dp/{i}' for i in asin_list]

In [22]:
len(urls)

8932

In [1]:
import json
file_path = '../amazon_crawler/data/result/board_type/0-1000.json'
with open(file_path, 'r', encoding='utf-8') as f:
    data = json.load(f)
board_names = set()

for item in data:
    board_name = item.get('board_name')
    if board_name:
        board_names.add(board_name)

for name in sorted(board_names):
    print(name)


BEST_Unknown
External SSD
Internal SSD
Micro SD
Unknown
null


In [3]:
import json

file_path = '../amazon_crawler/data/result/board_type/199_test2.json'
output_path = '../amazon_crawler/data/result/board_type/199_unknown_only.json'

# 원본 JSON 파일 열기
with open(file_path, 'r', encoding='utf-8') as f:
    data = json.load(f)

# 'board_name'이 'Unknown'인 항목만 필터링
unknown_items = [item for item in data if item.get('board_name') == 'Unknown']

# 필터링된 데이터 저장
with open(output_path, 'w', encoding='utf-8') as f:
    json.dump(unknown_items, f, ensure_ascii=False, indent=2)

print(f"Unknown board_name 항목 {len(unknown_items)}개를 저장했습니다.")



Unknown board_name 항목 59개를 저장했습니다.


In [None]:
file_path = '../amazon_crawler/data/result/board_type/199_test2.json'

# JSON 파일 열기
with open(file_path, 'r', encoding='utf-8') as f:
    data = json.load(f)

# 'board_name'이 'Unknown'인 항목에서 ASIN만 추출
unknown_asin_list = [item['asin'] for item in data if item.get('board_name') == 'Unknown' and 'asin' in item]
output_path = 'unknown_asins.txt'
with open(output_path, 'w', encoding='utf-8') as f:
    for asin in unknown_asin_list:
        f.write(asin + '\n')
# 결과 출력 (또는 필요하면 저장)
print(f"Unknown ASIN 개수: {len(unknown_asin_list)}")
print(unknown_asin_list)

Unknown ASIN 개수: 59
['B08YCYS58M', 'B08JV6PP9B', 'B0BH71X8GH', 'B07VYVT88H', 'B00T8IZFUK', 'B0DVDCHL51', 'B0DVD8GBYL', 'B0DVDKLLKX', 'B07MDL3XFM', 'B0CQD377LS', 'B07J33LFXV', 'B00J62AGW0', 'B073GNRSDH', 'B07S4N6ZCR', 'B0BFH9QZ49', 'B0DW7P5FB2', 'B0D3PNR3XD', 'B0DVDKSPT1', 'B07XQXF444', 'B07DM3S5QB', 'B01LOOJ8Z8', 'B009NHAEXE', 'B0C28S6N61', 'B0DYQH22PW', 'B0DYQH1N48', 'B00S89F9QK', 'B0F31WPWSP', 'B0DW8ZW47C', 'B00EEJP1N4', 'B0057Y8CM2', 'B00SOL9Z1M', 'B0DMBVGSZZ', 'B0DW92YSB6', 'B0CP67GQXZ', 'B010QBTTPY', 'B00HZ0RWY0', 'B0DYQN227W', 'B0DYQQX3TJ', 'B0DYQN7ZWH', 'B0DY8JVNC8', 'B0DYQKB51W', 'B005LXC1FI', 'B0084JFLUS', 'B0DYVTCP7L', 'B0DWFHFCTV', 'B0DYVNMJRW', 'B0DY8WNN74', 'B08VH6Y8P3', 'B0B7NTJS1T', 'B01E0DOC1U', 'B09MJ1H6ZB', 'B07JGJ175D', 'B0DSQVBSHG', 'B0CZS9VFF6', 'B0CNTXS1M2', 'B0DSR9D4PL', 'B0CZS9NLSH', 'B0CNTYTHCH', 'B00ND1H51A']


In [4]:
import nest_asyncio
nest_asyncio.apply()

import asyncio

async def main():
    print("Hello async!")

await main()  # 그냥 await로 호출 가능

Hello async!


In [None]:
import asyncio
from playwright.async_api import async_playwright

async def run():
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=False)
        page = await browser.new_page()
        await page.goto("https://www.amazon.com/gp/bestsellers/pc/3015433011")
        await page.wait_for_load_state("networkidle")

        prev_height = 0
        for i in range(20):
            await page.evaluate("window.scrollBy(0, document.body.scrollHeight)")
            await asyncio.sleep(2)
            curr_height = await page.evaluate("document.body.scrollHeight")
            print(f"스크롤 높이: {curr_height}")
            if curr_height == prev_height:
                print("더 이상 로딩되지 않음")
                break
            prev_height = curr_height

        count = await page.evaluate("document.querySelectorAll('[id=\"gridItemRoot\"]').length")
        print("카드 개수:", count)

        html = await page.content()
        with open("amazon_bestsellers.html", "w", encoding="utf-8") as f:
            f.write(html)

        await browser.close()

asyncio.run(run())


In [5]:
import json
from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017")
db = client["mydb"]
collection = db["best"]

with open("../pwtest/123.json", "r", encoding="utf-8") as f:
    data = json.load(f)

collection.insert_many(data)
print("finished")

finished


In [None]:
import json
from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017")
db = client["mydb"]
collection = db["best_products"]

def get_next_sequence(db, name):
    ret = db.counters.find_one_and_update(
        {"_id": name},
        {"$inc": {"seq": 1}},
        return_document=True
    )
    return ret["seq"]

with open("../pwtest/123.json", "r", encoding="utf-8") as f:
    data = json.load(f)

for doc in data:
    doc["seq"] = get_next_sequence(db, "productid")
    collection.insert_one(doc)

print("저장 완료")
