# FinBERT Sentiment Pipeline (Updated)

This notebook fixes the earlier critical issues and marks every major change inline with `# CHANGED (Issue X)` comments.

## Fixed Issues
- Issue 1: NY market-day alignment (instead of UTC date grouping)
- Issue 2: Long-text truncation reduced via 512-token + multi-chunk scoring
- Issue 3: Removed extreme-score daily capping bias
- Issue 4: Added validation diagnostics
- Issue 5: Reproducibility improvements (seed, stable outputs, metadata)
- Issue 6: Duplicate/syndicated news deduplication


In [None]:
# --- FinBERT scoring + robust daily sentiment features (S&P 500 news)
# CHANGED (Issue 1): Align article timestamps to NY market day instead of raw UTC date.
# CHANGED (Issue 2): Reduce truncation error with 512-token window + multi-chunk scoring.
# CHANGED (Issue 3): Remove extreme-score daily capping bias (use all articles by default).
# CHANGED (Issue 4): Add validation diagnostics (provider-label agreement + optional returns check).
# CHANGED (Issue 5): Improve reproducibility with deterministic seed, fixed output dirs, run metadata.
# CHANGED (Issue 6): Deduplicate repeated/syndicated news to avoid overweighting duplicate stories.

import json
from datetime import datetime, timezone
from pathlib import Path

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer

# -----------------------
# CONFIG
# -----------------------
# CHANGED (Issue 5): Use explicit project root to avoid path ambiguity.
PROJECT_ROOT = Path('/Users/petarnikodimov/Documents/diploma/sentiment-enhanced-risk-engine').resolve()
DATA_DIR = (PROJECT_ROOT / 'data').resolve()
PROCESSED_DIR = (DATA_DIR / 'processed').resolve()
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

IN_CSV = DATA_DIR / 'news_2017-2025.csv'
OUT_SCORED = PROCESSED_DIR / 'finbert_scored_news_2017_2025.csv'
OUT_DAILY = PROCESSED_DIR / 'daily_sentiment_news_2017_2025.csv'
OUT_RUN_META = PROCESSED_DIR / 'finbert_run_metadata.json'

TEXT_COL = 'content'
TITLE_COL = 'title'
DATE_COL = 'date'
LINK_COL = 'link'
PROVIDER_LABEL_COL = 'sentiment'

# CHANGED (Issue 1): S&P 500 market-day alignment timezone.
MARKET_TZ = 'America/New_York'

# CHANGED (Issue 2): Increase context and add chunking for long articles.
MAX_TOKENS = 512
MAX_CHUNKS_PER_ARTICLE = 3
BATCH_SIZE = 24

# CHANGED (Issue 3): Keep all articles by default to avoid selection bias.
USE_ALL_ARTICLES_PER_DAY = True
MAX_ARTICLES_PER_DAY = None

RANDOM_SEED = 42
MODEL_NAME = 'ProsusAI/finbert'

np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(RANDOM_SEED)

# -----------------------
# Load data
# -----------------------
if not IN_CSV.exists():
    raise FileNotFoundError(f'Input CSV not found: {IN_CSV}')

raw_df = pd.read_csv(IN_CSV)
for col in [TEXT_COL, DATE_COL]:
    if col not in raw_df.columns:
        raise ValueError(f"Expected '{col}' column. Found: {raw_df.columns.tolist()}")

raw_rows = len(raw_df)

# -----------------------
# Preprocess + deduplicate
# -----------------------
df = raw_df.copy()
df[TEXT_COL] = df[TEXT_COL].astype(str)
if TITLE_COL in df.columns:
    df[TITLE_COL] = df[TITLE_COL].fillna('').astype(str)

# CHANGED (Issue 1): Parse timestamps as UTC and convert to NY market date.
dt_utc = pd.to_datetime(df[DATE_COL], errors='coerce', utc=True)
df['timestamp_utc'] = dt_utc
df['timestamp_ny'] = dt_utc.dt.tz_convert(MARKET_TZ)
df['market_date'] = df['timestamp_ny'].dt.strftime('%Y-%m-%d')

df = df.dropna(subset=['market_date', TEXT_COL]).copy()
df = df.sort_values('timestamp_utc').reset_index(drop=True)

# CHANGED (Issue 6): Deduplicate by normalized link first (when available).
dropped_by_link = 0
if LINK_COL in df.columns:
    df['_link_norm'] = df[LINK_COL].fillna('').astype(str).str.strip().str.lower()
    before = len(df)
    keep_mask = (df['_link_norm'] == '') | (~df['_link_norm'].duplicated(keep='first'))
    df = df.loc[keep_mask].copy()
    dropped_by_link = before - len(df)

# CHANGED (Issue 6): Secondary near-duplicate removal on (market_date, title, content).
dropped_by_text = 0
dedup_cols = ['market_date', TEXT_COL]
if TITLE_COL in df.columns:
    dedup_cols.insert(1, TITLE_COL)
before = len(df)
df = df.drop_duplicates(subset=dedup_cols, keep='first').copy()
dropped_by_text = before - len(df)

# Build model text using title + content.
if TITLE_COL in df.columns:
    df['model_text'] = (df[TITLE_COL].str.strip() + '\\n\\n' + df[TEXT_COL].str.strip()).str.strip()
else:
    df['model_text'] = df[TEXT_COL].str.strip()

df = df[df['model_text'].str.len() > 0].reset_index(drop=True)

print('Input file:', IN_CSV)
print('Rows loaded:', raw_rows)
print('Rows after preprocessing:', len(df))
print('Dropped duplicate links:', dropped_by_link)
print('Dropped duplicate text/title/date rows:', dropped_by_text)
print('Market date range:', df['market_date'].min(), '->', df['market_date'].max())
print('Output scored file:', OUT_SCORED)
print('Output daily file:', OUT_DAILY)

# -----------------------
# Device
# -----------------------
if torch.cuda.is_available():
    device = 'cuda'
elif getattr(torch.backends, 'mps', None) and torch.backends.mps.is_available():
    device = 'mps'
else:
    device = 'cpu'
print('Device:', device)

# -----------------------
# Load FinBERT
# -----------------------
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME).to(device)
model.eval()

id2label = {int(k): v.lower() for k, v in model.config.id2label.items()}
label2id = {v: k for k, v in id2label.items()}
for needed in ['positive', 'negative', 'neutral']:
    if needed not in label2id:
        raise ValueError(f"Missing label '{needed}' in model labels: {id2label}")

pos_id = label2id['positive']
neg_id = label2id['negative']
neu_id = label2id['neutral']


def choose_chunk_windows(token_count, max_body_tokens, max_chunks):
    """Select up to max_chunks windows spread across long articles."""
    if token_count <= max_body_tokens:
        return [(0, token_count)]
    if max_chunks <= 1:
        return [(0, max_body_tokens)]

    starts = np.linspace(0, token_count - max_body_tokens, num=max_chunks, dtype=int).tolist()
    starts = sorted(dict.fromkeys(starts))
    return [(s, s + max_body_tokens) for s in starts]


@torch.no_grad()
def finbert_batch_chunked(texts):
    """
    CHANGED (Issue 2): For long texts, score multiple windows and average probabilities.
    This keeps runtime manageable while reducing single-window truncation bias.

    CHANGED (Runtime fix): Avoid tokenizer.prepare_for_model for compatibility with older
    tokenizer classes that do not expose this method.
    """
    max_body_tokens = MAX_TOKENS - 2  # reserve [CLS]/[SEP]

    cls_id = tokenizer.cls_token_id
    sep_id = tokenizer.sep_token_id
    pad_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else 0
    if cls_id is None or sep_id is None:
        raise ValueError('Tokenizer must define cls_token_id and sep_token_id')

    flat_input_ids = []
    flat_attention_masks = []
    article_index = []
    token_lengths = []
    chunked_articles = 0

    for article_i, text in enumerate(texts):
        token_ids = tokenizer(text, add_special_tokens=False)['input_ids']
        token_lengths.append(len(token_ids))

        windows = choose_chunk_windows(len(token_ids), max_body_tokens, MAX_CHUNKS_PER_ARTICLE)
        if len(windows) > 1:
            chunked_articles += 1

        for start, end in windows:
            chunk_ids = token_ids[start:end]
            input_ids = [cls_id] + chunk_ids + [sep_id]
            attn_mask = [1] * len(input_ids)

            flat_input_ids.append(input_ids)
            flat_attention_masks.append(attn_mask)
            article_index.append(article_i)

    max_len = max(len(x) for x in flat_input_ids)
    input_tensor = torch.full((len(flat_input_ids), max_len), pad_id, dtype=torch.long)
    attention_tensor = torch.zeros((len(flat_input_ids), max_len), dtype=torch.long)

    for row_i, (ids, attn) in enumerate(zip(flat_input_ids, flat_attention_masks)):
        seq_len = len(ids)
        input_tensor[row_i, :seq_len] = torch.tensor(ids, dtype=torch.long)
        attention_tensor[row_i, :seq_len] = torch.tensor(attn, dtype=torch.long)

    logits = model(
        input_ids=input_tensor.to(device),
        attention_mask=attention_tensor.to(device),
    ).logits
    probs = torch.softmax(logits, dim=-1).detach().cpu().numpy()

    p_pos = np.zeros(len(texts), dtype=np.float32)
    p_neg = np.zeros(len(texts), dtype=np.float32)
    p_neu = np.zeros(len(texts), dtype=np.float32)
    counts = np.zeros(len(texts), dtype=np.float32)

    for row_i, article_i in enumerate(article_index):
        p_pos[article_i] += probs[row_i, pos_id]
        p_neg[article_i] += probs[row_i, neg_id]
        p_neu[article_i] += probs[row_i, neu_id]
        counts[article_i] += 1.0

    counts[counts == 0.0] = 1.0
    p_pos /= counts
    p_neg /= counts
    p_neu /= counts

    sent_score = p_pos - p_neg
    return p_pos, p_neg, p_neu, sent_score, np.array(token_lengths), chunked_articles, len(flat_input_ids)


# -----------------------
# Score all articles
# -----------------------
p_pos_all, p_neg_all, p_neu_all, score_all = [], [], [], []
token_lengths_all = []
chunked_articles_total = 0
total_chunks = 0

for i in tqdm(range(0, len(df), BATCH_SIZE), desc=f'FinBERT scoring ({device})'):
    batch_texts = df['model_text'].iloc[i : i + BATCH_SIZE].tolist()
    ppos, pneg, pneu, sc, tok_lens, chunked_n, chunk_rows = finbert_batch_chunked(batch_texts)

    p_pos_all.append(ppos)
    p_neg_all.append(pneg)
    p_neu_all.append(pneu)
    score_all.append(sc)

    token_lengths_all.extend(tok_lens.tolist())
    chunked_articles_total += chunked_n
    total_chunks += chunk_rows

df['p_pos'] = np.concatenate(p_pos_all)
df['p_neg'] = np.concatenate(p_neg_all)
df['p_neu'] = np.concatenate(p_neu_all)
df['sent_score'] = np.concatenate(score_all)

# Robust label assignment from max probability among named classes.
df['sent_label'] = np.select(
    [
        (df['p_pos'] >= df[['p_neg', 'p_neu']].max(axis=1)),
        (df['p_neg'] >= df[['p_pos', 'p_neu']].max(axis=1)),
    ],
    ['positive', 'negative'],
    default='neutral',
)

# Save article-level output
article_cols = [
    'market_date',
    'timestamp_utc',
    'timestamp_ny',
    TITLE_COL if TITLE_COL in df.columns else None,
    TEXT_COL,
    LINK_COL if LINK_COL in df.columns else None,
    PROVIDER_LABEL_COL if PROVIDER_LABEL_COL in df.columns else None,
    'p_pos',
    'p_neg',
    'p_neu',
    'sent_score',
    'sent_label',
]
article_cols = [c for c in article_cols if c is not None and c in df.columns]
df[article_cols].to_csv(OUT_SCORED, index=False)
print(f'Saved scored articles -> {OUT_SCORED} (rows={len(df)})')

# -----------------------
# Optional per-day cap (unbiased)
# -----------------------
# CHANGED (Issue 3): No default cap. If enabled, use random sampling, not extreme |score| selection.
if USE_ALL_ARTICLES_PER_DAY or MAX_ARTICLES_PER_DAY is None:
    df_day = df.copy()
else:
    df_day = (
        df.groupby('market_date', group_keys=False)
        .apply(lambda g: g.sample(n=min(len(g), MAX_ARTICLES_PER_DAY), random_state=RANDOM_SEED))
        .reset_index(drop=True)
    )

print(
    'Rows used in daily aggregation:',
    len(df_day),
    '| avg/day:',
    len(df_day) / df_day['market_date'].nunique(),
)

# -----------------------
# Daily aggregation
# -----------------------
daily = (
    df_day.groupby('market_date')
    .agg(
        sent_mean=('sent_score', 'mean'),
        sent_median=('sent_score', 'median'),
        sent_std=('sent_score', 'std'),
        news_count=('sent_score', 'size'),
        frac_neg=('sent_label', lambda s: float(np.mean(s == 'negative'))),
        frac_pos=('sent_label', lambda s: float(np.mean(s == 'positive'))),
        frac_neu=('sent_label', lambda s: float(np.mean(s == 'neutral'))),
    )
    .reset_index()
)

# Add sampling uncertainty feature for downstream risk models.
daily['sent_std'] = daily['sent_std'].fillna(0.0)
daily['sent_se'] = daily['sent_std'] / np.sqrt(daily['news_count'].clip(lower=1))

daily.to_csv(OUT_DAILY, index=False)
print(f'Saved daily sentiment -> {OUT_DAILY} (days={len(daily)})')

# -----------------------
# Validation diagnostics
# -----------------------
# CHANGED (Issue 4): Add weak-label validation against provider sentiment if available.


def normalize_provider_label(x):
    if pd.isna(x):
        return np.nan
    s = str(x).strip().lower()
    if 'pos' in s:
        return 'positive'
    if 'neg' in s:
        return 'negative'
    if 'neu' in s:
        return 'neutral'
    return np.nan


if PROVIDER_LABEL_COL in df.columns:
    df['provider_label_norm'] = df[PROVIDER_LABEL_COL].apply(normalize_provider_label)
    val = df.dropna(subset=['provider_label_norm', 'sent_label']).copy()

    if len(val) > 0:
        agreement = float((val['provider_label_norm'] == val['sent_label']).mean())
        conf = pd.crosstab(val['provider_label_norm'], val['sent_label'], rownames=['provider'], colnames=['finbert'])

        print('\\nValidation vs provider labels')
        print('rows:', len(val), '| agreement:', f'{agreement:.4f}')
        print(conf)

        provider_num = val['provider_label_norm'].map({'negative': -1, 'neutral': 0, 'positive': 1}).values
        corr = np.corrcoef(val['sent_score'].values, provider_num)[0, 1]
        print('score-provider ordinal correlation:', round(float(corr), 4))

        try:
            from sklearn.metrics import classification_report, f1_score

            f1_macro = f1_score(val['provider_label_norm'], val['sent_label'], average='macro')
            print('macro F1:', round(float(f1_macro), 4))
            print(classification_report(val['provider_label_norm'], val['sent_label'], digits=4))
        except Exception as e:
            print('sklearn report skipped:', e)
    else:
        print('No overlapping rows with normalized provider labels for validation.')
else:
    print(f"Provider label column '{PROVIDER_LABEL_COL}' not found; skipping weak-label validation.")

# CHANGED (Issue 4): Optional market-response check if returns file exists.
RETURNS_CSV = DATA_DIR / 'sp500_returns_daily.csv'
if RETURNS_CSV.exists():
    r = pd.read_csv(RETURNS_CSV)
    date_col = 'date' if 'date' in r.columns else ('Date' if 'Date' in r.columns else None)

    if date_col is not None:
        r['market_date'] = pd.to_datetime(r[date_col], errors='coerce').dt.strftime('%Y-%m-%d')

        if 'return' in r.columns:
            r['ret_t'] = pd.to_numeric(r['return'], errors='coerce')
        elif 'ret' in r.columns:
            r['ret_t'] = pd.to_numeric(r['ret'], errors='coerce')
        elif 'close' in r.columns:
            r['ret_t'] = pd.to_numeric(r['close'], errors='coerce').pct_change()
        else:
            r['ret_t'] = np.nan

        r['ret_t_plus_1'] = r['ret_t'].shift(-1)
        merged = daily.merge(r[['market_date', 'ret_t_plus_1']], on='market_date', how='inner').dropna()

        if len(merged) > 10:
            ic = np.corrcoef(merged['sent_mean'], merged['ret_t_plus_1'])[0, 1]
            print('\\nOptional next-day return validation')
            print('rows:', len(merged), '| corr(sent_mean, next_day_return):', round(float(ic), 4))
        else:
            print('Optional returns validation skipped: not enough merged rows.')
    else:
        print('Optional returns validation skipped: no date/Date column in returns file.')
else:
    print(f'Optional returns file not found ({RETURNS_CSV}); skipped market-response check.')

# -----------------------
# Run metadata for reproducibility
# -----------------------
# CHANGED (Issue 5): Save run metadata with versions/config/diagnostics.
meta = {
    'run_utc': datetime.now(timezone.utc).isoformat(),
    'model_name': MODEL_NAME,
    'device': device,
    'market_tz': MARKET_TZ,
    'config': {
        'max_tokens': MAX_TOKENS,
        'max_chunks_per_article': MAX_CHUNKS_PER_ARTICLE,
        'batch_size': BATCH_SIZE,
        'use_all_articles_per_day': USE_ALL_ARTICLES_PER_DAY,
        'max_articles_per_day': MAX_ARTICLES_PER_DAY,
        'random_seed': RANDOM_SEED,
    },
    'library_versions': {
        'torch': torch.__version__,
        'transformers': __import__('transformers').__version__,
        'pandas': pd.__version__,
        'numpy': np.__version__,
    },
    'row_counts': {
        'raw_rows': int(raw_rows),
        'scored_rows': int(len(df)),
        'daily_rows': int(len(daily)),
        'dropped_duplicate_links': int(dropped_by_link),
        'dropped_duplicate_text_rows': int(dropped_by_text),
    },
    'token_diagnostics': {
        'mean_tokens': float(np.mean(token_lengths_all)) if token_lengths_all else 0.0,
        'p50_tokens': float(np.quantile(token_lengths_all, 0.50)) if token_lengths_all else 0.0,
        'p90_tokens': float(np.quantile(token_lengths_all, 0.90)) if token_lengths_all else 0.0,
        'p99_tokens': float(np.quantile(token_lengths_all, 0.99)) if token_lengths_all else 0.0,
        'articles_with_multiple_chunks': int(chunked_articles_total),
        'pct_articles_with_multiple_chunks': float(chunked_articles_total / max(len(df), 1)),
        'total_chunk_inferences': int(total_chunks),
    },
    'outputs': {
        'scored_csv': str(OUT_SCORED),
        'daily_csv': str(OUT_DAILY),
        'run_metadata_json': str(OUT_RUN_META),
    },
}

OUT_RUN_META.write_text(json.dumps(meta, indent=2))
print(f'Saved run metadata -> {OUT_RUN_META}')

daily.head()


