# Stock LSTM — Unified Backtest + 10‑day Forecast

This notebook is parameterized for Papermill.

**Inputs:** `TICKER, LOOKBACK, CONTEXT, BACKTEST_HORIZON, HORIZON, OUTPUT_JSON`

**Output file format (JSON)**:
```json
{
  "ticker": "AAPL",
  "look_back": 60,
  "context": 100,
  "backtest_horizon": 20,
  "horizon": 10,
  "metrics": {
    "rmse": 0.0,
    "mape": 0.0,
    "accuracy_pct": 0.0,
    "expected_10d_move_pct": 0.0
  },
  "forecast": [
    {"date": "YYYY-MM-DD", "actual": 123.45, "part": "context"},
    {"date": "YYYY-MM-DD", "pred": 123.45, "part": "backtest"},
    {"date": "YYYY-MM-DD", "pred": 123.45, "part": "forecast"}
  ]
}
```


In [None]:
import os, math, json
import numpy as np
import pandas as pd
import yfinance as yf
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error
import tensorflow as tf

print("TensorFlow:", tf.__version__)


In [None]:
# Papermill parameters (will be overridden when executed by Papermill)
TICKER = "AAPL"
LOOKBACK = 60
CONTEXT = 100
BACKTEST_HORIZON = 20
HORIZON = 10
OUTPUT_JSON = "forecast.json"


In [None]:
# -------------------------------
# Helper functions & model
# -------------------------------

def fetch_prices(ticker: str, start="2016-01-01", end=None, interval="1d") -> pd.DataFrame:
    df = yf.download(ticker, start=start, end=end, interval=interval, auto_adjust=True, progress=False)
    if df.empty:
        raise ValueError("No data returned. Check ticker/interval or your network.")
    df.index = pd.to_datetime(df.index)
    df.index.name = "Date"
    return df[["Open", "High", "Low", "Close", "Volume"]].dropna()

def add_features(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    close = pd.to_numeric(out["Close"].squeeze(), errors="coerce")
    close = close.where(close > 0, np.nan)
    out["Close"] = close

    # base returns
    out["log_ret"] = np.log(close).diff()
    out["ret"] = close.pct_change()

    # rolling stats
    out["roll_mean_7"] = close.rolling(7).mean()
    out["roll_std_7"] = close.rolling(7).std()
    out["roll_mean_21"] = close.rolling(21).mean()
    out["roll_std_21"] = close.rolling(21).std()

    # RSI(14)
    delta = close.diff()
    up = delta.clip(lower=0)
    down = -delta.clip(upper=0)
    roll_up = up.ewm(alpha=1 / 14, min_periods=14, adjust=False).mean()
    roll_dn = down.ewm(alpha=1 / 14, min_periods=14, adjust=False).mean()
    rs = roll_up / roll_dn.replace(0, np.nan)
    out["rsi_14"] = 100 - (100 / (1 + rs))

    # MACD (12,26,9)
    ema12 = close.ewm(span=12, adjust=False).mean()
    ema26 = close.ewm(span=26, adjust=False).mean()
    macd = ema12 - ema26
    out["macd"] = macd
    out["macd_signal"] = macd.ewm(span=9, adjust=False).mean()
    out["macd_diff"] = out["macd"] - out["macd_signal"]

    # Bollinger width (20,2)
    ma20 = close.rolling(20).mean()
    sd20 = close.rolling(20).std()
    out["bb_width"] = (ma20 + 2 * sd20 - (ma20 - 2 * sd20)) / close

    # lags & volatility
    out["ret_lag1"] = out["log_ret"].shift(1)
    out["ret_lag3"] = out["log_ret"].shift(3)
    out["ret_lag5"] = out["log_ret"].shift(5)
    out["vol_7"] = out["log_ret"].rolling(7).std()
    out["vol_21"] = out["log_ret"].rolling(21).std()
    out["z_close_21"] = (close - close.rolling(21).mean()) / close.rolling(21).std()

    return out.dropna()

FEATURES = [
    "Close","Volume","log_ret","ret",
    "roll_mean_7","roll_std_7","roll_mean_21","roll_std_21",
    "rsi_14","macd","macd_signal","macd_diff",
    "bb_width","ret_lag1","ret_lag3","ret_lag5",
    "vol_7","vol_21","z_close_21",
]

def make_windows(X: np.ndarray, y: np.ndarray, lookback: int, horizon: int):
    xs, ys = [], []
    for i in range(lookback, len(X) - horizon + 1):
        xs.append(X[i - lookback : i, :])
        ys.append(y[i : i + horizon])
    return np.array(xs, dtype="float32"), np.array(ys, dtype="float32")

def build_model(input_steps: int, n_features: int, horizon: int) -> tf.keras.Model:
    inp = tf.keras.Input(shape=(input_steps, n_features))
    x = tf.keras.layers.Conv1D(48, kernel_size=5, padding="causal", activation="relu")(inp)
    x = tf.keras.layers.Dropout(0.2)(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(160, return_sequences=True))(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    x = tf.keras.layers.LSTM(96)(x)
    out = tf.keras.layers.Dense(horizon)(x)
    model = tf.keras.Model(inp, out)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss="mse")
    return model

def infer_freq_has_weekends(idx: pd.Index) -> bool:
    # If any timestamp lands on Saturday/Sunday, assume weekend series (e.g., crypto)
    try:
        weekdays = pd.Index([pd.Timestamp(x).weekday() for x in idx])
        return ((weekdays >= 5).any())
    except Exception:
        return False

def next_dates(last_date, n: int, use_weekends: bool) -> list:
    last_ts = pd.Timestamp(last_date)
    if use_weekends:
        rng = pd.date_range(last_ts, periods=n+1, freq="D")
        return [d for d in rng[1:]]
    else:
        rng = pd.bdate_range(last_ts, periods=n+1)
        return [d for d in rng[1:]]


In [None]:
# -------------------------------
# Main logic
# -------------------------------

# 1) Pull enough data
buffer_days = 320
start_date = (pd.Timestamp.utcnow() - pd.Timedelta(days=CONTEXT + LOOKBACK + BACKTEST_HORIZON + buffer_days)).date().isoformat()
raw = fetch_prices(TICKER, start=start_date)
df = add_features(raw)
df["target_ret"] = df["log_ret"].shift(-1)
df = df.dropna()
if len(df) < LOOKBACK + BACKTEST_HORIZON + 5:
    raise ValueError("Not enough rows after feature engineering.")

# 2) Keep just the window we need for context + train + backtest
df_tail = df.tail(CONTEXT + LOOKBACK + BACKTEST_HORIZON)
X_df = df_tail[FEATURES].astype("float32")
y = df_tail["target_ret"].astype("float32").values
dates = df_tail.index

# split point for backtest
train_end_idx = len(df_tail) - BACKTEST_HORIZON
if train_end_idx <= LOOKBACK:
    train_end_idx = LOOKBACK + 1
    BACKTEST_HORIZON = max(1, len(df_tail) - train_end_idx)

X_train_df = X_df.iloc[:train_end_idx]
y_train = y[:train_end_idx]

# 3) Scalers on train only
scaler_X = StandardScaler().fit(X_train_df.values)
scaler_y = StandardScaler().fit(y_train.reshape(-1,1))
X_train = scaler_X.transform(X_train_df.values)
y_train_s = scaler_y.transform(y_train.reshape(-1,1)).ravel()

# 4) Train model for HORIZON-step forecast
Xw, yw = make_windows(X_train, y_train_s, LOOKBACK, HORIZON)
if len(Xw) == 0:
    raise ValueError("No training windows for multi-step model.")
m_multi = build_model(LOOKBACK, Xw.shape[2], HORIZON)
cbs=[tf.keras.callbacks.EarlyStopping(monitor="loss", patience=6, restore_best_weights=True)]
m_multi.fit(Xw, yw, epochs=24, batch_size=32, verbose=0, callbacks=cbs)

# 5) Train a second model for 1-step backtest
Xw1, yw1 = make_windows(X_train, y_train_s, LOOKBACK, 1)
if len(Xw1) == 0:
    raise ValueError("No training windows for 1-step model.")
m_one = build_model(LOOKBACK, Xw1.shape[2], 1)
m_one.fit(Xw1, yw1, epochs=16, batch_size=32, verbose=0, callbacks=cbs)

# 6) Backtest last BACKTEST_HORIZON days (1-step each)
backtest_dates = []
backtest_pred_prices = []
backtest_actual_prices = []
backtest_pred_rets = []
for k in range(int(BACKTEST_HORIZON)):
    end_idx = train_end_idx + k  # index of the day we're predicting
    xb_raw = X_df.values[end_idx-LOOKBACK:end_idx, :]
    xb = scaler_X.transform(xb_raw).reshape(1, LOOKBACK, Xw1.shape[2])
    pred_ret_s = m_one.predict(xb, verbose=0)[0][0]
    pred_ret = scaler_y.inverse_transform([[pred_ret_s]])[0,0]

    last_price = float(df_tail["Close"].iloc[end_idx-1])
    pred_price = float(last_price * np.exp(pred_ret))
    actual_price = float(df_tail["Close"].iloc[end_idx])

    backtest_dates.append(pd.Timestamp(dates[end_idx]).strftime("%Y-%m-%d"))
    backtest_pred_prices.append(pred_price)
    backtest_actual_prices.append(actual_price)
    backtest_pred_rets.append(float(pred_ret))

# Metrics on backtest
rmse = float(np.sqrt(mean_squared_error(backtest_actual_prices, backtest_pred_prices)))
mape = float(mean_absolute_percentage_error(backtest_actual_prices, backtest_pred_prices) * 100)

actual_rets = []
for k in range(int(BACKTEST_HORIZON)):
    prev_price = float(df_tail["Close"].iloc[train_end_idx - 1 + k])
    cur_price = float(df_tail["Close"].iloc[train_end_idx + k])
    actual_rets.append((cur_price - prev_price) / (prev_price if prev_price != 0 else 1.0))

# directional accuracy (% of correct up/down)
if len(backtest_pred_rets) and len(actual_rets):
    acc = float((np.sign(backtest_pred_rets) == np.sign(actual_rets)).mean() * 100.0)
else:
    acc = 0.0

# 7) Multi-step forward forecast of next HORIZON sessions
last_block_raw = X_df.values[-LOOKBACK:]
last_block = scaler_X.transform(last_block_raw).reshape(1, LOOKBACK, Xw.shape[2])
next_rets_s = m_multi.predict(last_block, verbose=0)[0]
next_rets = scaler_y.inverse_transform(next_rets_s.reshape(-1,1)).ravel()
last_price = float(df_tail["Close"].iloc[-1])
future_pred_prices = (last_price * np.exp(np.cumsum(next_rets))).astype(float)
expected_move_pct = float((float(future_pred_prices[-1]) - last_price) / (last_price if last_price != 0 else 1.0) * 100.0)

# 8) Build unified series for frontend (context actuals + backtest preds + future preds)
series_map = {}

# Context actuals
ctx_tail = df_tail.tail(int(CONTEXT))
for d, p in zip(ctx_tail.index, ctx_tail["Close"].astype(float).values):
    date_str = pd.Timestamp(d).strftime("%Y-%m-%d")
    row = series_map.get(date_str, {"date": date_str})
    row["actual"] = float(p)
    # Only mark part as context if not already labeled by backtest
    row.setdefault("part", "context")
    series_map[date_str] = row

# Backtest predictions
for i, dstr in enumerate(backtest_dates[:len(backtest_pred_prices)]):
    date_str = pd.Timestamp(dstr).strftime("%Y-%m-%d")
    row = series_map.get(date_str, {"date": date_str})
    row["pred"] = float(backtest_pred_prices[i])
    row["part"] = "backtest"  # mark segment
    series_map[date_str] = row

# Future predictions
has_weekends = infer_freq_has_weekends(df_tail.index)
future_dates = next_dates(df_tail.index[-1], int(HORIZON), use_weekends=has_weekends)
for i, d in enumerate(future_dates[:len(future_pred_prices)]):
    date_str = pd.Timestamp(d).strftime("%Y-%m-%d")
    row = series_map.get(date_str, {"date": date_str})
    row["pred"] = float(future_pred_prices[i])
    row["part"] = "forecast"
    series_map[date_str] = row

# Final series sorted by date
series = sorted(series_map.values(), key=lambda r: r["date"])

metrics = {
    "rmse": float(rmse),
    "mape": float(mape),
    "accuracy_pct": float(acc),
    "expected_10d_move_pct": float(expected_move_pct),
}

result = {
  "ticker": str(TICKER).upper(),
  "look_back": int(LOOKBACK),
  "context": int(CONTEXT),
  "backtest_horizon": int(BACKTEST_HORIZON),
  "horizon": int(HORIZON),
  "metrics": metrics,
  "forecast": series,
}

# Write JSON
out_path = str(OUTPUT_JSON)
out_dir = os.path.dirname(out_path) or "."
os.makedirs(out_dir, exist_ok=True)
with open(out_path, "w") as f:
    json.dump(result, f)
print("WROTE JSON =>", out_path)
