In [21]:
import yfinance as yf
import snscrape.modules.twitter as sntwitter
import pandas as pd
from datetime import datetime, timedelta

import pprint


def _to_date(d):
    """Normalize input to a date object.

    Accepts a string in ISO format 'YYYY-MM-DD', a datetime, or a date
    and returns a datetime.date.
    """
    from datetime import date as _date

    if isinstance(d, str):
        # Try ISO format first (YYYY-MM-DD)
        try:
            return datetime.fromisoformat(d).date()
        except Exception:
            # Fallback to common format
            return datetime.strptime(d, "%Y-%m-%d").date()
    if isinstance(d, datetime):
        return d.date()
    if isinstance(d, _date):
        return d
    raise TypeError(f"Unsupported date type: {type(d)}")


def _parse_timestamp(item):
    """Return a datetime (or None) from several possible fields in a yfinance news item."""
    ts = item.get('providerPublishTime')
    if ts:
        try:
            ts = int(ts)
            if ts > 1e12:
                return datetime.fromtimestamp(ts / 1000.0)
            else:
                return datetime.fromtimestamp(ts)
        except Exception:
            pass

    for key in ('pubDate',):
        val = item.get(key) or (item.get('content') or {}).get(key)
        if val and isinstance(val, str):
            try:
                s = val.rstrip('Z')
                return datetime.fromisoformat(s)
            except Exception:
                try:
                    return datetime.strptime(val, '%Y-%m-%dT%H:%M:%SZ')
                except Exception:
                    pass

    val = (item.get('content') or {}).get('displayTime')
    if val and isinstance(val, str):
        try:
            s = val.rstrip('Z')
            return datetime.fromisoformat(s)
        except Exception:
            try:
                return datetime.strptime(val, '%Y-%m-%dT%H:%M:%SZ')
            except Exception:
                pass

    return None


def get_stock_news(ticker, start_date, end_date, verbose=False, include_latest_if_empty=False, latest_n=10):
    """Get news headlines for a stock using yfinance with robust date parsing.

    If include_latest_if_empty is True and no items fall in the requested
    date range, return up to `latest_n` most-recent items from yfinance.news
    (useful when yfinance only has very recent items outside the requested
    range).
    """
    stock = yf.Ticker(ticker)
    news_out = []

    sd = _to_date(start_date)
    ed = _to_date(end_date)

    if verbose:
        print('fetched items:', len(stock.news))

    parsed_items = []
    # First pass: parse timestamps for all items and collect those in range
    for item in stock.news:
        try:
            pub_dt = _parse_timestamp(item)
            if pub_dt is None:
                if verbose:
                    print('could not parse timestamp for item; keys:', list(item.keys()))
                continue
            parsed_items.append((pub_dt, item))
            pub_date_only = pub_dt.date()
            if sd <= pub_date_only <= ed:
                content_block = item.get('content') or {}
                title = item.get('title') or content_block.get('title') or content_block.get('headline') or ''
                content_text = item.get('content') or content_block.get('summary') or content_block.get('description') or ''
                news_out.append({
                    'date': pub_date_only,
                    'title': title,
                    'content': content_text
                })
        except Exception as e:
            if verbose:
                print('error processing news item:', e)
            continue

    # If nothing matched and user asked for latest items, return most-recent parsed items
    if len(news_out) == 0 and include_latest_if_empty and parsed_items:
        if verbose:
            print('No items in range — returning latest', latest_n, 'items instead')
        # sort parsed items by datetime desc and take latest_n
        parsed_items.sort(key=lambda x: x[0], reverse=True)
        for pub_dt, item in parsed_items[:latest_n]:
            pub_date_only = pub_dt.date()
            content_block = item.get('content') or {}
            title = item.get('title') or content_block.get('title') or content_block.get('headline') or ''
            content_text = item.get('content') or content_block.get('summary') or content_block.get('description') or ''
            news_out.append({
                'date': pub_date_only,
                'title': title,
                'content': content_text
            })

    return pd.DataFrame(news_out)


def get_twitter_posts(query, start_date, end_date, max_tweets=1000):
    """Get tweets mentioning a stock ticker"""
    tweets = []
    # Build search strings. Accepts either strings or date/datetime objects.
    if isinstance(start_date, (str,)):
        start_str = start_date
    else:
        start_str = _to_date(start_date).isoformat()

    if isinstance(end_date, (str,)):
        end_str = end_date
    else:
        end_str = _to_date(end_date).isoformat()

    search_query = f"${query} since:{start_str} until:{end_str}"
    
    for i, tweet in enumerate(sntwitter.TwitterSearchScraper(search_query).get_items()):
        if i >= max_tweets:
            break
        tweets.append({
            'date': tweet.date.date(),
            'content': getattr(tweet, 'rawContent', getattr(tweet, 'content', '')),
            'url': getattr(tweet, 'url', None)
        })
    
    return pd.DataFrame(tweets)


In [24]:
news_df = get_stock_news('AAPL', '2020-01-01', '2025-10-01', verbose=True)
print('Collected:', len(news_df))
news_df.head()

fetched items: 10
Collected: 0


# News sources added

This notebook now includes two alternative methods to fetch historical news for a ticker:

1. NewsAPI (recommended) — reliable and paginated, requires an API key. Use `fetch_news_newsapi(...)`.
2. Yahoo Finance HTML scraper (best-effort) — no API key but fragile and may miss or break over time. Use `fetch_news_yahoo(...)`.

Run the appropriate cell(s) below and provide an API key for the NewsAPI cell if you choose that option.

In [25]:
# NewsAPI fetcher (replace YOUR_API_KEY)
import requests
import pandas as pd
from datetime import datetime

def fetch_news_newsapi(q, from_date, to_date, api_key, page_size=100, max_pages=10, verbose=True):
    """Fetch news via NewsAPI (https://newsapi.org/). Returns a DataFrame.

    - q: query string (e.g., 'AAPL OR Apple')
    - from_date/to_date: 'YYYY-MM-DD' strings
    - api_key: your NewsAPI key
    """
    url = "https://newsapi.org/v2/everything"
    page = 1
    all_articles = []
    while page <= max_pages:
        params = {
            'q': q,
            'from': from_date,
            'to': to_date,
            'pageSize': page_size,
            'page': page,
            'apiKey': api_key,
            'language': 'en'
        }
        r = requests.get(url, params=params, timeout=30)
        r.raise_for_status()
        data = r.json()
        articles = data.get('articles', [])
        if verbose:
            print(f"page {page}: got {len(articles)} articles")
        if not articles:
            break
        for a in articles:
            published = a.get('publishedAt')
            try:
                if published:
                    dt = datetime.fromisoformat(published.rstrip('Z'))
                else:
                    dt = None
            except Exception:
                dt = None
            all_articles.append({
                'date': dt.date() if dt else None,
                'title': a.get('title'),
                'content': a.get('content') or a.get('description'),
                'source': a.get('source', {}).get('name'),
                'url': a.get('url')
            })
        total = data.get('totalResults')
        # stop if we've collected all or if fewer than page_size returned
        if total and len(all_articles) >= total:
            break
        if len(articles) < page_size:
            break
        page += 1
    return pd.DataFrame(all_articles)


In [30]:
# Yahoo Finance HTML scraper (best-effort)
# NOTE: fragile; Yahoo may change their layout. Use responsibly and respect robots.txt/TOS.
import requests
from bs4 import BeautifulSoup
import pandas as pd

HEADERS = {"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"}

def fetch_news_yahoo(ticker, max_pages=3, verbose=True, session=None):
    """Scrape news items from Yahoo Finance quote/news pages (best-effort).

    This function tries several common Yahoo URLs and falls back to the quote page if the dedicated news page returns 404.
    It uses a requests.Session with a browser-like User-Agent and de-duplicates results.
    Returns a DataFrame with columns ['title','url'] or an empty DataFrame if nothing is found.
    """
    if session is None:
        session = requests.Session()
    session.headers.update(HEADERS)

    # Try a few URL formats that Yahoo has used
    tried_urls = [
        f"https://finance.yahoo.com/quote/{ticker}/news?p={ticker}",
        f"https://finance.yahoo.com/quote/{ticker}/news",
        f"https://finance.yahoo.com/quote/{ticker}",
    ]

    articles = []
    for url in tried_urls:
        try:
            if verbose:
                print('fetching', url)
            r = session.get(url, timeout=20)
            # If we get a 404 for one of the common shapes, try the next URL instead of failing
            if r.status_code == 404:
                if verbose:
                    print('404 for', url)
                continue
            r.raise_for_status()
            soup = BeautifulSoup(r.text, 'html.parser')

            items = []
            # Common selectors: list items that contain stream news, h3 anchors, and links that include '/news/'
            for node in soup.select('li.js-stream-content h3 a, h3 a, a[href*="/news/"]'):
                title = node.get_text(strip=True)
                href = node.get('href')
                if not href:
                    continue
                if href.startswith('/'):
                    href = 'https://finance.yahoo.com' + href
                items.append((title, href))

            # Fallback: look for <article> tags or long anchor text that looks like headlines
            if not items:
                for article in soup.find_all('article'):
                    a = article.find('a')
                    if not a:
                        continue
                    title = a.get_text(strip=True)
                    href = a.get('href')
                    if href and href.startswith('/'):
                        href = 'https://finance.yahoo.com' + href
                    items.append((title, href))

            # Final fallback: any long anchor with a '/news/' path
            if not items:
                for a in soup.select('a'):
                    href = a.get('href') or ''
                    txt = a.get_text(strip=True)
                    if '/news/' in href and txt:
                        if href.startswith('/'):
                            href = 'https://finance.yahoo.com' + href
                        items.append((txt, href))

            # De-duplicate while preserving order
            seen = set()
            for title, href in items:
                if not href or href in seen:
                    continue
                seen.add(href)
                articles.append({'title': title, 'url': href})

            # If we found any articles on this page, stop trying other URL shapes
            if articles:
                break
        except requests.exceptions.HTTPError as e:
            if verbose:
                print('HTTP error while fetching', url, e)
            continue
        except Exception as e:
            if verbose:
                print('Error fetching/parsing', url, e)
            continue

    if not articles and verbose:
        print('No articles found on Yahoo for', ticker, '- try network/headers or use NewsAPI as a fallback')

    return pd.DataFrame(articles)


In [31]:
# Examples / quick tests (run locally)
# 1) NewsAPI (requires API key)
# df = fetch_news_newsapi('AAPL OR Apple', '2024-01-01', '2025-10-01', api_key='YOUR_API_KEY', page_size=100)
# print('newsapi rows:', len(df))

# 2) Yahoo scraper (best-effort)
df_y = fetch_news_yahoo('AAPL')
print('yahoo items:', len(df_y))
df_y.head()

print('Cells added: NewsAPI fetcher and Yahoo scraper.\nUse the commented example calls to test locally.')


fetching https://finance.yahoo.com/quote/AAPL/news?p=AAPL
404 for https://finance.yahoo.com/quote/AAPL/news?p=AAPL
fetching https://finance.yahoo.com/quote/AAPL/news
404 for https://finance.yahoo.com/quote/AAPL/news
fetching https://finance.yahoo.com/quote/AAPL
404 for https://finance.yahoo.com/quote/AAPL/news?p=AAPL
fetching https://finance.yahoo.com/quote/AAPL/news
404 for https://finance.yahoo.com/quote/AAPL/news
fetching https://finance.yahoo.com/quote/AAPL
yahoo items: 36
Cells added: NewsAPI fetcher and Yahoo scraper.
Use the commented example calls to test locally.
yahoo items: 36
Cells added: NewsAPI fetcher and Yahoo scraper.
Use the commented example calls to test locally.


# Sentiment Analysis

In [64]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import pipeline
import torch
import numpy as np
import sys

# Robust model loader: prefer safetensors (avoids torch.load vulnerability),
# otherwise require torch >= 2.6.0 so transformers can safely call torch.load.
def _torch_version_ok(min_ver="2.6.0"):
    try:
        # packaging is the most reliable; fall back to simple parsing if missing
        from packaging.version import parse as _parse
        v = getattr(torch, '__version__', '0.0.0').split('+')[0]
        return _parse(v) >= _parse(min_ver)
    except Exception:
        try:
            v = getattr(torch, '__version__', '0.0.0').split('+')[0]
            major, minor = v.split('.')[:2]
            return (int(major), int(minor)) >= tuple(int(x) for x in min_ver.split('.')[:2])
        except Exception:
            return False


def _try_load_finbert():
    # First attempt: load safetensors weights (if present on HF) — avoids torch.load
    try:
        print('Trying to load FinBERT using safetensors (use_safetensors=True) ...')
        tokenizer = AutoTokenizer.from_pretrained('ProsusAI/finbert')
        model = AutoModelForSequenceClassification.from_pretrained('ProsusAI/finbert', use_safetensors=True)
        pipe = pipeline('sentiment-analysis', model=model, tokenizer=tokenizer)
        # expose raw model/tokenizer for probability-based scoring
        globals()['finbert_model'] = model
        globals()['finbert_tokenizer'] = tokenizer
        print('Loaded FinBERT with safetensors.')
        return pipe
    except Exception as e_s:
        if hasattr(e_s, '__class__'):
            # Keep message brief but informative
            print('safetensors load failed or not available:', e_s)
        else:
            print('safetensors load failed (no further details)')

    # If safetensors failed, ensure torch is new enough for transformers to call torch.load safely
    if not _torch_version_ok('2.6.0'):
        print('Current torch version:', getattr(torch, '__version__', None))
        print('Transformer loading requires torch >= 2.6.0 due to a security fix.')
        raise RuntimeError('Please upgrade torch to >= 2.6.0 to load FinBERT safely, or ensure safetensors are available for this model.')

    # Try loading the standard HF pipeline (this will use torch.load internally)
    try:
        print('Attempting to load FinBERT pipeline using standard weights...')
        pipe = pipeline('sentiment-analysis', model='ProsusAI/finbert', tokenizer='ProsusAI/finbert')
        # pipeline contains model/tokenizer attributes we can use for logits
        try:
            globals()['finbert_model'] = pipe.model
            globals()['finbert_tokenizer'] = pipe.tokenizer
        except Exception:
            # best-effort; if unavailable, finbert_model may remain unset and we'll fallback
            pass
        print('Loaded FinBERT pipeline (standard weights).')
        return pipe
    except Exception as e_p:
        print('Failed to load FinBERT pipeline with standard weights:', e_p)
        raise

# Create pipeline object (finbert) or leave None with helpful message
try:
    finbert = _try_load_finbert()
except Exception as e_final:
    finbert = None
    print('FinBERT is not available in this environment. See messages above to upgrade torch or enable safetensors.')


Trying to load FinBERT using safetensors (use_safetensors=True) ...


Device set to use cuda:0


Loaded FinBERT with safetensors.


In [68]:
def analyze_sentiment_batch(texts, backend='auto'):
    """Analyze sentiment for a batch of texts.

    Parameters
    - texts: list[str]
    - backend: 'auto'|'finbert'|'nlptown' ;
        'auto' will prefer nlptown (intensity-aware 1-5 star) if available, otherwise FinBERT.

    Returns
    - list[int]: scores in range [-10..10]
    """
    # Helper mappers
    def map_nlptown_label_to_score(label):
        import re
        m = re.search(r'([1-5])', (label or ''))
        if not m:
            return 0
        stars = int(m.group(1))
        return int(round(((stars - 3) / 2.0) * 10.0))

    def map_prob_to_score(label, prob):
        lab = (label or '').lower()
        sign = 0
        if 'pos' in lab or 'positive' in lab:
            sign = 1
        elif 'neg' in lab or 'negative' in lab:
            sign = -1
        else:
            return 0
        intensity = max(0.0, (float(prob) - 0.5) / 0.5)
        return int(round(sign * intensity * 10.0))

    # Decide backend
    chosen = backend
    if backend == 'auto':
        # prefer nlptown (1-5 star mapping) since it produced better separation in your diagnostic
        chosen = 'nlptown'

    # NLPTOWN backend
    if chosen == 'nlptown':
        try:
            nlptown_pipe = globals().get('nlptown_pipe')
            if nlptown_pipe is None:
                # lazily create and cache
                from transformers import pipeline as _pipeline
                nlptown_pipe = _pipeline('sentiment-analysis', model='nlptown/bert-base-multilingual-uncased-sentiment', tokenizer='nlptown/bert-base-multilingual-uncased-sentiment')
                globals()['nlptown_pipe'] = nlptown_pipe

            # Use top_k=None / return_all_scores behavior for distributions when available
            try:
                dist = nlptown_pipe(texts, return_all_scores=True)
            except TypeError:
                # older transformers may require single-call or not support return_all_scores
                dist = [nlptown_pipe(t, return_all_scores=True) if isinstance(t, str) else [] for t in texts]

            mapped = []
            for d in dist:
                if isinstance(d, list) and len(d) > 0:
                    top = sorted(d, key=lambda x: x.get('score', 0.0), reverse=True)[0]
                    lab = top.get('label', '')
                    mapped.append(map_nlptown_label_to_score(lab))
                else:
                    mapped.append(0)
            return mapped
        except Exception as e:
            print('nlptown backend failed, falling back to FinBERT logic:', e)
            # fall through to finbert logic below

    # FINBERT (original logic) - attempt to use the existing finbert model/pipeline
    if finbert is None:
        raise RuntimeError('FinBERT pipeline is not loaded and nlptown backend failed. Upgrade torch or enable safetensors or run the nlptown diagnostic cell.')

    # Keep previous label/margin-based logic for finbert
    intensity_kw = {
        'very': 1.0,
        'strong': 1.0,
        'strongly': 1.0,
        'high': 0.95,
        'heavy': 0.95,
        'severe': 0.95,
        'major': 0.9,
        'sharp': 0.9,
        'plunge': 0.95,
        'plummets': 1.0,
        'surge': 0.95,
        'record': 0.9,
        'huge': 1.0,
        'massive': 1.0,
        'slight': 0.35,
        'slightly': 0.35,
        'modest': 0.45,
        'minor': 0.4,
        'mild': 0.4,
        'nominal': 0.3,
        'small': 0.35,
        'possible': 0.5,
        'likely': 0.7,
        'announces layoffs': 0.95,
        'layoff': 0.95,
        'layoffs': 0.95
    }

    def _label_contains_intensity(label_str):
        s = (label_str or '').lower()
        for kw, val in intensity_kw.items():
            if kw in s:
                return float(val)
        return None

    def _label_sign(label_str):
        s = (label_str or '').lower()
        if 'pos' in s or 'positive' in s:
            return 1
        if 'neg' in s or 'negative' in s:
            return -1
        return 0

    finbert_model_obj = globals().get('finbert_model', None)
    finbert_tokenizer_obj = globals().get('finbert_tokenizer', None)

    # Try label-space intensity hints first (if model provides them)
    label_set = []
    if finbert_model_obj is not None:
        cfg = getattr(finbert_model_obj, 'config', None)
        id2label = getattr(cfg, 'id2label', {}) if cfg is not None else {}
        label_set = [str(v).lower() for v in id2label.values()] if id2label else []
    else:
        try:
            probe = finbert(['test'], return_all_scores=True)
            if probe and isinstance(probe, list) and len(probe) > 0:
                label_set = [d.get('label', '').lower() for d in probe[0]]
        except Exception:
            label_set = []

    detected_intensity_labels = any(_label_contains_intensity(l) is not None for l in label_set)

    # If detected intensity labels and raw model available
    if detected_intensity_labels and finbert_model_obj is not None and finbert_tokenizer_obj is not None:
        inputs = finbert_tokenizer_obj(texts, return_tensors='pt', padding=True, truncation=True)
        try:
            device = next(finbert_model_obj.parameters()).device
        except Exception:
            device = None
        if device is not None:
            inputs = {k: v.to(device) for k, v in inputs.items()}
        with torch.no_grad():
            outputs = finbert_model_obj(**inputs)
            logits = outputs.logits
            probs = torch.nn.functional.softmax(logits, dim=1).cpu().numpy()

        cfg = getattr(finbert_model_obj, 'config', None)
        id2label = {int(k): v.lower() for k, v in getattr(cfg, 'id2label', {}).items()} if cfg is not None else {}
        scores = []
        for p in probs:
            top_idx = int(p.argmax())
            top_label = id2label.get(top_idx, '')
            sign = _label_sign(top_label)
            intensity = _label_contains_intensity(top_label)
            if intensity is None:
                top2 = _np.partition(p, -2)[-2:]
                margin = float(top2[-1] - top2[0]) if len(top2) >= 2 else float(p[top_idx])
                intensity = float(_np.tanh(margin / 1.0))
            val = int(round(sign * intensity * 10.0))
            scores.append(max(-10, min(10, val)))
        return scores

    # Fallback: margin-based using raw logits if available
    if finbert_model_obj is not None and finbert_tokenizer_obj is not None:
        inputs = finbert_tokenizer_obj(texts, return_tensors='pt', padding=True, truncation=True)
        try:
            device = next(finbert_model_obj.parameters()).device
        except Exception:
            device = None
        if device is not None:
            inputs = {k: v.to(device) for k, v in inputs.items()}
        with torch.no_grad():
            outputs = finbert_model_obj(**inputs)
            logits = outputs.logits
        top_vals, top_idx = torch.topk(logits, k=2, dim=1)
        margins = (top_vals[:, 0] - top_vals[:, 1]).cpu().numpy()
        cfg = getattr(finbert_model_obj, 'config', None)
        id2label = {int(k): v.lower() for k, v in getattr(cfg, 'id2label', {}).items()} if cfg is not None else {}
        signs = []
        for idx in top_idx[:, 0].cpu().numpy():
            lbl = id2label.get(int(idx), '')
            signs.append(_label_sign(lbl))
        norm = _np.tanh(margins / 1.0)
        scores = [int(round(float(sgn) * float(n) * 10.0)) for sgn, n in zip(signs, norm)]
        scores = [max(-10, min(10, v)) for v in scores]
        return scores

    # Last fallback: pipeline single-shot
    results = finbert(texts)
    scores = []
    for r in results:
        label = (r.get('label', '') or '').lower()
        sign = _label_sign(label)
        conf = float(r.get('score', 0.0) or 0.0)
        val = int(round(sign * _np.tanh((conf - 0.5) * 2.0) * 10.0))
        scores.append(max(-10, min(10, val)))
    return scores


In [69]:
# FinBERT loader completed in previous cell. Check whether it loaded successfully.
print('FinBERT loaded:', finbert is not None)
if finbert is None:
    print('FinBERT pipeline is not available. Upgrade torch to >=2.6 or enable safetensors as instructed in the previous cell.')

FinBERT loaded: True


In [71]:
# Very simple examples: one positive, one negative, one neutral headline
print('finbert available:', finbert is not None)

examples = [
    ('negative', 'Company posts record a slight loss in profits.'),
    ('negative', 'Company reports heavy losses and announces layoffs.'),
    ('neutral', 'Company schedules conference call to discuss quarterly results.')
]

if finbert is None:
    print('FinBERT pipeline not loaded — see previous cell for instructions to upgrade torch or enable safetensors.')
else:
    texts = [t for _, t in examples]
    scores = analyze_sentiment_batch(texts)
    for (label, text), score in zip(examples, scores):
        print(f"Sentiment: {label:>8} | Score: {score:>5} | Text: {text}")

# End of simple examples

# --- Diagnostic: try alternative open-source sentiment models and map to -10..10 ---
# This cell attempts several publicly-available models to see whether any give a more
# intensity-sensitive output for the example headlines. Run this cell locally (it will
# download models from Hugging Face) and inspect printed scores to decide which to use.

from transformers import pipeline
import numpy as _np

candidate_models = [
    'nlptown/bert-base-multilingual-uncased-sentiment',
    'cardiffnlp/twitter-roberta-base-sentiment',
    'finiteautomata/bertweet-base-sentiment-analysis',
]

texts = [t for _, t in examples]


def map_nlptown_label_to_score(label):
    import re
    m = re.search(r'([1-5])', (label or ''))
    if not m:
        return 0
    stars = int(m.group(1))
    return int(round(((stars - 3) / 2.0) * 10.0))


def map_prob_to_score(label, prob):
    lab = (label or '').lower()
    sign = 0
    if 'pos' in lab or 'positive' in lab:
        sign = 1
    elif 'neg' in lab or 'negative' in lab:
        sign = -1
    else:
        return 0
    intensity = max(0.0, (float(prob) - 0.5) / 0.5)
    return int(round(sign * intensity * 10.0))

for mid in candidate_models:
    print('\nModel:', mid)
    try:
        pipe = pipeline('sentiment-analysis', model=mid, tokenizer=mid)
    except Exception as e:
        print('  Could not load model:', e)
        continue

    # Try return_all_scores first
    try:
        dist = pipe(texts, return_all_scores=True)
        mapped = []
        for d in dist:
            if isinstance(d, list) and len(d) > 0:
                top = sorted(d, key=lambda x: x.get('score', 0.0), reverse=True)[0]
                lab = top.get('label', '')
                scr = top.get('score', 0.0)
                if 'nlptown' in mid:
                    mapped.append(map_nlptown_label_to_score(lab))
                else:
                    mapped.append(map_prob_to_score(lab, scr))
            else:
                mapped.append(0)
        print('  mapped scores:', mapped)
        continue
    except TypeError:
        # some pipeline variants don't accept return_all_scores for multi-inputs; fall back
        pass
    except Exception as e:
        print('  return_all_scores not available or failed:', e)

    try:
        res = pipe(texts)
        mapped = []
        for r in res:
            lab = (r.get('label', '') or '').lower()
            scr = float(r.get('score', 0.0) or 0.0)
            if 'nlptown' in mid:
                mapped.append(map_nlptown_label_to_score(r.get('label', '')))
            else:
                mapped.append(map_prob_to_score(r.get('label', ''), scr))
        print('  mapped scores:', mapped)
    except Exception as e:
        print('  single-shot pipeline call failed:', e)


finbert available: True
Sentiment: negative | Score:    -5 | Text: Company posts record a slight loss in profits.
Sentiment: negative | Score:   -10 | Text: Company reports heavy losses and announces layoffs.
Sentiment:  neutral | Score:     5 | Text: Company schedules conference call to discuss quarterly results.

Model: nlptown/bert-base-multilingual-uncased-sentiment


Device set to use cuda:0


  mapped scores: [-5, -10, 5]

Model: cardiffnlp/twitter-roberta-base-sentiment
  Could not load model: Could not load model cardiffnlp/twitter-roberta-base-sentiment with any of the following classes: (<class 'transformers.models.auto.modeling_auto.AutoModelForSequenceClassification'>, <class 'transformers.models.roberta.modeling_roberta.RobertaForSequenceClassification'>). See the original errors:

while loading with AutoModelForSequenceClassification, an error is thrown:
Traceback (most recent call last):
  File "/home/pham156/.conda/envs/rocky9/2024.09/CS373/lib/python3.11/site-packages/transformers/pipelines/base.py", line 293, in infer_framework_load_model
    model = model_class.from_pretrained(model, **kwargs)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/pham156/.conda/envs/rocky9/2024.09/CS373/lib/python3.11/site-packages/transformers/models/auto/auto_factory.py", line 604, in from_pretrained
    return model_class.from_pretrained(
           ^^^^^^^^

In [72]:
# Diagnostic: print raw model outputs (probs/logits) for nlptown and FinBERT
# Run this cell to see exactly what each model returns for the example texts.

texts = [
    'Company posts record a slight loss in profits.',
    'Company reports heavy losses and announces layoffs.',
    'Company schedules conference call to discuss quarterly results.'
]

print('\n--- NLPTOWN (distribution and mapped score) ---')
try:
    nlptown_pipe = globals().get('nlptown_pipe')
    if nlptown_pipe is None:
        from transformers import pipeline as _pipeline
        nlptown_pipe = _pipeline('sentiment-analysis', model='nlptown/bert-base-multilingual-uncased-sentiment', tokenizer='nlptown/bert-base-multilingual-uncased-sentiment')
        globals()['nlptown_pipe'] = nlptown_pipe

    try:
        dist = nlptown_pipe(texts, return_all_scores=True)
    except TypeError:
        dist = [nlptown_pipe(t, return_all_scores=True) for t in texts]

    for t, d in zip(texts, dist):
        print('\nText:', t)
        if not isinstance(d, list):
            print('  Unexpected dist format:', d)
            continue
        for entry in sorted(d, key=lambda x: x.get('label','')):
            print(f"   {entry.get('label')}: {entry.get('score'):.4f}")
        # mapped score
        top = sorted(d, key=lambda x: x.get('score', 0.0), reverse=True)[0]
        import re
        m = re.search(r'([1-5])', (top.get('label','') or ''))
        mapped = int(round(((int(m.group(1)) - 3) / 2.0) * 10.0)) if m else 0
        print('  MAPPED (nlptown->-10..10):', mapped)

except Exception as e:
    print('NLPTOWN diagnostic failed:', e)

print('\n--- FINBERT (pipeline/probs or raw logits if available) ---')
try:
    finbert_model_obj = globals().get('finbert_model', None)
    finbert_tokenizer_obj = globals().get('finbert_tokenizer', None)
    if finbert_model_obj is not None and finbert_tokenizer_obj is not None:
        # raw logits route
        inputs = finbert_tokenizer_obj(texts, return_tensors='pt', padding=True, truncation=True)
        try:
            device = next(finbert_model_obj.parameters()).device
        except Exception:
            device = None
        if device is not None:
            inputs = {k: v.to(device) for k, v in inputs.items()}
        import torch as _torch
        with _torch.no_grad():
            outputs = finbert_model_obj(**inputs)
            logits = outputs.logits
            probs = _torch.nn.functional.softmax(logits, dim=1).cpu().numpy()
        cfg = getattr(finbert_model_obj, 'config', None)
        id2label = {int(k): v for k, v in getattr(cfg, 'id2label', {}).items()} if cfg is not None else {}
        for i, t in enumerate(texts):
            print('\nText:', t)
            row = probs[i]
            for idx, p in enumerate(row):
                lbl = id2label.get(idx, str(idx))
                print(f"   {lbl}: {p:.4f}")
            # margin
            import numpy as _np
            top_idx = int(row.argmax())
            second = _np.partition(row, -2)[-2:][0]
            margin = float(row[top_idx] - second)
            print('  Margin:', margin)
    else:
        # pipeline fallback
        res = finbert(texts, return_all_scores=True)
        for t, d in zip(texts, res):
            print('\nText:', t)
            for entry in sorted(d, key=lambda x: x.get('label','')):
                print(f"   {entry.get('label')}: {entry.get('score'):.4f}")

except Exception as e:
    print('FinBERT diagnostic failed or FinBERT not available:', e)



--- NLPTOWN (distribution and mapped score) ---

Text: Company posts record a slight loss in profits.
   1 star: 0.2466
   2 stars: 0.3074
   3 stars: 0.3072
   4 stars: 0.1093
   5 stars: 0.0295
  MAPPED (nlptown->-10..10): -5

Text: Company reports heavy losses and announces layoffs.
   1 star: 0.7153
   2 stars: 0.1938
   3 stars: 0.0627
   4 stars: 0.0168
   5 stars: 0.0115
  MAPPED (nlptown->-10..10): -10

Text: Company schedules conference call to discuss quarterly results.
   1 star: 0.0303
   2 stars: 0.0480
   3 stars: 0.2041
   4 stars: 0.4360
   5 stars: 0.2816
  MAPPED (nlptown->-10..10): 5

--- FINBERT (pipeline/probs or raw logits if available) ---

Text: Company posts record a slight loss in profits.
   positive: 0.0117
   negative: 0.9723
   neutral: 0.0160
  Margin: 0.9562507271766663

Text: Company reports heavy losses and announces layoffs.
   positive: 0.0077
   negative: 0.9687
   neutral: 0.0235
  Margin: 0.945213794708252

Text: Company schedules conference call