this cell imports libraries and sets up basic configuration like tickers and feature names.

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import shap
import matplotlib.pyplot as plt
import seaborn as sns

plt.rcParams["figure.figsize"] = (10, 6)

TICKERS = ["AAPL", "MSFT", "GOOGL"]
NEWS_TEMPLATE = "news_with_prices_{ticker}.csv"
SENTIMENT_PATH = "daily_sentiment.csv"
SEQ_LEN = 60

FEATURE_COLS = [
    "Open", "High", "Low", "Close", "Volume",
    "daily_return",
    "sentiment_score", "sent_lag1", "sent_lag2", "sent_lag3",
    "sent_roll3", "sent_roll7"
]
TARGET_COL = "next_day_return"


this cell defines a function to load, clean, merge sentiment, and engineer features for a single ticker, then loads all tickers.

In [None]:
def load_and_prepare_ticker(ticker):
    path = NEWS_TEMPLATE.format(ticker=ticker)
    df = pd.read_csv(path)
    cols_to_drop = [
        "Unnamed: 0",
        "Article_title",
        "Url",
        "Publisher",
        "Author",
        "Article",
        "Lsa_summary",
        "Luhn_summary",
        "Lexrank_summary",
        "Stock_symbol",
        "price_symbol",
        "is_exact_match",
        "Date",
        "news_date"
    ]
    df = df.drop(columns=[c for c in cols_to_drop if c in df.columns])
    if "Textrank_summary" in df.columns:
        df = df.drop(columns=["Textrank_summary"])
    df["date_price"] = pd.to_datetime(df["date_price"])
    df = df.sort_values("date_price")
    df = df.groupby("date_price", as_index=False).agg(
        {
            "Open": "mean",
            "High": "mean",
            "Low": "mean",
            "Close": "mean",
            "Volume": "mean"
        }
    )
    df["daily_return"] = df["Close"].pct_change()
    df["next_day_return"] = df["daily_return"].shift(-1)
    df = df.dropna().reset_index(drop=True)

    sent = pd.read_csv(SENTIMENT_PATH)
    sent = sent[sent["Stock_symbol"] == ticker].copy()
    sent = sent[["news_date", "daily_sentiment"]]
    sent = sent.rename(columns={"news_date": "date_price", "daily_sentiment": "sentiment_score"})
    sent["date_price"] = pd.to_datetime(sent["date_price"])

    df = df.merge(sent, on="date_price", how="left")
    df["sentiment_score"] = df["sentiment_score"].ffill().fillna(0.0)
    df = df.sort_values("date_price").reset_index(drop=True)

    df["sent_lag1"] = df["sentiment_score"].shift(1)
    df["sent_lag2"] = df["sentiment_score"].shift(2)
    df["sent_lag3"] = df["sentiment_score"].shift(3)
    df["sent_roll3"] = df["sentiment_score"].rolling(3).mean()
    df["sent_roll7"] = df["sentiment_score"].rolling(7).mean()

    df = df.dropna().reset_index(drop=True)
    return df

ticker_dfs = {t: load_and_prepare_ticker(t) for t in TICKERS}
ticker_dfs


this cell defines a function to scale features, create sliding window sequences, and split into train, validation, and test sets for each ticker.

In [None]:
def build_sequences_from_df(df, seq_len=SEQ_LEN):
    features = df[FEATURE_COLS].values
    target = df[TARGET_COL].values
    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(features)
    X_all = scaled_features
    y_all = target
    X, y = [], []
    for i in range(len(df) - seq_len):
        X.append(X_all[i:i + seq_len])
        y.append(y_all[i + seq_len])
    X = np.array(X)
    y = np.array(y)
    n = len(X)
    train_end = int(n * 0.7)
    val_end = int(n * 0.85)
    X_train = X[:train_end]
    y_train = y[:train_end]
    X_val = X[train_end:val_end]
    y_val = y[train_end:val_end]
    X_test = X[val_end:]
    y_test = y[val_end:]
    return X_train, y_train, X_val, y_val, X_test, y_test, scaler

seq_data = {}
for ticker in TICKERS:
    seq_data[ticker] = build_sequences_from_df(ticker_dfs[ticker])
    shapes = [arr.shape for arr in seq_data[ticker][:5]]
    print(ticker, "shapes:", shapes)


this cell defines helper classes and functions to build the transformer model, the lstm model, and the linear baseline.

In [None]:
class PositionalEncoding(layers.Layer):
    def call(self, x):
        seq_len = tf.shape(x)[1]
        d_model = tf.shape(x)[2]
        positions = tf.cast(tf.range(seq_len)[:, tf.newaxis], tf.float32)
        dims = tf.cast(tf.range(d_model)[tf.newaxis, :], tf.float32)
        angle_rates = 1.0 / tf.pow(10000.0, (2 * (dims // 2)) / tf.cast(d_model, tf.float32))
        angle_rads = positions * angle_rates
        sines = tf.sin(angle_rads[:, 0::2])
        cosines = tf.cos(angle_rads[:, 1::2])
        pos_encoding = tf.concat([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[tf.newaxis, ...]
        return x + pos_encoding

def build_transformer(seq_len, n_features):
    inputs = layers.Input(shape=(seq_len, n_features))
    x = layers.Dense(64)(inputs)
    x = PositionalEncoding()(x)
    attn = layers.MultiHeadAttention(num_heads=4, key_dim=64)(x, x)
    attn = layers.Dropout(0.1)(attn)
    x = layers.Add()([x, attn])
    x = layers.LayerNormalization()(x)
    ff = layers.Dense(128, activation="relu")(x)
    ff = layers.Dense(64)(ff)
    ff = layers.Dropout(0.1)(ff)
    x = layers.Add()([x, ff])
    x = layers.LayerNormalization()(x)
    x = layers.GlobalAveragePooling1D()(x)
    outputs = layers.Dense(1)(x)
    model = models.Model(inputs, outputs)
    model.compile(optimizer=tf.keras.optimizers.Adam(1e-3), loss="mse", metrics=["mae"])
    return model

def build_lstm(seq_len, n_features):
    inputs = layers.Input(shape=(seq_len, n_features))
    x = layers.LSTM(64)(inputs)
    outputs = layers.Dense(1)(x)
    model = models.Model(inputs, outputs)
    model.compile(optimizer=tf.keras.optimizers.Adam(1e-3), loss="mse", metrics=["mae"])
    return model

def train_keras_model(model, X_train, y_train, X_val, y_val, epochs=80, batch_size=16):
    es = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
    rl = tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=5)
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[es, rl],
        verbose=0
    )
    return model, history

def train_linear_baseline(X_train, y_train):
    n, sl, f = X_train.shape
    X_flat = X_train.reshape(n, sl * f)
    reg = LinearRegression()
    reg.fit(X_flat, y_train)
    return reg

def predict_linear(reg, X):
    n, sl, f = X.shape
    X_flat = X.reshape(n, sl * f)
    return reg.predict(X_flat)

def predict_naive_zero(X):
    return np.zeros(len(X))


this cell defines functions for computing metrics and printing a small metrics table.

In [None]:
def evaluate_predictions(y_true, y_pred):
    mae = mean_absolute_error(y_true, y_pred)
    rmse = mean_squared_error(y_true, y_pred) ** 0.5
    r2 = r2_score(y_true, y_pred)
    corr = np.corrcoef(y_true, y_pred)[0, 1]
    dir_true = (y_true > 0).astype(int)
    dir_pred = (y_pred > 0).astype(int)
    direction_accuracy = (dir_true == dir_pred).mean()
    return {
        "MAE": mae,
        "RMSE": rmse,
        "R2": r2,
        "Corr": corr,
        "DirAcc": direction_accuracy
    }

def print_metrics_table(ticker, metrics_dict):
    dfm = pd.DataFrame(metrics_dict).T
    dfm["DirAcc"] = dfm["DirAcc"].apply(lambda x: f"{x*100:.2f}%")
    print(f"=== {ticker} ===")
    display(dfm)


this cell trains the transformer, lstm, linear, and naive models for each ticker and stores metrics and predictions.

In [None]:
results = {}

for ticker in TICKERS:
    X_train, y_train, X_val, y_val, X_test, y_test, scaler = seq_data[ticker]
    n_features = X_train.shape[2]

    transformer = build_transformer(SEQ_LEN, n_features)
    transformer, _ = train_keras_model(transformer, X_train, y_train, X_val, y_val)

    lstm = build_lstm(SEQ_LEN, n_features)
    lstm, _ = train_keras_model(lstm, X_train, y_train, X_val, y_val)

    lin_reg = train_linear_baseline(X_train, y_train)

    y_pred_trans = transformer.predict(X_test).flatten()
    y_pred_lstm = lstm.predict(X_test).flatten()
    y_pred_lin = predict_linear(lin_reg, X_test)
    y_pred_naive = predict_naive_zero(X_test)

    metrics = {
        "NaiveZero": evaluate_predictions(y_test, y_pred_naive),
        "Linear": evaluate_predictions(y_test, y_pred_lin),
        "LSTM": evaluate_predictions(y_test, y_pred_lstm),
        "Transformer": evaluate_predictions(y_test, y_pred_trans),
    }
    results[ticker] = {
        "metrics": metrics,
        "y_test": y_test,
        "y_pred_trans": y_pred_trans,
        "X_test": X_test,
        "transformer": transformer
    }
    print_metrics_table(ticker, metrics)


this cell runs a randomization test where the training targets are shuffled to check if the transformer performs similarly on noise.

In [None]:
def randomization_test_transformer(X_train, y_train, X_val, y_val, X_test, y_test):
    y_train_shuffled = np.random.permutation(y_train)
    n_features = X_train.shape[2]
    model = build_transformer(SEQ_LEN, n_features)
    model, _ = train_keras_model(model, X_train, y_train_shuffled, X_val, y_val, epochs=60)
    y_pred = model.predict(X_test).flatten()
    return evaluate_predictions(y_test, y_pred)

randomized_results = {}
for ticker in TICKERS:
    X_train, y_train, X_val, y_val, X_test, y_test, _ = seq_data[ticker]
    randomized_results[ticker] = randomization_test_transformer(
        X_train, y_train, X_val, y_val, X_test, y_test
    )

for ticker in TICKERS:
    print(f"=== randomization test: {ticker} ===")
    dfm = pd.DataFrame({"Randomized_Transformer": randomized_results[ticker]}, index=randomized_results[ticker].keys()).T
    display(dfm)


this cell plots actual versus predicted returns, cumulative returns, and rolling directional accuracy for a chosen ticker.

In [None]:
ticker_to_plot = "AAPL"

y_test = results[ticker_to_plot]["y_test"]
y_pred = results[ticker_to_plot]["y_pred_trans"]

plt.figure(figsize=(12,6))
plt.plot(y_test, label="actual", marker="o")
plt.plot(y_pred, label="transformer predicted", marker="x")
plt.title(f"{ticker_to_plot} next-day returns: actual vs predicted")
plt.xlabel("test sample")
plt.ylabel("return")
plt.grid(True)
plt.legend()
plt.show()

actual_cum = np.cumsum(y_test)
pred_cum = np.cumsum(y_pred)

plt.figure(figsize=(12,6))
plt.plot(actual_cum, label="actual cumulative return", linewidth=2)
plt.plot(pred_cum, label="predicted cumulative return", linewidth=2)
plt.title(f"{ticker_to_plot} cumulative actual vs predicted returns")
plt.xlabel("test sample")
plt.ylabel("cumulative return")
plt.grid(True)
plt.legend()
plt.show()

dir_true = (y_test > 0).astype(int)
dir_pred = (y_pred > 0).astype(int)
rolling_acc = pd.Series((dir_true == dir_pred).astype(int)).rolling(10).mean()

plt.figure(figsize=(12,6))
plt.plot(rolling_acc, label="rolling 10-day direction accuracy", linewidth=2)
plt.axhline(0.5, color="gray", linestyle="--", label="50% baseline")
plt.title(f"{ticker_to_plot} rolling directional accuracy")
plt.xlabel("test sample")
plt.ylabel("accuracy")
plt.grid(True)
plt.legend()
plt.show()


this cell computes kernel shap feature importance for a chosen ticker using the transformer model.

In [None]:
def kernel_shap_importance_for_ticker(ticker, sample_size=40, background_size=40):
    X_test = results[ticker]["X_test"]
    model = results[ticker]["transformer"]
    X_train, _, _, _, _, _, _ = seq_data[ticker]

    X_train_flat = X_train.reshape(len(X_train), -1)
    X_test_flat = X_test.reshape(len(X_test), -1)

    background_idx = np.random.choice(len(X_train_flat), min(background_size, len(X_train_flat)), replace=False)
    background = X_train_flat[background_idx]

    def pred_wrapper(x):
        x_seq = x.reshape(-1, SEQ_LEN, X_train.shape[2])
        return model.predict(x_seq, verbose=0).flatten()

    explainer = shap.KernelExplainer(pred_wrapper, background)

    sample_idx = np.random.choice(len(X_test_flat), min(sample_size, len(X_test_flat)), replace=False)
    X_sample = X_test_flat[sample_idx]

    shap_values = explainer.shap_values(X_sample)
    shap_vals = np.array(shap_values).reshape(len(X_sample), SEQ_LEN, X_train.shape[2])
    shap_mean = np.mean(np.abs(shap_vals), axis=(0, 1))

    feature_names = FEATURE_COLS
    plt.figure(figsize=(10,6))
    plt.barh(feature_names, shap_mean)
    plt.title(f"{ticker} kernel shap feature importance")
    plt.xlabel("mean |shap|")
    plt.grid(True)
    plt.show()

ticker_to_explain = "AAPL"
kernel_shap_importance_for_ticker(ticker_to_explain)
