In [1]:
import time, random, json, os, re, csv, uuid, hashlib, pathlib, requests

import pandas as pd
from io import StringIO

from selenium import webdriver
from selenium.webdriver.support.ui import Select
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import WebDriverException, TimeoutException, StaleElementReferenceException

from urllib.parse import urlparse
from datetime import datetime
import requests
from bs4 import BeautifulSoup

In [2]:
## 동이름-고유번호 txt를 활용한 검색목록 리스트 생성

with open("dong-number.txt", "r", encoding="utf-8") as f:
    data = f.read()

locations = json.loads(data)['locations']
keyword = '유모차+스토케'

search_list_urls = []

for lo in locations:
    dong_name = lo['name3']
    dong_id = lo['id']
    search_list_url = f'https://www.daangn.com/kr/buy-sell/?in={dong_name}-{dong_id}&search={keyword}'
    
    search_list_urls.append(search_list_url)

len(search_list_urls) # 917개동

917

In [3]:
## 검색목록 리스트에서 상세링크 리스트 추출

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.common.exceptions import WebDriverException, TimeoutException, StaleElementReferenceException
import time, random, json, os

# ====== 동작 파라미터 ======
OUT_PATH = "detail_links.json"   # 결과 저장 파일
MAX_SCROLLS = 80
STABLE_ROUNDS_TARGET = 3
SCROLL_PAUSE = (0.8, 1.6)
URL_COOLDOWN = (5, 12)

# 403 백오프(분)
BACKOFF_BASE_MIN = 5
BACKOFF_MAX_MIN  = 60

# ====== 드라이버 세팅 ======
opts = webdriver.ChromeOptions()
opts.add_argument("--window-size=1280,900")
opts.add_argument("--lang=ko-KR")
opts.add_argument("user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36")
driver = webdriver.Chrome(options=opts)

def polite_sleep(a, b):
    time.sleep(random.uniform(a, b))

def looks_like_403(html: str) -> bool:
    h = (html or "").lower()
    return ("403" in h and "forbidden" in h) or ("access denied" in h)

def backoff_sleep(cur_min):
    wait_min = min(cur_min, BACKOFF_MAX_MIN)
    print(f"[403 감지] {wait_min}분 대기 후 재시도")
    time.sleep(wait_min * 60)
    return min(wait_min * 2, BACKOFF_MAX_MIN)

def collect_search_article_links_on_page():
    """현재 페이지에서 data-gtm='search_article' a 태그 href 수집"""
    anchors = driver.find_elements(By.CSS_SELECTOR, "a[data-gtm='search_article']")
    hrefs = []
    for a in anchors:
        try:
            href = a.get_attribute("href")
            if href:
                hrefs.append(href)
        except StaleElementReferenceException:
            continue
    return hrefs

def load_with_optional_backoff(url, backoff_min):
    """URL 로드 후 403이면 백오프 후 재시도"""
    driver.get(url)
    if looks_like_403(driver.page_source):
        backoff_min = backoff_sleep(backoff_min)
        driver.get(url)
        if looks_like_403(driver.page_source):
            print(" - 403 지속: 이 URL은 건너뜁니다.")
            return False, backoff_min
    return True, backoff_min

def scroll_and_collect_all():
    """무한스크롤 하며 상세 링크 모두 수집"""
    collected = set()
    stable_rounds = 0
    for _ in range(MAX_SCROLLS):
        hrefs = collect_search_article_links_on_page()
        before = len(collected)
        collected.update(hrefs)
        if len(collected) == before:
            stable_rounds += 1
        else:
            stable_rounds = 0
        if stable_rounds >= STABLE_ROUNDS_TARGET:
            break
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        polite_sleep(*SCROLL_PAUSE)
        if looks_like_403(driver.page_source):
            print(" - 스크롤 중 403 감지: 현재 페이지 수집을 중단합니다.")
            break
    return collected

def save_links(path, links_set):
    data = sorted(links_set)
    tmp = path + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    os.replace(tmp, path)

# ====== 메인 ======
# 기존 결과 복구
all_detail_links = set()
if os.path.exists(OUT_PATH):
    try:
        with open(OUT_PATH, "r", encoding="utf-8") as f:
            prev = json.load(f)
            all_detail_links.update(prev)
            print(f"[복구] 기존 {len(prev)}개 상세 링크 로드")
    except Exception:
        pass

backoff_min = BACKOFF_BASE_MIN

for idx, url in enumerate(search_list_urls, start=1):
    print(f"\n[{idx}/{len(search_list_urls)}] 수집 시작: {url}")
    ok, backoff_min = load_with_optional_backoff(url, backoff_min)
    if not ok:
        continue
    try:
        page_links = scroll_and_collect_all()
        all_detail_links.update(page_links)
        print(f" - 이번 페이지 수집: {len(page_links)}개 / 누적: {len(all_detail_links)}개")
        save_links(OUT_PATH, all_detail_links)  # 중간 저장
    except WebDriverException as e:
        print(f" - 드라이버 오류: {e}")
    polite_sleep(*URL_COOLDOWN)

driver.quit()

# 최종 저장
save_links(OUT_PATH, all_detail_links)
print(f"\n완료. 총 {len(all_detail_links)}개 상세 링크 저장 → {OUT_PATH}")

# 샘플 출력
for x in list(sorted(all_detail_links))[:10]:
    print(" -", x)


[복구] 기존 5898개 상세 링크 로드

[1/917] 수집 시작: https://www.daangn.com/kr/buy-sell/?in=역삼동-6035&search=유모차+스토케


KeyboardInterrupt: 

In [60]:
## 상품상세 크롤링

"""
Daangn detail crawler (final, with robust title/price/location/detail)
- 이미지: 화살표 클릭 없이, 메인 이미지가 들어있는 갤러리 컨테이너의 형제 이미지들을 DOM 순서대로 수집 (최대 10장)
- webp 자동 변환: 알파 있으면 PNG, 없으면 JPEG(quality=95)  (Pillow 미지원 시 imageio 폴백)
- detail: '더보기/펼치기' 클릭 후 설명 컨테이너 우선 추출(노이즈 제거). detail 로직은 유지
- location: 지역명(…동/…구) 우선, 없을 때만 '유아동'으로 폴백
- title: 카테고리로 오탐 방지 (명시 셀렉터 > 메타 > <title> 순, 파이프/카테고리 제거)
- price: 다양한 DOM 케이스를 커버하는 견고한 추출
- 저장 구조:
    sample_data/raw/
      ├─ daangn.csv
      └─ <post_uuid>/
           ├─ daangn_<post_uuid>_1.jpg (또는 .png)
           └─ ...
- CSV row: id,title,detail,condition,uploaded_date,is_completed,price,location
"""

import os
import re
import csv
import json
import time
import uuid
import random
from io import BytesIO
from urllib.parse import urlparse
from datetime import datetime, timedelta

import requests

# Pillow (webp 지원 체크 및 변환)
from PIL import Image, ImageFile, features
ImageFile.LOAD_TRUNCATED_IMAGES = True
WEBP_SUPPORTED = features.check('webp')
try:
    import imageio.v3 as iio  # 폴백
    IMAGEIO_AVAILABLE = True
except Exception:
    IMAGEIO_AVAILABLE = False

# Selenium
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    TimeoutException, NoSuchElementException, StaleElementReferenceException, WebDriverException
)

# ========= 사용자 설정 =========
LINKS_JSON   = "empty_links.json"
BASE_RAW_DIR = os.path.join("sample_data", "raw")
CSV_PATH     = os.path.join(BASE_RAW_DIR, "daangn.csv")
HEADLESS     = True

# 요청 헤더 (이미지 다운로드용)
BASE_REFERER = "https://www.daangn.com/"
REQ_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
    "Accept-Language": "ko-KR,ko;q=0.9,en;q=0.8",
    "Accept": "image/avif,image/*;q=0.8,*/*;q=0.5",
    "Referer": BASE_REFERER,
    "Connection": "keep-alive",
}

# 대기/재시도
PAGE_TIMEOUT = 20
RETRY        = 3
SLEEP_RANGE  = (1.2, 2.2)

os.makedirs(BASE_RAW_DIR, exist_ok=True)

# ========= 유틸 =========
def read_links(json_path):
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)
    if isinstance(data, dict) and isinstance(data.get("links"), list):
        return data["links"]
    if isinstance(data, list):
        return data
    raise ValueError("JSON은 리스트 또는 {'links': [...]} 형식이어야 합니다.")

def clean_text(x: str) -> str:
    if not x: return ""
    return re.sub(r"\s+", " ", x).strip()

def guess_deal_done(*fragments) -> str:
    blob = " ".join([f for f in fragments if f]).lower()
    if any(k in blob for k in ["거래완료","판매완료","거래 완료","판매 완료", "예약중"]): return "True"
    return "False" if blob else "unknown"

def infer_ext_from_url(u: str) -> str:
    p = urlparse(u).path.lower()
    for ext in (".jpg",".jpeg",".png",".gif",".webp"):
        if p.endswith(ext): return ext
    return ".jpg"

def now_string():
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")

def normalize_uploaded_at(raw: str) -> str:
    if not raw: return now_string()
    txt = raw.strip()
    try:  # ISO
        iso = txt.replace("Z", "+00:00")
        return datetime.fromisoformat(iso).strftime("%Y-%m-%d %H:%M:%S.%f")
    except: pass
    m = re.search(r"(\d+)\s*초\s*전", txt)
    if m: return (datetime.now() - timedelta(seconds=int(m.group(1)))).strftime("%Y-%m-%d %H:%M:%S.%f")
    m = re.search(r"(\d+)\s*분\s*전", txt)
    if m: return (datetime.now() - timedelta(minutes=int(m.group(1)))).strftime("%Y-%m-%d %H:%M:%S.%f")
    m = re.search(r"(\d+)\s*시간\s*전", txt)
    if m: return (datetime.now() - timedelta(hours=int(m.group(1)))).strftime("%Y-%m-%d %H:%M:%S.%f")
    if "어제" in txt: return (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S.%f")
    m = re.search(r"(\d+)\s*일\s*전", txt)
    if m: return (datetime.now() - timedelta(days=int(m.group(1)))).strftime("%Y-%m-%d %H:%M:%S.%f")
    for fmt in ("%Y.%m.%d %H:%M", "%Y.%m.%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"):
        try: return datetime.strptime(txt, fmt).strftime("%Y-%m-%d %H:%M:%S.%f")
        except: pass
    return now_string()

# ========= Selenium =========
def make_driver(headless=True):
    opts = webdriver.ChromeOptions()
    if headless: opts.add_argument("--headless=new")
    opts.add_argument("--window-size=1280,1600")
    opts.add_argument("--lang=ko-KR")
    opts.add_argument("--disable-gpu")
    opts.add_argument("--disable-dev-shm-usage")
    opts.add_argument("--no-sandbox")
    opts.add_argument("--disable-blink-features=AutomationControlled")
    opts.add_argument("--user-agent=" + REQ_HEADERS["User-Agent"])
    driver = webdriver.Chrome(options=opts)
    driver.set_page_load_timeout(PAGE_TIMEOUT)
    return driver

def get_meta(driver, prop):
    try:
        return driver.find_element(By.CSS_SELECTOR, f"meta[property='{prop}']").get_attribute("content") or ""
    except NoSuchElementException:
        return ""

def find_first_text(driver, selectors):
    for sel in selectors:
        try:
            el = driver.find_element(By.CSS_SELECTOR, sel)
            txt = clean_text(el.text)
            if txt: return txt
        except NoSuchElementException:
            continue
    return ""

def find_by_xpath_text(driver, xpaths):
    for xp in xpaths:
        try:
            el = driver.find_element(By.XPATH, xp)
            txt = clean_text(el.text)
            if txt: return txt
        except NoSuchElementException:
            continue
    return ""

def gentle_scroll(driver, steps=6, pause=(0.4, 0.8)):
    last_y = 0
    for _ in range(steps):
        driver.execute_script("window.scrollBy(0, document.body.scrollHeight/6);")
        time.sleep(random.uniform(*pause))
        y = driver.execute_script("return window.scrollY;")
        if y == last_y: break
        last_y = y

# ========= 이미지 URL (형제 수집) =========
def _extract_url_from_style(style: str) -> str:
    if not style: return ""
    m = re.search(r'url\((["\']?)(.+?)\1\)', style)
    return m.group(2) if m else ""

def _best_srcset(srcset: str) -> str:
    pairs = []
    for part in (srcset or "").split(","):
        p = part.strip()
        if not p: continue
        toks = p.split()
        url = toks[0]; w = 0
        if len(toks) > 1 and toks[1].endswith("w"):
            try: w = int(toks[1][:-1])
            except: w = 0
        pairs.append((w, url))
    if not pairs: return ""
    pairs.sort(key=lambda x: x[0], reverse=True)
    return pairs[0][1]

def _get_url(el):
    tag = (el.tag_name or "").lower()
    if tag == "img":
        cur = el.get_attribute("currentSrc") or ""
        if cur.startswith("http"): return cur
        srcset = el.get_attribute("srcset") or ""
        if "http" in srcset:
            u = _best_srcset(srcset)
            if u.startswith("http"): return u
        for attr in ["src", "data-src", "data-original", "data-lazy", "data-image"]:
            v = el.get_attribute(attr) or ""
            if v.startswith("http"): return v
    elif tag == "picture":
        try:
            best = ""; bestw = -1
            for s in el.find_elements(By.TAG_NAME, "source"):
                ss = s.get_attribute("srcset") or ""
                u = _best_srcset(ss); w = 0
                if " " in ss:
                    last = ss.strip().split()[-1]
                    if last.endswith("w"):
                        try: w = int(last[:-1])
                        except: w = 0
                if u.startswith("http") and w >= bestw:
                    best, bestw = u, w
            if best: return best
        except: pass
        try:
            img = el.find_element(By.TAG_NAME, "img")
            return _get_url(img)
        except: pass
    bg = _extract_url_from_style(el.get_attribute("style") or "")
    if bg.startswith("http"): return bg
    return ""

def _largest_visual(scope):
    cands = []
    for css in ["img", "picture", "div[style*='background-image']"]:
        try:
            for e in scope.find_elements(By.CSS_SELECTOR, css):
                r = e.rect or {}
                area = (r.get("width",0) or 0) * (r.get("height",0) or 0)
                if area > 0: cands.append((area, e))
        except: pass
    if not cands: return None
    cands.sort(key=lambda x: x[0], reverse=True)
    return cands[0][1]

def _gallery_container_from(main_el, max_up=6):
    node = main_el
    for _ in range(max_up):
        try: parent = node.find_element(By.XPATH, "./..")
        except: break
        try:
            els = parent.find_elements(By.CSS_SELECTOR, "img, picture, div[style*='background-image']")
        except: els = []
        if len(els) >= 2: return parent
        node = parent
    return main_el

def collect_gallery_images_from_dom(driver, max_images=10):
    main = _largest_visual(driver)
    if not main:
        urls = []
        for m in driver.find_elements(By.CSS_SELECTOR, "meta[property='og:image']"):
            c = m.get_attribute("content") or ""
            if c.startswith("http"): urls.append(c); break
        return urls[:max_images]
    container = _gallery_container_from(main)
    try:
        cands = container.find_elements(By.CSS_SELECTOR, "img, picture, div[style*='background-image']")
    except:
        cands = [main]
    urls, seen = [], set()
    for el in cands:
        try: u = _get_url(el)
        except StaleElementReferenceException: continue
        if u and u.startswith("http") and u not in seen:
            urls.append(u); seen.add(u)
        if len(urls) >= max_images: break
    if not urls:
        u = _get_url(main)
        if u and u.startswith("http"): urls.append(u)
    return urls[:max_images]

# ========= detail / location / title =========
NOISE_TOKENS = ("매너온도","°C","관심","채팅","프로필","신고","온도","판매자","후기","안전","택배","예약중","끌올","당근페이")
def _strip_noise(txt):
    lines = [l.strip() for l in re.split(r"[\r\n]+", txt or "") if l.strip()]
    kept = []
    for ln in lines:
        if any(tok in ln for tok in NOISE_TOKENS):
            continue
        kept.append(ln)
    s = " ".join(kept)
    s = re.sub(r"\d+(?:\.\d+)?\s*°C","",s)
    return clean_text(s)

def expand_description(driver):
    xps = [
        "//button[contains(.,'더보기') or contains(.,'더 보기') or contains(.,'펼치기') or contains(.,'more')]",
        "//*[self::a or self::span or self::div][contains(.,'더보기') or contains(.,'더 보기') or contains(.,'펼치기') or contains(.,'more')]",
    ]
    for _ in range(3):
        clicked = False
        for xp in xps:
            try:
                for b in driver.find_elements(By.XPATH, xp):
                    driver.execute_script("arguments[0].click();", b)
                    clicked = True; time.sleep(0.15)
            except: pass
        if not clicked: break

def extract_full_description(driver):
    sels = [
        "[data-testid='post-description']",
        "[data-testid*='description']",
        "[class*='description']",
    ]
    for s in sels:
        try:
            el = driver.find_element(By.CSS_SELECTOR, s)
            txt = _strip_noise((el.get_attribute("textContent") or el.text or ""))
            if len(txt) >= 10:
                return txt
        except NoSuchElementException:
            pass
    try:
        hdrs = driver.find_elements(By.XPATH, "//*[self::h1 or self::h2 or self::h3 or self::div][contains(normalize-space(.),'상품설명') or contains(normalize-space(.),'상품 설명')]")
        for h in hdrs:
            try:
                cand = h.find_element(By.XPATH, "following::*[normalize-space(string())!=''][1]")
                txt = _strip_noise(cand.get_attribute("textContent") or cand.text or "")
                if len(txt) >= 10 and "매너온도" not in txt:
                    return txt
            except: pass
    except: pass
    long_txt = ""
    for scope_css in ["article","main","section"]:
        try:
            scope = driver.find_element(By.CSS_SELECTOR, scope_css)
            nodes = scope.find_elements(By.CSS_SELECTOR, "p,div")
        except: nodes = []
        for n in nodes:
            t = _strip_noise(n.get_attribute("textContent") or n.text or "")
            if len(t) > len(long_txt) and len(t) >= 20:
                long_txt = t
    return long_txt

def extract_location(driver):
    """
    location: 지역명(…동/…구) 우선, 없으면 '유아동'으로 폴백
    """
    REGION_RE = re.compile(r"(?<![가-힣0-9])([가-힣0-9]{1,20}(?:제?\d{1,2})?동|[가-힣0-9]{1,20}구)(?![가-힣0-9])")
    def find_region_in(text: str):
        if not text: return None
        m = REGION_RE.search(clean_text(text))
        return m.group(1) if m else None

    # 명시 셀렉터
    for sel in ["[data-testid*='region']","[data-testid*='location']","[class*='region']","[class*='location']"]:
        try:
            el = driver.find_element(By.CSS_SELECTOR, sel)
            v = find_region_in(el.get_attribute("textContent") or el.text or "")
            if v: return v
        except NoSuchElementException: pass

    # '매너온도' 컨테이너
    try:
        cont = driver.find_element(By.XPATH, "//*[contains(normalize-space(.),'매너온도')]/ancestor::*[self::section or self::div][1]")
        v = find_region_in(cont.text)
        if v: return v
        for xp in ["preceding::*[self::span or self::div][1]","following::*[self::span or self::div][1]"]:
            try:
                sib = cont.find_element(By.XPATH, xp)
                v = find_region_in(sib.text)
                if v: return v
            except: pass
    except: pass

    # 본문 폴백
    try:
        body = driver.find_element(By.TAG_NAME, "body").text or ""
    except: body = ""
    v = find_region_in(body)
    if v: return v

    return ""

# === Title 전용 ===
CATEGORY_TOKENS = {
    "유아동","디지털기기","생활가전","가구/인테리어","생활/가공식품","스포츠/레저",
    "여성의류","남성의류","게임/취미","뷰티/미용","반려동물용품",
    "도서/티켓/음반","식물","기타 중고물품","티켓/교환권","지역생활","당근 중고거래"
}
def _sanitize_title(text: str) -> str:
    t = clean_text(text)
    if not t: return ""
    t = re.sub(r"\s*\|\s*당근\s*중고거래\s*$", "", t)
    parts = [p.strip() for p in t.split("|") if p.strip()]
    if parts:
        for p in parts:
            if p not in CATEGORY_TOKENS and "당근" not in p:
                return p
        return parts[0]
    return t

def extract_title(driver) -> str:
    def valid(x: str) -> bool:
        s = clean_text(x)
        return bool(s) and (s not in CATEGORY_TOKENS) and len(s) >= 4

    for sel in ["[data-testid='post-title']", "article h1", "main h1", "header h1", "h1"]:
        try:
            raw = driver.find_element(By.CSS_SELECTOR, sel).get_attribute("textContent") or ""
            t = _sanitize_title(raw)
            if valid(t): return t
        except NoSuchElementException: pass

    for prop in ["og:title", "twitter:title"]:
        raw = get_meta(driver, prop)
        t = _sanitize_title(raw)
        if valid(t): return t

    raw = getattr(driver, "title", "") or ""
    t = _sanitize_title(raw)
    if valid(t): return t
    return clean_text(raw)

# ========= 가격 (견고 추출) =========
def _parse_price_from_text(txt: str):
    if not txt: return None
    t = txt.strip()
    if any(k in t for k in ["무료", "나눔"]): return "0"
    m = re.search(r"(\d[\d,]*)\s*원", t)
    if m: return m.group(1).replace(",", "")
    m = re.search(r"^\s*(\d[\d,]*)\s*$", t)
    if m: return m.group(1).replace(",", "")
    return None

def extract_price(driver):
    try:
        el = driver.find_element(By.CSS_SELECTOR, "meta[property='product:price:amount']")
        v = (el.get_attribute("content") or "").strip()
        if v:
            m = re.search(r"(\d[\d,]*)", v)
            if m: return m.group(1).replace(",", "")
    except NoSuchElementException:
        pass

    selectors = [
        "[data-testid='price']",
        "[data-testid*='price']",
        "span[class*='price']",
        "div[class*='price']",
    ]
    for sel in selectors:
        try:
            txt = (driver.find_element(By.CSS_SELECTOR, sel).text or "").strip()
            val = _parse_price_from_text(txt)
            if val is not None: return val
        except NoSuchElementException: pass

    xpaths = ["//*[contains(normalize-space(.),'가격')]", "//*[contains(normalize-space(.),'원')]"]
    for xp in xpaths:
        try:
            node = driver.find_element(By.XPATH, xp)
            if "매너온도" in (node.text or ""): continue
            val = _parse_price_from_text(node.text or "")
            if val is not None: return val
            for sibxp in [".//following-sibling::*[1]", ".//following::*[1]"]:
                try:
                    sib = node.find_element(By.XPATH, sibxp)
                    val = _parse_price_from_text(sib.text or "")
                    if val is not None: return val
                except NoSuchElementException: pass
        except NoSuchElementException: pass

    try:
        body_text = (driver.find_element(By.TAG_NAME, "body").text or "")[:2000]
    except: body_text = ""
    m = re.search(r"(\d[\d,]{3,})\s*원", body_text)
    if m: return m.group(1).replace(",", "")
    if re.search(r"(무료나눔|무료|나눔)", body_text): return "0"
    return ""

# ========= 상세 파서 =========
def parse_detail_with_selenium(driver, url):
    for attempt in range(RETRY):
        try:
            driver.get(url)
            WebDriverWait(driver, PAGE_TIMEOUT).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
            break
        except (TimeoutException, WebDriverException):
            if attempt == RETRY - 1: raise
            time.sleep(1 + attempt)

    time.sleep(random.uniform(0.6, 1.2))
    gentle_scroll(driver, steps=6)

    title = extract_title(driver)

    expand_description(driver)
    description = extract_full_description(driver)

    price = extract_price(driver)

    uploaded_at = ""
    try:
        t = driver.find_element(By.TAG_NAME, "time")
        uploaded_at = t.get_attribute("datetime") or clean_text(t.get_attribute("title") or t.text)
    except NoSuchElementException: pass
    uploaded_at = uploaded_at or get_meta(driver, "article:published_time") or \
                  find_by_xpath_text(driver, ["//*[contains(normalize-space(.), '등록일') or contains(normalize-space(.), '게시일') or contains(normalize-space(.), '업로드')]"])

    full_text = clean_text(driver.find_element(By.TAG_NAME, "body").text)
    is_deal_done = guess_deal_done(title, description, full_text)

    used_condition = find_by_xpath_text(driver, ["//*[contains(normalize-space(.), '사용감') or contains(normalize-space(.), '상태') or contains(normalize-space(.), '컨디션')]/following::*[1]"]) or \
                     find_first_text(driver, ["[aria-label*='상태']","[aria-label*='컨디션']"])

    gentle_scroll(driver, steps=4)
    image_urls = collect_gallery_images_from_dom(driver, max_images=10)

    location = extract_location(driver)

    post_id = str(uuid.uuid4())

    return {
        "post_id": post_id,
        "url": url,
        "title": clean_text(title),
        "description": clean_text(description),
        "price": price,
        "uploaded_date": normalize_uploaded_at(clean_text(uploaded_at)),
        "is_completed": is_deal_done,
        "condition": clean_text(used_condition),
        "image_urls": image_urls,
        "location": clean_text(location),
    }

# ========= webp 변환 저장 =========
def _save_with_pillow(img_bytes: bytes, base_path_noext: str):
    im = Image.open(BytesIO(img_bytes))
    has_alpha = im.mode in ("RGBA", "LA") or ("transparency" in im.info)
    if has_alpha:
        out = base_path_noext + ".png"
        im.save(out, format="PNG", optimize=True)
    else:
        out = base_path_noext + ".jpg"
        if im.mode in ("RGBA","LA","P"): im = im.convert("RGB")
        im.save(out, format="JPEG", quality=95, optimize=True)
    return os.path.basename(out)

def _save_with_imageio(img_bytes: bytes, base_path_noext: str):
    arr = iio.imread(BytesIO(img_bytes))
    im = Image.fromarray(arr)
    has_alpha = im.mode in ("RGBA", "LA") or ("transparency" in im.info)
    if has_alpha:
        out = base_path_noext + ".png"
        im.save(out, "PNG", optimize=True)
    else:
        out = base_path_noext + ".jpg"
        if im.mode in ("RGBA","LA","P"): im = im.convert("RGB")
        im.save(out, "JPEG", quality=95, optimize=True)
    return os.path.basename(out)

def download_images_to_dir(image_urls, target_dir, prefix, start_index=1):
    os.makedirs(target_dir, exist_ok=True)
    saved, idx = [], start_index
    session = requests.Session()
    session.headers.update(REQ_HEADERS)

    for u in image_urls:
        if not u.startswith("http"): continue
        for attempt in range(RETRY):
            try:
                r = session.get(u, timeout=15)
                if r.status_code in (403,429,503):
                    time.sleep(1.0 + attempt); continue
                r.raise_for_status()

                ct  = (r.headers.get("Content-Type") or "").lower()
                ext = infer_ext_from_url(u)
                base = os.path.join(target_dir, f"{prefix}_{idx}")

                is_webp = (ext == ".webp") or ("image/webp" in ct)
                if is_webp:
                    try:
                        if WEBP_SUPPORTED:
                            name = _save_with_pillow(r.content, base)
                        elif IMAGEIO_AVAILABLE:
                            name = _save_with_imageio(r.content, base)
                        else:
                            name = os.path.basename(base + ".webp")
                            with open(base + ".webp", "wb") as f: f.write(r.content)
                        saved.append(name); idx += 1; break
                    except Exception:
                        name = os.path.basename(base + ".webp")
                        with open(base + ".webp", "wb") as f: f.write(r.content)
                        saved.append(name); idx += 1; break

                if ext not in (".jpg",".jpeg",".png",".gif"):
                    if "jpeg" in ct or "jpg" in ct: ext = ".jpg"
                    elif "png" in ct: ext = ".png"
                    elif "gif" in ct: ext = ".gif"
                    else: ext = ".jpg"

                path = base + ext
                with open(path, "wb") as f: f.write(r.content)
                saved.append(os.path.basename(path)); idx += 1; break

            except requests.RequestException:
                if attempt == RETRY - 1: pass
                else: time.sleep(1.0 + attempt)
        time.sleep(random.uniform(0.15, 0.35))
    return saved

# ========= 메인 =========
def main():
    print(f"[info] Pillow WEBP supported: {WEBP_SUPPORTED} | imageio available: {IMAGEIO_AVAILABLE}")
    links = read_links(LINKS_JSON)

    fields = ["id","title","detail","condition","uploaded_date","is_completed","price","location"]
    is_new = not os.path.exists(CSV_PATH)

    driver = make_driver(HEADLESS)
    try:
        with open(CSV_PATH, "a", encoding="utf-8", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=fields)
            if is_new: writer.writeheader()

            for i, url in enumerate(links, 1):
                print(f"[{i}/{len(links)}] {url}")
                try:
                    detail  = parse_detail_with_selenium(driver, url)
                    post_id = detail["post_id"]
                    post_dir = os.path.join(BASE_RAW_DIR, post_id)
                    prefix = f"daangn_{post_id}"

                    _ = download_images_to_dir(detail["image_urls"], post_dir, prefix, start_index=1)

                    row = {
                        "id": post_id,
                        "title": detail["title"],
                        "detail": detail["description"],
                        "condition": detail["condition"],
                        "uploaded_date": detail["uploaded_date"],
                        "is_completed": detail["is_completed"],
                        "price": detail["price"],
                        "location": detail["location"],
                    }
                    writer.writerow(row)
                except Exception as e:
                    print("  ! 실패:", e)
                time.sleep(random.uniform(*SLEEP_RANGE))
    finally:
        driver.quit()

if __name__ == "__main__":
    main()


[info] Pillow WEBP supported: True | imageio available: False
[1/1991] https://www.daangn.com/kr/buy-sell/%EC%8A%A4%ED%86%A0%EC%BC%80-%EC%8A%A4%EC%BF%A0%ED%8A%B8%EC%A0%84%EC%9A%A9-%EC%9C%A0%EB%AA%A8%EC%B0%A8-%EB%B0%A9%ED%92%8D%EC%BB%A4%EB%B2%84-wizqdo9dvrun/?in=%ED%99%94%EB%B4%89%EB%8F%99-6458
[2/1991] https://www.daangn.com/kr/buy-sell/%EC%8A%A4%ED%86%A0%EC%BC%80-%EC%9D%B5%EC%8A%A4%ED%94%8C%EB%A1%9C%EB%A6%AC-v6-%EB%B8%94%EB%9E%99%EB%A9%9C%EB%9E%80%EC%A7%80-%EB%B8%8C%EB%9D%BC%EC%9A%B4%ED%95%B8%EB%93%A4-4z2ao1bt7sup/?in=%EC%95%94%EC%82%AC%EC%A0%9C2%EB%8F%99-440
[3/1991] https://www.daangn.com/kr/buy-sell/%EC%8A%A4%ED%86%A0%EC%BC%80-%EC%9C%A0%EB%AA%A8%EC%B0%A8-%EB%B8%94%EB%9E%99-xv84y7r4bhrx/?in=%EC%9D%B8%EB%8F%99-5776
[4/1991] https://www.daangn.com/kr/buy-sell/%EC%9C%A0%EB%AA%A8%EC%B0%A8-%EC%A0%95%ED%92%88-%EC%8A%A4%ED%86%A0%EC%BC%80-%ED%92%8B%EB%A8%B8%ED%94%84-%EC%99%84%EC%A0%84%EC%83%88%EA%B2%83-aunjd5opb933/?in=%EC%8B%A0%EC%9B%90%EB%8F%99-4402
[5/1991] https://www.daangn.com/kr/buy-

In [49]:
## 데이터 확인

import pandas as pd

df = pd.read_csv('C:/walker/code/SecondHanded-Strollers-PredictedPrice/src/crawling/sample_data/raw/daangn.csv', encoding = 'utf-8')
df.head()

df[df['id'] == 'fc88cc42-d83c-4a4e-893d-8d831a8f7c0d']

Unnamed: 0,id,title,detail,condition,uploaded_date,is_completed,price,location
4410,fc88cc42-d83c-4a4e-893d-8d831a8f7c0d,403 Forbidden,,,2025-08-21 05:22:53.408249,False,,


# 403으로 막힌 경우 재수집 process
## 현구조로는 1회만 가능(상세링크 수집 로직을 수정해야 함)

In [None]:
# 빈폴더의 uuid list 생성 -> daanggn.csv에서 uuid로 index 찾아서 index list 생성

from pathlib import Path
import pandas as pd
import json

ROOT_DIR = r"C:\walker\code\SecondHanded-Strollers-PredictedPrice\src\crawling\sample_data\raw"
LINKS_PATH = r"C:\walker\code\SecondHanded-Strollers-PredictedPrice\src\crawling\daangn\detail_links.json"
CSV_PATH = r"C:\walker\code\SecondHanded-Strollers-PredictedPrice\src\crawling\sample_data\raw\daangn.csv"   # 두 번째 이미지의 csv 경로
ID_COL = "id" # CSV에서 uuid가 들어있는 컬럼명

def find_empty_uuid_dirs(root: str) -> list[str]:
    """하위에 '파일'이 하나도 없는 폴더를 빈 폴더로 간주 (빈 하위폴더만 있어도 빈 것으로 처리)."""
    empty = []
    root_p = Path(root)
    for d in root_p.iterdir():
        if not d.is_dir():
            continue
        # d 아래에 파일이 하나라도 있으면 비어있지 않음
        has_any_file = any(p.is_file() for p in d.rglob("*"))
        if not has_any_file:
            empty.append(d.name)  # 폴더명이 곧 uuid
    return empty

def read_csv_safely(path: str) -> pd.DataFrame:
    """인코딩 이슈 방지: utf-8 → utf-8-sig → cp949 순으로 시도."""
    for enc in ("utf-8", "utf-8-sig", "cp949"):
        try:
            return pd.read_csv(path, encoding=enc)
        except Exception:
            pass
    # 그래도 실패하면 에러를 띄움
    return pd.read_csv(path)

def indices_for_uuids(df: pd.DataFrame, uuids: list[str], id_col: str = "id") -> list[int]:
    """
    uuid 리스트 순서대로 DataFrame에서 해당 id의 '첫 번째 행 인덱스(int)'를 반환.
    없으면 -1. (중복 id가 있으면 첫 번째 것 기준)
    """
    # id -> 첫 번째 index 매핑 (중복 id 처리용)
    first_idx_map = df.reset_index().groupby(id_col)["index"].first()
    return [int(first_idx_map.get(u, -1)) for u in uuids]

# === 실행 ===
empty_uuid_list = find_empty_uuid_dirs(ROOT_DIR)
print(f"빈 폴더 수: {len(empty_uuid_list)}")
# print(empty_uuid_list[:10])  # 필요하면 샘플 확인

df = read_csv_safely(CSV_PATH)
index_list = indices_for_uuids(df, empty_uuid_list, ID_COL)

print(f"매칭된 인덱스 리스트 길이: {len(index_list)}")

빈 폴더 수: 62
매칭된 인덱스 리스트 길이: 62


In [None]:
# 상세링크 파일(list)에서 index list로 빈이미지 링크 리스트 생성

import json

empty_image_link_list = []

with open('detail_links.json', 'r') as f:
    data = json.load(f)
    
    for i in index_list:
        empty_image_link_list.append(data[i])

In [None]:
# JSON 파일로 저장
with open('empty_links.json', 'w') as f:
    json.dump(empty_image_link_list, f, indent=4)

In [62]:
# 빈폴더와 해당하는 uuid에 id에 해당하는 csv에서 row 삭제
# -*- coding: utf-8 -*-
# pip install pandas
from pathlib import Path
import shutil
import pandas as pd

# === 경로 설정 ===
ROOT_DIR = r"C:\walker\code\SecondHanded-Strollers-PredictedPrice\src\crawling\sample_data\raw"
CSV_PATH = r"C:\walker\code\SecondHanded-Strollers-PredictedPrice\src\crawling\sample_data\raw\daangn.csv"
ID_COL = "id"

def read_csv_safely(path: str) -> pd.DataFrame:
    for enc in ("utf-8", "utf-8-sig", "cp949"):
        try:
            return pd.read_csv(path, encoding=enc)
        except Exception:
            pass
    return pd.read_csv(path)

def list_empty_dirs(root: str) -> list[Path]:
    """재귀적으로 '파일'이 하나도 없으면 빈 폴더로 간주."""
    root_p = Path(root)
    empty = []
    for d in root_p.iterdir():
        if d.is_dir():
            if not any(p.is_file() for p in d.rglob("*")):
                empty.append(d)
    return empty

# === 1) 빈 폴더 수집 ===
empty_dirs = list_empty_dirs(ROOT_DIR)         # Path 객체 리스트
empty_uuids = [d.name for d in empty_dirs]     # 폴더명(UUID)

# === 2) CSV 로드 & 매칭 준비 ===
df = read_csv_safely(CSV_PATH)

# id -> 첫 번째 인덱스 매핑(중복 id는 첫 번째만 지움)
id_first_idx = df.reset_index().groupby(ID_COL)["index"].first()

# CSV에 존재하는 UUID만 선택해서 1:1 매칭
matched_dirs = [d for d in empty_dirs if d.name in id_first_idx.index]
matched_indices = [int(id_first_idx[d.name]) for d in matched_dirs]

# 통계(정보용)
skipped_no_row = len(empty_dirs) - len(matched_dirs)
dup_id_count = (df[ID_COL].duplicated(keep=False).sum() > 0)

# === 3) row 삭제(매칭된 것만, 1 UUID -> 1 row) ===
df2 = df.drop(index=matched_indices).reset_index(drop=True)
df2.to_csv(CSV_PATH, index=False, encoding="utf-8-sig")

# === 4) 폴더 삭제(매칭된 것만) ===
deleted_uuids = []
for d in matched_dirs:
    try:
        shutil.rmtree(d)
        deleted_uuids.append(d.name)
    except Exception as e:
        print(f"[WARN] 폴더 삭제 실패: {d} -> {e}")

# === 5) 일치 검증 ===
deleted_rows = len(matched_indices)
deleted_folders = len(deleted_uuids)
assert deleted_folders == deleted_rows, f"개수 불일치: 폴더 {deleted_folders}, row {deleted_rows}"

print(f"[완료] 삭제된 폴더: {deleted_folders}개, 삭제된 row: {deleted_rows}개 (1:1 일치)")
print(f"[참고] 빈폴더(전체): {len(empty_dirs)}개, CSV에 id가 없어 스킵한 폴더: {skipped_no_row}개")
if dup_id_count:
    print("[참고] CSV에 중복 id가 있어도 각 UUID당 첫 번째 행만 삭제했습니다.")


[완료] 삭제된 폴더: 62개, 삭제된 row: 62개 (1:1 일치)
[참고] 빈폴더(전체): 62개, CSV에 id가 없어 스킵한 폴더: 0개
