In [30]:
from selenium import webdriver as wd
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
import time
import pandas as pd
import requests
import re


def check_comment_count_is_zero(html_source, css_selector):
    is_comment_count_zero = False
    
    soup = BeautifulSoup(html_source, 'lxml')
    datas = soup.select(css_selector)
    print(datas)
    if len(datas) > 0:
        comment_count_data = datas[0]
        
        if comment_count_data.text == "0 Comments":
            is_comment_count_zero = True
            
    return is_comment_count_zero

def scroll(driver, height=700):
    driver.execute_script(f"window.scrollTo(0, {height});")

def scroll_page(driver, target='essential'):
    last_page_height = driver.execute_script("return document.documentElement.scrollHeight")

    if target == 'essential':
        for _ in range(3):
            driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
            time.sleep(1.0)
            new_page_height = driver.execute_script("return document.documentElement.scrollHeight")
            last_page_height = new_page_height
            
    elif target == 'comment':
        while True:
            driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
            
            time.sleep(2.0)
            
            new_page_height = driver.execute_script("return document.documentElement.scrollHeight")
            
            if new_page_height == last_page_height:
                break
                
            last_page_height = new_page_height
    
    return driver


def get_url_title_in_html_source(html_source, css_selector):
    titles, urls = [], []
    
    soup = BeautifulSoup(html_source, 'lxml')
    
    datas = soup.select(css_selector)
    
    for data in datas:
        title = data.text.replace('\n', '')
        url = "https://www.youtube.com" + data.get('href')
        
        titles.append(title)
        urls.append(url)
        
    return titles, urls

        
def get_channel_video_url_list(channel_url):
    titles = []
    urls = []
    
    driver = wd.Chrome(ChromeDriverManager().install())
    driver.maximize_window()
    
    driver.get(channel_url)
    
    driver = scroll_page(driver=driver)
        
    html_source = driver.page_source

    driver.quit()
    
    url_title_css_selector = "#a.yt-simple-endpoint.focus-on-expand.style-scope.ytd-rich-grid-media"
    
    titles, urls = get_url_title_in_html_source(
        html_source=html_source, 
        css_selector=url_title_css_selector
    )
    return titles, urls

def crawl_youtube_page_html_sources(urls, driver_dir):
    html_sources = []

    for idx in range(len(urls)):
        driver = wd.Chrome(service=Service(driver_dir))
        driver.maximize_window()
        driver.get(urls[idx]['url'])
        
        time.sleep(2.0)
        # Expand detail
        click_detail = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, '#expand')))
        click_detail.click()
        scroll(driver)
        
        comment_css_selector = "yt-formatted-string.count-text.style-scope.ytd-comments-header-renderer"
        
        html_source = driver.page_source
        
        # is_comment_count_zero = check_comment_count_is_zero(
        #     html_source=html_source, css_selector=comment_css_selector
        # )
        
        # if not is_comment_count_zero:
        driver = scroll_page(driver=driver)

        html_source = driver.page_source
        html_sources.append(html_source)
        driver.quit()
    
    return html_sources

def post_processing_text(text):
    return text.replace('\n', '').replace('\t', '').replace('                ','') if text is not None else ""

def pack_space(text):
    return " ".join(text.split())

def divide_watch_shorts(titles, urls):
    watch_url, shorts_url = [], []
    
    for title, url in zip(titles, urls):
        url_type = url.split("/")[3].split("?")[0]
        
        if url_type == "watch":
            watch_url.append({
                "title": title, 
                "url": url
            })
        elif url_type == "watch":
            shorts_url.append({
                "title": title, 
                "url": url
            })
            
    return watch_url, shorts_url

def get_title_view_date(url_dict ,html_source):
    video_info_result_dict = {
        "title": url_dict['title'], 
        "video_url": url_dict['url'],
        "comment": []
    }

    title_selector = "#title > h1 > yt-formatted-string"
    view_selector = "#info > span:nth-child(1)"
    date_selector = "#info > span:nth-child(3)"
    # description_selector = "#description-inline-expander > yt-attributed-string > span > span:nth-child(6)"
    no_comment_selector = "#count > yt-formatted-string > span:nth-child(1)"
    soup = BeautifulSoup(html_source, 'lxml')

    title_list = soup.select(title_selector)
    view_count_list = soup.select(view_selector)
    date_list = soup.select(date_selector)
    no_of_comment_list = soup.select(no_comment_selector)
    
    for title, view_count, date in zip(title_list, view_count_list, date_list):
        print(f"title: {title.text}, views: {view_count.text}, date:{date.text}")
        
        title = title.text
        views = view_count.text.split(' ')[0]
        dates = date.text
        # num_comment = no_of_comment.text
        
        video_info_dict = {"title":title, 'views': views, 'dates': dates}
        video_info_result_dict['comment'].append(video_info_dict)
    
    return video_info_result_dict

In [31]:
def youtube_crawling(url_num):
  driver_dir = '/Users/jyh/Downloads/chromedriver-mac-arm64/chromedriver'
  driver = wd.Chrome(service=Service(driver_dir))

  crawling_result_list = []
  url = "https://www.youtube.com/@snubh/videos"
  driver.get(url)

  css_selector = "a.yt-simple-endpoint.focus-on-expand.style-scope.ytd-rich-grid-media"

  driver = scroll_page(driver=driver, target='comment')
  html_source = driver.page_source

  # Crawling titles and urls of videos
  titles, urls = get_url_title_in_html_source(css_selector=css_selector,  html_source=html_source)
  print("Crawling the title, url of all videos completed!")
  print(f"No. of videos: {len(titles)}")

  urls, shorts_url = divide_watch_shorts(titles, urls)

  # Crawling page sources
  urls = urls[:url_num]
  html_sources = crawl_youtube_page_html_sources(urls, driver_dir)

  # Crawl comments and id
  for url_dict, html_source in zip(urls, html_sources):
      video_info_result_dict = get_title_view_date(url_dict, html_source)
      crawling_result_list.append(video_info_result_dict)

  result_df = pd.DataFrame()
  for i in crawling_result_list:
    result_df = pd.concat([result_df, pd.DataFrame(i['comment'])], axis=0) 
  result_df['dates'] = pd.to_datetime(result_df['dates']).dt.strftime('%Y-%m-%d')
  result_df['views'] = result_df['views'].str.replace(",","").astype(int)
  if url_num < 10:
    result_df.to_csv('SNUBH_Youtube_crawling_test.csv')
  else:
    result_df.to_csv('SNUBH_Youtube_crawling.csv')
  
      

Crawling the title, url of all videos completed!
No. of videos: 350
title: 무서운 급성심근경색! 살기 위해 꼭 알아야 하는 심근경색 증상 1가지 [숫자로 보는 건강], views: 1,441 views, date:Sep 14, 2023
title: 분당서울대학교병원 라이브 건강 강좌, views: 1,374 views, date:Sep 4, 2023
title: Pioneering Digital Healthcare: Seoul National University Bundang Hospital's Smart Medical Revolution, views: 778 views, date:Aug 28, 2023
