In [1]:
!pip install selenium webdriver-manager



In [8]:
import json
import random
import re
import socket
import subprocess
import sys
import tempfile
import time
import urllib.parse
import urllib.request
import pandas as pd
from datetime import datetime
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

In [13]:
# json 불러오기
with open('categories_dict_v3.json', 'r', encoding='utf-8-sig') as f:
    data = json.load(f)

data.values()

dict_values([195008, 195010, 195013, 497920, 195080, 195082, 195083, 195084, 195085, 445905, 445906, 445907, 445908, 445909, 195088, 311357, 508266, 194379, 194383, 194384, 194385, 497900, 497906, 497907, 194390, 194394, 194395, 194396, 194397, 194398, 197086, 224574, 194403, 194404, 194415, 194417, 194419, 194420, 194421, 194422, 194423, 194424, 194427, 194429, 194430, 394584, 432482, 432485, 432486, 432487, 432490, 432491, 432492, 432495, 432496, 432498, 432499, 432500, 432501, 432502, 432503, 432504, 486604, 486605, 432508, 432509, 432510, 432524, 432526, 432527, 432528, 432529, 432533, 432534, 432535, 432537, 432539, 432540, 432541, 432542, 508743, 432552, 432553, 432556, 432557, 432558, 432559, 432561, 432562, 432563, 432564, 432569, 432570, 432571, 486616, 486618, 486619, 486620, 486622, 432577, 432578, 432579, 432580, 432583, 432584, 432586, 432587, 432590, 432593, 432596, 432597, 432598, 432599, 432600, 432601, 432602, 432603, 432604, 432606, 432607, 432608, 432612, 432613, 432

In [14]:
# ==============================================================================
# [설정] 카테고리 리스트 (감자, 고구마 등)
# ==============================================================================
CATEGORY_LIST = data.values() # 리스트로 번호 받기
# ==============================================================================

# ---------------------------------------------------
# 1) 크롬 실행
# ---------------------------------------------------
CHROME_EXE = r"C:\Program Files\Google\Chrome\Application\chrome.exe"

def pick_free_port(start=9222, end=9350):
    for p in range(start, end+1):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            try:
                s.bind(("127.0.0.1", p))
                return p
            except OSError:
                continue
    raise RuntimeError("포트 찾기 실패")

def wait_devtools(port, timeout=12):
    url = f"http://127.0.0.1:{port}/json/version"
    t0 = time.time()
    while time.time() - t0 < timeout:
        try:
            with urllib.request.urlopen(url, timeout=1) as r:
                js = json.loads(r.read().decode("utf-8", "ignore"))
                if js.get("Browser"): return True
        except: time.sleep(0.2)
    return False

tmp_profile = tempfile.mkdtemp(prefix="chrome_profile_")
port = pick_free_port()

args = [
    CHROME_EXE,
    f"--remote-debugging-port={port}",
    f"--user-data-dir={tmp_profile}",
    "--lang=ko-KR",
    "--window-size=1400,950",
    "--disable-http2",
    "--disable-blink-features=AutomationControlled",
    "about:blank"
]
subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

if not wait_devtools(port):
    raise SystemExit("크롬 실행 실패")

print(f"[OK] 크롬 실행됨 (Port: {port})")

# ---------------------------------------------------
# 2) Selenium 연결
# ---------------------------------------------------
opts = Options()
opts.add_experimental_option("debuggerAddress", f"127.0.0.1:{port}")
driver = webdriver.Chrome(options=opts)
wait = WebDriverWait(driver, 20)

# ---------------------------------------------------
# 3) 사용자 준비 대기
# ---------------------------------------------------
print("\n" + "="*70)
print("1. 브라우저가 열리면 '첫 번째 카테고리 페이지'로 직접 이동하세요.")
print("2. 접속이 잘 되었는지 확인하세요.")
print("3. 준비가 완료되면 콘솔에 'y'를 입력하고 엔터를 치세요.")
print("="*70)

ready = input("시작할까요? (y 입력): ").strip().lower()
if ready != 'y':
    driver.quit()
    sys.exit()

try:
    if not driver.window_handles: raise Exception("탭 없음")
    driver.switch_to.window(driver.window_handles[-1])
except Exception as e:
    print(f"[FATAL] 브라우저 연결 실패: {e}")
    sys.exit()

# ---------------------------------------------------
# 4) [목록용] JS 스니펫 (전체 스캔)
# ---------------------------------------------------
SCRAPE_JS_ASYNC = r"""
var done = arguments[0];
function sleep(ms){ return new Promise(r=>setTimeout(r, ms)); }
const CARD_SELS = ['li[class^="ProductUnit_productUnit"]', 'li.search-product', 'ul#productList > li'];
const TITLE_SELS = ['div.name', 'div[class*="productName"]'];
const PRICE_SELS = ['strong.price-value', 'strong[class*="priceValue"]'];
const UNIT_SELS  = ['span.unit-price', 'span[class*="unitPrice"]'];
const STAR_SELS  = ['em.rating', 'div[class*="star"]'];
const REV_SELS   = ['span.rating-total-count', 'span[class*="ratingCount"]'];
function intVal(t){ if(!t) return null; const n=(t+'').replace(/[^\d]/g,''); return n?parseInt(n,10):null; }
function floatVal(t){ if(!t) return 0.0; const m=(t+'').replace(/,/g,'').match(/\d+(\.\d+)?/); return m?parseFloat(m[0]):0.0; }
function pick(el, sels){ for(const s of sels){ const x = el.querySelector(s); if(x) return x.textContent.trim(); } return ""; }

async function loadAll(){
  let prev = -1;
  for(let i=0; i<10; i++){
    let cards = document.querySelectorAll(CARD_SELS.join(','));
    let now = cards.length;
    if(now > 0 && now === prev) break;
    prev = now;
    window.scrollBy(0, 800);
    await sleep(200);
  }
  return document.querySelectorAll(CARD_SELS.join(','));
}

(async () => {
  try {
    const cards = await loadAll();
    const rows = [];
    cards.forEach(el => {
      let linkEl = el.querySelector('a[href*="/vp/products/"]');
      let link = linkEl ? linkEl.href : "";
      let pid = el.getAttribute('data-product-id') || el.getAttribute('data-item-id') || "";
      if(!pid && link) { let m = link.match(/\/vp\/products\/(\d+)/); if(m) pid = m[1]; }
      const title = pick(el, TITLE_SELS);
      const price = intVal(pick(el, PRICE_SELS));
      const unit_price = intVal(pick(el, UNIT_SELS));
      const rating = floatVal(pick(el, STAR_SELS));
      const review_count = intVal(pick(el, REV_SELS));
      rows.push({product_id: pid, title, price, unit_price, rating, review_count, product_url: link});
    });
    done(rows);
  } catch(e){ done([]); }
})();
"""

# ---------------------------------------------------
# 5) [핵심 수정] 상세 페이지 이미지 수집 함수
# ---------------------------------------------------
def scrape_detail_in_current_tab():
    result = {"main_thumbnail": "", "detail_images": ""}

    try:
        print("    -> 상세 페이지 로딩 대기 중...")

        # 1. 페이지 로딩 확인 (제목이나 썸네일)
        try:
            wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "h2.prod-buy-header__title, img.prod-image__detail, img[alt='Product image']")))
            print("    -> 진입 확인")
        except TimeoutException:
            print("    [WARN] 로딩 시간 초과 (스크롤 시도)")

        # 2. 메인 썸네일 찾기
        main_img = None
        # 우선순위: 신규 UI -> 구형 UI
        for sel in ["img[alt='Product image']", "img.prod-image__detail"]:
            try:
                main_img = driver.find_element(By.CSS_SELECTOR, sel)
                if main_img: break
            except: continue

        if main_img:
            src = main_img.get_attribute("src")
            if src:
                if src.startswith("//"): src = "https:" + src
                result['main_thumbnail'] = src
                print(f"    -> 메인 썸네일 확보")

        # 3. 상세 이미지 찾기 (스크롤 필수)
        print("    -> 상세 이미지 스캔을 위해 스크롤 다운...")
        for _ in range(6):
            driver.execute_script("window.scrollBy(0, 800);")
            time.sleep(0.5)

        # [수정된 부분] 사용자 제공 태그 반영 + 기존 선택자 혼합
        # .product-detail-content-inside img : 사용자가 제공한 신규 UI
        # #productDetail img : 구형 UI
        # .product-detail-content img : 범용
        detail_selectors = [
            ".product-detail-content-inside img",
            "#productDetail img",
            ".product-detail-content img"
        ]

        detail_imgs = []
        for sel in detail_selectors:
            detail_imgs = driver.find_elements(By.CSS_SELECTOR, sel)
            if detail_imgs:
                print(f"    [Info] 선택자 '{sel}'로 이미지 발견")
                break

        img_urls = []
        for img in detail_imgs:
            src = img.get_attribute("src")
            if not src: src = img.get_attribute("data-src") # Lazy Load 대응

            if src and "blank.gif" not in src:
                if src.startswith("//"): src = "https:" + src
                img_urls.append(src)

        result['detail_images'] = "\n".join(img_urls)
        print(f"    -> 상세 이미지 {len(img_urls)}장 확보")

    except Exception as e:
        print(f"    [ERROR] 상세 이미지 파싱 중 오류: {e}")

    return result

# ---------------------------------------------------
# 6) 메인 로직
# ---------------------------------------------------
all_data = []
collected_categories = set()

# --- [Phase 1] 현재 페이지(수동 진입) 수집 ---
try:
    current_url = driver.current_url
    print(f"\n[Step 1] 현재 페이지 수집: {current_url}")
    match = re.search(r"/categories/(\d+)", current_url)
    current_cat_id = match.group(1) if match else "manual_start"

    # 목록 수집
    print("  데이터 수집 시작 (스크롤 중)...")
    rows = driver.execute_async_script(SCRAPE_JS_ASYNC)
    print(f"  -> {len(rows)}개 상품 스캔 완료")

    # 최저가 선정
    valid_items = [r for r in rows if r.get('price') is not None and r['price'] > 0]

    if valid_items:
        cheapest_item = min(valid_items, key=lambda x: x['price'])
        print(f"  ★ 최저가 선정: {cheapest_item['title']} ({cheapest_item['price']}원)")

        if cheapest_item.get('product_url'):
            print(f"    상세 페이지로 이동 중...")
            driver.get(cheapest_item['product_url'])

            # 상세 이미지 수집
            images = scrape_detail_in_current_tab()
            cheapest_item.update(images)

            cheapest_item['category_id'] = current_cat_id
            cheapest_item['crawling_dt'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

            all_data.append(cheapest_item)
            collected_categories.add(current_cat_id)
            print("    수집 완료.")
    else:
        print("  [WARN] 유효한 상품이 없습니다.")

except Exception as e:
    print(f"[ERROR] 초기 페이지 수집 실패: {e}")


# --- [Phase 2] 리스트 순회 ---
for idx, cat_id in enumerate(CATEGORY_LIST):
    if cat_id in collected_categories: continue

    sleep_time = random.uniform(5, 8)
    print(f"\n[{idx+1}/{len(CATEGORY_LIST)}] 대기 {sleep_time:.1f}초 후 이동: {cat_id}")
    time.sleep(sleep_time)

    target_url = f"https://www.coupang.com/np/categories/{cat_id}?listSize=60&sorter=saleCountDesc"

    try:
        print(f"  카테고리 이동 시도...")
        js_click_fake = f"""
            var a = document.createElement('a');
            a.href = '{target_url}';
            a.id = 'temp_crawler_link';
            document.body.appendChild(a);
            a.click();
        """
        driver.execute_script(js_click_fake)

        try:
            wait.until(EC.url_contains(cat_id))
        except TimeoutException:
            print("  [WARN] 카테고리 URL 변경 감지 실패")

        try:
            wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "li[class^='ProductUnit'], ul#productList")))
        except TimeoutException:
            print(f"  [WARN] {cat_id} 목록 로딩 타임아웃!")
            continue

        rows = driver.execute_async_script(SCRAPE_JS_ASYNC)
        if not rows:
            print("  [WARN] 상품 0개")
            continue

        valid_items = [r for r in rows if r.get('price') is not None and r['price'] > 0]
        if not valid_items:
            print("  [WARN] 가격 정보 없음")
            continue

        cheapest_item = min(valid_items, key=lambda x: x['price'])
        print(f"  ★ 최저가 선정: {cheapest_item['title']} ({cheapest_item['price']}원)")

        if cheapest_item.get('product_url'):
            print(f"    상세 페이지로 이동 중...")
            driver.get(cheapest_item['product_url'])

            images = scrape_detail_in_current_tab()
            cheapest_item.update(images)

            cheapest_item['category_id'] = cat_id
            cheapest_item['crawling_dt'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

            all_data.append(cheapest_item)
            collected_categories.add(cat_id)
            print("    수집 완료.")

    except Exception as e:
        print(f"  [ERROR] {cat_id} 처리 중 오류: {e}")
        continue

# ---------------------------------------------------
# 7) 저장
# ---------------------------------------------------
print(f"\n[완료] 총 {len(all_data)}개 수집됨")

if all_data:
    df = pd.DataFrame(all_data)
    cols = ["category_id", "product_id", "title", "price", "unit_price", "main_thumbnail", "detail_images", "product_url", "crawling_dt"]
    df = df.reindex(columns=[c for c in cols if c in df.columns] + [c for c in df.columns if c not in cols])

    filename = f"coupang_final_{datetime.now().strftime('%H%M%S')}.csv"
    df.to_csv(filename, index=False, encoding="utf-8-sig")
    print(f"파일 저장 완료: {filename}")
else:
    print("수집된 데이터가 없습니다.")

[OK] 크롬 실행됨 (Port: 9222)

1. 브라우저가 열리면 '첫 번째 카테고리 페이지'로 직접 이동하세요.
2. 접속이 잘 되었는지 확인하세요.
3. 준비가 완료되면 콘솔에 'y'를 입력하고 엔터를 치세요.

[Step 1] 현재 페이지 수집: https://www.coupang.com/np/categories/195011?listSize=60&sorter=saleCountDesc
  데이터 수집 시작 (스크롤 중)...
  -> 60개 상품 스캔 완료
  ★ 최저가 선정: 풀무원샘물 퓨어 생수, 2L, 12개 (6900원)
    상세 페이지로 이동 중...
    -> 상세 페이지 로딩 대기 중...
    -> 진입 확인
    -> 메인 썸네일 확보
    -> 상세 이미지 스캔을 위해 스크롤 다운...
    [Info] 선택자 '.product-detail-content-inside img'로 이미지 발견
    -> 상세 이미지 4장 확보
    수집 완료.

[1/767] 대기 6.3초 후 이동: 195008
  카테고리 이동 시도...
  [ERROR] 195008 처리 중 오류: 'in <string>' requires string as left operand, not int

[2/767] 대기 5.8초 후 이동: 195010


KeyboardInterrupt: 