# Linear Models for Stock Prediction

Following the quantitative approach:
- Phase 1: Single best feature model (z-scored)
- Phase 2: Multi-feature linear regression
- Phase 3: Regularized models (Ridge, Lasso)

Compare all models on out-of-sample performance.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import requests
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from scipy.stats import pearsonr
import joblib

sns.set_style('darkgrid')
plt.rcParams['figure.figsize'] = (12, 6)

In [None]:
# config
API_KEY = "vFDjkUVRfPnedLrbRjm75BZ9CJHz3dfv"
TICKER = "AAPL"
START_DATE = "2025-10-01"
END_DATE = "2025-11-01"

In [None]:
def pull_polygon_data(ticker, start, end, api_key):
    url = f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/1/minute/{start}/{end}?apiKey={api_key}"
    response = requests.get(url)
    data = response.json()
    
    if 'results' not in data or len(data['results']) < 2:
        raise ValueError("not enough data")
    
    df = pd.DataFrame(data['results'])
    df['timestamp'] = pd.to_datetime(df['t'], unit='ms')
    df = df.rename(columns={'o':'open','h':'high','l':'low','c':'close','v':'volume'})
    df = df[['timestamp','open','high','low','close','volume']]
    return df

In [None]:
def calculate_features(df):
    df = df.copy()
    
    df['momentum_1min'] = df['close'].pct_change()
    df['volatility_1min'] = df['momentum_1min'] ** 2
    df['price_direction'] = (df['close'] > df['open']).astype(int)
    df['vwap'] = (df['close'] * df['volume']).cumsum() / df['volume'].cumsum()
    df['vwap_dev'] = (df['close'] - df['vwap']) / df['vwap']
    df['hour'] = df['timestamp'].dt.hour
    df['minute'] = df['timestamp'].dt.minute
    
    # target: next minute return (continuous, not binary)
    df['next_return'] = df['close'].shift(-1) / df['close'] - 1
    
    df = df.dropna()
    return df

In [None]:
# load data
print(f"loading data for {TICKER}...")
df = pull_polygon_data(TICKER, START_DATE, END_DATE, API_KEY)
df = calculate_features(df)
print(f"loaded {len(df)} bars")

features = ['momentum_1min', 'volatility_1min', 'price_direction', 'vwap_dev', 'hour', 'minute']
X = df[features]
y = df['next_return']

# chronological split
split_idx = int(len(X) * 0.8)
X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
y_train, y_test = y.iloc[:split_idx], y.iloc[split_idx:]

print(f"\ntrain size: {len(X_train)}")
print(f"test size: {len(X_test)}")

## Phase 1: Single Best Feature Model

Use only the feature with highest IC, z-scored

In [None]:
from scipy.stats import pearsonr

# find best feature by IC on training data
ic_scores = {}
for feat in features:
    ic, _ = pearsonr(X_train[feat], y_train)
    ic_scores[feat] = abs(ic)

best_feature = max(ic_scores, key=ic_scores.get)
print(f"best feature: {best_feature} (IC={ic_scores[best_feature]:.4f})")

# z-score the feature
scaler = StandardScaler()
X_train_single = scaler.fit_transform(X_train[[best_feature]])
X_test_single = scaler.transform(X_test[[best_feature]])

# fit simple linear regression
model_single = LinearRegression()
model_single.fit(X_train_single, y_train)

# predictions
y_pred_train_single = model_single.predict(X_train_single)
y_pred_test_single = model_single.predict(X_test_single)

# metrics
train_corr_single, _ = pearsonr(y_train, y_pred_train_single)
test_corr_single, _ = pearsonr(y_test, y_pred_test_single)
train_mse_single = mean_squared_error(y_train, y_pred_train_single)
test_mse_single = mean_squared_error(y_test, y_pred_test_single)

print(f"\nPhase 1 Results (Single Feature: {best_feature}):")
print(f"train correlation: {train_corr_single:.4f}")
print(f"test correlation: {test_corr_single:.4f}")
print(f"train MSE: {train_mse_single:.8f}")
print(f"test MSE: {test_mse_single:.8f}")

## Phase 2: Multi-Feature Linear Regression

In [None]:
# standardize all features
scaler_multi = StandardScaler()
X_train_scaled = scaler_multi.fit_transform(X_train)
X_test_scaled = scaler_multi.transform(X_test)

# fit linear regression
model_linear = LinearRegression()
model_linear.fit(X_train_scaled, y_train)

# predictions
y_pred_train_linear = model_linear.predict(X_train_scaled)
y_pred_test_linear = model_linear.predict(X_test_scaled)

# metrics
train_corr_linear, _ = pearsonr(y_train, y_pred_train_linear)
test_corr_linear, _ = pearsonr(y_test, y_pred_test_linear)
train_mse_linear = mean_squared_error(y_train, y_pred_train_linear)
test_mse_linear = mean_squared_error(y_test, y_pred_test_linear)

print(f"\nPhase 2 Results (Multi-Feature Linear):")
print(f"train correlation: {train_corr_linear:.4f}")
print(f"test correlation: {test_corr_linear:.4f}")
print(f"train MSE: {train_mse_linear:.8f}")
print(f"test MSE: {test_mse_linear:.8f}")

# feature coefficients
print("\nFeature Coefficients:")
for feat, coef in zip(features, model_linear.coef_):
    print(f"  {feat}: {coef:.6f}")

## Phase 3: Ridge Regression (L2 Regularization)

In [None]:
# try different alpha values
alphas = [0.001, 0.01, 0.1, 1.0, 10.0]
ridge_results = []

for alpha in alphas:
    model_ridge = Ridge(alpha=alpha)
    model_ridge.fit(X_train_scaled, y_train)
    
    y_pred_train_ridge = model_ridge.predict(X_train_scaled)
    y_pred_test_ridge = model_ridge.predict(X_test_scaled)
    
    train_corr, _ = pearsonr(y_train, y_pred_train_ridge)
    test_corr, _ = pearsonr(y_test, y_pred_test_ridge)
    test_mse = mean_squared_error(y_test, y_pred_test_ridge)
    
    ridge_results.append({
        'alpha': alpha,
        'train_corr': train_corr,
        'test_corr': test_corr,
        'test_mse': test_mse
    })

ridge_df = pd.DataFrame(ridge_results)
print("\nRidge Regression Results:")
print(ridge_df.to_string(index=False))

# best ridge model
best_ridge_alpha = ridge_df.loc[ridge_df['test_corr'].idxmax(), 'alpha']
print(f"\nbest ridge alpha: {best_ridge_alpha}")

model_ridge_best = Ridge(alpha=best_ridge_alpha)
model_ridge_best.fit(X_train_scaled, y_train)
y_pred_test_ridge_best = model_ridge_best.predict(X_test_scaled)

## Phase 4: Lasso Regression (L1 Regularization)

In [None]:
# try different alpha values
lasso_results = []

for alpha in alphas:
    model_lasso = Lasso(alpha=alpha, max_iter=10000)
    model_lasso.fit(X_train_scaled, y_train)
    
    y_pred_train_lasso = model_lasso.predict(X_train_scaled)
    y_pred_test_lasso = model_lasso.predict(X_test_scaled)
    
    train_corr, _ = pearsonr(y_train, y_pred_train_lasso)
    test_corr, _ = pearsonr(y_test, y_pred_test_lasso)
    test_mse = mean_squared_error(y_test, y_pred_test_lasso)
    
    # count non-zero coefficients
    non_zero = np.sum(model_lasso.coef_ != 0)
    
    lasso_results.append({
        'alpha': alpha,
        'train_corr': train_corr,
        'test_corr': test_corr,
        'test_mse': test_mse,
        'non_zero_features': non_zero
    })

lasso_df = pd.DataFrame(lasso_results)
print("\nLasso Regression Results:")
print(lasso_df.to_string(index=False))

# best lasso model
best_lasso_alpha = lasso_df.loc[lasso_df['test_corr'].idxmax(), 'alpha']
print(f"\nbest lasso alpha: {best_lasso_alpha}")

model_lasso_best = Lasso(alpha=best_lasso_alpha, max_iter=10000)
model_lasso_best.fit(X_train_scaled, y_train)
y_pred_test_lasso_best = model_lasso_best.predict(X_test_scaled)

print("\nLasso Feature Selection:")
for feat, coef in zip(features, model_lasso_best.coef_):
    print(f"  {feat}: {coef:.6f}")

## Model Comparison

In [None]:
# compare all models
test_corr_ridge_best, _ = pearsonr(y_test, y_pred_test_ridge_best)
test_corr_lasso_best, _ = pearsonr(y_test, y_pred_test_lasso_best)

comparison = pd.DataFrame([
    {
        'model': 'Single Feature',
        'train_corr': train_corr_single,
        'test_corr': test_corr_single,
        'test_mse': test_mse_single,
        'overfitting_gap': train_corr_single - test_corr_single
    },
    {
        'model': 'Linear (All Features)',
        'train_corr': train_corr_linear,
        'test_corr': test_corr_linear,
        'test_mse': test_mse_linear,
        'overfitting_gap': train_corr_linear - test_corr_linear
    },
    {
        'model': f'Ridge (α={best_ridge_alpha})',
        'train_corr': ridge_df.loc[ridge_df['alpha'] == best_ridge_alpha, 'train_corr'].values[0],
        'test_corr': test_corr_ridge_best,
        'test_mse': ridge_df.loc[ridge_df['alpha'] == best_ridge_alpha, 'test_mse'].values[0],
        'overfitting_gap': ridge_df.loc[ridge_df['alpha'] == best_ridge_alpha, 'train_corr'].values[0] - test_corr_ridge_best
    },
    {
        'model': f'Lasso (α={best_lasso_alpha})',
        'train_corr': lasso_df.loc[lasso_df['alpha'] == best_lasso_alpha, 'train_corr'].values[0],
        'test_corr': test_corr_lasso_best,
        'test_mse': lasso_df.loc[lasso_df['alpha'] == best_lasso_alpha, 'test_mse'].values[0],
        'overfitting_gap': lasso_df.loc[lasso_df['alpha'] == best_lasso_alpha, 'train_corr'].values[0] - test_corr_lasso_best
    }
])

print("\n=== MODEL COMPARISON ===")
print(comparison.to_string(index=False))

## Prediction vs Actual Plots

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

models_to_plot = [
    ('Single Feature', y_pred_test_single, test_corr_single),
    ('Linear (All)', y_pred_test_linear, test_corr_linear),
    ('Ridge', y_pred_test_ridge_best, test_corr_ridge_best),
    ('Lasso', y_pred_test_lasso_best, test_corr_lasso_best)
]

for idx, (name, y_pred, corr) in enumerate(models_to_plot):
    row, col = idx // 2, idx % 2
    axes[row, col].scatter(y_test, y_pred, alpha=0.3, s=10)
    axes[row, col].set_xlabel('actual return')
    axes[row, col].set_ylabel('predicted return')
    axes[row, col].set_title(f'{name} (corr={corr:.4f})')
    axes[row, col].axhline(y=0, color='red', linestyle='--', alpha=0.3)
    axes[row, col].axvline(x=0, color='red', linestyle='--', alpha=0.3)
    
    # add diagonal line (perfect prediction)
    lims = [min(y_test.min(), y_pred.min()), max(y_test.max(), y_pred.max())]
    axes[row, col].plot(lims, lims, 'k--', alpha=0.5, label='perfect prediction')
    axes[row, col].legend()

plt.tight_layout()
plt.show()

## Save Best Model

In [None]:
# determine best model by test correlation
best_idx = comparison['test_corr'].idxmax()
best_model_name = comparison.loc[best_idx, 'model']

print(f"\nbest model: {best_model_name}")
print(f"test correlation: {comparison.loc[best_idx, 'test_corr']:.4f}")

# save the best linear model
if 'Ridge' in best_model_name:
    joblib.dump(model_ridge_best, 'trained_linear_model.pkl')
    joblib.dump(scaler_multi, 'feature_scaler.pkl')
elif 'Lasso' in best_model_name:
    joblib.dump(model_lasso_best, 'trained_linear_model.pkl')
    joblib.dump(scaler_multi, 'feature_scaler.pkl')
elif 'Linear' in best_model_name:
    joblib.dump(model_linear, 'trained_linear_model.pkl')
    joblib.dump(scaler_multi, 'feature_scaler.pkl')
else:
    joblib.dump(model_single, 'trained_linear_model.pkl')
    joblib.dump(scaler, 'feature_scaler.pkl')

print(f"\nsaved best model to trained_linear_model.pkl")