<div style="text-align: center; font-size: 16px;">
    <strong>Course:</strong> Machine Learning Operations |
    <strong>Lecturer:</strong> Prof. Dr. Klotz |
    <strong>Date:</strong> 17.05.2025 |
    <strong>Name:</strong> Sofie Pischl
</div>

# <center>Data Collection </center>

Konzept & Inhalt:

Daten von den größten Social media Apps sollen abgegriffen werden. besonderer Fokus auf Texten.

1. Setup & Imports
2. Reddit: Hot Posts aus Subreddits
3. Instagram: Top Posts per Scraping/API (light)
4. Twitter: Aktuelle Tweets via snscrape oder Tweepy
5. TikTok: Trending Videos
6. YouTube: Trending Videos (API/Scraping)
7. Fazit & Learnings

----
# 1. Setup & Imports

Zunächst werden alle benötigten Libraries importiert:
- `praw` für den Reddit-Zugriff
- `pandas` für Datenverarbeitung
- `datetime` für Timestamps
- `dotenv` für Umgebungsvariablen
- `pathlib` für saubere Pfadangaben
- `logging` für Fehlerprotokollierung

In [1]:
import os
import praw
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from pathlib import Path
import logging

# Load from .env file
load_dotenv()

True

# 2. Reddit

### Funktionen:
- Authentifizierung über OAuth2 via `praw`
- Abruf der `hot`-Beiträge aus ausgewählten Subreddits
- Speicherung als `.csv` unter `/data/raw/reddit_data.csv`
- Fehler-Handling und Logging integriert

**Authentifizierung**

Zur Authentifizierung an der Reddit-API wird ein Reddit-Objekt der Bibliothek praw initialisiert. Die benötigten Zugangsdaten – client_id, client_secret und user_agent – werden aus einer .env-Datei geladen, um die Trennung von Code und Konfiguration zu gewährleisten und Sicherheitsrisiken zu minimieren.

Diese Parameter dienen der eindeutigen Identifikation der Anwendung gegenüber der API und sind notwendig, um Zugriff auf Reddit-Inhalte zu erhalten. Der user_agent ermöglicht zudem die Rückverfolgbarkeit von API-Anfragen seitens Reddit. Ohne diese Authentifizierung ist ein reguliertes, automatisiertes Scraping nicht zulässig.

In [2]:
reddit = praw.Reddit(
    client_id=(os.getenv("REDDIT_ID")),
    client_secret=(os.getenv("REDDIT_SECRET")),
    user_agent=(os.getenv("USER_AGENT"))
)

**Logging-Konfiguration**

Bevor das Reddit-Scraping startet, wird ein Logging-System eingerichtet. Dazu wird zunächst ein Pfad zur Log-Datei definiert – in diesem Fall `logs/reddit.log`. Falls das Verzeichnis `logs/` noch nicht existiert, wird es automatisch erstellt. Anschließend wird das Python-Logging so konfiguriert, dass alle Log-Meldungen in diese Datei geschrieben werden.

Die Konfiguration legt fest, dass nur Meldungen ab dem Schweregrad `INFO` gespeichert werden. Außerdem wird das Format der Einträge so definiert, dass jeder Log-Eintrag einen Zeitstempel, den Log-Level (wie `INFO` oder `ERROR`) sowie die eigentliche Nachricht enthält. So entsteht eine nachvollziehbare Chronik über den Ablauf und mögliche Fehler des Scripts.

Ein typischer Eintrag könnte zum Beispiel so aussehen:

```
2025-04-19 14:33:07,512 - INFO - Starte Reddit-Scraping...
```

In [4]:
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Logging einrichten
log_path = Path("../logs/reddit.log")
log_path.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
    filename=log_path,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
print(f" Logging aktiv unter: {log_path.resolve()}")

 Logging aktiv unter: C:\Users\SofiePischl\Documents\01_HdM\10_ML_OPS\TrendAnalyse Social Media\logs\reddit.log


**Reddit-Datensammlung mittels Python und PRAW**

Die Funktion `scrape_reddit()` dient der systematischen Erhebung textbasierter Inhalte aus der Social-Media-Plattform **Reddit**. Ziel ist es, strukturierte Daten zur Analyse von Trendthemen zu generieren. Zur Umsetzung wird die Bibliothek `praw` (Python Reddit API Wrapper) verwendet, die eine komfortable Schnittstelle zur Reddit-API bereitstellt.

Nach der Initialisierung der Protokollierung via `logging` erfolgt die Authentifizierung an der Reddit-API. Hierfür wird ein `Reddit`-Objekt instanziiert, wobei sensible Zugangsdaten wie `client_id`, `client_secret` und `user_agent` aus einer `.env`-Datei geladen werden. Dieses Vorgehen ermöglicht eine sichere Trennung von Konfiguration und Codebasis und schützt vor dem unbeabsichtigten Leaken von API-Schlüsseln.

Im Anschluss wird eine Liste von Subreddits definiert, die sowohl populäre als auch als „trending“ markierte Communities umfasst. Aus diesen Subreddits werden jeweils bis zu 100 Beiträge aus dem Hot-Feed abgerufen. Dieses Verfahren stellt sicher, dass aktuelle, stark diskutierte Inhalte gesammelt werden, die ein hohes Relevanzpotenzial für Trendanalysen aufweisen.

Die Datenextraktion erfolgt über eine doppelte Schleife: Für jedes Subreddit werden Hot-Beiträge iteriert, wobei ausschließlich „self-posts“ berücksichtigt werden. Diese beinhalten keine externen Links und ermöglichen dadurch eine fokussierte Analyse des vom Nutzer selbst verfassten Textinhalts. Pro Beitrag werden zentrale Metriken wie Titel, Text, Anzahl der Kommentare, Upvotes, Erstellungszeitpunkt sowie die URL gespeichert. Zur besseren zeitlichen Einordnung wird außerdem ein einheitlicher Zeitstempel für alle Einträge vergeben.

Die gesammelten Beiträge werden in einem `pandas.DataFrame` strukturiert und anschließend unter `data/raw/reddit_data.csv` abgespeichert. Dabei wird sichergestellt, dass benötigte Verzeichnisse automatisch erstellt werden. Falls bereits Daten vorhanden sind, werden die neuen Einträge angehängt und anschließend Duplikate basierend auf Titel, Textinhalt und Subreddit entfernt. Die finale Version wird ohne Index in die CSV-Datei geschrieben.

Abschließend wird die Anzahl der gespeicherten Beiträge im Logfile vermerkt. Etwaige Fehler werden während der Ausführung abgefangen und entsprechend protokolliert. Die Funktion kann sowohl als Modul importiert als auch direkt per Skriptausführung genutzt werden.


In [5]:
def scrape_reddit():
    try:
        logging.info("Starte Reddit-Scraping...")

        subreddits = ["all", "popular", "trendingreddits", "trendingsubreddits"]
        post_limit = 100
        all_posts = []
        scrape_time = datetime.now()

        for sub in subreddits:
            subreddit = reddit.subreddit(sub)
            for post in subreddit.hot(limit=post_limit):
                if post.is_self:
                    all_posts.append({
                        "subreddit": sub,
                        "title": post.title,
                        "text": post.selftext,
                        "score": post.score,
                        "comments": post.num_comments,
                        "created": datetime.fromtimestamp(post.created),
                        "url": post.url,
                        "scraped_at": scrape_time
                    })

        df = pd.DataFrame(all_posts)
        csv_path = Path("../app/data/raw/reddit_data.csv")
        csv_path.parent.mkdir(parents=True, exist_ok=True)

        if csv_path.exists():
            df_existing = pd.read_csv(csv_path)
            df = pd.concat([df_existing, df], ignore_index=True)

        df.drop_duplicates(subset=["title", "text", "subreddit"], inplace=True)
        df.to_csv(csv_path, index=False)
        print(f" Gespeichert unter: {csv_path.resolve()}")

        logging.info(f"{len(df)} Einträge gespeichert unter {csv_path}")

    except Exception as e:
        logging.error(f"Fehler beim Reddit-Scraping: {e}")

# Ausführung bei direktem Aufruf
if __name__ == "__main__":
    scrape_reddit()

 Gespeichert unter: C:\Users\SofiePischl\Documents\01_HdM\10_ML_OPS\TrendAnalyse Social Media\app\data\raw\reddit_data.csv


In [None]:
# Zeuge neuste Einträge
df = pd.read_csv("../app/data/raw/reddit_data.csv")
df.tail()

Unnamed: 0,subreddit,title,text,score,comments,created,url,scraped_at
162,popular,Is anyone else getting irritated with the new ...,"I get it, it’s something I can put in my prefe...",2474,643,2025-04-19 01:53:11,https://www.reddit.com/r/ChatGPT/comments/1k2j...,2025-04-19 14:20:21.514955
163,popular,What is the first thing you’d buy if you get f...,,5116,7975,2025-04-18 21:51:33,https://www.reddit.com/r/AskReddit/comments/1k...,2025-04-19 14:23:21.324902
164,popular,"Under current law, the Social Security payroll...",,9256,1062,2025-04-18 18:47:13,https://www.reddit.com/r/SocialSecurity/commen...,2025-04-19 14:23:21.324902
165,trendingreddits,We are open again!,,4,2,2024-09-27 20:14:11,https://www.reddit.com/r/TrendingReddits/comme...,2025-04-19 14:23:21.324902
166,trendingreddits,Hi,,1,4,2025-04-02 21:13:09,https://www.reddit.com/r/TrendingReddits/comme...,2025-04-19 14:23:21.324902


In [8]:
len(df)

167

---

# 2. Instagram

was nicht funktioniert hat: instaloader, playwright

In [9]:
import os
import time
import logging
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
from datetime import datetime

load_dotenv()

True

In [10]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

In [11]:
INSTA_USERNAME= os.getenv("INSTA_USERNAME")
INSTA_PASSWORD= os.getenv("INSTA_PASSWORD")

In [12]:
# Logging initialisieren
log_path = Path("../logs/instagram.log")
log_path.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
    filename=log_path,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

print(f"Logging aktiv unter: {log_path.resolve()}")

Logging aktiv unter: C:\Users\SofiePischl\Documents\01_HdM\10_ML_OPS\TrendAnalyse Social Media\logs\instagram.log


In [13]:

def scrape_instagram_explore():
    try:
        logging.info("Starte Instagram-Explore-Scraping...")

        # Headless-Browser konfigurieren
        options = Options()
        options.add_argument("--headless=new")
        options.add_argument("--disable-gpu")
        options.add_argument("--window-size=1920x1080")

        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
        driver.get("https://www.instagram.com/accounts/login/")

        # Login
        username = os.getenv("INSTA_USERNAME")
        password = os.getenv("INSTA_PASSWORD")

        time.sleep(3)
        driver.find_element(By.NAME, "username").send_keys(username)
        driver.find_element(By.NAME, "password").send_keys(password)
        driver.find_element(By.XPATH, "//button[@type='submit']").click()

        time.sleep(7)  # Warte auf Login

        # Gehe zur Explore-Seite
        driver.get("https://www.instagram.com/explore/")
        time.sleep(5)

        # Seite scrollen, um mehr Posts zu laden
        for _ in range(3):  # 3x scrollen → kannst du erhöhen
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(3)

        # Beiträge extrahieren
        posts = driver.find_elements(By.XPATH, '//a[contains(@href, "/p/")]')
        post_data = []

        for post in posts:
            try:
                url = post.get_attribute("href")
                img = post.find_element(By.TAG_NAME, "img")
                img_url = img.get_attribute("src")
                description = img.get_attribute("alt")

                post_data.append({
                    "timestamp": datetime.now().isoformat(),
                    "url": url,
                    "image": img_url,
                    "description": description
                })
            except Exception as e:
                logging.warning(f"Fehler beim Extrahieren eines Posts: {e}")
                continue

        # In CSV speichern
        csv_path = Path("../app/data/raw/instagram_data.csv")
        csv_path.parent.mkdir(parents=True, exist_ok=True)

        df = pd.DataFrame(post_data)

        if csv_path.exists():
            df_existing = pd.read_csv(csv_path)
            df = pd.concat([df_existing, df], ignore_index=True)

        df.drop_duplicates(subset=["url", "image", "description"], inplace=True)
        df.to_csv(csv_path, index=False)

        logging.info(f"{len(df)} Einträge gespeichert unter {csv_path}")
        driver.quit()

    except Exception as e:
        logging.error(f"Fehler beim Instagram-Explore-Scraping: {e}")

In [14]:
# Jetzt ausführen
scrape_instagram_explore()

In [15]:
def scrape_instagram_post(url):
    try:
        logging.info(f"Starte Instagram-Scraping für URL: {url}")
        
        # Zielpfad vorbereiten
        csv_path = Path("../app/data/raw/instagram_data.csv")
        csv_path.parent.mkdir(parents=True, exist_ok=True)

        # Browser initialisieren
        options = Options()
        options.add_argument("--headless=new")  # im Hintergrund ausführen
        options.add_argument("--disable-gpu")
        options.add_argument("--window-size=1920x1080")
        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
        wait = WebDriverWait(driver, 15)

        driver.get(url)

        # Beitrag laden
        wait.until(EC.presence_of_element_located((By.TAG_NAME, "article")))

        # Login-Popup schließen (falls vorhanden)
        try:
            close_btn = wait.until(EC.element_to_be_clickable((By.XPATH, "//div[@role='dialog']//button")))
            close_btn.click()
        except:
            pass

        # Username extrahieren
        try:
            username_elem = wait.until(EC.visibility_of_element_located(
                (By.XPATH, "//a[contains(@href, '/') and contains(@class, 'notranslate')]//span[1]")
            ))
            username = username_elem.text.strip()
        except:
            username = ""

        # Caption extrahieren
        try:
            caption_elem = wait.until(EC.visibility_of_element_located(
                (By.XPATH, "//div[@data-testid='post-comment-root']//span")
            ))
            caption = caption_elem.text.strip()
        except:
            caption = ""

        # Likes (optional)
        try:
            likes_elem = wait.until(EC.visibility_of_element_located(
                (By.XPATH, "//section//span[contains(text(), 'Gefällt')]")
            ))
            likes = likes_elem.text.strip()
        except:
            likes = ""

        # Timestamp + Struktur
        data = {
            "timestamp": datetime.now().isoformat(),
            "url": url,
            "username": username,
            "caption": caption,
            "likes": likes
        }

        df_new = pd.DataFrame([data])

        if csv_path.exists():
            df_existing = pd.read_csv(csv_path)
            df_new = pd.concat([df_existing, df_new], ignore_index=True)

        df_new.drop_duplicates(subset=["url", "caption", "username"], inplace=True)
        df_new.to_csv(csv_path, index=False)

        logging.info(f"Erfolgreich gespeichert unter {csv_path}")
        driver.quit()

    except Exception as e:
        logging.error(f"Fehler beim Scrapen von {url}: {e}")



In [5]:
# Beispielaufruf
scrape_instagram_post("https://www.instagram.com/p/DICWDtYNq7E/")

# 1. TWITTER

In [2]:
import os

In [9]:
!pip install python-dotenv



In [4]:
import requests
import pandas as pd
from datetime import datetime

# Your bearer token from Twitter Developer Portal
BEARER_TOKEN = os.getenv("X_BEARER_TOKEN")

# Twitter API endpoint for recent tweets
search_url = "https://api.twitter.com/2/tweets/search/recent"

# Search parameters – open topic
query_params = {
    'query': 'Twitter lang:de -is:retweet',  # No keyword, just German tweets
    'max_results': 50,  # Max per request (10–100)
    'tweet.fields': 'created_at,public_metrics,text,author_id',
    'expansions': 'author_id',
    'user.fields': 'username,name'
}

# Set headers
headers = {
    "Authorization": f"Bearer {BEARER_TOKEN}"
}

# Send request
response = requests.get(search_url, headers=headers, params=query_params)

# Check response
if response.status_code != 200:
    raise Exception(f"Request failed: {response.status_code}\n{response.text}")

data = response.json()

# Extract data
tweets = data.get("data", [])
users = {u["id"]: u for u in data.get("includes", {}).get("users", [])}

# Prepare data rows
results = []
for tweet in tweets:
    user = users.get(tweet["author_id"], {})
    metrics = tweet.get("public_metrics", {})
    results.append({
        "url": f"https://twitter.com/{user.get('username')}/status/{tweet['id']}",
        "timestamp": datetime.now().isoformat(),
        "datum": tweet.get("created_at", ""),
        "username": user.get("username", ""),
        "name": user.get("name", ""),
        "caption": tweet.get("text", ""),
        "likes": metrics.get("like_count", 0),
        "retweets": metrics.get("retweet_count", 0),
        "replies": metrics.get("reply_count", 0)
    })

# Save to CSV
df = pd.DataFrame(results)
df.sort_values(by="likes", ascending=False, inplace=True)  # Sort by popularity
df.to_csv("../raw/twitter_api_top_tweets.csv", index=False, encoding="utf-8")

print(f"Saved {len(df)} tweets to twitter_api_top_tweets.csv")


Exception: Request failed: 429
{"title":"Too Many Requests","detail":"Too Many Requests","type":"about:blank","status":429}

In [5]:
print(response.headers.get("x-rate-limit-remaining"))
print(response.headers.get("x-rate-limit-reset"))

0
1744658724


In [6]:
pip install ntscraper

Collecting ntscraper
  Downloading ntscraper-0.3.18-py3-none-any.whl.metadata (7.4 kB)
Downloading ntscraper-0.3.18-py3-none-any.whl (12 kB)
Installing collected packages: ntscraper
Successfully installed ntscraper-0.3.18
Note: you may need to restart the kernel to use updated packages.


In [7]:
from ntscraper import Nitter

scraper = Nitter(log_level=1, skip_instance_check=False)

Testing instances: 100%|██████████| 5/5 [00:01<00:00,  2.86it/s]


In [12]:
pip install snscrape


Note: you may need to restart the kernel to use updated packages.


In [15]:
import snscrape.modules.twitter as sntwitter
import pandas as pd

query = 'Klima lang:de since:2024-01-01'
max_tweets = 50
tweets = []

try:
    for i, tweet in enumerate(sntwitter.TwitterSearchScraper(query).get_items()):
        if i >= max_tweets:
            break
        tweets.append({
            'Datum': tweet.date,
            'User': tweet.user.username,
            'Name': tweet.user.displayname,
            'Text': tweet.content,
            'Likes': tweet.likeCount,
            'Retweets': tweet.retweetCount,
            'Replies': tweet.replyCount,
            'URL': tweet.url
        })
    print(f"{len(tweets)} Tweets erfolgreich gesammelt.")
except Exception as e:
    print(f"Fehler beim Scrapen: {e}")
finally:
    df = pd.DataFrame(tweets)
    output_path = "tweets_scrape_output.csv"
    df.to_csv(output_path, index=False)
    print(f"Tweets gespeichert in: {output_path}")


14-Apr-25 21:22:10 - Retrieving scroll page None
14-Apr-25 21:22:10 - Retrieving https://twitter.com/i/api/graphql/7jT5GT59P8IFjgxwqnEdQw/SearchTimeline?variables=%7B%22rawQuery%22%3A%22Klima%20lang%3Ade%20since%3A2024-01-01%22%2C%22count%22%3A20%2C%22product%22%3A%22Latest%22%2C%22withDownvotePerspective%22%3Afalse%2C%22withReactionsMetadata%22%3Afalse%2C%22withReactionsPerspective%22%3Afalse%7D&features=%7B%22rweb_lists_timeline_redesign_enabled%22%3Afalse%2C%22blue_business_profile_image_shape_enabled%22%3Afalse%2C%22responsive_web_graphql_exclude_directive_enabled%22%3Atrue%2C%22verified_phone_label_enabled%22%3Afalse%2C%22creator_subscriptions_tweet_preview_api_enabled%22%3Afalse%2C%22responsive_web_graphql_timeline_navigation_enabled%22%3Atrue%2C%22responsive_web_graphql_skip_user_profile_image_extensions_enabled%22%3Afalse%2C%22tweetypie_unmention_optimization_enabled%22%3Atrue%2C%22vibe_api_enabled%22%3Atrue%2C%22responsive_web_edit_tweet_api_enabled%22%3Atrue%2C%22graphql_is_t

In [16]:
from playwright.sync_api import sync_playwright
import pandas as pd
import time

def scrape_tweets(keyword="Klima", max_tweets=20):
    tweets_data = []

    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()

        search_url = f"https://x.com/search?q={keyword}%20lang%3Ade&f=live"
        page.goto(search_url)
        time.sleep(5)

        last_height = 0
        while len(tweets_data) < max_tweets:
            tweet_elements = page.query_selector_all('article')

            for tweet in tweet_elements:
                try:
                    content = tweet.inner_text()
                    lines = content.split('\n')
                    username = lines[0] if lines else ""
                    text = '\n'.join(lines[2:-4]) if len(lines) > 6 else content
                    timestamp = tweet.query_selector('time').get_attribute('datetime') if tweet.query_selector('time') else ''
                    tweet_url = tweet.query_selector('a:has(time)').get_attribute('href') if tweet.query_selector('a:has(time)') else ''
                    full_url = f"https://x.com{tweet_url}" if tweet_url else ''

                    if any(d['url'] == full_url for d in tweets_data):
                        continue  # Already captured

                    tweets_data.append({
                        "username": username,
                        "text": text,
                        "timestamp": timestamp,
                        "url": full_url
                    })

                    if len(tweets_data) >= max_tweets:
                        break
                except Exception as e:
                    continue

            # Scroll down
            page.mouse.wheel(0, 2000)
            time.sleep(2)

        browser.close()

    return tweets_data

# 🔄 Ausführen & speichern
data = scrape_tweets("Klima", max_tweets=30)
df = pd.DataFrame(data)
df.to_excel("tweets_playwright_scrape.xlsx", index=False)
print(f"{len(df)} Tweets gespeichert in 'tweets_playwright_scrape.xlsx'")


Error: It looks like you are using Playwright Sync API inside the asyncio loop.
Please use the Async API instead.

# 4. TikTok

In [None]:
from TikTokApi import TikTokApi
import asyncio
import os
import csv
from dotenv import load_dotenv

# Load .env values
load_dotenv()

ms_token = os.getenv("MS_TOKEN")
csv_path = os.getenv("OUTPUT_PATH", "trending_videos.csv")


async def trending_videos():
    async with TikTokApi() as api:
        await api.create_sessions(
            ms_tokens=[ms_token],
            num_sessions=1,
            sleep_after=3,
            browser=os.getenv("TIKTOK_BROWSER", "chromium")
        )

        data = []

        async for video in api.trending.videos(count=30):
            info = video.as_dict
            data.append({
                "id": info.get("id"),
                "description": info.get("desc"),
                "author_username": info.get("author", {}).get("uniqueId"),
                "author_id": info.get("author", {}).get("id"),
                "likes": info.get("stats", {}).get("diggCount"),
                "shares": info.get("stats", {}).get("shareCount"),
                "comments": info.get("stats", {}).get("commentCount"),
                "plays": info.get("stats", {}).get("playCount"),
                "video_url": info.get("video", {}).get("downloadAddr"),
                "created_time": info.get("createTime"),
            })

        os.makedirs(os.path.dirname(csv_path), exist_ok=True)

        file_exists = os.path.isfile(csv_path)
        with open(csv_path, "a", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=data[0].keys())
            if not file_exists:
                writer.writeheader()
            writer.writerows(data)

        print(f"\n✅ Erfolgreich {len(data)} Videos in '{csv_path}' gespeichert.")


if __name__ == "__main__":
    asyncio.run(trending_videos())


# YouTube

In [22]:
import os
import logging
from pathlib import Path
from dotenv import load_dotenv
from googleapiclient.discovery import build
import pandas as pd
from datetime import datetime

# === ENV & Logging ===
load_dotenv()

log_path = Path("../logs/youtube.log")
log_path.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
    filename=log_path,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# === YouTube API Setup ===
API_KEY = os.getenv("YT_KEY")
youtube = build("youtube", "v3", developerKey=API_KEY)

def scrape_youtube_trending(region="DE", max_results=50):
    try:
        logging.info("Starte YouTube-Scraping...")

        request = youtube.videos().list(
            part="snippet,statistics",
            chart="mostPopular",
            regionCode=region,
            maxResults=max_results
        )
        response = request.execute()

        videos = []
        scrape_time = datetime.now()

        for item in response.get("items", []):
            snippet = item["snippet"]
            stats = item.get("statistics", {})

            videos.append({
                "video_id": item["id"],
                "title": snippet.get("title"),
                "description": snippet.get("description"),
                "channel_title": snippet.get("channelTitle"),
                "published_at": snippet.get("publishedAt"),
                "view_count": stats.get("viewCount"),
                "like_count": stats.get("likeCount"),
                "comment_count": stats.get("commentCount"),
                "url": f"https://www.youtube.com/watch?v={item['id']}",
                "scraped_at": scrape_time
            })

        df = pd.DataFrame(videos)
        csv_path = Path("../app/data/raw/youtube_data.csv")
        csv_path.parent.mkdir(parents=True, exist_ok=True)

        if csv_path.exists():
            df_existing = pd.read_csv(csv_path)
            df = pd.concat([df_existing, df], ignore_index=True)

        df.drop_duplicates(subset=["video_id"], inplace=True)
        df.to_csv(csv_path, index=False)

        display(df.head())

        logging.info(f"{len(df)} Videos gespeichert unter {csv_path}")

    except Exception as e:
        logging.error(f"Fehler beim YouTube-Scraping: {e}")
        print(f" Fehler: {e}")

# Direkter Aufruf
if __name__ == "__main__":
    scrape_youtube_trending()

Unnamed: 0,video_id,title,description,channel_title,published_at,view_count,like_count,comment_count,url,scraped_at
0,4sHs9ujo1eg,Die Heuchelei der Stars & Influencer auf dem C...,Das Coachella Festival ist grade bei vielen In...,Alicia Joe,2025-04-21T16:02:49Z,322656,19853,1352,https://www.youtube.com/watch?v=4sHs9ujo1eg,2025-04-22 22:06:12.302112
1,gTA6gQIC39A,Warum ausgerechnet Katy Perry jetzt ins All fl...,Anzeige | Ladet euch für eure nächste Reise Ai...,Desy,2025-04-21T17:30:01Z,172254,10729,1183,https://www.youtube.com/watch?v=gTA6gQIC39A,2025-04-22 22:06:12.302112
2,ncXAUBGV8JI,"Problemmotoren von VW, Ford, Stellantis und Co...","Ein nasser, in Öl geführter Zahnriemen schien ...",auto motor und sport,2025-04-21T15:01:16Z,159413,2657,768,https://www.youtube.com/watch?v=ncXAUBGV8JI,2025-04-22 22:06:12.302112
3,dFKCoqPjJ28,Fynn Kliemanns Comeback ist ein Fiebertraum,Fynn Kliemann ist zurück. Der DIY-Künstler und...,Der Dunkle Parabelritter,2025-04-21T17:30:02Z,169827,10718,1113,https://www.youtube.com/watch?v=dFKCoqPjJ28,2025-04-22 22:06:12.302112
4,gzCXZID3AlY,Streit mit PAULA ? Kontakt mit leiblichen ELTE...,"Anzeige | \nHey, wie versprochen kommen hier a...",Beauty Benzz,2025-04-21T14:00:58Z,70563,2880,293,https://www.youtube.com/watch?v=gzCXZID3AlY,2025-04-22 22:06:12.302112
