# Setup

In [None]:
import feedparser
import trafilatura
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModelForSequenceClassification
import torch
from transformers import pipeline
import spacy
from collections import Counter, defaultdict
import re
from pathlib import Path
import csv
from youtube_transcript_api import YouTubeTranscriptApi
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
from google_auth_oauthlib.flow import InstalledAppFlow
# import io
from dotenv import load_dotenv
from googleapiclient.discovery import build
import os
from youtube_transcript_api.proxies import WebshareProxyConfig

In [26]:
try:
    nlp = spacy.load("en_core_web_sm")
except OSError:
    print("Installing spaCy model...")
    import subprocess
    subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"])
    nlp = spacy.load("en_core_web_sm")


In [27]:
sentiment_analyzer = pipeline(
    "sentiment-analysis",
    model="ProsusAI/finbert",
    tokenizer="ProsusAI/finbert"
)

def get_text_sentiment_score(text: str, max_chars=512) -> float:
    if not text or not text.strip():
        return 0.0

    chunks = [text[i:i+max_chars] for i in range(0, len(text), max_chars)]
    scores = []

    for chunk in chunks:
        result = sentiment_analyzer(chunk)[0]
        label = result["label"].upper()
        score = float(result["score"])

        if "POS" in label:
            scores.append(score)
        elif "NEG" in label:
            scores.append(-score)
        else:
            scores.append(0.0)

    return sum(scores) / len(scores) if scores else 0.0


Device set to use cpu


# Using RSS Feeds

In [28]:

FEEDS = [
    "https://search.cnbc.com/rs/search/combinedcms/view.xml?partnerId=wrss01&id=10000664",
]

MAX_ARTICLES_PER_FEED = 30
CONTEXT_SENTENCES = 1
TICKER_LIST_PATH = Path("tickers.csv")  # optional: columns ticker,name

TICKER_RE = re.compile(r"(?<![A-Z])\$?[A-Z]{2,5}(?![A-Z])")
TICKER_STOP = {
    "A", "AN", "AND", "ARE", "AS", "AT", "BE", "BUT", "BY", "CAN", "CO", "FOR",
    "FROM", "HAS", "HAVE", "IN", "IS", "IT", "ITS", "NOT", "OF", "ON", "OR",
    "THE", "TO", "WAS", "WERE", "WILL", "WITH",
}

SECTOR_KEYWORDS = {
    "Technology": ["tech", "software", "technology", "cloud", "ai", "artificial intelligence",
                   "chip", "semiconductor", "digital", "platform", "app", "data", "cyber"],
    "Finance": ["bank", "financial", "finance", "investment", "trading", "market",
                "stock", "equity", "bond", "credit", "lending", "mortgage"],
    "Healthcare": ["health", "medical", "pharmaceutical", "drug", "biotech", "hospital",
                    "treatment", "patient", "fda", "clinical", "therapy"],
    "Energy": ["oil", "gas", "energy", "petroleum", "renewable", "solar", "wind",
               "electric", "power", "fuel", "drilling", "crude"],
    "Retail": ["retail", "store", "shopping", "consumer", "e-commerce", "online shopping",
               "merchandise", "sales", "retailer"],
    "Automotive": ["car", "automotive", "vehicle", "auto", "truck", "electric vehicle",
                   "ev", "manufacturing", "tesla"],
    "Real Estate": ["real estate", "property", "housing", "construction", "mortgage",
                    "development", "reit"],
    "Telecommunications": ["telecom", "communication", "wireless", "5g", "network", "internet"],
    "Aerospace": ["aerospace", "aircraft", "defense", "boeing", "space"],
    "Consumer Goods": ["consumer goods", "packaged goods", "cpg"],
}

def extract_article_text(url: str) -> str | None:
    downloaded = trafilatura.fetch_url(url)
    if not downloaded:
        return None

    text = trafilatura.extract(
        downloaded,
        include_comments=False,
        include_tables=False,
        include_formatting=False
    )
    return text

def load_ticker_map(path: Path):
    ticker_to_name = {}
    name_to_ticker = {}
    if not path.exists():
        return ticker_to_name, name_to_ticker

    with path.open("r", newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            ticker = (row.get("ticker") or "").strip().upper()
            name = (row.get("name") or "").strip()
            if not ticker or not name:
                continue
            ticker_to_name[ticker] = name
            name_to_ticker[name.lower()] = ticker

    return ticker_to_name, name_to_ticker


ticker_to_name, name_to_ticker = load_ticker_map(TICKER_LIST_PATH)


def fetch_articles(feed_url, max_items=30):
    feed = feedparser.parse(feed_url)
    articles = []
    for entry in feed.entries[:max_items]:
        text = extract_article_text(entry.link)
        if not text:
            continue
        articles.append({
            "title": entry.title,
            "url": entry.link,
            "published": entry.get("published"),
            "text": text,
        })
    return articles


def get_tickers(text):
    tickers = []
    for m in TICKER_RE.findall(text):
        t = m.replace("$", "").upper()
        if t in TICKER_STOP:
            continue
        if ticker_to_name and t not in ticker_to_name:
            continue
        tickers.append(t)
    return tickers


def get_companies(doc):
    companies = [ent.text for ent in doc.ents if ent.label_ == "ORG"]
    mapped = []
    for name in companies:
        ticker = name_to_ticker.get(name.lower())
        if ticker:
            mapped.append(ticker)
        else:
            mapped.append(name)
    return mapped


def get_sectors(text_lower):
    return [
        sector for sector, keywords in SECTOR_KEYWORDS.items()
        if any(kw in text_lower for kw in keywords)
    ]


def sentence_windows(sentences, idx, window=1):
    start = max(0, idx - window)
    end = min(len(sentences), idx + window + 1)
    return " ".join(sentences[start:end])


def analyze_article_entities(article, window=1):
    text = f"{article.get('title','')} {article.get('text','')}"
    if not text.strip():
        return [], [], []

    doc = nlp(text)
    sentences = [sent.text for sent in doc.sents]

    stock_scores = defaultdict(list)
    company_scores = defaultdict(list)
    sector_scores = defaultdict(list)

    cache = {}

    for i, sent in enumerate(sentences):
        sent_doc = nlp(sent)
        tickers = set(get_tickers(sent))
        companies = set(get_companies(sent_doc))
        sectors = set(get_sectors(sent.lower()))

        if not tickers and not companies and not sectors:
            continue

        window_text = sentence_windows(sentences, i, window=window)
        if window_text not in cache:
            cache[window_text] = get_text_sentiment_score(window_text)
        s = cache[window_text]

        for t in tickers:
            stock_scores[t].append(s)
        for c in companies:
            company_scores[c].append(s)
        for sec in sectors:
            sector_scores[sec].append(s)

    return stock_scores, company_scores, sector_scores


def aggregate_entities_with_sentiment(articles, window=1):
    stock_stats = defaultdict(lambda: {"mentions": 0, "scores": []})
    company_stats = defaultdict(lambda: {"mentions": 0, "scores": []})
    sector_stats = defaultdict(lambda: {"mentions": 0, "scores": []})

    for article in articles:
        stock_scores, company_scores, sector_scores = analyze_article_entities(article, window=window)

        for name, scores in stock_scores.items():
            stock_stats[name]["mentions"] += 1
            stock_stats[name]["scores"].append(sum(scores) / len(scores))

        for name, scores in company_scores.items():
            company_stats[name]["mentions"] += 1
            company_stats[name]["scores"].append(sum(scores) / len(scores))

        for name, scores in sector_scores.items():
            sector_stats[name]["mentions"] += 1
            sector_stats[name]["scores"].append(sum(scores) / len(scores))

    def finalize(stats):
        rows = []
        for name, data in stats.items():
            avg = sum(data["scores"]) / len(data["scores"]) if data["scores"] else 0.0
            rows.append({
                "name": name,
                "mentions": data["mentions"],
                "avg_sentiment": avg,
            })
        rows.sort(key=lambda x: x["mentions"], reverse=True)
        return rows

    return {
        "stocks": finalize(stock_stats),
        "companies": finalize(company_stats),
        "sectors": finalize(sector_stats),
    }


## Use

In [None]:
all_articles = []
for feed in FEEDS:
    all_articles.extend(fetch_articles(feed, max_items=MAX_ARTICLES_PER_FEED))

results = aggregate_entities_with_sentiment(all_articles, window=CONTEXT_SENTENCES)

results["stocks"][:10], results["sectors"][:10]

# Youtube Pipeline

## With Transcript

In [28]:
YOUTUBE_FEED = "https://www.youtube.com/feeds/videos.xml?channel_id=UCrp_UI8XtuYfpiqluWLD7Lw"
MAX_VIDEOS = 30

def fetch_youtube_videos(feed_url, max_items=30):
    feed = feedparser.parse(feed_url)
    ytt_api = YouTubeTranscriptApi()
    videos = []
    for entry in feed.entries[:max_items]:
        video_id = entry.get("yt_videoid") or entry.get("id")
        transcript_text = None
        if video_id:
            try:
                transcript = ytt_api.fetch(video_id)
                snippets = getattr(transcript, "snippets", transcript)
                transcript_text = " ".join((s.text or "").strip() for s in snippets if getattr(s, "text", None))
            except Exception:
                transcript_text = None

        combined_text = f"{entry.get('title', '')} {transcript_text or ''}"
        tickers = get_tickers(combined_text)
        doc = nlp(combined_text) if combined_text.strip() else None
        companies = get_companies(doc) if doc else []
        sectors = get_sectors(combined_text.lower())

        videos.append({
            "title": entry.get("title"),
            "url": entry.get("link"),
            "published": entry.get("published"),
            "video_id": video_id,
            "author": entry.get("author"),
            "summary": entry.get("summary"),
            "transcript_text": transcript_text,
            "mention_counts": {
                "stocks": len(tickers),
                "companies": len(companies),
                "sectors": len(sectors),
            },
        })
    return videos

youtube_videos = fetch_youtube_videos(YOUTUBE_FEED, max_items=MAX_VIDEOS)
youtube_videos[:3]


[{'title': 'Mad Money 01/09/26 | Audio Only',
  'url': 'https://www.youtube.com/watch?v=Ctwl6H8f9o8',
  'published': '2026-01-10T00:59:12+00:00',
  'video_id': 'Ctwl6H8f9o8',
  'author': 'CNBC Television',
  'summary': 'Listen to Jim Cramer’s personal guide through the confusing jungle of Wall Street investing, navigating through opportunities and pitfalls with one goal in mind - to help you make money. \n\nFor access to live and exclusive video from CNBC subscribe to CNBC PRO: https://cnb.cx/42d859g\n\n» Subscribe to CNBC TV: https://cnb.cx/SubscribeCNBCtelevision\n» Subscribe to CNBC: https://cnb.cx/SubscribeCNBC\n» Watch CNBC on the go with CNBC+: https://www.cnbc.com/WatchCNBCPlus\n\n\nTurn to CNBC TV for the latest stock market news and analysis. From market futures to live price updates CNBC is the leader in business news worldwide.\n\nConnect with CNBC News Online\nGet the latest news: http://www.cnbc.com/\nFollow CNBC on LinkedIn: https://cnb.cx/LinkedInCNBC\nFollow CNBC News o

## Check Ban

In [24]:
# from youtube_transcript_api import YouTubeTranscriptApi

test_video_id = "dQw4w9WgXcQ"  # Rick Roll - should have captions
try:
    ytt_api = YouTubeTranscriptApi()
    ytt_api.fetch(test_video_id)
    print("✅ Not banned - can fetch transcripts")
except Exception as e:
    print(f"❌ Error: {e}")
    if "429" in str(e) or "banned" in str(e).lower() or "blocked" in str(e).lower():
        print("⚠️ Likely IP banned")

❌ Error: 
Could not retrieve a transcript for the video https://www.youtube.com/watch?v=dQw4w9WgXcQ! This is most likely caused by:

YouTube is blocking requests from your IP. This usually is due to one of the following reasons:
- You have done too many requests and your IP has been blocked by YouTube
- You are doing requests from an IP belonging to a cloud provider (like AWS, Google Cloud Platform, Azure, etc.). Unfortunately, most IPs from cloud providers are blocked by YouTube.

Ways to work around this are explained in the "Working around IP bans" section of the README (https://github.com/jdepoix/youtube-transcript-api?tab=readme-ov-file#working-around-ip-bans-requestblocked-or-ipblocked-exception).


If you are sure that the described cause is not responsible for this error and that a transcript should be retrievable, please create an issue at https://github.com/jdepoix/youtube-transcript-api/issues. Please add which version of youtube_transcript_api you are using and provide the 

## Without Transcript

In [47]:
YOUTUBE_FEED = "https://www.youtube.com/feeds/videos.xml?channel_id=UCrp_UI8XtuYfpiqluWLD7Lw"
MAX_VIDEOS = 30

def fetch_youtube_videos_no_transcript(feed_url, max_items=30):
    """Fetch YouTube videos without transcripts - uses only title and summary from RSS feed"""
    feed = feedparser.parse(feed_url)
    videos = []
    
    for entry in feed.entries[:max_items]:
        video_id = entry.get("yt_videoid") or entry.get("id")
        
        # Use only title and summary (no transcript fetching)
        combined_text = f"{entry.get('title', '')} {entry.get('summary', '')}"
        
        tickers = get_tickers(combined_text)
        doc = nlp(combined_text) if combined_text.strip() else None
        companies = get_companies(doc) if doc else []
        sectors = get_sectors(combined_text.lower())

        videos.append({
            "title": entry.get("title"),
            "url": entry.get("link"),
            "published": entry.get("published"),
            "video_id": video_id,
            "author": entry.get("author"),
            "summary": entry.get("summary"),
            "transcript_text": None,  # No transcript
            "mention_counts": {
                "stocks": len(tickers),
                "companies": len(companies),
                "sectors": len(sectors),
            },
        })
    return videos

youtube_videos_no_transcript = fetch_youtube_videos_no_transcript(YOUTUBE_FEED, max_items=MAX_VIDEOS)
youtube_videos_no_transcript[:3]

[{'title': 'Mad Money 01/09/26 | Audio Only',
  'url': 'https://www.youtube.com/watch?v=Ctwl6H8f9o8',
  'published': '2026-01-10T00:59:12+00:00',
  'video_id': 'Ctwl6H8f9o8',
  'author': 'CNBC Television',
  'summary': 'Listen to Jim Cramer’s personal guide through the confusing jungle of Wall Street investing, navigating through opportunities and pitfalls with one goal in mind - to help you make money. \n\nFor access to live and exclusive video from CNBC subscribe to CNBC PRO: https://cnb.cx/42d859g\n\n» Subscribe to CNBC TV: https://cnb.cx/SubscribeCNBCtelevision\n» Subscribe to CNBC: https://cnb.cx/SubscribeCNBC\n» Watch CNBC on the go with CNBC+: https://www.cnbc.com/WatchCNBCPlus\n\n\nTurn to CNBC TV for the latest stock market news and analysis. From market futures to live price updates CNBC is the leader in business news worldwide.\n\nConnect with CNBC News Online\nGet the latest news: http://www.cnbc.com/\nFollow CNBC on LinkedIn: https://cnb.cx/LinkedInCNBC\nFollow CNBC News o

In [7]:
def analyze_youtube_video_entities(video, window=1):
    """Analyze YouTube video for entities with sentiment (similar to analyze_article_entities)"""
    # Combine title, summary, and transcript if available
    text_parts = [
        video.get('title', ''),
        video.get('summary', ''),
        video.get('transcript_text', '')
    ]
    text = ' '.join([part for part in text_parts if part])
    
    if not text.strip():
        return defaultdict(list), defaultdict(list), defaultdict(list)

    doc = nlp(text)
    sentences = [sent.text for sent in doc.sents]

    stock_scores = defaultdict(list)
    company_scores = defaultdict(list)
    sector_scores = defaultdict(list)

    cache = {}

    for i, sent in enumerate(sentences):
        sent_doc = nlp(sent)
        tickers = set(get_tickers(sent))
        companies = set(get_companies(sent_doc))
        sectors = set(get_sectors(sent.lower()))

        if not tickers and not companies and not sectors:
            continue

        window_text = sentence_windows(sentences, i, window=window)
        if window_text not in cache:
            cache[window_text] = get_text_sentiment_score(window_text)
        s = cache[window_text]

        for t in tickers:
            stock_scores[t].append(s)
        for c in companies:
            company_scores[c].append(s)
        for sec in sectors:
            sector_scores[sec].append(s)

    return stock_scores, company_scores, sector_scores


def aggregate_youtube_entities_with_sentiment(videos, window=1):
    """Aggregate YouTube video entities with sentiment (similar to aggregate_entities_with_sentiment)"""
    stock_stats = defaultdict(lambda: {"mentions": 0, "scores": []})
    company_stats = defaultdict(lambda: {"mentions": 0, "scores": []})
    sector_stats = defaultdict(lambda: {"mentions": 0, "scores": []})

    for video in videos:
        stock_scores, company_scores, sector_scores = analyze_youtube_video_entities(video, window=window)

        for name, scores in stock_scores.items():
            stock_stats[name]["mentions"] += 1
            stock_stats[name]["scores"].append(sum(scores) / len(scores) if scores else 0.0)

        for name, scores in company_scores.items():
            company_stats[name]["mentions"] += 1
            company_stats[name]["scores"].append(sum(scores) / len(scores) if scores else 0.0)

        for name, scores in sector_scores.items():
            sector_stats[name]["mentions"] += 1
            sector_stats[name]["scores"].append(sum(scores) / len(scores) if scores else 0.0)

    def finalize(stats):
        rows = []
        for name, data in stats.items():
            avg = sum(data["scores"]) / len(data["scores"]) if data["scores"] else 0.0
            rows.append({
                "name": name,
                "mentions": data["mentions"],
                "avg_sentiment": avg,
            })
        rows.sort(key=lambda x: x["mentions"], reverse=True)
        return rows

    return {
        "stocks": finalize(stock_stats),
        "companies": finalize(company_stats),
        "sectors": finalize(sector_stats),
    }

In [51]:
# Analyze YouTube videos and show stocks/companies/sectors with sentiment
youtube_results = aggregate_youtube_entities_with_sentiment(youtube_videos_no_transcript, window=CONTEXT_SENTENCES)

print("YouTube Videos - Most Talked About Stocks:")
for stock in youtube_results["stocks"][:10]:
    print(f"  {stock['name']}: {stock['mentions']} mentions, avg sentiment: {stock['avg_sentiment']:.3f}")

print("\nYouTube Videos - Most Talked About Companies:")
for company in youtube_results["companies"][:10]:
    print(f"  {company['name']}: {company['mentions']} mentions, avg sentiment: {company['avg_sentiment']:.3f}")

print("\nYouTube Videos - Most Talked About Sectors:")
for sector in youtube_results["sectors"][:10]:
    print(f"  {sector['name']}: {sector['mentions']} mentions, avg sentiment: {sector['avg_sentiment']:.3f}")

# Show top results
youtube_results["stocks"][:10], youtube_results["companies"][:10], youtube_results["sectors"][:10]

YouTube Videos - Most Talked About Stocks:
  BA: 1 mentions, avg sentiment: 0.000
  SLB: 1 mentions, avg sentiment: 0.000

YouTube Videos - Most Talked About Companies:
  CNBC: 3 mentions, avg sentiment: 0.000
  CNBC News Online: 1 mentions, avg sentiment: 0.000
  WhatsApp: 1 mentions, avg sentiment: 0.000
  FSLR: 1 mentions, avg sentiment: 0.000
  Radiant Logistics: 1 mentions, avg sentiment: 0.000
  Talen Energy: 1 mentions, avg sentiment: 0.000
  SailPoint: 1 mentions, avg sentiment: 0.000
  BA: 1 mentions, avg sentiment: 0.000
  SLB: 1 mentions, avg sentiment: 0.000
  SPY: 1 mentions, avg sentiment: 0.000

YouTube Videos - Most Talked About Sectors:
  Finance: 10 mentions, avg sentiment: 0.033
  Technology: 9 mentions, avg sentiment: -0.032
  Energy: 2 mentions, avg sentiment: 0.000
  Automotive: 1 mentions, avg sentiment: 0.000
  Healthcare: 1 mentions, avg sentiment: 0.834
  Real Estate: 1 mentions, avg sentiment: -0.504


([{'name': 'BA', 'mentions': 1, 'avg_sentiment': 0.0},
  {'name': 'SLB', 'mentions': 1, 'avg_sentiment': 0.0}],
 [{'name': 'CNBC', 'mentions': 3, 'avg_sentiment': 0.0},
  {'name': 'CNBC News Online', 'mentions': 1, 'avg_sentiment': 0.0},
  {'name': 'WhatsApp', 'mentions': 1, 'avg_sentiment': 0.0},
  {'name': 'FSLR', 'mentions': 1, 'avg_sentiment': 0.0},
  {'name': 'Radiant Logistics', 'mentions': 1, 'avg_sentiment': 0.0},
  {'name': 'Talen Energy', 'mentions': 1, 'avg_sentiment': 0.0},
  {'name': 'SailPoint', 'mentions': 1, 'avg_sentiment': 0.0},
  {'name': 'BA', 'mentions': 1, 'avg_sentiment': 0.0},
  {'name': 'SLB', 'mentions': 1, 'avg_sentiment': 0.0},
  {'name': 'SPY', 'mentions': 1, 'avg_sentiment': 0.0}],
 [{'name': 'Finance', 'mentions': 10, 'avg_sentiment': 0.03299798965454102},
  {'name': 'Technology', 'mentions': 9, 'avg_sentiment': -0.0315802428457472},
  {'name': 'Energy', 'mentions': 2, 'avg_sentiment': 0.0},
  {'name': 'Automotive', 'mentions': 1, 'avg_sentiment': 0.0},
 

In [56]:
feed = feedparser.parse(YOUTUBE_FEED)
print(f"Feed provides {len(feed.entries)} videos")

# If it's only 15, you're already getting the max from RSS
if len(feed.entries) < MAX_VIDEOS:
    print(f"Feed only has {len(feed.entries)} videos, but you set MAX_VIDEOS={MAX_VIDEOS}")
    print("   You're already getting all available videos from this RSS feed")

Feed provides 15 videos
Feed only has 15 videos, but you set MAX_VIDEOS=30
   You're already getting all available videos from this RSS feed


## Youtube with Google API

In [29]:
# YouTube Data API Configuration
load_dotenv()
YOUTUBE_API_KEY = os.getenv("API_KEY")
CHANNEL_ID = "UCrp_UI8XtuYfpiqluWLD7Lw"  # CNBC channel
MAX_VIDEOS = 100

def fetch_youtube_videos_with_api(channel_id, api_key, max_results=100):
    """Fetch YouTube videos using Data API (no transcripts needed)"""
    youtube = build('youtube', 'v3', developerKey=api_key)
    videos = []
    next_page_token = None
    uploads_playlist_id = None
    
    print(f"Fetching videos from channel {channel_id}...")
    
    while len(videos) < max_results:
        try:
            # First, get the uploads playlist ID for the channel
            if uploads_playlist_id is None:  # Only need to do this once
                channel_response = youtube.channels().list(
                    part='contentDetails',
                    id=channel_id
                ).execute()
                
                if not channel_response.get('items'):
                    print(f"❌ Channel {channel_id} not found")
                    break
                
                uploads_playlist_id = channel_response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
            
            # Get videos from uploads playlist
            if uploads_playlist_id:
                request = youtube.playlistItems().list(
                    part='snippet,contentDetails',
                    playlistId=uploads_playlist_id,
                    maxResults=min(max_results, max_results - len(videos)),
                    pageToken=next_page_token
                )
            else:
                # Fallback: search for videos from channel
                request = youtube.search().list(
                    part='snippet',
                    channelId=channel_id,
                    type='video',
                    maxResults=min(max_results, max_results - len(videos)),
                    pageToken=next_page_token,
                    order='date'
                )
            
            response = request.execute()
            
            # Get video IDs
            video_ids = []
            for item in response['items']:
                if 'contentDetails' in item:
                    video_ids.append(item['contentDetails']['videoId'])
                elif 'id' in item and 'videoId' in item['id']:
                    video_ids.append(item['id']['videoId'])
            
            # Get detailed video information
            if video_ids:
                video_details = youtube.videos().list(
                    part='snippet,statistics',
                    id=','.join(video_ids)
                ).execute()
                
                for item in video_details['items']:
                    snippet = item['snippet']
                    videos.append({
                        'title': snippet.get('title', ''),
                        'video_id': item['id'],
                        'url': f"https://www.youtube.com/watch?v={item['id']}",
                        'published': snippet.get('publishedAt', ''),
                        'published_date': snippet.get('publishedAt', ''),
                        'author': snippet.get('channelTitle', ''),
                        'summary': snippet.get('description', ''),  # Full description
                        'transcript_text': None,  # No transcript (IP banned)
                        'view_count': item['statistics'].get('viewCount', 0),
                        'like_count': item['statistics'].get('likeCount', 0),
                    })
            
            next_page_token = response.get('nextPageToken')
            if not next_page_token:
                break
                
            print(f"  Fetched {len(videos)} videos so far...")
            
        except Exception as e:
            print(f"❌ Error fetching videos: {e}")
            break
    
    print(f"✅ Total videos fetched: {len(videos)}")
    return videos

# Fetch videos using API
youtube_videos_api = fetch_youtube_videos_with_api(CHANNEL_ID, YOUTUBE_API_KEY, max_results=MAX_VIDEOS)

# Show first few videos
youtube_videos_api[:3]

Fetching videos from channel UCrp_UI8XtuYfpiqluWLD7Lw...
  Fetched 50 videos so far...
  Fetched 100 videos so far...
✅ Total videos fetched: 100


[{'title': 'Mad Money 01/09/26 | Audio Only',
  'video_id': 'Ctwl6H8f9o8',
  'url': 'https://www.youtube.com/watch?v=Ctwl6H8f9o8',
  'published': '2026-01-10T00:59:12Z',
  'published_date': '2026-01-10T00:59:12Z',
  'author': 'CNBC Television',
  'summary': 'Listen to Jim Cramer’s personal guide through the confusing jungle of Wall Street investing, navigating through opportunities and pitfalls with one goal in mind - to help you make money. \n\nFor access to live and exclusive video from CNBC subscribe to CNBC PRO: https://cnb.cx/42d859g\n\n» Subscribe to CNBC TV: https://cnb.cx/SubscribeCNBCtelevision\n» Subscribe to CNBC: https://cnb.cx/SubscribeCNBC\n» Watch CNBC on the go with CNBC+: https://www.cnbc.com/WatchCNBCPlus\n\n\nTurn to CNBC TV for the latest stock market news and analysis. From market futures to live price updates CNBC is the leader in business news worldwide.\n\nConnect with CNBC News Online\nGet the latest news: http://www.cnbc.com/\nFollow CNBC on LinkedIn: https://

# Caching Transcript

In [None]:
# Transcript fetching with caching and conservative throttling
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
import json
import time
import random
from pathlib import Path

TRANSCRIPT_CACHE_PATH = Path('daily_transcripts.json')

def load_transcript_cache(path):
    if path.exists():
        return json.loads(path.read_text(encoding='utf-8'))
    return {}

def save_transcript_cache(path, cache):
    path.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding='utf-8')

def fetch_transcript_with_backoff(video_id, max_retries=3):
    delay = 1.0
    for attempt in range(1, max_retries + 1):
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            # Join transcript segments into a single string
            return ' '.join([seg.get('text', '') for seg in transcript])
        except (TranscriptsDisabled, NoTranscriptFound):
            return None
        except Exception:
            if attempt == max_retries:
                return None
            time.sleep(delay + random.random())
            delay *= 2

def attach_transcripts(videos, cache_path=TRANSCRIPT_CACHE_PATH):
    # Latest IDs in order; used for pruning
    latest_ids = [v.get('video_id') for v in videos if v.get('video_id')]

    cache = load_transcript_cache(cache_path)
    # Drop anything not in the latest 100
    cache = {vid: cache.get(vid) for vid in latest_ids if vid in cache}

    for idx, video in enumerate(videos, start=1):
        vid = video.get('video_id')
        if not vid:
            continue
        if vid in cache:
            video['transcript_text'] = cache[vid]
            continue
        # Conservative pacing with jitter
        time.sleep(0.75 + random.random() * 0.75)
        transcript_text = fetch_transcript_with_backoff(vid)
        video['transcript_text'] = transcript_text
        cache[vid] = transcript_text
        if idx % 10 == 0:
            save_transcript_cache(cache_path, cache)

    # Final prune to ensure cache only has latest 100 IDs
    cache = {vid: cache.get(vid) for vid in latest_ids}
    save_transcript_cache(cache_path, cache)
    return videos

# Attach transcripts for the latest 100 videos
youtube_videos_api = attach_transcripts(youtube_videos_api)


In [14]:
CONTEXT_SENTENCES = 1
result = aggregate_youtube_entities_with_sentiment(youtube_videos_api, window=CONTEXT_SENTENCES)

In [20]:
result['sectors']

[{'name': 'Technology', 'mentions': 54, 'avg_sentiment': -0.05197525889049342},
 {'name': 'Finance', 'mentions': 52, 'avg_sentiment': -0.029143351058547314},
 {'name': 'Automotive', 'mentions': 29, 'avg_sentiment': -0.16226489379488188},
 {'name': 'Energy', 'mentions': 23, 'avg_sentiment': -0.05071821420089058},
 {'name': 'Real Estate', 'mentions': 6, 'avg_sentiment': 0.06632298231124878},
 {'name': 'Healthcare', 'mentions': 5, 'avg_sentiment': -0.026582205295562746},
 {'name': 'Retail', 'mentions': 3, 'avg_sentiment': 0.13232562939325967},
 {'name': 'Aerospace', 'mentions': 2, 'avg_sentiment': 0.0},
 {'name': 'Telecommunications', 'mentions': 1, 'avg_sentiment': 0.0}]

# Check youtube IP ban

In [46]:
from youtube_transcript_api import YouTubeTranscriptApi

test_video_id = "dQw4w9WgXcQ"  # Rick Roll - should have captions
try:
    ytt_api = YouTubeTranscriptApi()
    transcript = ytt_api.fetch(test_video_id)
    print("✅ Not banned - can fetch transcripts")
except Exception as e:
    print(f"❌ Error: {e}")
    if "429" in str(e) or "banned" in str(e).lower() or "blocked" in str(e).lower():
        print("⚠️ Likely IP banned")

❌ Error: 
Could not retrieve a transcript for the video https://www.youtube.com/watch?v=dQw4w9WgXcQ! This is most likely caused by:

YouTube is blocking requests from your IP. This usually is due to one of the following reasons:
- You have done too many requests and your IP has been blocked by YouTube
- You are doing requests from an IP belonging to a cloud provider (like AWS, Google Cloud Platform, Azure, etc.). Unfortunately, most IPs from cloud providers are blocked by YouTube.

Ways to work around this are explained in the "Working around IP bans" section of the README (https://github.com/jdepoix/youtube-transcript-api?tab=readme-ov-file#working-around-ip-bans-requestblocked-or-ipblocked-exception).


If you are sure that the described cause is not responsible for this error and that a transcript should be retrievable, please create an issue at https://github.com/jdepoix/youtube-transcript-api/issues. Please add which version of youtube_transcript_api you are using and provide the 