#### W컨셉 크롤링 및 SQL

In [3]:
from selenium import webdriver   # 최신버전이여야하기 때문에 webdriver맞춰줌
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By   # By.ID / By.CLASS_NAME, By.NAME, By.XPATH
from webdriver_manager.chrome import ChromeDriverManager  # 드라이버 버전 맞추기
import time, re
import pandas as pd

service = Service(ChromeDriverManager().install())
options = Options()  # 인스턴스 객체 = 프로토타입 객체
# options.add_argument("--headless")  # 브라우저 창 띄우지 않고 (그래픽이 낭비됨)
# options.add_argument("--disable-gpu")
options.add_argument("--disable-dev-shm-usage")  # 메모리가 너무 많을 때 로컬컴퓨터 공간 사용하기
options.add_argument("--start-maximized")  # 브라우저 실행 시 화면 최대화
options.add_argument("--window-size=1920x1080")  # 햐상도 수동 지정
options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0")  # 실제브라우저처럼 가장
options.add_argument("--lang=ko_KR")

driver = webdriver.Chrome(service=service, options=options)  # 웹드라이버 안에 가상의 크롬

category_urls = {
    "여성의류" : "https://display.wconcept.co.kr/rn/best?displayCategoryType=10101&displaySubCategoryType=ALL&gnbType=Y",
    "가방" : "https://display.wconcept.co.kr/rn/best?displayCategoryType=10102&displaySubCategoryType=ALL&gnbType=Y",
    "신발" : "https://display.wconcept.co.kr/rn/best?displayCategoryType=10103&displaySubCategoryType=ALL&gnbType=Y",
    "액세서리" : "https://display.wconcept.co.kr/rn/best?displayCategoryType=10104&displaySubCategoryType=ALL&gnbType=Y"
}

all_data = []

def extract_product_id_from_imgsrc(src: str) -> str | None :  # src: srt (하나의 인자를 받는 이름 : src, 문자열 str타입) / srt | None (문자열 혹은 None을 반환)
    if not src :  # src가 빈 문자열이면
        return None  # 함수를 진행하지 않고 
    m = re.search(r"/(\d+)(?:[_\.][^/?]+)?(?:\?.*)?$", src)  # m (matck)
    # /(\d+) : / => url경로 중 슬래시로 시작하는, (\d+) => 하나이상의 숫자
    # (?:[_\.][^\?]+)? : (?:) => 찾아오지 않을 그룹, [_\.] => _또는. 으로 시작, [^/?]+ => /나 ?를 제외한 문자들, ? => 방금 정규표현식한 것들이 없어도됨
    # (?:\?.*)? : (?:\?.*) => 물음표로 시작해서 그 뒤에 어떤 문자열이든 올 수 있음, ? => 이 문자열이 있어도 되고 없어도 됨
    # $ => 문자열 끝
    return m.group(1) if m else None  # 첫번째그룹(\d+)의 값을 가져옴(상품ID), m이 없다면 None반환

def build_product_url(pid: str | None) -> str :  # build_product_url (함수이름), pid: str | None(pid: 상품id, srt일수도있고 값이 없을 수도 있고), ->str (함수는 항상 문자열을 반환)
    return f"https://www.wconcept.co.kr/Product/{pid}" if pid else ""  #pid가 있으면 f~를 반환, 없으면 빈문자열

for cat_name,url in category_urls.items() :  # 키값과 밸류값을 각각 가져와야함)
    driver.get(url)  
    time.sleep(3) # 한페이지당 3초

    for _ in range(10) : # 스크롤10번 반복하기 때문에 값을 찾아올 필요 없어서 _
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")  # 자바스크립트 문법 씀
        time.sleep(2)

    products = driver.find_elements(By.CSS_SELECTOR, "div.product-item.item.type-all")[0:100]

    for product in products :
        try :
            name = product.find_element(By.CSS_SELECTOR, "span.prdc-title span.text.detail").text.strip()
            try : 
                img = product.find_element(By.CSS_SELECTOR, "button.area-img img")
                src = img.get_attribute("src")
                pid = extract_product_id_from_imgsrc(src)
                url = build_product_url(pid)  # 마지막 최종 url
            except:
                url = ""
            try : 
                review_info = product.find_element(By.CSS_SELECTOR, "span.area-info span.text.cnt").text
                review_count = int(re.sub(r"[^\d]", "", review_info)) if review_info else 0  # 리뷰가 있으면 줄거고 없으면 0
            except : 
                review_count = 0
            try :
                rating_text = product.find_element(By.CSS_SELECTOR, "span.area-info em.score").text
                review_avg = float(re.findall(r"[0-9.]+", rating_text)[0])
            except :
                review_avg = 0.0

            all_data.append({
                "카테고리" : cat_name,
                "상품명" : name,
                "url" : url,
                "리뷰수" : review_count,
                "평균평점" : review_avg
            })
            
        except Exception as e :
            print("에러발생 :", e)
            continue

driver.quit()

df = pd.DataFrame(all_data)

df_top100 = df.sort_values(by="리뷰수", ascending=False).head(100).reset_index(drop=True)  # 특정값을 정렬, GROUPBY 개념, reset_index(drop=True) => 기존 index지움

category_summary = df.groupby("카테고리").agg({
    "리뷰수" : "sum",  # 총 리뷰수 더함
    "평균평점" : "mean"  # 평균 평점 구함
}).reset_index()  # agg => 집계, 계산

print(f"""
상위 100개 상품 : 
{df_top100.head(5)}

카테고리별 통계 :
{category_summary}
""")


상위 100개 상품 : 
   카테고리                                    상품명  \
0    신발                OORIGINAL BLACK - 조리 블랙   
1    가방  [14%쿠폰] [단독] Panier Petit Bag_6colors   
2  액세서리   [단독] Round Logo Ball Cap (10 Colors)   
3  액세서리           [단독]BT8032 티디 스퀘어 버클 벨트 18mm   
4  여성의류                       실켓 코튼 티셔츠_8color   

                                            url   리뷰수  평균평점  
0  https://www.wconcept.co.kr/Product/301721294  8478   5.0  
1  https://www.wconcept.co.kr/Product/301756891  3730   5.0  
2  https://www.wconcept.co.kr/Product/301628486  3682   4.9  
3  https://www.wconcept.co.kr/Product/304460283  3131   4.9  
4  https://www.wconcept.co.kr/Product/301218440  2968   4.9  

카테고리별 통계 :
   카테고리    리뷰수   평균평점
0    가방  24218  4.267
1    신발  27342  4.281
2  액세서리  30519  4.016
3  여성의류  31521  4.435



In [5]:
from openpyxl import Workbook

wb = Workbook()

ws_top100 = wb.active
ws_top100.title = "상위100상품"

ws_top100.append(["카테고리", "상품명", "URL", "리뷰수", "평균평점"])

for idx, row in df_top100.iterrows() :
    ws_top100.append([row["카테고리"], row["상품명"], row["url"], row["리뷰수"], row["평균평점"]])

ws_category = wb.create_sheet(title="카테고리별 통계")

ws_category.append(["카테고리", "총 리뷰수", "평균평점"])

for idx, row in category_summary.iterrows() :
    ws_category.append([
        row["카테고리"],  row["리뷰수"],  round(row["평균평점"], 2)
    ])

filename = "wconcept_result.xlsx"
wb.save(filename)
print(f"✅1개의 엑셀파일에 시트 2개 저장 완료 : {filename}")

✅1개의 엑셀파일에 시트 2개 저장 완료 : wconcept_result.xlsx


In [6]:
import pandas as pd
import pymysql
from sqlalchemy import create_engine, text

DB_NAME = "wconcept_db_v1"
TABLE_TOP100 = "top100_products"
TABLE_CATEGORY = "category_summary"
USER = "root"
PASSWORD = "753951DnDn!!"
HOST = "127.0.0.1"
PORT = 3306

xlsx_path = "wconcept_result.xlsx"
SHEET_TOP100 = "상위100상품"
SHEET_CATEGORY = "카테고리별 통계"

df_top100 = pd.read_excel(xlsx_path, sheet_name=SHEET_TOP100)
df_category = pd.read_excel(xlsx_path, sheet_name=SHEET_CATEGORY)

top100_cols = {
    "카테고리" : "category",
    "상품명" : "product_name",
    "URL" : "url",
    "리뷰수" : "review_count",
    "평균평점" : "avg_rating"
}

category_cols = {
    "카테고리" : "category",
    "총 리뷰수" : "total_review_count",
    "평균평점" : "avg_rating"
}

df_top100 = df_top100.rename(columns=top100_cols)[list(top100_cols.values())]  # 재할당
df_category = df_category.rename(columns=category_cols)[list(category_cols.values())]

df_top100 = df_top100.dropna(subset=["category", "product_name", "review_count", "avg_rating"])   # 값이 존재 시 버림, url은 100% 있음 (위 try문)
df_category = df_category.dropna(subset=["category", "total_review_count", "avg_rating"])

df_top100["review_count"] = df_top100["review_count"].astype(int)  # 문자열을 숫자로 다시 안전장치
df_top100["avg_rating"] = df_top100["avg_rating"].astype(float).round(2)
df_category["total_review_count"] = df_category["total_review_count"].astype(int)
df_category["avg_rating"] = df_category["avg_rating"].astype(float).round(2)


conn_admin = pymysql.connect(host=HOST, user=USER, password=PASSWORD, port=PORT, autocommit=True)

with conn_admin.cursor() as cur :
    cur.execute(f"CREATE DATABASE IF NOT EXISTS `{DB_NAME}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;")  # 특수문자 같은 경우 에러 안뜨도록
conn_admin.close()    

engine = create_engine(
    f"mysql+pymysql://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB_NAME}?charset=utf8mb4"  # 다시 접속
)

ddl_top100 = f"""
CREATE TABLE IF NOT EXISTS `{TABLE_TOP100}` (
    `id` BIGINT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
    `category` VARCHAR(50) NOT NULL,
    `product_name` VARCHAR(255) NOT NULL,
    `url` TEXT NULL,
    `review_count` INT NOT NULL,
    `avg_rating` DECIMAL(3,2) UNSIGNED NOT NULL,
    `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""

ddl_category = f"""
CREATE TABLE IF NOT EXISTS `{TABLE_CATEGORY}` (
    `id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
    `category` VARCHAR(50) NOT NULL,
    `total_review_count` INT NOT NULL,
    `avg_rating` DECIMAL(3,2) UNSIGNED NOT NULL,
    `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""

with engine.begin() as conn :  
    conn.execute(text(ddl_top100))  
    conn.execute(text(ddl_category))

with engine.begin() as conn :
    df_top100[["category", "product_name", "url", "review_count", "avg_rating"]].to_sql(
        TABLE_TOP100, con=conn, if_exists="append", index=False
    )
    df_category[["category", "total_review_count", "avg_rating"]].to_sql(
        TABLE_CATEGORY, con=conn, if_exists="append", index=False
    )

def ensure_index(conn, schema, table, index_name, colum_list) :  # 값이 제대로 생성 안될경우 검증용 함수
    q = text(""" 
        SELECT 1 
        FROM information_schema.statistics
        WHERE table_schema=:schema AND
        table_name=:table AND
        index_name=:index_name
    """)
    # 1이 정상적으로 들어온다면 
    exists = conn.execute(q, {"schema" : schema, "table" : table, "index_name" : index_name}).fetchone()
    

with engine.begin() as conn :  # conn이란 이름으로 바꿈
    ensure_index(conn, DB_NAME, TABLE_TOP100, f"idx_{TABLE_TOP100}_category", ["category"])
    ensure_index(conn, DB_NAME, TABLE_TOP100, f"idx_{TABLE_TOP100}_review_count", ["review_count"])
    ensure_index(conn, DB_NAME, TABLE_TOP100, f"idx_{TABLE_TOP100}_avg_rating", ["avg_rating"])
    ensure_index(conn, DB_NAME, TABLE_CATEGORY, f"idx_{TABLE_CATEGORY}_total_review", ["total_review_count"])
    ensure_index(conn, DB_NAME, TABLE_CATEGORY, f"idx_{TABLE_CATEGORY}_avg_rating", ["avg_rating"])

print("✅MySQL 데이터 생성완료")

✅MySQL 데이터 생성완료
