In [4]:
# Notebook: 02_build_macro_and_parquet.ipynb

import pandas as pd, numpy as np, json
from pathlib import Path

# I/O
OUTDIR = Path("macro_retrieval")
OUTDIR.mkdir(parents=True, exist_ok=True)

# Base CSV from Step 1 (your cleaned retrieval-ready file)
BASE_CSV = OUTDIR = "base_sp500_2007_2023.csv"   # <- change if your filename differs

# Switch to True if you want to fetch via FRED API now
USE_FRED = False   # otherwise expects four small CSVs (see Cell 3)

# Helper: business-day index (NYSE-style approximated by pandas)
def bdays(start, end):
    return pd.bdate_range(start=start, end=end)


In [5]:
base = pd.read_csv(BASE_CSV, parse_dates=["Date"]).sort_values("Date")

# Parse the JSON string embedding -> numpy array
def parse_emb(s):
    arr = np.array(json.loads(s), dtype=np.float32)
    return arr

base["text_embed"] = base["prev_day_embedding_json"].apply(parse_emb)
base = base.drop(columns=["prev_day_embedding_json"])

# Derive train window from the data (no hard-coded dates)
TRAIN_START = base["Date"].min().normalize()
TRAIN_END   = base["Date"].max().normalize()
print("Train window:", TRAIN_START.date(), "→", TRAIN_END.date())

# Quick sanity checks
assert base["Date"].is_monotonic_increasing, "Dates not sorted"
assert base["Movement"].isin([0,1]).all(), "Movement must be 0/1"
print("Base rows:", len(base), "Columns:", list(base.columns))


Train window: 2007-08-07 → 2023-07-14
Base rows: 2799 Columns: ['Date', 'Movement', 'Open', 'Close_lag1', 'High_lag1', 'Volume_lag1', 'Daily_Return_lag1', 'Volatility_lag1', 'sentiment_volatility_lag1', 'aggregate_sentiment_score_lag1', 'text_embed']


In [6]:
# For CPI YoY we need 12 prior months; add cushion for publication lags (max ~30d for GDP)
train_start = pd.to_datetime(TRAIN_START)
train_end   = pd.to_datetime(TRAIN_END)

macro_fetch_start = (train_start - pd.DateOffset(months=13) - pd.DateOffset(days=45)).normalize()
macro_fetch_end   = train_end
print("Fetch macro from:", macro_fetch_start.date(), "to:", macro_fetch_end.date())


Fetch macro from: 2006-05-23 to: 2023-07-14


In [10]:
from fredapi import Fred
fred = Fred(api_key="ae61356225c8e8de915564116aa0c9f3")

# CPI (monthly): compute YoY %
cpi = fred.get_series('CPIAUCSL', observation_start=macro_fetch_start, observation_end=macro_fetch_end)
cpi = cpi.to_frame("cpi").reset_index().rename(columns={"index":"Date"})
cpi["cpi_yoy"] = cpi["cpi"].pct_change(12) * 100

# Unemployment (monthly, %)
unrate = fred.get_series('UNRATE', observation_start=macro_fetch_start, observation_end=macro_fetch_end)
unrate = unrate.to_frame("unrate").reset_index().rename(columns={"index":"Date"})

# 10Y-2Y spread (daily, pp)
t10y2y = fred.get_series('T10Y2Y', observation_start=macro_fetch_start, observation_end=macro_fetch_end)
t10y2y = t10y2y.to_frame("t10y2y").reset_index().rename(columns={"index":"Date"})

# Real GDP (quarterly): compute QoQ %
gdp = fred.get_series('GDPC1', observation_start=macro_fetch_start, observation_end=macro_fetch_end)
gdp = gdp.to_frame("gdp").reset_index().rename(columns={"index":"Date"})
gdp["gdp_qoq"] = gdp["gdp"].pct_change() * 100


In [11]:
# Merge sparse macro series (native frequencies)
macro = (cpi[["Date","cpi_yoy"]]
         .merge(unrate[["Date","unrate"]], on="Date", how="outer")
         .merge(t10y2y[["Date","t10y2y"]], on="Date", how="outer")
         .merge(gdp[["Date","gdp_qoq"]], on="Date", how="outer")
        ).sort_values("Date")

# Reindex to business days on the wide window, forward-fill across days
wide_idx = bdays(macro_fetch_start, macro_fetch_end)
macro_w = (macro.set_index("Date").reindex(wide_idx).rename_axis("Date").ffill())

# Apply publication lags (strict causality)
macro_w["cpi_yoy_lagged"] = macro_w["cpi_yoy"].shift(10)  # CPI +10d
macro_w["unrate_lagged"]  = macro_w["unrate"].shift(5)    # UNRATE +5d
macro_w["t10y2y_lagged"]  = macro_w["t10y2y"]             # 0d
macro_w["gdp_qoq_lagged"] = macro_w["gdp_qoq"].shift(30)  # GDP +30d

macro_w = macro_w.ffill().reset_index()
print("Wide macro rows (b-days):", len(macro_w))


Wide macro rows (b-days): 4474


In [13]:
# Trim to actual training window
macro_train = macro_w[(macro_w["Date"] >= TRAIN_START) & (macro_w["Date"] <= TRAIN_END)].copy()

lag_cols = ["cpi_yoy_lagged","unrate_lagged","t10y2y_lagged","gdp_qoq_lagged"]
mu = macro_train[lag_cols].mean()
sd = macro_train[lag_cols].std(ddof=0).replace(0, 1.0)

for c in lag_cols:
    macro_train[c + "_z"] = (macro_train[c] - mu[c]) / sd[c]

macro_daily = macro_train[["Date"] + [c+"_z" for c in lag_cols]]
print("Macro (train) rows:", len(macro_daily), "Cols:", macro_daily.columns.tolist())

# Save stats for reproducibility
json.dump({"mu": mu.to_dict(), "sd": sd.to_dict()}, open("macro_zscore_stats.json","w"))


Macro (train) rows: 4159 Cols: ['Date', 'cpi_yoy_lagged_z', 'unrate_lagged_z', 't10y2y_lagged_z', 'gdp_qoq_lagged_z']


In [15]:
# Keep base within the same train window (safety)
base_train = base[(base["Date"] >= TRAIN_START) & (base["Date"] <= TRAIN_END)].copy()

final = base_train.merge(macro_daily, on="Date", how="left").sort_values("Date")

# Parquet prefers lists (not ndarrays) for variable-length vectors
final["text_embed"] = final["text_embed"].apply(lambda a: a.tolist())

final_path = "sp500_features.parquet"
final.to_parquet(final_path, index=False)

print("Saved:", final_path)
print("Rows:", len(final))
print("Columns:", list(final.columns))


Saved: sp500_features.parquet
Rows: 2799
Columns: ['Date', 'Movement', 'Open', 'Close_lag1', 'High_lag1', 'Volume_lag1', 'Daily_Return_lag1', 'Volatility_lag1', 'sentiment_volatility_lag1', 'aggregate_sentiment_score_lag1', 'text_embed', 'cpi_yoy_lagged_z', 'unrate_lagged_z', 't10y2y_lagged_z', 'gdp_qoq_lagged_z']


In [19]:
# === Sanity prints (no asserts) ===
import numpy as np

print(f"Rows: {len(final)}")
print(f"Date range: {final['Date'].min().date()} → {final['Date'].max().date()}")

# 1) NaNs in required macro z-cols
required = ["cpi_yoy_lagged_z","unrate_lagged_z","t10y2y_lagged_z","gdp_qoq_lagged_z"]
nan_counts = final[required].isna().sum()
print("\nNaNs per macro z-col:\n", nan_counts)

# 2) Embedding dimensionality & consistency
first_dim = len(final["text_embed"].iloc[0])
dims = final["text_embed"].apply(lambda v: len(v))
print(f"\nEmbedding first row dim: {first_dim}")
if dims.nunique() == 1 and dims.iloc[0] == first_dim:
    print(f"Embeddings: OK (all dims == {first_dim})")
else:
    print("Embeddings: Inconsistencies found!")
    print("Dim counts:", dims.value_counts().to_dict())
    bad_idx = dims[dims != first_dim].index[:5]
    print("Sample problematic indices:", list(bad_idx))

# 3) Movement integrity
valid_mask = final["Movement"].isin([0, 1])
invalid_count = (~valid_mask).sum()
print("\nMovement unique values:", np.sort(final["Movement"].unique()))
if invalid_count == 0:
    print("Movement: OK (only 0/1)")
else:
    print(f"Movement: {invalid_count} invalid rows detected.")
    print("Sample invalid row indices:", list(final.index[~valid_mask][:5]))

# 4) Date monotonicity
is_mono = final["Date"].is_monotonic_increasing
print("\nDate monotonic increasing:", is_mono)
if not is_mono:
    deltas = final["Date"].diff()
    bad = deltas.dt.days.fillna(0) < 0
    print("First monotonic violations at indices:", list(final.index[bad][:5]))

# 5) Duplicate dates
dup_mask = final["Date"].duplicated(keep=False)
dup_count = int(dup_mask.sum())
if dup_count == 0:
    print("Duplicates: OK (no duplicate dates)")
else:
    print(f"Duplicates: Found {dup_count} duplicate rows.")
    dup_dates = final.loc[dup_mask, "Date"].value_counts().head(5)
    print("Sample duplicate dates and counts:\n", dup_dates)


Rows: 2799
Date range: 2007-08-07 → 2023-07-14

NaNs per macro z-col:
 cpi_yoy_lagged_z    0
unrate_lagged_z     0
t10y2y_lagged_z     0
gdp_qoq_lagged_z    0
dtype: int64

Embedding first row dim: 384
Embeddings: OK (all dims == 384)

Movement unique values: [0 1]
Movement: OK (only 0/1)

Date monotonic increasing: True
Duplicates: OK (no duplicate dates)


In [1]:
import numpy as np, pandas as pd, json
from pathlib import Path

OUTDIR = Path("macro_retrieval")
train_path = "sp500_features.parquet"
final = pd.read_parquet(train_path)

# Stack train matrices
text_train  = np.vstack(final["text_embed"].to_numpy()).astype("float32")              # [N, d]
macro_train = final[["cpi_yoy_lagged_z","unrate_lagged_z","t10y2y_lagged_z","gdp_qoq_lagged_z"]].to_numpy().astype("float32")  # [N, 4]

# Joint = norm([ text ; α * macro ])
alpha = 0.5
joint_train = np.concatenate([text_train, alpha * macro_train], axis=1)
joint_train /= (np.linalg.norm(joint_train, axis=1, keepdims=True) + 1e-9)

# Save artifacts
np.save(OUTDIR/"text_train.npy", text_train)
pd.DataFrame({"date": final["Date"], "joint_embed": list(joint_train)}).to_parquet(OUTDIR/"train_joint_vectors.parquet", index=False)
final[["Date"]].rename(columns={"Date":"date"}).to_parquet(OUTDIR/"index_meta.parquet", index=False)
print("Saved joint + meta. Shapes:", text_train.shape, joint_train.shape)

# Build FAISS index
try:
    import faiss
except Exception as e:
    print("FAISS not found. Install with: pip install faiss-cpu")
    raise

d = joint_train.shape[1]
index = faiss.IndexFlatIP(d)
index = faiss.IndexIDMap2(index)
ids = np.arange(joint_train.shape[0]).astype("int64")
index.add_with_ids(joint_train.astype("float32"), ids)
faiss.write_index(index, str(OUTDIR/"index_flat_ip.faiss"))
print("FAISS index built. Vectors:", index.ntotal, "Dim:", d)



Saved joint + meta. Shapes: (2799, 384) (2799, 388)
FAISS index built. Vectors: 2799 Dim: 388
