Skript zum Scrapen der YT-Trends

Beispiel eines Datensatzes:

    "rank": "#1",
    "channel": "DAVE",
    "subs": "1,16 Mio.",
    "title": "THE RACE - Die erste Nacht bricht an - Folge 03",
    "views": "612.386",
    "likes": "38613",
    "dislikes": "457",
    "comments": "1.474",
    "publication_date": "07.07.2024",
    "description": ---
    "video_url": "https://www.youtube.com/watch?v=iVXllgfjmeQ",
    "channel_url": "https://www.youtube.com/@dave_"

In [None]:
from selenium import webdriver 
import pandas as pd 
from selenium.webdriver.common.by import By 
from selenium.webdriver.support.ui import WebDriverWait 
from selenium.webdriver.support import expected_conditions as EC
from selenium.common import TimeoutException, StaleElementReferenceException
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from utils import extract_likes
import pandas as pd
import os
import json
from datetime import datetime
import time
from tqdm import tqdm  # Import tqdm for progress bar


Funktion zum Speichern der Daten in eine JSON-Datei mit einem Timestamp

In [None]:
def save_videos_with_timestamp(videos, output_dir):
    timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M')
    filename = f'trends_tracking_{timestamp}.json'
    filepath = os.path.join(output_dir, filename)
    
    try:
        # Erstelle ein Objekt mit allen Videos und einem gemeinsamen scraped_at
        data = {
            'scraped_at': timestamp,
            'videos': videos
        }

        with open(filepath, 'w', encoding='utf-8') as file:
            json.dump(data, file, ensure_ascii=False, indent=4)
            print(f'Data saved to {filepath}')
    except Exception as e:
        print(f"Error saving {filename}: {str(e)}")

Initialisierung des Webdrivers und navigiere zur Trends Seite

Es müssen zuerst die Cookies akzeptiert werden

In [None]:

chrome_options = webdriver.ChromeOptions()
# Addon das Dislike Anzahl anzeigt
chrome_options.add_extension('./add_dislike_addon.crx')
# macht Browser unsichtbar
# chrome_options.add_argument('--headless=new')

driver = webdriver.Chrome(options=chrome_options)  
driver.get('https://www.youtube.com/feed/trending')

try:

    accept_all = driver.find_element(By.XPATH, '/html/body/c-wiz/div/div/div/div[2]/div[1]/div[3]/div[1]/form[2]/div/div/button')

    accept_all.click()
except TimeoutException:
    print('Cookie Modal not found')


Logik zum scrapen der Information des jeweiligen Videos

In [None]:
def scrape_video_data(driver, rank, video_link):
    try:
        WebDriverWait(driver, 30).until(
            EC.visibility_of_element_located((By.CSS_SELECTOR, 'h1.ytd-watch-metadata'))
        )

        video = {}
        # Channel Informationen
        channel_element = driver.find_element(By.ID, 'owner')
        channel_url = channel_element.find_element(By.CSS_SELECTOR, 'a.yt-simple-endpoint').get_attribute('href')
        channel = channel_element.find_element(By.ID, 'channel-name').text
        subs = channel_element.find_element(By.ID, 'owner-sub-count').text.replace(' Abonnenten', '')

        # Öffnet die Beschreibung
        time.sleep(1)
        driver.find_element(By.CSS_SELECTOR, '#description-inline-expander #expand').click()

        # Video Informationen
        title = driver.find_element(By.CSS_SELECTOR, 'h1.ytd-watch-metadata').text

        info_container_elements = driver.find_elements(By.CSS_SELECTOR, '#info-container span')
        views = info_container_elements[0].text.replace(' Aufrufe', '')
        publication_date = info_container_elements[2].text
        description = driver.find_element(By.CSS_SELECTOR, '#description-inline-expander .ytd-text-inline-expander span').text

        like_button = driver.find_element(By.CSS_SELECTOR, '[aria-label*="Ich mag das Video"]')
        aria_label_text = like_button.get_attribute('aria-label')
        likes = extract_likes(aria_label_text)
        
        dislikes = driver.find_element(By.CSS_SELECTOR, '.YtDislikeButtonViewModelHost button').text

        # Scrolle um Kommentare zu laden
        body = driver.find_element(By.TAG_NAME, 'body')
        for _ in range(7):
            body.send_keys(Keys.PAGE_DOWN)
            
        WebDriverWait(driver, 30).until(
            EC.visibility_of_element_located((By.CSS_SELECTOR, '#comments ytd-comments-header-renderer span'))
        )
        num_comments = driver.find_elements(By.CSS_SELECTOR, '#comments ytd-comments-header-renderer span')[0].text

        video['rank'] = f'#{rank}'
        video['channel'] = channel
        video['subs'] = subs
        video['title'] = title
        video['views'] = views
        video['likes'] = likes
        video['dislikes'] = dislikes
        video['comments'] = num_comments
        video['publication_date'] = publication_date
        video['description'] = description
        video['video_url'] = video_link
        video['channel_url'] = channel_url
        

        return video

    except Exception as e:
        raise Exception({str(e)})


Hole alle Video-Links auf der Seite und scrape für jedes Video die Infos.


Speichern der Daten und exportieren in den scraped_data Orner

In [None]:
def scrape_trends(output_dir, num_videos):
    try:
        # Warte, bis die Video-Elemente geladen sind
        WebDriverWait(driver, 15).until(
            EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'ytd-video-renderer'))
        )

        # Finde alle Video-Elemente
        video_elements = driver.find_elements(By.CSS_SELECTOR, 'ytd-video-renderer')[:num_videos] 

        video_links = []
        for video_element in video_elements:
            video_link = video_element.find_element(By.ID, 'video-title').get_attribute('href')
            video_links.append(video_link)

        #Scrape jedes Video nacheinander. Angefangen mit Platz 1 der Trends
        all_videos = []
        for rank, video_link in tqdm(enumerate(video_links, start=1), total=len(video_links), desc="Scraping Videos"):
            try: 
                driver.get(video_link)
                video_data = scrape_video_data(driver, rank, video_link)

            except Exception as e:
                print(f'Error scraping data for {video_link}: {str(e)}') 

            if video_data:
                tqdm.write
                all_videos.append(video_data)

        # Daten speichern, einschließlich des aktuellen Zeitstempels
        save_videos_with_timestamp(all_videos, output_dir)

    finally:
        driver.quit()


Starten des Scrapens

In [None]:
output_dir = 'scraped_data'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# legt fest bis welchen Platz der Trends gescraped werden soll(startet ab #1)
num_videos = 20

scrape_trends(output_dir, num_videos)
