# Sentiment Analysis

This file contains code on:
- how YouTube Videos were searched and downloaded, as well as transcribed
- how sentiment scores were generated (from X, Reddit posts + YouTube videos)



## Collecting YouTube Videos

In [None]:
# prepping to download the videos as mp3
!pip install yt-dlp
!apt-get install ffmpeg
!pip install thefuzz[speedup]

In [None]:
import json
import requests
import os
from datetime import datetime, timedelta
from google.colab import userdata

API_KEY = userdata.get('YT_API')
SEARCH_QUERY = "\"$AAPL\" | \"$MSFT\" | \"$NVDA\" | \"$AMZN\" | \"$META\" | \"$BRK.B\" | \"$GOOGL\" | \"$AVGO\" | \"$GOOG\" | \"$TSLA\""
MAX_RESULTS_PER_CALL = 50
TOTAL_RESULTS_NEEDED = 8000
OUTPUT_FILE = "youtube_shorts_results.json"

# Load existing data if file exists
if os.path.exists(OUTPUT_FILE):
    with open(OUTPUT_FILE, "r", encoding="utf-8") as f:
        all_items = json.load(f)
else:
    all_items = []

existing_ids = set(item["id"]["videoId"] for item in all_items if "id" in item and "videoId" in item["id"])

base_url = "https://www.googleapis.com/youtube/v3/search"

# Loop through years
for year in range(2020, 2025):
    if len(all_items) >= TOTAL_RESULTS_NEEDED:
        break

    start_date = f"{year}-01-01T00:00:00Z"
    end_date = f"{year}-12-31T23:59:59Z"
    next_page_token = None

    print(f"📅 Searching year: {year}")

    while True:
        if len(all_items) >= TOTAL_RESULTS_NEEDED:
            break

        params = {
            "key": API_KEY,
            "part": "snippet",
            "q": SEARCH_QUERY,
            "type": "video",
            "videoDuration": "short",
            "maxResults": MAX_RESULTS_PER_CALL,
            "relevanceLanguage": "en",
            "publishedAfter": start_date,
            "publishedBefore": end_date
        }

        if next_page_token:
            params["pageToken"] = next_page_token

        response = requests.get(base_url, params=params)
        if response.status_code != 200:
            print(f"⚠️ Error {response.status_code}: {response.text}")
            break  # Stop this year if error occurs

        data = response.json()
        items = data.get("items", [])
        new_count = 0

        for item in items:
            video_id = item.get("id", {}).get("videoId")
            if video_id and video_id not in existing_ids:
                all_items.append(item)
                existing_ids.add(video_id)
                new_count += 1

        print(f"Collected: {len(all_items)} / {TOTAL_RESULTS_NEEDED} (Added {new_count} new)")

        next_page_token = data.get("nextPageToken")
        if not next_page_token:
            break  # No more results for this year

        # Save after every batch
        with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
            json.dump(all_items, f, ensure_ascii=False, indent=4)

print("✅ Finished collecting Shorts from 2005 to 2024.")

In [None]:
import json
import csv

# Load JSON data
with open("youtube_shorts_results.json", "r", encoding="utf-8") as f:
    data = json.load(f)

# Output CSV file
with open("youtube_shorts_results.csv", "w", newline='', encoding="utf-8") as csvfile:
    fieldnames = ["publishTime", "title", "description", "video_url"]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    writer.writeheader()

    for item in data:
        snippet = item.get("snippet", {})
        video_id = item.get("id", {}).get("videoId")

        if not video_id:
            continue

        writer.writerow({
            "publishTime": snippet.get("publishTime", ""),
            "title": snippet.get("title", ""),
            "description": snippet.get("description", ""),
            "video_url": f"https://www.youtube.com/watch?v={video_id}"
        })

print("✅ JSON data successfully converted to CSV.")

### Downloading Files

In [None]:
import os
import csv
import subprocess
from pathlib import Path
from thefuzz import fuzz

# Paths and config
CSV_FILE = "/content/drive/MyDrive/FYP/filtered_file.csv"
OUTPUT_DIR = "/content/drive/MyDrive/FYP/mp3_audio"
TICKERS = ['AAPL', 'MSFT', 'NVDA', 'AMZN', 'META', 'BRK.B', 'GOOGL', 'AVGO', 'GOOG', 'TSLA', 'SPY', 'SPX']
FUZZY_THRESHOLD = 90  # Matching score to skip similar titles

# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)

def contains_ticker(text):
    text = text.upper()
    return any(ticker in text for ticker in TICKERS)

# Get already downloaded base filenames (without extension)
existing_titles = [Path(f).stem for f in os.listdir(OUTPUT_DIR) if f.endswith(".mp3")]

# Load filtered CSV
with open(CSV_FILE, "r", encoding="utf-8") as csvfile:
    reader = csv.DictReader(csvfile)
    rows = [row for row in reader if contains_ticker(row.get("title", ""))]
    links = [(row["video_url"], row["title"]) for row in rows]

print(f"\n🎯 Found {len(links)} matching videos. Checking for fuzzy duplicates...\n")

for i, (url, title) in enumerate(links):
    print(f"🎵 [{i+1}/{len(links)}] Checking: {title}")

    # Fuzzy match against existing MP3 titles
    already_downloaded = any(fuzz.token_set_ratio(title, existing) >= FUZZY_THRESHOLD for existing in existing_titles)
    if already_downloaded:
        print(f"✅ Skipped (fuzzy match found): {title}")
        continue

    try:
        # Predict the actual filename
        result = subprocess.run(
            ["yt-dlp", "--get-filename", "--output", "%(title).80s.%(ext)s", url],
            check=True,
            capture_output=True,
            text=True,
            timeout=15
        )
        predicted_filename = result.stdout.strip()
        mp3_path = os.path.join(OUTPUT_DIR, Path(predicted_filename).with_suffix(".mp3"))

        print(f"⬇️ Downloading to: {mp3_path}")
        subprocess.run([
            "yt-dlp",
            "--extract-audio",
            "--audio-format", "mp3",
            "--output", f"{OUTPUT_DIR}/%(title).80s.%(ext)s",
            url
        ], check=True, timeout=60)

        existing_titles.append(Path(mp3_path).stem)  # Add to avoid duplicates later

    except subprocess.TimeoutExpired:
        print(f"⏱️ Timeout exceeded. Skipping: {url}")
    except subprocess.CalledProcessError:
        print(f"❌ Download failed: {url}")

### Transcribing Audio
Note: This was originally done in a Spyder environment.

Audio transcription was done using WhisperX (https://github.com/m-bain/whisperX)

In [None]:
!pip install whisperx

In [None]:
import whisperx
import gc
import torch
import time
import mysecrets
import pandas as pd
import os
import ffmpeg
from fuzzywuzzy import process

# Config
hf_token = mysecrets.hf_token
device = "cuda"
audio_folder = r"G:\My Drive\FYP\mp3_audio"
csv_file = r"G:\My Drive\FYP\transcript.csv"
metadata_csv = r"G:\My Drive\FYP\youtube_shorts_results.csv"
batch_size = 4
compute_type = "int8"

# Load metadata CSV
metadata_df = pd.read_csv(metadata_csv)
video_metadata = {
    row["title"]: {
        "date": row["publishTime"],
        "description": row["description"],
        "url": row["video_url"]
    } for _, row in metadata_df.iterrows()
}

print(f"Loaded metadata for {len(video_metadata)} videos.")
print(f"Sample video titles: {list(video_metadata.keys())[:5]}")

# Load WhisperX model
model = whisperx.load_model("small.en", device, compute_type=compute_type)

def transcribe_audio(audio_file):
    audio = whisperx.load_audio(audio_file)
    result = model.transcribe(audio, batch_size=batch_size)
    segments = result["segments"]
    return " ".join(segment["text"] for segment in segments)

def extract_tickers(title):
    stock_list = ['AAPL', 'MSFT', 'NVDA', 'AMZN', 'META', 'BRK.B', 'GOOGL', 'AVGO', 'GOOG', 'TSLA']
    found = [s for s in stock_list if s in title.upper()]
    return ", ".join(found) if found else "Unknown"

def update_sheet(audio_file, transcription_text):
    title = os.path.splitext(audio_file)[0].strip().lower()

    # Fuzzy match against metadata titles
    best_match, score = process.extractOne(title, video_metadata.keys())
    print(f"🎬 Audio File: {audio_file}")
    print(f"🔍 Fuzzy matched '{title}' to '{best_match}' with score {score}")

    if score < 90:
        print(f"❌ No good match found for: {title} (score: {score})")
        return

    metadata = video_metadata[best_match]
    date = metadata.get("date")
    video_url = metadata.get("url")
    description = metadata.get("description")
    tickers = extract_tickers(best_match)

    new_row = {
        "title": best_match,
        "transcription": transcription_text,
        "date": date,
        "video URL": video_url,
        "description": description,
        "tickers found": tickers
    }

    columns = ["title", "transcription", "date", "video URL", "description", "tickers found"]
    if os.path.exists(csv_file):
        df = pd.read_csv(csv_file)
    else:
        df = pd.DataFrame(columns=columns)

    df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
    df.to_csv(csv_file, index=False, encoding='utf-8-sig')
    print(f"✅ Transcription added for '{best_match}' to CSV.")

# Load already processed titles from CSV
if os.path.exists(csv_file):
    processed_df = pd.read_csv(csv_file)
    processed_titles = set(processed_df['title'].str.lower())
else:
    processed_titles = set()

# Process each audio file
for audio_file in os.listdir(audio_folder):
    if not (audio_file.endswith(".mp3") or audio_file.endswith(".wav")):
        continue

    title_base = os.path.splitext(audio_file)[0].strip().lower()
    if title_base in processed_titles:
        print(f"✅ Skipping already processed file: {audio_file}")
        continue

    audio_file_path = os.path.join(audio_folder, audio_file)
    print(f"🎧 Processing {audio_file_path}")

    try:
        transcription = transcribe_audio(audio_file_path)
        update_sheet(audio_file, transcription)
        processed_titles.add(title_base)  # Add to set to avoid duplicate processing during the run
    except Exception as e:
        print(f"❌ Error processing {audio_file}: {e}")

# Cleanup
gc.collect()
torch.cuda.empty_cache()
del model

## Sentiment Analysis

In [None]:
!pip install transformers
!pip install datasets

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import torch.nn.functional as F

# Load FinBERT
model_name = "ProsusAI/finbert"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
model.eval()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

labels = ["negative", "neutral", "positive"]

In [None]:
# list of tickers to filter by
stocks = ['AAPL', 'MSFT', 'NVDA', 'AMZN', 'META', 'BRK.B', 'GOOGL', 'AVGO', 'GOOG', 'TSLA']

In [None]:
# batch process sentiment scoring
def get_sentiments_batch(texts, batch_size=32, max_length=64):
    sentiments = []
    scores = []

    for i in tqdm(range(0, len(texts), batch_size), desc="Sentiment Scoring"):
        batch_texts = texts[i:i + batch_size]

        inputs = tokenizer(
            batch_texts.tolist(),
            return_tensors="pt",
            padding="max_length",
            truncation=True,
            max_length=max_length
        ).to(device)

        with torch.no_grad():
            outputs = model(**inputs)
            probs = F.softmax(outputs.logits, dim=1)

        preds = torch.argmax(probs, dim=1)
        batch_scores = torch.max(probs, dim=1).values

        for pred, score in zip(preds, batch_scores):
            sentiments.append(labels[pred])
            scores.append(score.item())

    return sentiments, scores

### Sentiment Scores without Audio Transcripts

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import torch.nn.functional as F
import pandas as pd
from datasets import load_dataset
from tqdm import tqdm
import re

# news articles
news1 = pd.read_csv("/content/drive/MyDrive/FYP/sp500_news_290k_articles.csv")
news2 = pd.read_csv("/content/drive/MyDrive/FYP/stock_data_articles.csv")

news2 = news2.rename(columns={
    'symbol': 'ticker',
    'Publishdate': 'date',
    'Title': 'headline'
})
news2['time'] = None
news2 = news2[['ticker', 'date', 'headline', 'time']]
news1 = news1[['ticker', 'date', 'headline', 'time']]

# filter news data by tickers
df_news = pd.concat([news1, news2], ignore_index=True)
df_news = df_news[df_news['ticker'].isin(stocks)]
df_news = df_news.dropna(subset=['headline', 'date']).reset_index(drop=True)
df_news['source'] = 'news'
df_news['date'] = pd.to_datetime(df_news['date'], format='mixed', errors='coerce').dt.date

# load reddit data (2021-2025)
ds_reddit = load_dataset("johntoro/Reddit-Stock-Sentiment", split="train")
df_reddit = ds_reddit.to_pandas()

# use a regex pattern from the tickers
pattern = r'\b(?:' + '|'.join(re.escape(ticker) for ticker in stocks) + r')\b'

# keep posts that contain the tickker
df_reddit['headline'] = df_reddit['title'].fillna('') + " " + df_reddit['text'].fillna('')
df_reddit['ticker'] = None  # Initially None, we will extract ticker later
df_reddit['date'] = pd.to_datetime(df_reddit['datetime']).dt.date
df_reddit['time'] = pd.to_datetime(df_reddit['datetime']).dt.time
df_reddit['source'] = 'reddit'
df_reddit = df_reddit[df_reddit['headline'].str.contains(pattern, case=False, na=False)]

# extract tickers
df_reddit['ticker'] = df_reddit['headline'].apply(lambda x: [ticker for ticker in stocks if ticker in x])

df_reddit = df_reddit[['ticker', 'date', 'headline', 'time', 'source']].dropna(subset=['headline'])

# loading other reddit dataset
df_reddit2 = pd.read_csv('/content/drive/MyDrive/FYP/posts.csv')

# Combine 'title' and 'selftext' to create the 'headline' for sentiment analysis
df_reddit2['headline'] = df_reddit2['title'].fillna('') + " " + df_reddit2['selftext'].fillna('')
df_reddit2['ticker'] = None  # Initially None, we will extract ticker later
df_reddit2['date'] = pd.to_datetime(df_reddit2['created_utc'], unit='s').dt.date  # Convert from Unix timestamp
df_reddit2['time'] = pd.to_datetime(df_reddit2['created_utc'], unit='s').dt.time
df_reddit2['source'] = 'reddit2'

# filter based on tickers
df_reddit2 = df_reddit2[df_reddit2['headline'].str.contains(pattern, case=False, na=False)]

# extract tickers
df_reddit2['ticker'] = df_reddit2['headline'].apply(lambda x: [ticker for ticker in stocks if ticker in x])

df_reddit2 = df_reddit2[['ticker', 'date', 'headline', 'time', 'source']].dropna(subset=['headline'])

# load twitter dataset
twitter_ds = load_dataset("mjw/stock_market_tweets", split="train")
df_twt = twitter_ds.to_pandas()
df_twt['headline'] = df_twt['body']
df_twt['ticker'] = df_twt['ticker_symbol']
df_twt['date'] = pd.to_datetime(df_twt['post_date'], format='mixed', errors='coerce').dt.date
df_twt['time'] = pd.to_datetime(df_twt['post_date'], format='mixed', errors='coerce').dt.time
df_twt['source'] = 'twitter'

# filter Twitter data by tickers
df_twt = df_twt[df_twt['ticker'].isin(stocks)]

df_twt = df_twt[['ticker', 'date', 'headline', 'time', 'source']].dropna(subset=['headline', 'date'])

# Ensure 'date' columns are in datetime format
df_news['date'] = pd.to_datetime(df_news['date'], errors='coerce')
df_reddit['date'] = pd.to_datetime(df_reddit['date'], errors='coerce')
df_reddit2['date'] = pd.to_datetime(df_reddit2['date'], errors='coerce')
df_twt['date'] = pd.to_datetime(df_twt['date'], errors='coerce')

# 📆 Date Range Filtering
df_news = df_news[df_news['date'].dt.year.between(2010, 2024)]
df_reddit = df_reddit[df_reddit['date'].dt.year.between(2010, 2024)]
df_reddit2 = df_reddit2[df_reddit2['date'].dt.year.between(2010, 2024)]
df_twt = df_twt[df_twt['date'].dt.year.between(2010, 2024)]

# 🧩 Combine All and Filter by Date Range (2010-2024)
df_all = pd.concat([df_news, df_reddit, df_reddit2, df_twt], ignore_index=True)
df_all = df_all[df_all['date'].dt.year.between(2010, 2024)]

# Print final counts
print(f"Total News Articles (2010-2024): {len(df_news)}")
print(f"Total Reddit Posts (2010-2024): {len(df_reddit) + len(df_reddit2)}")
print(f"Total Twitter Posts (2010-2024): {len(df_twt)}")
print(f"Combined Total (2010-2024): {len(df_all)}")

# 🔍 Run Sentiment Scoring
sentiments, scores = get_sentiments_batch(df_all['headline'])

# Add results to DataFrame
df_all['sentiment'] = sentiments
df_all['sentiment_score'] = scores

# 🧠 Compute daily aggregated sentiment score
daily_sentiment = df_all[df_all['sentiment'].isin(['positive', 'negative'])].copy()
daily_sentiment['adjusted_score'] = daily_sentiment.apply(
    lambda row: row['sentiment_score'] if row['sentiment'] == 'positive' else -row['sentiment_score'], axis=1
)
daily_scores = daily_sentiment.groupby('date')['adjusted_score'].mean().reset_index()
daily_scores.rename(columns={'adjusted_score': 'daily_sentiment_score'}, inplace=True)

print(daily_scores.tail())

# ✅ Save outputs
df_all.to_csv("/content/drive/MyDrive/FYP/all_sentiment_results.csv", index=False)
daily_scores.to_csv("/content/drive/MyDrive/FYP/daily_sentiment_scores.csv", index=False)

### Sentiment Scores with Audio

In [None]:
import pandas as pd
from datasets import load_dataset
from tqdm import tqdm
import re

# load audio transcripts
df_transcripts = pd.read_csv("/content/drive/MyDrive/FYP/transcript.csv")

# preprocess the transcripts data to match our structure
df_transcripts = df_transcripts.rename(columns={
    'tickers found': 'ticker',
    'transcription': 'headline'
})

# convert date column to datetime
df_transcripts['date'] = pd.to_datetime(df_transcripts['date'], errors='coerce').dt.date
df_transcripts['time'] = None
df_transcripts['source'] = 'transcript'

# filtering by target tickers and valid dates
df_transcripts = df_transcripts[df_transcripts['ticker'].isin(stocks)]
df_transcripts = df_transcripts.dropna(subset=['headline', 'date']).reset_index(drop=True)

# ensuring 'date' columns are in datetime format
df_news['date'] = pd.to_datetime(df_news['date'], errors='coerce')
df_reddit['date'] = pd.to_datetime(df_reddit['date'], errors='coerce')
df_reddit2['date'] = pd.to_datetime(df_reddit2['date'], errors='coerce')
df_twt['date'] = pd.to_datetime(df_twt['date'], errors='coerce')

# filtering all by date (2010 to 2024)
df_news = df_news[df_news['date'].dt.year.between(2010, 2024)]
df_reddit = df_reddit[df_reddit['date'].dt.year.between(2010, 2024)]
df_reddit2 = df_reddit2[df_reddit2['date'].dt.year.between(2010, 2024)]
df_twt = df_twt[df_twt['date'].dt.year.between(2010, 2024)]

# combining and filtering by date
df_all = pd.concat([df_news, df_reddit, df_reddit2, df_twt], ignore_index=True)
df_all = df_all[df_all['date'].dt.year.between(2010, 2024)]  # Apply final date range filter

# checking dataset amount
print(f"Total News Articles (2010-2024): {len(df_news)}")
print(f"Total Reddit Posts (2010-2024): {len(df_reddit) + len(df_reddit2)}")
print(f"Total Twitter Posts (2010-2024): {len(df_twt)}")
print(f"Total Transcripts (2010-2024): {len(df_transcripts)}")
print(f"Combined Total (2010-2024): {len(df_all)}")

# generate sentiment score
sentiments, scores = get_sentiments_batch(df_all['headline'])

# add results to dataframe
df_all['sentiment'] = sentiments
df_all['sentiment_score'] = scores

# computing sentiment scores
daily_sentiment = df_all[df_all['sentiment'].isin(['positive', 'negative'])].copy()
daily_sentiment['adjusted_score'] = daily_sentiment.apply(
    lambda row: row['sentiment_score'] if row['sentiment'] == 'positive' else -row['sentiment_score'], axis=1
)
daily_scores = daily_sentiment.groupby('date')['adjusted_score'].mean().reset_index()
daily_scores.rename(columns={'adjusted_score': 'daily_sentiment_score'}, inplace=True)

print(daily_scores.tail())

# saving outputs
df_all.to_csv("/content/drive/MyDrive/FYP/all_sentiment_results_w_audio.csv", index=False)
daily_scores.to_csv("/content/drive/MyDrive/FYP/daily_sentiment_scores_w_audio.csv", index=False)