In [None]:
#20250516 FRI, jeri
"""
250516
1. 코드 함수화
2. 코드 간결화
3. 여러 개의 영상 처리 코드 추가
4. 여러 개의 제품 처리 코드 추가
"""
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from typing import List, Tuple, Union
from bs4 import BeautifulSoup
from datetime import datetime
from tqdm import tqdm
from pprint import pprint
import pandas as pd
import json
import time
import os
import re


# 옵션 설정
options = webdriver.ChromeOptions()
# options.add_argument('--headless')
options.add_argument('--disable-gpu')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--start-maximized') # 유튜브 더보기 클릭 및 정보 가져오기
# 크롬 드라이버 실행(자동 버전 관리)
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)

# 오늘 날짜 문자열 (YYYYMMDD)
today_str = datetime.today().strftime('%y%m%d')
today_str_four = datetime.today().strftime('%Y%m%d')

# 250515 여러 영상을 처리할 경우를 대비해서 video_ids를 리스트로 바꿈
video_ids = ['w7uwgwj6aZM', '0V92duTzsVw']
df_list = []

# 250516 추가
"""
==================
모든 영상 순회하는 코드
==================
"""
for vid in tqdm(video_ids, desc="영상 처리 중"):
    try:
        df = collect_video_data(driver, vid)
        df_list.append(df)
    except Exception as e:
        print(f"오류 발생 - {vid}: {e}")


"""
=================
영상 하나 처리하는 함수
=================
"""
# 250516 추가
def collect_video_data(driver, video_id):
    video_url = f'https://www.youtube.com/watch?v={video_id}'
    driver.get(video_url)
    # 페이지 로딩 대기
    time.sleep(5)

    # "더보기" 버튼 클릭
    # 250515 추가
    try:
        # '더보기' 버튼이 포함된 설명 섹션이 로드될 때까지 최대 20초 대기
        WebDriverWait(driver, 20).until(
            EC.presence_of_element_located((By.ID, "description-inline-expander"))
        )
        # '더보기' 버튼이 클릭 가능 상태가 될 때까지 최대 10초 대기
        expand_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, "tp-yt-paper-button#expand"))
        )
        # JavaScript를 사용하여 '더보기' 버튼 클릭 (Selenium 기본 클릭으로 안 되는 경우 대비)
        driver.execute_script("arguments[0].click();", expand_button)
        # 클릭 후 콘텐츠가 로드될 시간을 조금 기다림
        WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "description-inline-expander")))
        
    except Exception as e:
        print("더보기 버튼 클릭 실패:", e)

    """
    =====================
    더보기란에 있는 텍스트 추출
    =====================
    """
    try:
        description = driver.find_element(By.ID, 'description-inline-expander').text
        # 설명이 비어있을 경우, 기본 메시지로 대체
        if not description.strip():
            description = "더보기란에 설명 없음"
    except Exception as e:
        # 설명란 찾기 실패 또는 텍스트 추출 중 에러 발생 시
        print("더보기 클릭 또는 설명 추출 실패:", e)
        description = "더보기란에 설명 없음"

    soup = BeautifulSoup(driver.page_source, 'html.parser') # 조회수, 업로드일, 제품 수 추출
    spans = soup.select('span.style-scope.yt-formatted-string.bold')
    info_texts = [span.get_text(strip=True) for span in spans if span.get_text(strip=True)] # 공백 제외하고 실제 텍스트만 추출

    youtube_upload_date, youtube_view_count, youtube_product_count = extract_video_info(info_texts)
    product_info_list = get_product_info(driver)

    """
    ===============================
    채널명, 영상 제목, 구독자 수의 XPath
    ===============================
    """
    # 250514 추가
    base_info_results = {} # 영상에서 들고온 정보들을 담을 딕셔너리

    # XPath
    base_info = {
        "channel": '//*[@id="text"]/a',
        "title": '//*[@id="title"]/h1',
        "subscriber": '//*[@id="owner-sub-count"]',
    }

    for key, xpath in base_info.items():
        try:
            element = WebDriverWait(driver, 60).until(EC.presence_of_element_located((By.XPATH, xpath))) # 요소가 로딩될 때까지 최대 60초 대기
            text = element.text # 요소에서 텍스트 추출
            base_info_results[key] = (parse_subscriber_count(text) if key == "subscriber" else text) # 구독자 수일 경우 숫자 형식으로 변환
                
        except Exception as e:
            # 오류 발생 시 메시지 출력하고 "Not found" 저장
            print(f"Error fetching {key}: {e}")
            base_info_results[key] = "Not found"

    """
    =======================
    컬럼 항목
    =======================
    """
    youtube_upload_date, youtube_view_count, youtube_product_count = extract_video_info(info_texts)

    #250513 추가
    #250515 조회수, 업로드일, 포함된 제품 개수의 변수명 변경, 제품 관련 정보 컬럼 추가
    base_info_results.update({
        '추출일': today_str_four,
        '영상 업로드일': youtube_upload_date,
        '영상 링크': video_url,
        '조회수': youtube_view_count,
        '포함된 제품 개수': youtube_product_count,
        '더보기란 설명': description,
    })

    """
    =========
    컬럼명 매핑
    =========
    """
    # 250513 추가
    column_name_mapping = {
        "channel": "채널명",
        "title": "영상 제목",
        "subscriber": "구독자 수",
    }

    """
    =========================
    원하는 컬럼 순서를 리스트로 지정
    =========================
    """
    # 250513 추가
    column_order = [
        '추출일', '영상 업로드일', '채널명', '영상 제목', '영상 링크', '구독자 수', '조회수',
        '포함된 제품 개수', '제품 이미지 링크', '제품명', '제품 가격(원)', '제품 구매 링크', '더보기란 설명'
        ]

    # 250516 추가
    # 제품 정보도 함께 병합(제품 이미지, 제품명, 가격, 구매 링크)
    rows = []
    for product_info in product_info_list:
        row_data = base_info_results.copy()
        row_data.update(product_info)
        rows.append(row_data)

    # 엑셀 저장 전, 비어있는 제품 정보를 가진 행 제거
    filtered_product_data = []

    for row in rows:
        product_fields = [
            row.get('제품 이미지 링크'),
            row.get('제품명'),
            row.get('제품 가격(원)'),
            row.get('제품 구매 링크'),
        ]
        if any(product_fields):
            filtered_product_data.append(row)

    # 250513 추가
    df = pd.DataFrame(rows) # DataFrame 생성 및 컬럼명 변경 반영
    df = df.rename(columns=column_name_mapping)
    df = df[column_order] # 원하는 컬럼 순서로 재정렬
    return df

"""
===========================
더보기란에 있는 제품 정보 가져오기
===========================
"""
#250515 추가
#250516 코드 수정(간결화)
# 영상 더보기에 상품 영역이 표시되는 경우
def get_product_info(driver, timeout=10):
    wait = WebDriverWait(driver, timeout)
    product_info_list = []

    try:
        # 제품 정보가 담긴 요소가 페이지에 로드될 때까지 대기
        wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "ytd-merch-shelf-item-renderer")))
        # 상품 영역(여러 개일 수 있음) 요소들을 모두 가져오기
        product_elements = driver.find_elements(By.CSS_SELECTOR, "ytd-merch-shelf-item-renderer")

        # 최소 1개 이상 제품이 존재할 경우
        for product in product_elements:
            try:
                product_img_link = product.find_element(By.CSS_SELECTOR, "#img").get_attribute("src")
                product_name = product.find_element(By.CSS_SELECTOR, ".product-item-title").text.strip()
                product_price = product.find_element(By.CSS_SELECTOR, ".product-item-price").text.replace("₩", "").strip()
                link_raw = product.find_element(By.CSS_SELECTOR, ".product-item-description").text.strip()
                product_link = link_raw if not link_raw.startswith("http") else link_raw

                if any([product_img_link, product_name, product_price, product_link]):
                    product_info_list.append({
                        '제품 이미지 링크': product_img_link,
                        '제품명': product_name,
                        '제품 가격(원)': product_price,
                        '제품 구매 링크': product_link,
                    })
                
            except Exception as inner_e:
                print("🔸 일부 제품 정보 추출 실패:", inner_e)

    except Exception as e:
        print("제품 정보 추출 실패:", e)

    # 유효한 제품 정보가 하나도 없으면 "없음" 표시로
    if not product_info_list:
        product_info_list.append({
            '제품 이미지 링크': "없음",
            '제품명': "없음",
            '제품 가격(원)': "없음",
            '제품 구매 링크': "없음"
        })

    return product_info_list
"""
========================
업로드일, 조회수, 제품 수 추출
========================
"""
#250515 추가
#250516 코드 수정(간결화)
def extract_video_info(info_texts: List[str]) -> Tuple[str, Union[str, None], Union[int, None]]:
    # 기본값 설정
    youtube_upload_date = "업로드 날짜 정보 못 찾음"
    youtube_view_count = None
    youtube_product_count = None

    # 리스트 내 문자열을 하나씩 검사
    for text in info_texts:
        # 형식: 2025. 5. 12.
        if m :=re.search(r'(\d{4})\.\s*(\d{1,2})\.\s*(\d{1,2})\.', text):
            year, month, day = m.groups()
            youtube_upload_date = f"{year}{int(month):02d}{int(day):02d}"
        # 형식: 2025년 5월 12일
        elif m := re.search(r'(\d{4})년\s*(\d{1,2})월\s*(\d{1,2})일', text):
            year, month, day = m.groups()
            youtube_upload_date = f"{year}{int(month):02d}{int(day):02d}"


        # 조회수 추출: 쉼표가 포함된 숫자 (예: 472,329)
        match_views = re.search(r'조회수\s*([\d,]+)회', text)
        if match_views:
            youtube_view_count = match_views.group(1) if match_views else "조회수 정보 없음"

        # 제품 수 추출: 예) "1개 제품"
        match_products = re.search(r'(\d+)\s*개\s*제품', text)
        if match_products:
            youtube_product_count = int(match_products.group(1)) if match_products else "포함된 제품 없음"

    # 최종 결과 반환
    return youtube_upload_date, youtube_view_count, youtube_product_count

    # 함수 사용 예
    youtube_upload_date, youtube_view_count, youtube_product_count = extract_video_info(info_texts)

"""
=======================
구독자 수 표기 방식 변환 함수
=======================
"""
#250514 추가
def parse_subscriber_count(text):
    """
    '구독자 281만명' → '2,810,000'
    '구독자 3.45천명' → '3,450'
    '구독자 102명' → '102'
    """
    match = re.search(r'([\d\.]+)([천만]?)명', text) # 숫자+단위 추출
    if not match:
        return "0" # 매칭 안 되면 0 반환

    number = float(match.group(1)) # 숫자 부분 (정수/소수점 포함)
    unit = match.group(2) # 단위: 천/만

    # 단위에 따라 숫자 변환 
    if unit == '천':
        number *= 1_000
    elif unit == '만':
        number *= 10_000

    return f"{int(number):,}"  # 쉼표 넣은 문자열 반환

# 디버깅
product_infos = get_product_info(driver)
pprint({"📦 최종 제품 정보": product_infos})

"""
=================
파일명 중복 방지 저장
=================
"""
# 250513 추가
base_filename = f'침착맨_유튜브_{today_str}'
ext = '.xlsx'
filename = f'{base_filename}{ext}'
counter = 1
while os.path.exists(filename):
    filename = f'{base_filename}_{counter}{ext}'
    counter += 1

# 250516 추가
if df_list:
    df = pd.concat(df_list, ignore_index=True)
    df.to_excel(filename, index=False)
    print(f"\n✅ 엑셀 파일로 저장 완료: {filename}")
else:
    print("❌ 저장할 데이터프레임이 없습니다.")

#250515 수정 (오류 처리 삭제)
driver.quit()



영상 처리 중: 100%|██████████| 2/2 [00:17<00:00,  8.60s/it]


{'📦 최종 제품 정보': [{'제품 가격(원)': '18,000',
                 '제품 구매 링크': 'playwin1.cafe24.com/shop3/product/detail.html?product_no=73&item_code=P00000CV000A',
                 '제품 이미지 링크': 'https://encrypted-tbn0.gstatic.com/shopping?q=tbn:ANd9GcTO9Dsqdb00rCxw1ByR7lqM3AyLWj9q0PfA8vAArk8CoqcKUNTCnQhYxdQ-GSV8CPuiw9Ku7YiU',
                 '제품명': '[침착맨] 혁필 금속 키링'}]}

✅ 엑셀 파일로 저장 완료: 침착맨_유튜브_250516.xlsx
