# Notebook 1 — Prices (AAPL, XOM) + News → FinBERT → Daily sentiment index `I_t` + Features

Что делает ноутбук:
1) скачивает дневные OHLCV за последние **5 лет** (можно поменять `YEARS_BACK`);  
2) скачивает новости (по умолчанию **Alpha Vantage News & Sentiment**; нужен API key; есть fallback на GDELT, но у него ограничение по истории);  
3) прогоняет тексты новостей через **FinBERT (ProsusAI/finbert)**, считает `s(x)` и дневной индекс `I_t`;  
4) считает `returns`, `RSI`, `MACD`;  
5) собирает финальную таблицу `date, returns, RSI, MACD, I_t` (и сохраняет в файлы).


In [1]:
# Если нужно — установите зависимости
import sys, subprocess, importlib

def pip_install(pkgs):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q"] + pkgs)

required = [
    "pandas>=2.0", "numpy>=1.24", "requests>=2.31", "tqdm>=4.66",
    "yfinance>=0.2.30",
    "transformers>=4.40", "torch",  # torch может быть уже установлен
    "pyarrow>=14.0",  # для parquet
    "feedparser>=6.0",  # RSS (опционально)
]

missing = []
for pkg in ["pandas","numpy","requests","tqdm","yfinance","transformers","torch","pyarrow"]:
    try:
        importlib.import_module(pkg)
    except Exception:
        missing.append(pkg)

if missing:
    print("Installing missing:", missing)
    pip_install(required)
else:
    print("All dependencies look installed.")

  from .autonotebook import tqdm as notebook_tqdm


All dependencies look installed.


In [None]:
import os, math, time
from datetime import datetime, timedelta, timezone
from pathlib import Path

import numpy as np
import pandas as pd
import requests
from tqdm.auto import tqdm

DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)

TICKERS = ["AAPL", "XOM"]
YEARS_BACK = 5  # поменяйте на 3..5 по задаче

END_DATE = pd.Timestamp.utcnow().normalize()
START_DATE = END_DATE - pd.DateOffset(years=YEARS_BACK)

print("Date range:", START_DATE.date(), "→", END_DATE.date())

Date range: 2020-12-29 → 2025-12-29


In [3]:
import pandas as pd

# 1) выберем движок
try:
    import fastparquet  # noqa: F401
    _PARQUET_ENGINE = "fastparquet"
except Exception:
    _PARQUET_ENGINE = "pyarrow"

def safe_to_parquet_auto(df: pd.DataFrame, path):
    """
    Универсальная запись parquet:
    - работает независимо от того, какая версия safe_to_parquet у вас уже определена
    - использует fastparquet если есть, иначе pyarrow
    """
    # если у вас уже есть safe_to_parquet (из предыдущих ячеек) — попробуем его
    if "safe_to_parquet" in globals() and callable(globals()["safe_to_parquet"]):
        fn = globals()["safe_to_parquet"]
        try:
            # версия без engine
            return fn(df, path)
        except TypeError:
            # версия с engine
            return fn(df, path, engine=_PARQUET_ENGINE)

    # fallback: прямой to_parquet
    df.to_parquet(path, index=False, engine=_PARQUET_ENGINE)

print("Parquet engine (auto):", _PARQUET_ENGINE)

Parquet engine (auto): pyarrow


## 1) Данные по ценам (daily OHLCV)

In [None]:
import pandas as pd
import yfinance as yf

# --- 0) DATA_DIR (на всякий случай) ---
DATA_DIR.mkdir(parents=True, exist_ok=True)

# --- 1) Выбор engine: fastparquet -> предпочтительно ---
_PARQUET_ENGINE = None
try:
    import fastparquet  # noqa: F401
    _PARQUET_ENGINE = "fastparquet"
    print("Parquet engine:", _PARQUET_ENGINE)
except Exception:
    _PARQUET_ENGINE = "pyarrow"
    print("Parquet engine:", _PARQUET_ENGINE, "(fastparquet not found)")

# --- 2) Safe Parquet writer ---
def _reset_pyarrow_pandas_ext_types():
    # безопасно: если pyarrow нет/другая версия — просто пропустим
    try:
        import pyarrow as pa
    except Exception:
        return

    # точечные самые частые
    for name in ("pandas.period", "pandas.interval"):
        try:
            pa.unregister_extension_type(name)
        except Exception:
            pass

    # попытка подчистить всё pandas.* если API доступен
    for attr in ("registered_extension_types", "get_registered_extension_types", "list_registered_extension_types"):
        try:
            fn = getattr(pa, attr)
        except Exception:
            continue
        try:
            reg = fn()
            # pyarrow может вернуть dict или list
            if isinstance(reg, dict):
                names = list(reg.keys())
            else:
                names = list(reg)
            for n in names:
                n = str(n)
                if n.startswith("pandas."):
                    try:
                        pa.unregister_extension_type(n)
                    except Exception:
                        pass
            break
        except Exception:
            pass


def safe_to_parquet(df: pd.DataFrame, path, engine: str):
    """
    Пишем Parquet устойчиво.
    - Если ловим ArrowKeyError про pandas.* already defined — чистим registry и повторяем 1 раз.
    """
    try:
        df.to_parquet(path, index=False, engine=engine)
    except Exception as e:
        msg = str(e)
        if ("type extension with name pandas." in msg and "already defined" in msg) or "ArrowKeyError" in msg:
            _reset_pyarrow_pandas_ext_types()
            df.to_parquet(path, index=False, engine=engine)  # retry once
        else:
            raise


def download_prices(ticker: str, start: pd.Timestamp, end: pd.Timestamp) -> pd.DataFrame:
    df = yf.download(
        ticker, start=start.date(), end=(end + pd.Timedelta(days=1)).date(),
        interval="1d", auto_adjust=False, progress=False
    )
    if df.empty:
        raise RuntimeError(f"No price data for {ticker}")
    df = df.rename(columns=str.lower)
    df.index = pd.to_datetime(df.index).tz_localize(None)
    df = df.reset_index().rename(columns={"Date":"date", "index":"date"})
    df["ticker"] = ticker
    return df[["date","open","high","low","close","adj close","volume","ticker"]]

# --- 3) Скачивание и сохранение ---
prices_list = []

for t in TICKERS:
    dft = download_prices(t, START_DATE, END_DATE)  # ваша функция из предыдущей ячейки
    prices_list.append(dft)

    out_path = DATA_DIR / f"prices_{t}.parquet"
    safe_to_parquet(dft, out_path, engine=_PARQUET_ENGINE)

    print(
        t,
        dft.shape,
        dft["date"].min().date(),
        dft["date"].max().date(),
        "->",
        out_path.name,
    )

prices = pd.concat(prices_list, ignore_index=True)

all_path = DATA_DIR / "prices_all.parquet"
safe_to_parquet(prices, all_path, engine=_PARQUET_ENGINE)
print("Saved:", all_path)

prices.head()


Parquet engine: pyarrow (fastparquet not found)
AAPL (1256, 8) 2020-12-29 2025-12-29 -> prices_AAPL.parquet
XOM (1256, 8) 2020-12-29 2025-12-29 -> prices_XOM.parquet
Saved: data/prices_all.parquet


Price,date,open,high,low,close,adj close,volume,ticker,open,high,low,close,adj close,volume
Ticker,Unnamed: 1_level_1,aapl,aapl,aapl,aapl,aapl,aapl,Unnamed: 8_level_1,xom,xom,xom,xom,xom,xom
0,2020-12-29,138.050003,138.789993,134.339996,134.869995,131.28949,121047300.0,AAPL,,,,,,
1,2020-12-30,135.580002,135.990005,133.399994,133.720001,130.170029,96452100.0,AAPL,,,,,,
2,2020-12-31,134.080002,134.740005,131.720001,132.690002,129.167389,99116600.0,AAPL,,,,,,
3,2021-01-04,133.520004,133.610001,126.760002,129.410004,125.974472,143301900.0,AAPL,,,,,,
4,2021-01-05,128.889999,131.740005,128.429993,131.009995,127.532013,97664900.0,AAPL,,,,,,


## 1b) Intraday цены (1h) для event-study (Yahoo / yfinance)

In [5]:
import pandas as pd
import numpy as np
import yfinance as yf

def _to_utc_naive(ts) -> pd.Timestamp:
    """Convert anything timestamp-like to UTC-naive pandas Timestamp."""
    ts = pd.Timestamp(ts)
    if ts.tzinfo is not None:
        ts = ts.tz_convert("UTC").tz_localize(None)
    return ts

def download_prices_intraday_1h_yahoo(ticker: str, start, end, period_days: int = 729) -> pd.DataFrame:
    start = _to_utc_naive(start)
    end = _to_utc_naive(end)

    df = yf.download(
        ticker,
        interval="1h",
        period=f"{period_days}d",
        auto_adjust=False,
        progress=False,
        prepost=True,
    )
    if df is None or df.empty:
        raise RuntimeError(f"No intraday price data for {ticker} via Yahoo (period={period_days}d)")

    if isinstance(df.columns, pd.MultiIndex):
        df.columns = df.columns.get_level_values(0)

    df = df.rename(columns=lambda c: str(c).strip().lower().replace(" ", "_"))

    idx = pd.to_datetime(df.index)
    # make UTC-aware then UTC-naive
    if getattr(idx, "tz", None) is None:
        idx = idx.tz_localize("UTC")
    else:
        idx = idx.tz_convert("UTC")
    idx = idx.tz_localize(None)

    df = df.reset_index().rename(columns={"Datetime": "datetime", "Date": "datetime", "index": "datetime"})
    df["datetime"] = idx
    df["ticker"] = ticker

    if "adj_close" not in df.columns:
        df["adj_close"] = df.get("close", np.nan)

    out = df[["datetime", "open", "high", "low", "close", "adj_close", "volume", "ticker"]].copy()
    out["volume"] = pd.to_numeric(out["volume"], errors="coerce")

    # ✅ clip safely (all UTC-naive)
    end_inclusive = end + pd.Timedelta(days=1) - pd.Timedelta(nanoseconds=1)
    out = out[(out["datetime"] >= start) & (out["datetime"] <= end_inclusive)].copy()
    out = out.sort_values("datetime")

    return out


# ----- use it -----
INTRADAY_PERIOD_DAYS = 729

intr_end = _to_utc_naive(END_DATE)
intr_start = _to_utc_naive(max(pd.Timestamp(START_DATE), pd.Timestamp(END_DATE) - pd.Timedelta(days=INTRADAY_PERIOD_DAYS)))

intraday_prices = []
for t in TICKERS:
    out_path = DATA_DIR / f"prices_intraday_1h_{t}.parquet"
    if out_path.exists():
        dfi = safe_read_parquet(out_path)
        # на всякий: нормализуем если в кеше вдруг tz-aware
        dfi["datetime"] = pd.to_datetime(dfi["datetime"]).dt.tz_localize(None)
        print("Loaded cached intraday:", out_path.name, dfi.shape, dfi["datetime"].min(), dfi["datetime"].max())
    else:
        dfi = download_prices_intraday_1h_yahoo(t, intr_start, intr_end, period_days=INTRADAY_PERIOD_DAYS)
        if dfi.empty:
            print(f"WARNING: {t} intraday empty after clipping to [{intr_start}..{intr_end}]")
        else:
            safe_to_parquet_auto(dfi, out_path)
            print("Saved intraday:", out_path.name, dfi.shape, dfi["datetime"].min(), dfi["datetime"].max())

    intraday_prices.append(dfi)

intraday_prices = pd.concat(intraday_prices, ignore_index=True)
safe_to_parquet_auto(intraday_prices, DATA_DIR / "prices_intraday_1h_all.parquet")
intraday_prices.head()


Saved intraday: prices_intraday_1h_AAPL.parquet (8274, 8) 2024-01-02 09:00:00 2025-12-29 14:30:00
Saved intraday: prices_intraday_1h_XOM.parquet (8298, 8) 2024-01-02 09:00:00 2025-12-29 14:30:00


Price,datetime,open,high,low,close,adj_close,volume,ticker
0,2024-01-02 09:00:00,191.68,191.75,189.75,190.07,190.07,0,AAPL
1,2024-01-02 10:00:00,190.07,190.25,189.8,190.06,190.06,0,AAPL
2,2024-01-02 11:00:00,190.08,190.2,188.87,188.87,188.87,0,AAPL
3,2024-01-02 12:00:00,188.92,189.65,188.79,189.19,189.19,0,AAPL
4,2024-01-02 13:00:00,189.18,192.2,187.78,187.99,187.99,0,AAPL


## 2) Новости + маппинг по датам

### Источник новостей
- **Основной (рекомендуется): Alpha Vantage `NEWS_SENTIMENT`** — поддерживает `time_from/time_to` (можно брать 3–5 лет), но нужен API key. citeturn3view0  
- **Fallback: GDELT DOC API** — без ключа, но по сути ограничен коротким окном истории (не подойдет для 3–5 лет). citeturn1view0

В коде ниже:
- если `ALPHAVANTAGE_API_KEY` задан, используем Alpha Vantage;
- иначе используем GDELT на коротком промежутке (чтобы ноутбук всё равно работал).

In [6]:
# TB228JYIOYDWU5Q9

ALPHAVANTAGE_API_KEY = os.getenv("ALPHAVANTAGE_API_KEY", "").strip()
ALPHAVANTAGE_API_KEY = "TB228JYIOYDWU5Q9"
USE_ALPHA_VANTAGE = bool(ALPHAVANTAGE_API_KEY)


print("USE_ALPHA_VANTAGE =", USE_ALPHA_VANTAGE)
if not USE_ALPHA_VANTAGE:
    print("⚠️  Нет ALPHAVANTAGE_API_KEY. Будет fallback на GDELT/RSS (история GDELT обычно ограничена последними ~3 месяцами).")


USE_ALPHA_VANTAGE = True


In [7]:
def yyyymmddThhmm(ts: pd.Timestamp) -> str:
    # Alpha Vantage: YYYYMMDDTHHMM (UTC)
    ts = ts.tz_localize(timezone.utc) if ts.tzinfo is None else ts.tz_convert(timezone.utc)
    return ts.strftime("%Y%m%dT%H%M")

def alpha_vantage_news_window(ticker: str, time_from: pd.Timestamp, time_to: pd.Timestamp, limit: int = 1000, sort: str="EARLIEST") -> pd.DataFrame:
    url = "https://www.alphavantage.co/query"
    params = {
        "function": "NEWS_SENTIMENT",
        "tickers": ticker,
        "time_from": yyyymmddThhmm(time_from),
        "time_to": yyyymmddThhmm(time_to),
        "limit": limit,
        "sort": sort,
        "apikey": ALPHAVANTAGE_API_KEY,
    }
    r = requests.get(url, params=params, timeout=60)
    r.raise_for_status()
    data = r.json()

    # возможные ошибки: {'Information': '...'} / {'Note': '...'}
    if "feed" not in data:
        raise RuntimeError(f"Alpha Vantage response without 'feed': {list(data.keys())}")

    rows = []
    for it in data["feed"]:
        rows.append({
            "ticker": ticker,
            "time_published": it.get("time_published"),
            "title": it.get("title"),
            "summary": it.get("summary"),
            "url": it.get("url"),
            "source": it.get("source"),
        })
    df = pd.DataFrame(rows)
    return df

def fetch_alpha_vantage_news(ticker: str, start: pd.Timestamp, end: pd.Timestamp, window_days: int = 30) -> pd.DataFrame:
    # chunk по окнам, чтобы не упираться в limit и проще переживать rate-limit
    all_parts = []
    cur = start
    pbar = tqdm(total=int((end-start).days/window_days)+1, desc=f"AV news {ticker}")
    while cur < end:
        nxt = min(cur + pd.Timedelta(days=window_days), end)
        try:
            part = alpha_vantage_news_window(ticker, cur, nxt, limit=1000, sort="EARLIEST")
            all_parts.append(part)
        except Exception as e:
            print("Window failed:", cur.date(), "→", nxt.date(), ":", repr(e))
        # free-tier rate limit обычно 5 запросов/мин → пауза; при необходимости уменьшите
        time.sleep(12)
        cur = nxt
        pbar.update(1)
    pbar.close()

    if not all_parts:
        return pd.DataFrame(columns=["ticker","time_published","title","summary","url","source"])

    df = pd.concat(all_parts, ignore_index=True).drop_duplicates(subset=["url"])
    return df

# GDELT fallback (короткое окно истории)
def fetch_gdelt_artlist(query: str, timespan: str = "3m", maxrecords: int = 250) -> pd.DataFrame:
    url = "https://api.gdeltproject.org/api/v2/doc/doc"
    params = {
        "query": query,
        "mode": "artlist",
        "format": "json",
        "timespan": timespan,
        "maxrecords": maxrecords,
        "sort": "datedesc",
    }
    r = requests.get(url, params=params, timeout=60)
    r.raise_for_status()
    data = r.json()
    arts = data.get("articles", [])
    rows = []
    for a in arts:
        rows.append({
            "time_published": a.get("seendate"),
            "title": a.get("title"),
            "summary": a.get("snippet"),
            "url": a.get("url"),
            "source": a.get("sourceCountry"),
        })
    return pd.DataFrame(rows)

In [10]:
# --- Новости: Alpha Vantage (если есть ключ) + GDELT (последние ~3 месяца) + RSS (последние недели) ---
# Результат: единый news DataFrame с полями:
# ticker, source, title, url, published_at (UTC-naive), date (UTC day, UTC-naive), text

import pandas as pd
import numpy as np
import requests
from datetime import datetime
import feedparser

DATA_DIR.mkdir(parents=True, exist_ok=True)

# ---- helpers: unify everything to UTC-naive ----
def to_datetime_utc_naive(x) -> pd.Timestamp:
    """Parse datetime-like and return UTC-naive pandas Timestamp (or NaT)."""
    if x is None or (isinstance(x, float) and pd.isna(x)) or (isinstance(x, str) and not x.strip()):
        return pd.NaT

    # already a datetime/timestamp
    if isinstance(x, (pd.Timestamp, datetime)):
        ts = pd.Timestamp(x)
        if ts.tzinfo is not None:
            ts = ts.tz_convert("UTC").tz_localize(None)
        return ts

    s = str(x).strip()

    # Alpha Vantage style: YYYYMMDDTHHMMSS / YYYYMMDDTHHMM
    try:
        if "T" in s and len(s) >= 13 and s[:8].isdigit():
            if len(s) >= 15:
                ts = pd.to_datetime(s[:15], format="%Y%m%dT%H%M%S", utc=True, errors="coerce")
            else:
                ts = pd.to_datetime(s[:13], format="%Y%m%dT%H%M", utc=True, errors="coerce")
        else:
            ts = pd.to_datetime(s, utc=True, errors="coerce")
    except Exception:
        ts = pd.to_datetime(s, utc=True, errors="coerce")

    if ts is pd.NaT or pd.isna(ts):
        return pd.NaT
    return ts.tz_convert("UTC").tz_localize(None)

def to_utc_naive_day(x) -> pd.Timestamp:
    """Convert anything timestamp-like to UTC-naive day (00:00)."""
    ts = pd.Timestamp(x)
    if ts.tzinfo is not None:
        ts = ts.tz_convert("UTC").tz_localize(None)
    return ts.normalize()

# normalize START/END once here (fixes tz-aware vs tz-naive comparisons everywhere below)
START_DAY = to_utc_naive_day(START_DATE)
END_DAY   = to_utc_naive_day(END_DATE)

def add_date_cols(df: pd.DataFrame) -> pd.DataFrame:
    df["published_at"] = df["published_at"].apply(to_datetime_utc_naive)
    # ensure dtype and UTC-naive
    df["published_at"] = pd.to_datetime(df["published_at"]).dt.tz_localize(None)
    df["date"] = df["published_at"].dt.floor("D").dt.tz_localize(None)
    return df

# ---- sources ----
def fetch_google_news_rss(query: str, max_items: int = 200) -> pd.DataFrame:
    """
    RSS history is short (weeks/months). Helps to increase news density recently.
    """
    url = "https://news.google.com/rss/search"
    params = {"q": query, "hl": "en-US", "gl": "US", "ceid": "US:en"}
    r = requests.get(url, params=params, timeout=30)
    r.raise_for_status()
    feed = feedparser.parse(r.text)

    rows = []
    for e in feed.entries[:max_items]:
        rows.append({
            "title": getattr(e, "title", None),
            "url": getattr(e, "link", None),
            "published_at": getattr(e, "published", None) or getattr(e, "updated", None),
            "source": "google_news_rss",
        })
    return pd.DataFrame(rows)

def fetch_gdelt_artlist(query: str, timespan: str = "3m", maxrecords: int = 250, sort: str = "datedesc") -> pd.DataFrame:
    """
    GDELT DOC 2.0 API. timespan is relative (e.g., 3m).
    """
    url = "https://api.gdeltproject.org/api/v2/doc/doc"
    params = {
        "query": query,
        "mode": "artlist",
        "format": "json",
        "maxrecords": maxrecords,
        "sort": sort,
        "timespan": timespan,
    }
    r = requests.get(url, params=params, timeout=60)
    r.raise_for_status()
    js = r.json()
    arts = js.get("articles", []) or []

    rows = []
    for a in arts:
        rows.append({
            "title": a.get("title"),
            "url": a.get("url"),
            "published_at": a.get("seendate") or a.get("date") or a.get("datetime"),
            "source": "gdelt",
        })
    return pd.DataFrame(rows)

# ---- normalizers ----
def normalize_alpha_vantage(df: pd.DataFrame, ticker: str) -> pd.DataFrame:
    if df is None or df.empty:
        return pd.DataFrame(columns=["ticker","source","title","url","published_at","text","date"])
    out = df.copy()
    out["ticker"] = ticker
    out["source"] = "alpha_vantage"
    out["published_at"] = out.get("time_published")
    summary = out.get("summary", out.get("description", ""))
    out["text"] = (out.get("title", "").astype(str) + ". " + summary.astype(str)).str.strip()
    out = out[["ticker","source","title","url","published_at","text"]]
    out = add_date_cols(out)
    return out

def normalize_generic(df: pd.DataFrame, ticker: str, source_default: str) -> pd.DataFrame:
    if df is None or df.empty:
        return pd.DataFrame(columns=["ticker","source","title","url","published_at","text","date"])
    out = df.copy()
    out["ticker"] = ticker
    out["source"] = out.get("source", source_default)
    out["published_at"] = out.get("published_at")
    out["title"] = out.get("title")
    out["url"] = out.get("url")
    out["text"] = out.get("text", out.get("title", "")).astype(str)
    out = out[["ticker","source","title","url","published_at","text"]]
    out = add_date_cols(out)
    return out

# ---- build dataset ----
news_all = []

for t in TICKERS:
    out_path = DATA_DIR / f"news_raw_{t}.parquet"

    if out_path.exists():
        df = safe_read_parquet(out_path)
        # ensure date dtype is UTC-naive for safe comparisons later
        df["published_at"] = pd.to_datetime(df["published_at"]).dt.tz_localize(None)
        df["date"] = pd.to_datetime(df["date"]).dt.tz_localize(None)
        print("Loaded cached:", out_path.name, df.shape)
    else:
        parts = []

        if USE_ALPHA_VANTAGE:
            df_av = fetch_alpha_vantage_news(t, START_DATE, END_DATE, window_days=30)
            parts.append(normalize_alpha_vantage(df_av, t))
        else:
            df_g = fetch_gdelt_artlist(query=f"{t} stock", timespan="3m", maxrecords=250)
            parts.append(normalize_generic(df_g, t, "gdelt"))

        # RSS (short history) — best effort
        try:
            df_rss = fetch_google_news_rss(f"{t} stock", max_items=200)
            parts.append(normalize_generic(df_rss, t, "google_news_rss"))
        except Exception as e:
            print(f"RSS skipped for {t}: {e}")

        df = pd.concat([p for p in parts if p is not None and not p.empty], ignore_index=True)

        # basic cleanup + safe types
        df["published_at"] = pd.to_datetime(df["published_at"]).dt.tz_localize(None)
        df["date"] = pd.to_datetime(df["date"]).dt.tz_localize(None)

        df = df.dropna(subset=["published_at", "title"]).copy()
        df = df.drop_duplicates(subset=["url", "title", "published_at"]).copy()

        # ✅ tz-safe filter by day range
        df = df[(df["date"] >= START_DAY) & (df["date"] <= END_DAY)].copy()

        safe_to_parquet_auto(df, out_path)
        print("Saved:", out_path.name, df.shape)

    news_all.append(df)

news_all = (
    pd.concat(news_all, ignore_index=True)
      .sort_values(["ticker", "published_at"])
      .reset_index(drop=True)
)

safe_to_parquet_auto(news_all, DATA_DIR / "news_raw_all.parquet")
news_all.head()


AV news AAPL: 100%|██████████| 61/61 [13:12<00:00, 12.99s/it]


Saved: news_raw_AAPL.parquet (3602, 7)


AV news XOM: 100%|██████████| 61/61 [13:08<00:00, 12.92s/it]


Saved: news_raw_XOM.parquet (2754, 7)


Unnamed: 0,ticker,source,title,url,published_at,text,date
0,AAPL,alpha_vantage,Intel shares rise after Third Point urges chip...,https://www.cnbc.com/2020/12/29/third-point-ur...,2020-12-29 12:56:00,Intel shares rise after Third Point urges chip...,2020-12-29
1,AAPL,alpha_vantage,Exclusive: Hedge fund Third Point urges Intel ...,https://www.reuters.com/business/retail-consum...,2020-12-30 02:24:00,Exclusive: Hedge fund Third Point urges Intel ...,2020-12-30
2,AAPL,alpha_vantage,Apple loses copyright claims in lawsuit agains...,https://www.reuters.com/business/apple-loses-c...,2020-12-30 05:28:00,Apple loses copyright claims in lawsuit agains...,2020-12-30
3,AAPL,alpha_vantage,Apple Veterans’ Lidar Startup Adds $200 Millio...,https://www.bloomberg.com/news/articles/2021-0...,2021-01-04 12:00:00,Apple Veterans’ Lidar Startup Adds $200 Millio...,2021-01-04
4,AAPL,alpha_vantage,The TDVG ETF Is a Stellar Choice for Dividend ...,https://etfdb.com/active-etf-channel/tdvg-etf-...,2021-01-04 21:22:02,The TDVG ETF Is a Stellar Choice for Dividend ...,2021-01-04


## 3) FinBERT → `s(x)` и дневной индекс `I_t`

Модель: **ProsusAI/finbert** (3 класса: positive/negative/neutral). citeturn0search1  

Считаем для каждой новости:

- `p_pos, p_neg, p_neu` — softmax вероятности
- `s(x) = p_pos - p_neg` (в диапазоне [-1, 1]) — удобная непрерывная метрика citeturn0search13

Далее агрегируем по дню и тикеру:

- `I_t = mean(s(x))` по всем новостям в этот день.

In [11]:
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

MODEL_NAME = "ProsusAI/finbert"

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
model.eval()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print("device:", device)
print("labels:", model.config.id2label)

device: cpu
labels: {0: 'positive', 1: 'negative', 2: 'neutral'}


In [12]:
def finbert_scores(texts, batch_size: int = 16, max_length: int = 256):
    # returns np.array shape [n,3] aligned with model.config.id2label
    probs_all = []
    for i in tqdm(range(0, len(texts), batch_size), desc="FinBERT", leave=False):
        batch = texts[i:i+batch_size]
        enc = tokenizer(
            batch, truncation=True, padding=True, max_length=max_length,
            return_tensors="pt"
        )
        enc = {k: v.to(device) for k, v in enc.items()}
        with torch.no_grad():
            logits = model(**enc).logits
            probs = torch.softmax(logits, dim=-1).detach().cpu().numpy()
        probs_all.append(probs)
    return np.vstack(probs_all) if probs_all else np.zeros((0,3), dtype=float)

def build_text(row) -> str:
    title = row.get("title") or ""
    summary = row.get("summary") or ""
    txt = (title + ". " + summary).strip()
    return txt[:5000]  # safety cap

news = news_all.copy()
news["text"] = news.apply(build_text, axis=1)

# ограничим пустые/короткие тексты
news = news[news["text"].str.len() >= 5].copy()
news = news.sort_values(["ticker","date","url"]).reset_index(drop=True)

print("News rows:", len(news))
news.head()

News rows: 6355


Unnamed: 0,ticker,source,title,url,published_at,text,date
0,AAPL,alpha_vantage,Intel shares rise after Third Point urges chip...,https://www.cnbc.com/2020/12/29/third-point-ur...,2020-12-29 12:56:00,Intel shares rise after Third Point urges chip...,2020-12-29
1,AAPL,alpha_vantage,Apple loses copyright claims in lawsuit agains...,https://www.reuters.com/business/apple-loses-c...,2020-12-30 05:28:00,Apple loses copyright claims in lawsuit agains...,2020-12-30
2,AAPL,alpha_vantage,Exclusive: Hedge fund Third Point urges Intel ...,https://www.reuters.com/business/retail-consum...,2020-12-30 02:24:00,Exclusive: Hedge fund Third Point urges Intel ...,2020-12-30
3,AAPL,alpha_vantage,The TDVG ETF Is a Stellar Choice for Dividend ...,https://etfdb.com/active-etf-channel/tdvg-etf-...,2021-01-04 21:22:02,The TDVG ETF Is a Stellar Choice for Dividend ...,2021-01-04
4,AAPL,alpha_vantage,Apple Veterans’ Lidar Startup Adds $200 Millio...,https://www.bloomberg.com/news/articles/2021-0...,2021-01-04 12:00:00,Apple Veterans’ Lidar Startup Adds $200 Millio...,2021-01-04


In [None]:
import numpy as np
import pandas as pd

DATA_DIR.mkdir(parents=True, exist_ok=True)

# --- 1) Parquet safe writer (самодостаточно) ---
_PARQUET_ENGINE = None
try:
    import fastparquet  # noqa: F401
    _PARQUET_ENGINE = "fastparquet"
except Exception:
    _PARQUET_ENGINE = "pyarrow"

def _reset_pyarrow_pandas_ext_types():
    try:
        import pyarrow as pa
    except Exception:
        return
    for name in ("pandas.period", "pandas.interval"):
        try:
            pa.unregister_extension_type(name)
        except Exception:
            pass
    for attr in ("registered_extension_types", "get_registered_extension_types", "list_registered_extension_types"):
        try:
            fn = getattr(pa, attr)
        except Exception:
            continue
        try:
            reg = fn()
            names = list(reg.keys()) if isinstance(reg, dict) else list(reg)
            for n in names:
                n = str(n)
                if n.startswith("pandas."):
                    try:
                        pa.unregister_extension_type(n)
                    except Exception:
                        pass
            break
        except Exception:
            pass

def safe_to_parquet(df: pd.DataFrame, path):
    try:
        df.to_parquet(path, index=False, engine=_PARQUET_ENGINE)
    except Exception as e:
        msg = str(e)
        if ("type extension with name pandas." in msg and "already defined" in msg) or "ArrowKeyError" in msg:
            _reset_pyarrow_pandas_ext_types()
            df.to_parquet(path, index=False, engine=_PARQUET_ENGINE)  # retry once
        else:
            raise

print("Parquet engine:", _PARQUET_ENGINE)

# --- 2) FinBERT label mapping (robust к разным вариантам id2label) ---
id2label = {int(k): str(v).lower() for k, v in model.config.id2label.items()}

pos_id = next((i for i, lab in id2label.items() if "pos" in lab), None)
neg_id = next((i for i, lab in id2label.items() if "neg" in lab), None)
neu_id = next((i for i, lab in id2label.items() if "neu" in lab), None)

if pos_id is None or neg_id is None:
    raise RuntimeError(f"Unexpected FinBERT labels: {id2label}")

# --- 3) Готовим тексты: гарантируем str и без NaN ---
if "text" not in news.columns:
    raise RuntimeError("news must have a 'text' column")

texts = news["text"].fillna("").astype(str).tolist()

# --- 4) Скоринг ---
probs = finbert_scores(texts, batch_size=16, max_length=256)

# probs может быть list -> в ndarray
probs = np.asarray(probs, dtype="float32")

news["p_pos"] = probs[:, pos_id]
news["p_neg"] = probs[:, neg_id]
news["p_neu"] = probs[:, neu_id] if neu_id is not None else np.nan
news["s_x"] = news["p_pos"] - news["p_neg"]

# --- 5) Сохранение ---
out_path = DATA_DIR / "news_scored_all.parquet"
safe_to_parquet(news, out_path)
print("Saved:", out_path)

news.head()


Parquet engine: pyarrow


                                                          

Saved: data/news_scored_all.parquet




Unnamed: 0,ticker,source,title,url,published_at,text,date,p_pos,p_neg,p_neu,s_x
0,AAPL,alpha_vantage,Intel shares rise after Third Point urges chip...,https://www.cnbc.com/2020/12/29/third-point-ur...,2020-12-29 12:56:00,Intel shares rise after Third Point urges chip...,2020-12-29,0.214336,0.74141,0.044254,-0.527073
1,AAPL,alpha_vantage,Apple loses copyright claims in lawsuit agains...,https://www.reuters.com/business/apple-loses-c...,2020-12-30 05:28:00,Apple loses copyright claims in lawsuit agains...,2020-12-30,0.027705,0.906648,0.065647,-0.878942
2,AAPL,alpha_vantage,Exclusive: Hedge fund Third Point urges Intel ...,https://www.reuters.com/business/retail-consum...,2020-12-30 02:24:00,Exclusive: Hedge fund Third Point urges Intel ...,2020-12-30,0.64967,0.015036,0.335294,0.634633
3,AAPL,alpha_vantage,The TDVG ETF Is a Stellar Choice for Dividend ...,https://etfdb.com/active-etf-channel/tdvg-etf-...,2021-01-04 21:22:02,The TDVG ETF Is a Stellar Choice for Dividend ...,2021-01-04,0.902477,0.009486,0.088037,0.892991
4,AAPL,alpha_vantage,Apple Veterans’ Lidar Startup Adds $200 Millio...,https://www.bloomberg.com/news/articles/2021-0...,2021-01-04 12:00:00,Apple Veterans’ Lidar Startup Adds $200 Millio...,2021-01-04,0.438414,0.012534,0.549052,0.425879


In [15]:

# --- Дневные и часовые индексы новостного фона ---
# Требуется: news (после скоринга, с колонками: ticker, published_at, date, s_x)

import pandas as pd
import numpy as np

TAU_STRONG = 0.5  # порог "сильной" новости по |s(x)| (можете менять)

def _mean_pos(x):
    x = x[x > 0]
    return x.mean() if len(x) else np.nan

def _mean_neg(x):
    x = x[x < 0]
    return x.mean() if len(x) else np.nan

# 1) Daily
daily_idx = (
    news.groupby(["ticker","date"], as_index=False)
        .agg(
            I_t=("s_x","mean"),
            I_t_max=("s_x", lambda v: np.nanmax(np.abs(v.values)) if len(v) else np.nan),
            N_t_strong=("s_x", lambda v: float(np.sum(np.abs(v.values) >= TAU_STRONG))),
            n_news=("s_x","size"),
            I_t_pos=("s_x", _mean_pos),
            I_t_neg=("s_x", _mean_neg),
        )
)

safe_to_parquet(daily_idx, DATA_DIR / "daily_sentiment_indices.parquet")

# 2) Hourly (1h bins, UTC)
news["hour"] = news["published_at"].dt.floor("H")
hourly_idx = (
    news.dropna(subset=["hour"])
        .groupby(["ticker","hour"], as_index=False)
        .agg(
            I_h=("s_x","mean"),
            I_h_max=("s_x", lambda v: np.nanmax(np.abs(v.values)) if len(v) else np.nan),
            N_h_strong=("s_x", lambda v: float(np.sum(np.abs(v.values) >= TAU_STRONG))),
            n_news=("s_x","size"),
        )
        .rename(columns={"hour":"datetime"})
)

safe_to_parquet(hourly_idx, DATA_DIR / "hourly_sentiment_indices_1h.parquet")

daily_idx.head(), hourly_idx.head()


  news["hour"] = news["published_at"].dt.floor("H")


(  ticker       date       I_t   I_t_max  N_t_strong  n_news   I_t_pos  \
 0   AAPL 2020-12-29 -0.527073  0.527073         1.0       1       NaN   
 1   AAPL 2020-12-30 -0.122154  0.878942         2.0       2  0.634633   
 2   AAPL 2021-01-04  0.659435  0.892991         1.0       2  0.659435   
 3   AAPL 2021-01-05 -0.162034  0.510277         1.0       3  0.060650   
 4   AAPL 2021-01-06 -0.073685  0.262288         0.0       3  0.075242   
 
     I_t_neg  
 0 -0.527073  
 1 -0.878942  
 2       NaN  
 3 -0.273376  
 4 -0.148149  ,
   ticker            datetime       I_h   I_h_max  N_h_strong  n_news
 0   AAPL 2020-12-29 12:00:00 -0.527073  0.527073         1.0       1
 1   AAPL 2020-12-30 02:00:00  0.634633  0.634633         1.0       1
 2   AAPL 2020-12-30 05:00:00 -0.878942  0.878942         1.0       1
 3   AAPL 2021-01-04 12:00:00  0.425879  0.425879         0.0       1
 4   AAPL 2021-01-04 21:00:00  0.892991  0.892991         1.0       1)

## 4) Returns + RSI + MACD

In [16]:
import pandas as pd
import numpy as np

def yf_multiindex_to_long(prices: pd.DataFrame) -> pd.DataFrame:
    if not isinstance(prices.columns, pd.MultiIndex):
        raise TypeError(f"Expected MultiIndex columns, got {type(prices.columns)}")

    p = prices.copy()

    def norm(x) -> str:
        return str(x).strip().lower().replace(" ", "_")

    # 1) нормализуем имена полей (level 0) и тикеры (level 1)
    p.columns = pd.MultiIndex.from_tuples([
        (norm(a), str(b).strip().upper() if b is not None else "")
        for a, b in p.columns
    ])

    # 2) дата: у тебя она колонкой ('date','') -> делаем индексом
    if ("date", "") in p.columns:
        p = p.set_index(("date", ""))
        p.index.name = "date"
    else:
        # если даты нет колонкой, считаем что индекс и есть дата
        p.index.name = p.index.name or "date"

    # 3) ВАЖНО: выкидываем служебный ('ticker',''), чтобы не было конфликта при reset_index()
    if ("ticker", "") in p.columns:
        p = p.drop(columns=[("ticker", "")])

    # 4) stack по тикеру (level 1)
    out = (
        p.stack(level=1)
         .rename_axis(index=["date", "ticker"])
         .reset_index()
    )

    # 5) финальная чистка
    out.columns = [norm(c) for c in out.columns]
    out["date"] = pd.to_datetime(out["date"], errors="coerce")
    out["ticker"] = out["ticker"].astype("string").str.strip().str.upper()

    # оставим основные поля, если есть
    keep = [c for c in ["date","ticker","open","high","low","close","adj_close","volume"] if c in out.columns]
    out = (
        out[keep]
        .dropna(subset=["date","ticker"])
        .sort_values(["ticker","date"])
        .reset_index(drop=True)
    )
    return out

# --- usage ---
prices = yf_multiindex_to_long(prices)

# тикеры в том же виде, что в prices
TICKERS = [str(t).strip().upper() for t in TICKERS]

print(prices.columns)
print(prices.head())
print(prices["ticker"].value_counts().head())


Index(['date', 'ticker', 'open', 'high', 'low', 'close', 'adj_close',
       'volume'],
      dtype='object')
        date ticker        open        high         low       close  \
0 2020-12-29   AAPL  138.050003  138.789993  134.339996  134.869995   
1 2020-12-30   AAPL  135.580002  135.990005  133.399994  133.720001   
2 2020-12-31   AAPL  134.080002  134.740005  131.720001  132.690002   
3 2021-01-04   AAPL  133.520004  133.610001  126.760002  129.410004   
4 2021-01-05   AAPL  128.889999  131.740005  128.429993  131.009995   

    adj_close       volume  
0  131.289490  121047300.0  
1  130.170029   96452100.0  
2  129.167389   99116600.0  
3  125.974472  143301900.0  
4  127.532013   97664900.0  
ticker
AAPL    1256
XOM     1256
Name: count, dtype: Int64


  p.stack(level=1)


In [17]:
import numpy as np
import pandas as pd

# --- 0) sanity: нормализуем названия колонок (если вдруг где-то иначе) ---
prices = prices.copy()
prices.columns = [str(c).strip().lower().replace(" ", "_") for c in prices.columns]

# --- 1) выбираем правильную колонку цены для расчётов ---
# приоритет: adj_close -> close
price_col = "adj_close" if "adj_close" in prices.columns else ("close" if "close" in prices.columns else None)
if price_col is None:
    raise RuntimeError(f"prices must contain adj_close or close. Got columns: {list(prices.columns)}")

# --- 2) индикаторы ---
def ema(series: pd.Series, span: int) -> pd.Series:
    return series.ewm(span=span, adjust=False).mean()

def rsi(close: pd.Series, period: int = 14) -> pd.Series:
    delta = close.diff()
    gain = delta.clip(lower=0)
    loss = (-delta).clip(lower=0)
    avg_gain = gain.ewm(alpha=1/period, adjust=False).mean()
    avg_loss = loss.ewm(alpha=1/period, adjust=False).mean()
    rs = avg_gain / avg_loss.replace(0, np.nan)
    return 100 - (100 / (1 + rs))

def macd(close: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9):
    macd_line = ema(close, fast) - ema(close, slow)
    signal_line = ema(macd_line, signal)
    hist = macd_line - signal_line
    return macd_line, signal_line, hist

# --- 3) Фичи по каждому тикеру ---
feat_parts = []

for t in TICKERS:
    p = prices.loc[prices["ticker"] == t].copy()

    if p.empty:
        print(f"WARNING: no rows for {t}")
        continue

    p = p.sort_values("date")

    px = pd.to_numeric(p[price_col], errors="coerce")
    p["returns"] = px.pct_change()  # close-to-close
    p["RSI"] = rsi(px, period=14)
    macd_line, signal_line, hist = macd(px)
    p["MACD"] = macd_line
    p["MACD_signal"] = signal_line
    p["MACD_hist"] = hist

    feat_parts.append(p)

feat_prices = pd.concat(feat_parts, ignore_index=True)

# (опционально) оставим только ключевые колонки + индикаторы
keep_cols = [c for c in ["date", "ticker", price_col, "returns", "RSI", "MACD", "MACD_signal", "MACD_hist"] if c in feat_prices.columns]
feat_prices_out = feat_prices[keep_cols].copy()

safe_to_parquet(feat_prices_out, DATA_DIR / "price_features.parquet")
feat_prices_out.head()


Unnamed: 0,date,ticker,adj_close,returns,RSI,MACD,MACD_signal,MACD_hist
0,2020-12-29,AAPL,131.28949,,,0.0,0.0,0.0
1,2020-12-30,AAPL,130.170029,-0.008527,0.0,-0.089302,-0.01786,-0.071441
2,2020-12-31,AAPL,129.167389,-0.007703,0.0,-0.238232,-0.061935,-0.176298
3,2021-01-04,AAPL,125.974472,-0.024719,0.0,-0.606907,-0.170929,-0.435978
4,2021-01-05,AAPL,127.532013,0.012364,8.684289,-0.76459,-0.289661,-0.474928


## 5) Финальная таблица `date, returns, RSI, MACD, I_t`

Склеиваем price-features и `I_t` по (`ticker`, `date`). Если в какой-то день новостей нет — `I_t` будет NaN.

In [18]:

# --- Финальная дневная таблица: returns + RSI + MACD + новостные индексы ---
# Требуется: feat_prices (цены+фичи) и daily_idx (из предыдущей секции)

try:
    d_idx = daily_idx.copy()
except NameError:
    d_idx = safe_read_parquet(DATA_DIR / "daily_sentiment_indices.parquet")

final = feat_prices.merge(
    d_idx,
    on=["ticker","date"],
    how="left"
)

# минимальный набор (как в ТЗ)
final_small = final[["ticker","date","returns","RSI","MACD","I_t"]].copy()
safe_to_parquet(final_small, DATA_DIR / "final_features_all.parquet")

# расширенный набор (для анализа порогов/интенсивности)
final_ext = final[["ticker","date","returns","RSI","MACD","I_t","I_t_max","N_t_strong","n_news","I_t_pos","I_t_neg"]].copy()
safe_to_parquet(final_ext, DATA_DIR / "final_features_all_extended.parquet")

# также отдельные файлы по тикерам
for t in TICKERS:
    safe_to_parquet(final_small[final_small["ticker"] == t].copy(), DATA_DIR / f"final_features_{t}.parquet")
    safe_to_parquet(final_ext[final_ext["ticker"] == t].copy(), DATA_DIR / f"final_features_extended_{t}.parquet")

final_ext.tail()


Unnamed: 0,ticker,date,returns,RSI,MACD,I_t,I_t_max,N_t_strong,n_news,I_t_pos,I_t_neg
2507,XOM,2025-12-22,0.012512,54.44325,0.438767,0.181875,0.906059,9.0,24.0,0.308234,-0.702637
2508,XOM,2025-12-23,0.010749,57.885258,0.579341,0.179612,0.901233,7.0,20.0,0.31518,-0.36266
2509,XOM,2025-12-24,-0.001675,57.152934,0.66692,0.548885,0.927096,9.0,15.0,0.588975,-0.012382
2510,XOM,2025-12-26,-0.000923,56.727837,0.719162,0.062667,0.832491,2.0,17.0,0.141384,-0.30468
2511,XOM,2025-12-29,0.009572,60.044899,0.842844,-0.134916,0.952407,2.0,4.0,0.534848,-0.35817


## 6) Intraday (1h) фичи + часовой новостной индекс

Собираем датасет на **часовой** частоте: `datetime, ticker, r_fwd1h, r_fwd3h, I_h, I_h_max, N_h_strong, n_news`  
(используется в Notebook 2 для intraday-корреляций и event-study).

In [19]:

# --- Merge intraday prices with hourly sentiment index ---
import pandas as pd
import numpy as np

# 1) Ensure we have intraday_prices (from section 1b) and hourly_idx (from indices section)
try:
    intr = intraday_prices.copy()
except NameError:
    intr = safe_read_parquet(DATA_DIR / "prices_intraday_1h_all.parquet")

try:
    hidx = hourly_idx.copy()
except NameError:
    hidx = safe_read_parquet(DATA_DIR / "hourly_sentiment_indices_1h.parquet")

intr = intr.sort_values(["ticker","datetime"]).copy()

# forward returns from hour t close -> hour t+1/t+3 close
intr["r_fwd1h"] = intr.groupby("ticker")["adj_close"].pct_change().shift(-1)
intr["r_fwd3h"] = intr.groupby("ticker")["adj_close"].pct_change(3).shift(-3)

# merge
intr_merged = intr.merge(hidx, on=["ticker","datetime"], how="left")

# keep useful columns
keep_cols = ["datetime","ticker","adj_close","volume","r_fwd1h","r_fwd3h","I_h","I_h_max","N_h_strong","n_news"]
for col in keep_cols:
    if col not in intr_merged.columns:
        intr_merged[col] = np.nan
intr_merged = intr_merged[keep_cols].sort_values(["ticker","datetime"])

safe_to_parquet(intr_merged, DATA_DIR / "final_features_intraday_1h.parquet")
intr_merged.head()


Unnamed: 0,datetime,ticker,adj_close,volume,r_fwd1h,r_fwd3h,I_h,I_h_max,N_h_strong,n_news
0,2024-01-02 09:00:00,AAPL,190.07,0,-5.3e-05,-0.00463,,,,
1,2024-01-02 10:00:00,AAPL,190.06,0,-0.006261,-0.010891,,,,
2,2024-01-02 11:00:00,AAPL,188.87,0,0.001694,-0.003653,,,,
3,2024-01-02 12:00:00,AAPL,189.19,0,-0.006343,-0.015858,,,,
4,2024-01-02 13:00:00,AAPL,187.99,0,0.001011,-0.006304,,,,
