# 패키지 Import

In [18]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup

In [19]:
# 최신 크롬 드라이버 사용하도록 세팅: 현재 OS에 설치된 크롬 브라우저 버전에 맞게 cache에 드라이버 설치
from selenium.webdriver.chrome.service import Service
service = Service(ChromeDriverManager().install())

In [20]:
import time
import random
import pandas as pd

# 무한 스크롤 함수
## 1) 기능
- 콘텐츠 로딩을 충분히 기다리며 스크롤이 불가할 때까지 스크롤을 무한 반복하는 함수

## 2) 역할
- 유튜브 웹 페이지는 스크롤을 해야 새로운 콘텐츠 정보를 제공하기 때문에, 모든 검색 결과를 확인하기 위해서는 무한 스크롤 기능 필요

In [21]:
def scroll():
    try:        
        # 페이지 내 스크롤 높이 받아오기
        last_page_height = driver.execute_script("return document.documentElement.scrollHeight")
        while True:
            # 임의의 페이지 로딩 시간 설정
            # PC환경에 따라 로딩시간 최적화를 통해 scraping 시간 단축 가능
            pause_time = random.uniform(1, 2)
            # 페이지 최하단까지 스크롤
            driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
            # 페이지 로딩 대기 (스크롤 하면서 데이터 로딩되는 데 시간 대기)
            time.sleep(pause_time)
            # 무한 스크롤 동작을 위해 살짝 위로 스크롤(i.e., 페이지를 위로 올렸다가 내리는 제스쳐)
            driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight-50)")
            time.sleep(pause_time)
            # 페이지 내 스크롤 높이 새롭게 받아오기
            new_page_height = driver.execute_script("return document.documentElement.scrollHeight")
            # 스크롤을 완료한 경우(더이상 페이지 높이 변화가 없는 경우)
            if new_page_height == last_page_height:
                print("스크롤 완료")
                break
                
            # 스크롤 완료하지 않은 경우, 최하단까지 스크롤
            else:
                last_page_height = new_page_height
            
    except Exception as e:
        print("에러 발생: ", e)

# 데이터 Scrap

In [22]:
# 검색 키워드 설정: 키워드 내 띄어쓰기는 URL에서 '+'로 표시되기 때문에 이에 맞게 변환
SEARCH_KEYWORD = '연남동 맛집 추천'.replace(' ', '+')

In [23]:
driver = webdriver.Chrome(service=service) # python으로 크롬을 제어
# 스크래핑 할 URL 세팅
URL = "https://www.youtube.com/results?search_query=" + SEARCH_KEYWORD
# 크롬 드라이버를 통해 지정한 URL의 웹 페이지 오픈
driver.get(URL)
# 웹 페이지 로딩 대기
time.sleep(3)
# 무한 스크롤 함수 실행
scroll()

스크롤 완료


In [24]:
# 페이지 소스 추출
html_source = driver.page_source
soup_source = BeautifulSoup(html_source, 'html.parser')

Service process refused to terminate gracefully with SIGTERM, escalating to SIGKILL.
Traceback (most recent call last):
  File "/Users/rok/workspace/capstone2/youtube-shorts-crawler/venv/lib/python3.12/site-packages/selenium/webdriver/common/service.py", line 172, in _terminate_process
    self.process.wait(60)
  File "/opt/homebrew/Cellar/python@3.12/3.12.3/Frameworks/Python.framework/Versions/3.12/lib/python3.12/subprocess.py", line 1264, in wait
    return self._wait(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.3/Frameworks/Python.framework/Versions/3.12/lib/python3.12/subprocess.py", line 2045, in _wait
    raise TimeoutExpired(self.args, timeout)
subprocess.TimeoutExpired: Command '['/Users/rok/.wdm/drivers/chromedriver/mac64/125.0.6422.141/chromedriver-mac-arm64/chromedriver', '--port=51402']' timed out after 60 seconds
Service process refused to terminate gracefully with SIGTERM, escalating to SIGKILL.
Traceback (most rece

# 데이터 추출

In [26]:
content_total = soup_source.find_all('a', id='video-title')
# 콘텐츠 제목만 추출
content_total_title = list(map(lambda data: data.get_text().replace("\n", ""), content_total))
# 콘텐츠 링크만 추출
content_total_link = list(map(lambda data: "https://youtube.com" + data["href"], content_total))

# 조회수 & 업로드 날짜 추출
content_record_src = soup_source.find_all(class_ = 'style-scope ytd-video-meta-block')
content_view_cnt = [content_record_src[i].get_text().replace('조회수 ', '') for i in range(5, len(content_record_src), 10)]
content_upload_date = [content_record_src[i].get_text() for i in range(6, len(content_record_src), 10)]

# Ensure all lists are the same length
target_length = len(content_total_title)
min_length = min(len(content_total_title), len(content_total_link), len(content_view_cnt), len(content_upload_date))
content_total_title = content_total_title[:target_length]
content_total_link = content_total_link[:target_length]
content_view_cnt = content_view_cnt[:target_length]
content_upload_date = content_upload_date[:target_length]

# 딕셔너리 포맷팅
content_total_dict = {
    'title': content_total_title, 
    'link': content_total_link, 
    'view': content_view_cnt,
    'upload_date': content_upload_date
}

# Create DataFrame
df = pd.DataFrame(content_total_dict)
df.to_csv("../data/youtube_shorts", encoding='utf-8-sig', index=False)


NameError: name 'content_total' is not defined

In [None]:
# 데이터 추출

# 모든 비디오 요소를 찾음
videos = driver.find_elements(By.CSS_SELECTOR, 'ytd-video-renderer, ytd-grid-video-renderer')

# shorts만 필터링
shorts = [video for video in videos if 'shorts' in video.find_element(By.CSS_SELECTOR, 'a').get_attribute('href')]

# shorts 콘텐츠 제목과 링크 추출
shorts_titles = [short.find_element(By.CSS_SELECTOR, '#video-title').text for short in shorts]
shorts_links = [short.find_element(By.CSS_SELECTOR, 'a').get_attribute('href') for short in shorts]

# shorts의 조회수와 업로드 날짜 추출
content_view_cnt = []
content_upload_date = []
content_video_pk = []

for short in shorts:
    meta_block = short.find_element(By.CSS_SELECTOR, 'ytd-video-meta-block')
    view_count = meta_block.find_element(By.XPATH, './/span[contains(text(), "조회수")]').text.replace('조회수 ', '')
    upload_date = meta_block.find_element(By.XPATH, './/span[contains(text(), "전")]').text  # Assumes the date contains "전" (ago in Korean)
    content_view_cnt.append(view_count)
    content_upload_date.append(upload_date)

    video_link = short.find_element(By.CSS_SELECTOR, 'a').get_attribute('href')
    video_id = video_link.split('/')[-1]  # link의 '/'를 기준으로 slicing 해서 리스트로 넣고 마지막 원소 가져오기
    content_video_pk.append(video_id)

# 리스트 길이 맞춤
min_length = min(len(shorts_titles), len(shorts_links), len(content_view_cnt), len(content_upload_date))
shorts_titles = shorts_titles[:min_length]
shorts_links = shorts_links[:min_length]
content_view_cnt = content_view_cnt[:min_length]
content_upload_date = content_upload_date[:min_length]

# 딕셔너리 포맷팅
content_total_dict = {
    'title': shorts_titles,
    'link': shorts_links,
    'view': content_view_cnt,
    'upload_date': content_upload_date,
    'video_pk': content_video_pk
}

: 

# 데이터프레임 저장

In [None]:
df = pd.DataFrame(content_total_dict)
df.to_csv("../data/youtube_shorts.csv", encoding='utf-8-sig')

: 

# 데이터 확인

In [None]:
df

: 

In [None]:
import os
from dotenv import load_dotenv
from openai import OpenAI

# API 키를 환경변수에서 가져오기
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")

client = OpenAI(api_key=api_key)

def extract_store_name_from_title(title):
    response = client.completions.create(
        model="gpt-3.5-turbo-instruct",
        prompt=f"Extract the store name from the following video title: {title}",
        max_tokens=50,
        n=1,
        stop=None,
        temperature=0.5,
    )

    store_name = response.choices[0].text.strip()
    return store_name

# gpt가 추측한 식당 이름 리스트
guess_name = []

for title in shorts_titles:
    store_name = extract_store_name_from_title(title)
    guess_name.append(title)
    print(f"Video Title: {title} -> Store Name: {store_name}")





: 