In [50]:
# --- macOS 유니패스 100행/페이지 크롤러 (1→403, 숫자 클릭 + 안정화, 중복 최소화, 대기 상향) ---

import os, time, re, atexit, subprocess, random, hashlib
import pandas as pd

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    TimeoutException, NoSuchElementException, ElementClickInterceptedException,
    StaleElementReferenceException, WebDriverException
)

# ============== 사용자 설정 ==============
URL                  = "https://unipass.customs.go.kr/clip/prlstclsfsrch/openULS0203002S.do"
START_PAGE           = 1
END_PAGE             = 403
TARGET_ROWS          = 100          # 페이지당 목표 행 수
MIN_OK_ROWS          = 80           # 100행 세팅 검증 기준
SAVE_DIR             = "/Users/zoohunn/Desktop/4-2/데사캡디/크롤링"
OUT_CSV              = os.path.join(SAVE_DIR, "unipass_100rows_p1-403_clean.csv")
FAILED_LOG_CSV       = os.path.join(SAVE_DIR, "unipass_failed_rows.csv")
HEADLESS             = False        # 문제 없으면 True
WAIT_SEC             = 25           # 명시적 대기 상향
ROW_CLICK_RETRIES    = 4            # 각 행 클릭 재시도 상향
ROUNDS_PER_PAGE      = 5            # 페이지 내 재수집 라운드 상향
CHECKPOINT_STEP      = 10
# =======================================

# --- 대기 튜닝 (상향) ---
PAGE_CHANGE_WAIT_SEC  = 12.0  # 숫자 클릭 후 변경 확인 상향
BLOCK_CHANGE_WAIT_SEC = 1.6   # 블록 버튼 클릭 후 대기 상향
STABILIZE_TICKS       = 4     # 활성 페이지/첫 행 텍스트 연속 동일 틱
STABILIZE_TIMEOUT     = 10.0  # 안정화 최대 대기시간
CLICK_GAP_RANGE       = (0.12, 0.35)  # 클릭 사이 랜덤 슬립으로 디싱크 방지

os.makedirs(SAVE_DIR, exist_ok=True)

_ws_leading = re.compile(r'^[\s\u00A0\u2000-\u200B]+')
def lstrip_all(s: str) -> str:
    return _ws_leading.sub('', s or '')

def start_caffeinate():
    try:
        p = subprocess.Popen(["caffeinate", "-dimsu"])
        atexit.register(lambda: (p.terminate(), p.wait() if p.poll() is None else None))
        return p
    except FileNotFoundError:
        print("[경고] caffeinate 없음(수면방지 비활성).")
        return None

def make_driver():
    opts = webdriver.ChromeOptions()
    opts.add_argument("--lang=ko-KR")
    opts.add_argument("--disable-gpu")
    opts.add_argument("--window-size=1440,1000")
    opts.add_argument("--disable-blink-features=AutomationControlled")
    opts.add_experimental_option("excludeSwitches", ["enable-automation"])
    opts.add_experimental_option('useAutomationExtension', False)
    if HEADLESS: opts.add_argument("--headless=new")
    driver = webdriver.Chrome(options=opts)  # Selenium Manager 자동
    driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
    driver.set_page_load_timeout(60)
    return driver

def W(driver): return WebDriverWait(driver, WAIT_SEC)
def wait_present(wait, xp): return wait.until(EC.presence_of_element_located((By.XPATH, xp)))

def ensure_search(wait):
    search = wait_present(wait, '//*[@id="srchSrwr"]')
    search.clear(); search.send_keys("-"); search.send_keys(Keys.ENTER)

def list_rows(driver):
    try:
        return driver.find_elements(By.XPATH, '//*[@id="ULS0203002S_T1_table1"]/tbody/tr')
    except Exception:
        return []

def detail_ready(wait):
    wait_present(wait, '//*[@id="ULS0203037S_T1_table1"]')

# ---------- 100행 보기 ----------
def set_rows_100(driver):
    ok=False
    for xp in [
        "//select[@name='pageUnit']",
        "//select[contains(@id,'pageUnit')]",
        "//select[contains(@name,'page') and (option[@value='100'] or option[contains(.,'100')])]",
        "//div[contains(@class,'page') or contains(@class,'paging')]//select"
    ]:
        try:
            sel = driver.find_element(By.XPATH, xp)
            Select(sel).select_by_value("100"); time.sleep(1.1)
            ok=True; print("[행수설정] select로 100 설정:", xp); break
        except Exception: pass
    if not ok:
        for xp in [
            "//a[contains(.,'100개') or contains(.,'100건') or contains(.,'100')]",
            "//button[contains(.,'100개') or contains(.,'100건') or contains(.,'100')]",
            "//li/a[contains(.,'100')]"
        ]:
            try:
                el = driver.find_element(By.XPATH, xp)
                driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el)
                driver.execute_script("arguments[0].click();", el)
                time.sleep(0.8); ok=True; print("[행수설정] 버튼/링크로 100 설정:", xp); break
            except Exception: pass
    if not ok:
        js = """
        (function(){
          var selects = document.querySelectorAll('select');
          for (const s of selects){
            let found = Array.from(s.options).some(o => (o.text||'').includes('100') || (o.value||'')=='100');
            if(found){ s.value='100'; s.dispatchEvent(new Event('change',{bubbles:true})); return true; }
          }
          return false;
        })();
        """
        try:
            if driver.execute_script(js): time.sleep(1.0); ok=True; print("[행수설정] JS로 100 설정 성공")
        except Exception: pass

    try:
        time.sleep(1.0)
        n = len(list_rows(driver))
        if n >= MIN_OK_ROWS:
            print(f"[검증] 목록 행 수={n} (OK)"); return True
        else:
            print(f"[검증] 목록 행 수={n} (부족하지만 진행)"); return ok
    except Exception:
        print("[검증] 행 수 확인 실패(진행)"); return ok

# ---------- 페이지네이션 유틸 ----------
def get_paging_container(driver):
    for xp in [
        "//*[@id='ULS0203002S_T1_container']",
        "//div[contains(@class, 'paging')]",
        "//div[contains(@class, 'pagination')]"
    ]:
        try:
            return driver.find_element(By.XPATH, xp)
        except Exception:
            pass
    return driver

def get_active_page_no(driver):
    cont = get_paging_container(driver)
    xps = [
        ".//li[contains(@class,'on') or contains(@class,'active')]/*[self::span or self::strong]",
        ".//span[contains(@class,'on') or contains(@class,'active') or contains(@class,'current')]",
        ".//strong[contains(@class,'on') or contains(@class,'active') or contains(@class,'current')]",
        ".//li//*[self::span or self::strong][not(self::a)]",
        ".//*[self::span or self::strong][not(self::a)]"
    ]
    for xp in xps:
        try:
            eles = cont.find_elements(By.XPATH, xp)
            for e in eles:
                t = (e.text or "").strip()
                if t.isdigit():
                    return int(t)
        except Exception:
            pass
    try:
        labels = cont.find_elements(By.XPATH, ".//*")
        for e in labels:
            if e.tag_name.lower() == "a": 
                continue
            t = (e.text or "").strip()
            if t.isdigit():
                return int(t)
    except Exception:
        pass
    return None

def find_page_element_any(driver, page_no):
    cont = get_paging_container(driver)
    xps = [
        f".//li//a[normalize-space(text())='{page_no}']",
        f".//li//span[normalize-space(text())='{page_no}']",
        f".//a[normalize-space(text())='{page_no}']",
        f".//span[normalize-space(text())='{page_no}']",
        f".//strong[normalize-space(text())='{page_no}']",
    ]
    for xp in xps:
        try:
            el = cont.find_element(By.XPATH, xp)
            return el
        except Exception:
            continue
    return None

def find_page_link(driver, page_no):
    cont = get_paging_container(driver)
    xps = [
        f".//li//a[normalize-space(text())='{page_no}']",
        f".//a[normalize-space(text())='{page_no}']",
    ]
    for xp in xps:
        try:
            return cont.find_element(By.XPATH, xp)
        except Exception:
            continue
    return None

def click_block(driver, which="next"):
    cont = get_paging_container(driver)
    sel = ([
        ".//a[normalize-space(text())='>>' or normalize-space(text())='»' or contains(@title,'다음') or contains(@aria-label,'다음')]",
        ".//li[contains(@class,'next')]/a",
        ".//a[contains(@class,'next')]",
    ] if which=="next" else [
        ".//a[normalize-space(text())='<<' or normalize-space(text())='«' or contains(@title,'이전') or contains(@aria-label,'이전')]",
        ".//li[contains(@class,'prev')]/a",
        ".//a[contains(@class,'prev')]",
    ])
    for xp in sel:
        try:
            el = cont.find_element(By.XPATH, xp)
            try:
                el.click()
            except (ElementClickInterceptedException, WebDriverException):
                driver.execute_script("arguments[0].click();", el)
            time.sleep(BLOCK_CHANGE_WAIT_SEC)
            return True
        except Exception:
            continue
    return False

def stabilize_on_page(driver, expected_page, stable_ticks=STABILIZE_TICKS, timeout=STABILIZE_TIMEOUT):
    """활성 페이지 번호와 첫 행 텍스트가 stable_ticks회 연속 동일이면 안정화로 간주."""
    print(f"    [안정화] p.{expected_page} 고정 대기 (ticks={stable_ticks}, timeout={timeout}s)")
    last_first = None; last_active = None; same = 0; t0 = time.time()
    while time.time() - t0 < timeout:
        time.sleep(0.3)
        active = get_active_page_no(driver)
        first  = get_first_row_text_safe(driver)
        if active == expected_page and first and first == last_first and active == last_active:
            same += 1
        else:
            same = 0
        last_first  = first
        last_active = active
        if same >= stable_ticks:
            print("        → 안정화 완료")
            return True
    print("        → 시간초과(그래도 진행)"); return False

def go_to_page_exact(driver, page_no):
    """
    '페이지 번호 p' 기준으로만 이동:
      1) p가 페이지네이션에 보이지 않으면 딱 1번 >> 또는 << 로 블록 전환
      2) 보이면: <a>가 있으면 클릭, 없으면(활성) 그대로
      3) 클릭 뒤 활성 페이지/첫 행 텍스트 변화 검증 + 안정화 대기
    반환: ("ok"|"active"|"fail")
    """
    el_any = find_page_element_any(driver, page_no)
    if el_any is None:
        # 블록 한번만 전환해서 p가 보이게 유도
        cont = get_paging_container(driver)
        nums = []
        for e in cont.find_elements(By.XPATH, ".//a|.//span|.//strong"):
            try:
                t = e.text.strip()
                if t.isdigit():
                    nums.append(int(t))
            except Exception:
                pass
        if nums:
            mn, mx = min(nums), max(nums)
            if page_no > mx:
                click_block(driver, "next")
            elif page_no < mn:
                click_block(driver, "prev")
        time.sleep(0.7)
        el_any = find_page_element_any(driver, page_no)
        if el_any is None:
            return "fail"

    el_link = find_page_link(driver, page_no)
    if el_link is None:
        # 이미 활성 페이지 → 안정화 한번 더 확인
        print(f"    → {page_no}는 활성 표시(클릭 불가) → 안정화 확인")
        stabilize_on_page(driver, page_no)
        return "active"

    # 클릭 가능한 경우
    first_before = get_first_row_text_safe(driver)
    try:
        driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el_link)
        time.sleep(random.uniform(*CLICK_GAP_RANGE))
        try:
            el_link.click()
        except (ElementClickInterceptedException, WebDriverException):
            driver.execute_script("arguments[0].click();", el_link)

        # 변경 확인: 첫 행 텍스트 OR 활성 페이지 번호
        t0 = time.time(); changed = False
        while time.time() - t0 < PAGE_CHANGE_WAIT_SEC:
            time.sleep(0.35)
            first_after = get_first_row_text_safe(driver)
            active_now  = get_active_page_no(driver)
            if (first_before and first_after and first_after != first_before) or (active_now == page_no):
                changed = True; break
        if not changed:
            print("    → 경고: 페이지 변경 신호 미감지(그래도 안정화 시도)")
        stabilize_on_page(driver, page_no)
        return "ok"
    except Exception as e:
        print(f"    → 페이지 숫자 클릭 실패: {e}")
        return "fail"

def get_first_row_text_safe(driver):
    try:
        el = driver.find_element(By.XPATH, '//*[@id="ULS0203002S_T1_table1"]/tbody/tr[1]/td[3]/a')
        return (el.text or "").strip()
    except Exception:
        return None

# ---------- 상세/수집 ----------
def extract_detail(driver):
    detail_xpath = '//*[@id="ULS0203037S_T1_table1"]'
    W(driver).until(EC.presence_of_element_located((By.XPATH, detail_xpath)))
    eff_cells = driver.find_elements(By.XPATH, f'{detail_xpath}/tbody/tr[2]/td')
    eff = lstrip_all(" ".join(lstrip_all(c.text) for c in eff_cells if c.text)) or None
    hs_cells = driver.find_elements(By.XPATH, f'{detail_xpath}/tbody/tr[4]/td')
    hs = None
    if hs_cells:
        hs_list = [lstrip_all(c.text)[:12] for c in hs_cells if c.text]
        hs = lstrip_all(" ".join(hs_list)) or None
    name_cells = driver.find_elements(By.XPATH, f'{detail_xpath}/tbody/tr[5]/td')
    name = lstrip_all(" ".join(lstrip_all(c.text) for c in name_cells if c.text)) or None
    desc_cells = driver.find_elements(By.XPATH, f'{detail_xpath}/tbody/tr[6]/td')
    desc = lstrip_all(" ".join(lstrip_all(c.text) for c in desc_cells if c.text)) or None
    reason_cells = driver.find_elements(By.XPATH, f'{detail_xpath}/tbody/tr[7]/td')
    reason = lstrip_all(" ".join(lstrip_all(c.text) for c in reason_cells if c.text)) or None
    return eff, hs, name, desc, reason

def row_key_tuple(eff, hs, name, desc, reason):
    """페이지 내부 중복 제거용 키(내용 튜플 기반)."""
    k = (eff or "", hs or "", name or "", desc or "", reason or "")
    return hashlib.md5("||".join(k).encode("utf-8")).hexdigest()

def try_click_row(driver, i):
    link_xpath = f'//*[@id="ULS0203002S_T1_table1"]/tbody/tr[{i}]/td[3]/a'
    for attempt in range(1, ROW_CLICK_RETRIES+1):
        try:
            link = W(driver).until(EC.element_to_be_clickable((By.XPATH, link_xpath)))
            driver.execute_script("arguments[0].scrollIntoView({block:'center'});", link)
            time.sleep(0.05 + 0.08 * attempt + random.uniform(0, 0.05))
            try:
                link.click()
            except (ElementClickInterceptedException, WebDriverException):
                driver.execute_script("arguments[0].click();", link)
            return True
        except (TimeoutException, NoSuchElementException, StaleElementReferenceException, WebDriverException):
            time.sleep(0.15 * attempt + random.uniform(0.0, 0.12))
    return False

def collect_page_with_retries(driver, page_no):
    trs = list_rows(driver)
    n = min(TARGET_ROWS, len(trs)) if len(trs) > 0 else TARGET_ROWS
    if n == 0: n = TARGET_ROWS

    results = [None] * n
    pending = set(range(1, n+1))
    seen_hashes = set()  # 페이지 내 내용 중복 방지

    # 라운드 수만큼 반복 수집
    for round_idx in range(1, ROUNDS_PER_PAGE+1):
        if not pending: break
        print(f"    [라운드 {round_idx}] 미수집 {len(pending)}건 재시도")
        for i in list(pending):
            if not try_click_row(driver, i):
                continue
            try:
                eff, hs, name, desc, reason = extract_detail(driver)
                rec_hash = row_key_tuple(eff, hs, name, desc, reason)
                # 중복 내용이면 건너뜀 (불필요한 중복 출력/저장 방지)
                if rec_hash in seen_hashes and results[i-1] is not None:
                    continue
                seen_hashes.add(rec_hash)
                results[i-1] = {"시행일자": eff, "HSCode": hs, "이름": name, "설명": desc, "사유": reason}
                print(f"[p{page_no:>4} r{i:>3}] HS={hs} | 시행일자={eff}", flush=True)
                # 내용이 어느 정도 채워졌다면 수집 완료로 간주
                if any([eff, hs, name, desc, reason]):
                    pending.discard(i)
            except Exception:
                pass
            time.sleep(random.uniform(*CLICK_GAP_RANGE))

        # 각 라운드 후 짧게 안정화
        stabilize_on_page(driver, page_no, stable_ticks=2, timeout=4.0)

    # 보너스 스윕: 남은 pending에 대해서 한 번 더 시도
    if pending:
        print(f"    [보너스] 남은 {len(pending)}건 최종 시도")
        for i in list(pending):
            if not try_click_row(driver, i):
                continue
            try:
                eff, hs, name, desc, reason = extract_detail(driver)
                results[i-1] = {"시행일자": eff, "HSCode": hs, "이름": name, "설명": desc, "사유": reason}
                print(f"[p{page_no:>4} r{i:>3}] (보너스) HS={hs} | 시행일자={eff}", flush=True)
            except Exception:
                pass
            time.sleep(0.12)

    # 실패 목록
    failed = [i for i, rec in enumerate(results, start=1)
              if rec is None or not any([rec["시행일자"], rec["HSCode"], rec["이름"], rec["설명"], rec["사유"]])]
    # 비어있는 것은 최소한의 스켈레톤으로 채움(후처리 편의)
    for i in failed:
        if results[i-1] is None:
            results[i-1] = {"시행일자": None, "HSCode": None, "이름": None, "설명": None, "사유": None}

    print(f"    → 성공 {n-len(failed)} / 실패 {len(failed)}")
    return results, failed

def append_page(out_csv, page_no, records):
    df = pd.DataFrame(records, columns=["시행일자","HSCode","이름","설명","사유"])
    df.insert(0, "페이지", page_no)
    df.insert(1, "행", range(1, len(df)+1))
    header = not os.path.exists(out_csv)
    df.to_csv(out_csv, index=False, encoding="utf-8-sig", mode="a", header=header)

def append_failed_log(failed_csv, page_no, failed_rows):
    if not failed_rows: return
    df = pd.DataFrame([{"페이지": page_no, "행": r} for r in failed_rows])
    header = not os.path.exists(failed_csv)
    df.to_csv(failed_csv, index=False, encoding="utf-8-sig", mode="a", header=header)

# ---------- 메인 (기대 페이지 p 기준, 안정화 포함) ----------
def main():
    _ = start_caffeinate()
    driver = make_driver()
    wait = W(driver)

    try:
        print("[시작] 접속")
        driver.get(URL)

        print("[검색] '-' 로 조회")
        ensure_search(wait)
        wait_present(wait, '//*[@id="ULS0203002S_T1_table1"]'); detail_ready(wait)

        print("[설정] 100행 보기 강제")
        set_rows_100(driver)

        # 기존 파일 삭제 후 새로 시작
        for p in [OUT_CSV, FAILED_LOG_CSV]:
            if os.path.exists(p): os.remove(p)
        print(f"[저장] 결과 CSV → {OUT_CSV}")
        print(f"[로그] 실패 CSV → {FAILED_LOG_CSV}")

        for page_no in range(START_PAGE, END_PAGE+1):
            print(f"\n[페이지] {page_no}/{END_PAGE} 이동·수집 시작")

            # 1) 페이지 번호를 '보이게' 하고 이동 + 안정화
            res = go_to_page_exact(driver, page_no)
            if res == "fail":
                print("    → 재시도: 블록 전환 후 다시 확인")
                cont = get_paging_container(driver)
                nums = []
                for e in cont.find_elements(By.XPATH, ".//a|.//span|.//strong"):
                    try:
                        t = e.text.strip()
                        if t.isdigit(): nums.append(int(t))
                    except Exception:
                        pass
                if nums:
                    mn, mx = min(nums), max(nums)
                    if page_no > mx: click_block(driver, "next")
                    elif page_no < mn: click_block(driver, "prev")
                    time.sleep(0.8)
                res = go_to_page_exact(driver, page_no)
                if res == "fail":
                    print(f"[치명] p.{page_no} 노출/이동 실패 → 종료")
                    break

            # 2) 이제 현재 리스트는 p페이지 → 수집 시작
            stabilize_on_page(driver, page_no)   # 한 번 더 고정
            detail_ready(wait)
            records, failed_rows = collect_page_with_retries(driver, page_no)
            append_page(OUT_CSV, page_no, records)
            append_failed_log(FAILED_LOG_CSV, page_no, failed_rows)
            print(f"[저장] p.{page_no} 완료 (실패 {len(failed_rows)}건)")

            if page_no % CHECKPOINT_STEP == 0:
                print(f"[체크포인트] p.{page_no}까지 저장됨")

        print("\n[완료] 순차 크롤링 종료")
        print(f"→ 최종 파일: {OUT_CSV}")
        print(f"→ 실패 로그: {FAILED_LOG_CSV} (비어있으면 전부 성공)")

    finally:
        try: driver.quit()
        except Exception: pass

if __name__ == "__main__" or True:
    main()


[시작] 접속
[검색] '-' 로 조회
[설정] 100행 보기 강제
[행수설정] select로 100 설정: //select[contains(@name,'page') and (option[@value='100'] or option[contains(.,'100')])]
[검증] 목록 행 수=0 (부족하지만 진행)
[저장] 결과 CSV → /Users/zoohunn/Desktop/4-2/데사캡디/크롤링/unipass_100rows_p1-403_clean.csv
[로그] 실패 CSV → /Users/zoohunn/Desktop/4-2/데사캡디/크롤링/unipass_failed_rows.csv

[페이지] 1/403 이동·수집 시작
    → 1는 활성 표시(클릭 불가) → 안정화 확인
    [안정화] p.1 고정 대기 (ticks=4, timeout=10.0s)
        → 안정화 완료
    [안정화] p.1 고정 대기 (ticks=4, timeout=10.0s)
        → 안정화 완료
    [라운드 1] 미수집 100건 재시도
[p   1 r  1] HS=8543.70-2090 | 시행일자=2025-08-28
[p   1 r  2] HS=8543.70-2090 | 시행일자=2025-08-28
[p   1 r  4] HS=3212.90-1000 | 시행일자=2025-08-28
[p   1 r  5] HS=8543.70-2090 | 시행일자=2025-08-28
[p   1 r  6] HS=3212.90-1000 | 시행일자=2025-08-28
[p   1 r  7] HS=7007.19-1000 | 시행일자=2025-08-28
[p   1 r  8] HS=3212.90-1000 | 시행일자=2025-08-28
[p   1 r  9] HS=8536.50-4000 | 시행일자=2025-08-28
[p   1 r 10] HS=8536.50-4000 | 시행일자=2025-08-28
[p   1 r 11] HS=8536.50-4000 | 시행일자=2025-08

In [51]:
data1 = pd.read_csv('/Users/zoohunn/Desktop/4-2/데사캡디/크롤링/unipass_crawling_0929.csv')
data1.shape

(37000, 7)