In [None]:
import os, re, math, hashlib, warnings
from pathlib import Path
from datetime import datetime
from typing import Dict, Tuple, List

import numpy as np
import pandas as pd

from tqdm.auto import tqdm
import regex as re2
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TextClassificationPipeline
from sklearn.feature_extraction.text import TfidfVectorizer

warnings.filterwarnings("ignore")
pd.set_option("display.max_rows", 120)
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")

# Paths
INPUT_CLEAN   = "./final_view_cleaned.parquet"
OUT_8K        = "./features_event_8k.parquet"
OUT_10KQ      = "./features_event_10kq.parquet"
CHECKPOINT_DIR= "./_checkpoints"
CACHE_DIR     = "./_cache"
FINBERT_CACHE = os.path.join(CACHE_DIR, "finbert_cache.parquet")

# Settings
USE_GPU     = True
BATCH_SIZE  = 64
PRINT_EVERY = 20000
FINBERT_MODEL = os.getenv("FINBERT_MODEL", "ProsusAI/finbert")

In [None]:
# Utils
def ts() -> str:
    return datetime.now().strftime("%H:%M:%S")

def pbar(it, **kw):
    return tqdm(it, dynamic_ncols=True, leave=False, mininterval=0.2, **kw)

def ensure_dirs(*paths):
    for p in paths:
        Path(p).parent.mkdir(parents=True, exist_ok=True)

def sha1_full(s: str) -> str:
    b = (s or "").encode("utf-8", "ignore")
    return f"{hashlib.sha1(b).hexdigest()}:{len(b)}"

# Tokenization
WORD_RE       = re2.compile(r"(?:[A-Za-z][A-Za-z']+)")
SENT_SPLIT_RE = re2.compile(r"(?<!\b[A-Z][a-z]\.)(?<!\b(?:U\.S|Inc|Co|Ltd|Mr|Ms|Dr)\.)[.!?]+[\s\n]+")

def tokenize_words(text: str) -> List[str]:
    return [m.group(0).lower() for m in WORD_RE.finditer(text or "")]

def split_sentences(text: str) -> List[str]:
    parts = SENT_SPLIT_RE.split(text or "")
    return [p.strip() for p in parts if p and p.strip()]

# Readability
VOWEL_RE = re2.compile(r"[aeiouy]+", re2.I)

def syllables(word: str) -> int:
    w = (word or "").lower().strip("'")
    if not w: return 0
    syl = max(1, len(VOWEL_RE.findall(w)))
    if w.endswith("e") and syl > 1: syl -= 1
    return syl

def fog_proxy(text: str) -> float:
    sents, toks = split_sentences(text), tokenize_words(text)
    nW, nS = len(toks), len(sents)
    if nW == 0 or nS == 0: return np.nan
    complex_words = sum(1 for w in toks if syllables(w) >= 3)
    return 0.4 * (nW/nS + 100.0 * complex_words / max(1, nW))

# FinBERT
_device    = 0 if (USE_GPU and torch.cuda.is_available()) else -1
_tokenizer = AutoTokenizer.from_pretrained(FINBERT_MODEL)
_model     = AutoModelForSequenceClassification.from_pretrained(FINBERT_MODEL)
_finbert   = TextClassificationPipeline(model=_model, tokenizer=_tokenizer, device=_device, top_k=None)

_MODEL_MAX_LEN = min(max(getattr(_tokenizer, "model_max_length", 512), 1), 4096)
if _MODEL_MAX_LEN <= 0: _MODEL_MAX_LEN = 512
_CHUNK_TOKENS  = min(256, _MODEL_MAX_LEN // 2)
_CHUNK_STRIDE  = min(32, max(8, _CHUNK_TOKENS // 8))

def chunk_by_tokens(text: str, max_tokens=_CHUNK_TOKENS, stride=_CHUNK_STRIDE) -> List[str]:
    enc = _tokenizer(text or "", add_special_tokens=False)
    ids = enc.get("input_ids", [])
    if not ids: return []
    chunks, i, N = [], 0, len(ids)
    while i < N:
        j = min(i + max_tokens, N)
        chunks.append(_tokenizer.decode(ids[i:j], clean_up_tokenization_spaces=True))
        if j == N: break
        i = j - stride if j - stride > 0 else j
    return chunks

def _finbert_call(text_list: List[str]):
    return _finbert(text_list, return_all_scores=True, truncation=True,
                    padding=True, max_length=_MODEL_MAX_LEN, batch_size=BATCH_SIZE)

def _load_finbert_cache(path: str):
    if not Path(path).exists(): return {}
    try:
        dfc = pd.read_parquet(path)
        return {str(r.key):(float(r.neg),float(r.neu),float(r.pos)) for r in dfc.itertuples(index=False)}
    except Exception: return {}

def _save_finbert_cache(cache, path: str):
    if not cache: return
    p = Path(path); p.parent.mkdir(parents=True, exist_ok=True)
    df = pd.DataFrame([(k,*v) for k,v in cache.items()], columns=["key","neg","neu","pos"])
    df.drop_duplicates("key").to_parquet(p, index=False)

_FINBERT_CACHE = _load_finbert_cache(FINBERT_CACHE)

def finbert_probs_one(text: str) -> Tuple[float,float,float]:
    k = sha1_full(text or "")
    if k in _FINBERT_CACHE: return _FINBERT_CACHE[k]
    chunks = chunk_by_tokens(text or "")
    if not chunks:
        _FINBERT_CACHE[k] = (np.nan,np.nan,np.nan); return _FINBERT_CACHE[k]
    outs, neg, neu, pos, cnt = _finbert_call(chunks), 0,0,0,0
    for scores in outs:
        mp = {d["label"].lower():float(d["score"]) for d in scores}
        neg += mp.get("negative",0); neu += mp.get("neutral",0); pos += mp.get("positive",0); cnt+=1
    tup = (np.nan,np.nan,np.nan) if cnt==0 else (neg/cnt, neu/cnt, pos/cnt)
    _FINBERT_CACHE[k] = tup; return tup

def add_finbert_feats(df: pd.DataFrame, text_col: str, prefix: str = "") -> pd.DataFrame:
    negs, neus, poss = [], [], []
    for i, txt in enumerate(pbar(df[text_col].astype(str), total=len(df), desc=f"FinBERT:{prefix}"), 1):
        n,u,p = finbert_probs_one(txt)
        negs.append(n); neus.append(u); poss.append(p)
        if PRINT_EVERY and (i % PRINT_EVERY == 0):
            _save_finbert_cache(_FINBERT_CACHE, FINBERT_CACHE)
    _save_finbert_cache(_FINBERT_CACHE, FINBERT_CACHE)
    probs = np.column_stack([negs, neus, poss]).astype(float)
    polarity, intensity = probs[:,2] - probs[:,0], probs[:,2] + probs[:,0]
    with np.errstate(divide='ignore', invalid='ignore'):
        entropy = -np.nansum(np.where(probs>0, probs*np.log(np.maximum(probs,1e-12)),0), axis=1)
    margin = np.sort(probs, axis=1)[:,-1] - np.sort(probs, axis=1)[:,-2]
    out = pd.DataFrame({
        f"{prefix}finbert_neg": negs, f"{prefix}finbert_neu": neus, f"{prefix}finbert_pos": poss,
        f"{prefix}fb_polarity": polarity, f"{prefix}fb_intensity": intensity,
        f"{prefix}fb_entropy": entropy, f"{prefix}fb_margin": margin
    }, index=df.index)
    return pd.concat([df, out], axis=1)

# Novelty
def tfidf_cosine_novelty(curr: str, prev: str) -> float:
    if not (curr or "").strip() or not (prev or "").strip(): return np.nan
    vect = TfidfVectorizer(min_df=1, max_df=1.0, ngram_range=(1,2))
    X = vect.fit_transform([prev, curr])
    v0, v1 = X[0].toarray()[0], X[1].toarray()[0]
    n0, n1 = np.linalg.norm(v0), np.linalg.norm(v1)
    return np.nan if n0==0 or n1==0 else 1.0 - float(np.dot(v0,v1)/(n0*n1))

# 8-K regex & topics
ITEM_202_RE = re2.compile(r"\bitem.*2\.0*2\b|\b2\.0*2\b", re2.I)
ITEM_502_RE = re2.compile(r"\bitem.*5\.0*2\b|\b5\.0*2\b", re2.I)
TOPIC_PATTERNS = {
    "supply_chain": [re2.compile(r"\bsupply\s+chain\b", re2.I), re2.compile(r"\blogistic(s)?\b", re2.I)],
    "cyber":        [re2.compile(r"\bcyber\b", re2.I), re2.compile(r"\bransomware\b", re2.I)],
    "climate":      [re2.compile(r"\bclimate\b", re2.I), re2.compile(r"\bemission(s)?\b", re2.I)]
}

def count_topic_hits(text: str, patterns: List[re2.Pattern]) -> int:
    return sum(len(p.findall(text or "")) for p in patterns)

def anatomy_topics_8k(text: str) -> Dict[str, float]:
    nW, denom = len(tokenize_words(text)), max(1.0, len(tokenize_words(text))/1000.0)
    out = dict(
        has_item_202_earnings=bool(ITEM_202_RE.search(text or "")),
        has_item_502_exec_chg=bool(ITEM_502_RE.search(text or "")),
    )
    for k, pats in TOPIC_PATTERNS.items():
        out[f"topic_{k}_per1k"] = count_topic_hits(text.lower(), pats) / denom
    return out

# Features
def readability_feats(text: str) -> Dict[str,float]:
    toks, sents = tokenize_words(text), split_sentences(text)
    return dict(
        fog_proxy=fog_proxy(text),
        avg_words_per_sent=(len(toks)/len(sents)) if sents else np.nan,
        type_token_ratio=(len(set(toks))/len(toks)) if toks else np.nan,
        len_content_words=float(len(toks))
    )

def structure_feats(content: str, rf: str) -> Dict[str,float]:
    n_c, n_r = len(tokenize_words(content)), len(tokenize_words(rf))
    return dict(risk_to_content_ratio=(n_r/n_c) if n_c else np.nan, len_rf_words=float(n_r))

def novelty_feats_same_form(curr_rf: str, prev_rf: str) -> Dict[str,float]:
    return dict(novelty_rf=tfidf_cosine_novelty(curr_rf or "", prev_rf or ""))

def timing_feats(dt: pd.Timestamp) -> Dict[str,float]:
    return dict(filing_dow=float(int(pd.Timestamp(dt).dayofweek))) if not pd.isna(dt) else dict(filing_dow=np.nan)

def add_nan_flags(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    for c in out.select_dtypes(include=[np.number]).columns:
        out[f"{c}__was_nan"] = out[c].isna().astype("int8")
    for c in out.columns:
        if pd.api.types.is_object_dtype(out[c]): out[c] = out[c].fillna("")
    return out

# Builders
def build_features_8k(df8: pd.DataFrame) -> pd.DataFrame:
    if df8.empty: return pd.DataFrame(columns=["ticker","date","file_type"])
    out = df8.dropna(subset=["ticker","date"]).sort_values(["ticker","date"]).reset_index(drop=True)
    out["date"] = pd.to_datetime(out["date"], errors="coerce")
    out = add_finbert_feats(out, "content")
    rows = []
    for r in pbar(out.itertuples(index=False), total=len(out), desc="8-K derived"):
        feat = {}
        feat.update(readability_feats(r.content or ""))
        feat.update(anatomy_topics_8k(r.content or ""))
        feat.update(timing_feats(r.date))
        rows.append(feat)
    fe = pd.DataFrame(rows, index=out.index)
    res = pd.concat([out[["ticker","date","file_type"]], out.filter(regex=r"^finbert_|^fb_"), fe], axis=1)
    return add_nan_flags(res)

def build_features_10kq(df10: pd.DataFrame) -> pd.DataFrame:
    if df10.empty: return pd.DataFrame(columns=["ticker","date","file_type"])
    out = df10.dropna(subset=["ticker","date","file_type"]).sort_values(["ticker","date"]).reset_index(drop=True)
    out["date"] = pd.to_datetime(out["date"], errors="coerce")
    out["file_type"] = out["file_type"].str.lower().str.strip().replace({"10-k":"10k","10-q":"10q","8-k":"8k"})
    if "rf" not in out.columns: out["rf"] = ""
    out = add_finbert_feats(out, "content")
    prev_rf_k, rows = {}, []
    for r in pbar(out.itertuples(index=False), total=len(out), desc="10-K/Q derived"):
        feat = {}
        feat.update(readability_feats(r.content or ""))
        feat.update(structure_feats(r.content or "", r.rf or ""))
        feat.update(novelty_feats_same_form(r.rf, prev_rf_k.get((r.ticker, r.file_type), "")))
        prev_rf_k[(r.ticker, r.file_type)] = r.rf
        feat.update(timing_feats(r.date))
        rows.append(feat)
    fe = pd.DataFrame(rows, index=out.index)
    fe = pd.concat([out[["ticker","date","file_type"]], out.filter(regex=r"^finbert_|^fb_"), fe], axis=1)
    bool_cols = [c for c in fe.columns if c.startswith("has_item_")]
    agg = {c: "mean" for c in fe.columns if c not in ["ticker","date","file_type"]}
    for b in bool_cols: agg[b] = "max"
    fe = fe.groupby(["ticker","date","file_type"], as_index=False).agg(agg).sort_values(["ticker","date","file_type"]).reset_index(drop=True)
    return add_nan_flags(fe)


In [None]:
# Run
ensure_dirs(OUT_8K, OUT_10KQ, CHECKPOINT_DIR, CACHE_DIR, FINBERT_CACHE)
df = pd.read_parquet(INPUT_CLEAN)
df.columns = [c.lower() for c in df.columns]
df["file_type"] = df["file_type"].str.lower().str.strip().replace({"10-k":"10k","10-q":"10q","8-k":"8k"})
df["date"] = pd.to_datetime(df["date"], errors="coerce")

df_8k, df_10kq = df[df["file_type"].eq("8k")], df[df["file_type"].isin({"10k","10q"})]
print(f"[{ts()}] 8-K rows: {len(df_8k):,} | 10-K/Q rows: {len(df_10kq):,}")

fe8 = build_features_8k(df_8k); fe8.to_parquet(OUT_8K, index=False)
fe10 = build_features_10kq(df_10kq); fe10.to_parquet(OUT_10KQ, index=False)
