# Linear Model Training - Finance Sector
**Purpose**: Train and optimize a Linear Regression model for Finance sector

**Expected Performance**: Sharpe ~1.5 (Finance is mean-reverting, ideal for Linear models)

**Date**: 2026-02-22

## Setup

In [None]:
import sys
from pathlib import Path
import sqlite3
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error, r2_score
import warnings
warnings.filterwarnings('ignore')

# Set up paths
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

# Set random seed for reproducibility
np.random.seed(42)

print(f"Project root: {project_root}")
print(f"Python path updated")

## 1. Load Data

Load Finance sector tickers for training period (2019-2024 H1)

In [None]:
# Finance sector tickers
finance_tickers = ['JPM', 'BAC', 'GS', 'MS', 'WFC', 'C', 'BLK', 'AXP', 'USB', 'PNC']

# Training period
train_start = '2019-01-01'
train_end = '2024-06-30'

# Hold-out test period
test_start = '2024-07-01'
test_end = '2024-12-31'

print(f"Finance sector: {len(finance_tickers)} tickers")
print(f"Training: {train_start} to {train_end}")
print(f"Hold-out: {test_start} to {test_end}")

In [None]:
# Load data from database
db_path = project_root / 'data' / 'financial_data.db'
conn = sqlite3.connect(db_path)

# Build query
placeholders = ','.join(['?'] * len(finance_tickers))

query = f"""
SELECT 
    d.date,
    c.ticker,
    s.open_price as open,
    s.high_price as high,
    s.low_price as low,
    s.close_price as close,
    s.adjusted_close,
    s.volume
FROM fact_stock_price s
JOIN dim_date d ON s.date_id = d.date_id
JOIN dim_company c ON s.company_id = c.company_id
WHERE 
    c.ticker IN ({placeholders})
    AND d.date BETWEEN ? AND ?
ORDER BY d.date, c.ticker
"""

# Load training data
train_df = pd.read_sql_query(
    query, 
    conn, 
    params=tuple(finance_tickers) + (train_start, train_end)
)
train_df['date'] = pd.to_datetime(train_df['date'])

# Load test data
test_df = pd.read_sql_query(
    query, 
    conn, 
    params=tuple(finance_tickers) + (test_start, test_end)
)
test_df['date'] = pd.to_datetime(test_df['date'])

conn.close()

print(f"Training data: {len(train_df)} rows")
print(f"Test data: {len(test_df)} rows")
print(f"\nTraining tickers: {train_df['ticker'].nunique()}")
print(f"Test tickers: {test_df['ticker'].nunique()}")

## 2. Feature Engineering

Create simple technical indicators for Linear model

In [None]:
def create_features(df):
    """
    Create technical features for Linear model.
    
    Features:
    - Returns (1, 5, 20 days)
    - Moving averages (5, 20, 50 days)
    - Volatility (rolling std)
    - RSI
    - Volume indicators
    """
    features_df = df.copy()
    
    # Calculate returns
    features_df['returns'] = features_df.groupby('ticker')['adjusted_close'].pct_change()
    
    # Lagged returns (features)
    for lag in [1, 5, 10, 20]:
        features_df[f'returns_lag_{lag}'] = features_df.groupby('ticker')['returns'].shift(lag)
    
    # Moving averages
    for window in [5, 10, 20, 50]:
        features_df[f'ma_{window}'] = features_df.groupby('ticker')['adjusted_close'].transform(
            lambda x: x.rolling(window=window).mean()
        )
        # MA crossover (price vs MA)
        features_df[f'price_vs_ma_{window}'] = (
            features_df['adjusted_close'] / features_df[f'ma_{window}'] - 1
        )
    
    # Volatility (rolling std of returns)
    for window in [5, 10, 20]:
        features_df[f'volatility_{window}'] = features_df.groupby('ticker')['returns'].transform(
            lambda x: x.rolling(window=window).std()
        )
    
    # RSI (Relative Strength Index)
    def calculate_rsi(prices, period=14):
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi
    
    features_df['rsi_14'] = features_df.groupby('ticker')['adjusted_close'].transform(
        lambda x: calculate_rsi(x, 14)
    )
    
    # Volume indicators
    features_df['volume_ma_20'] = features_df.groupby('ticker')['volume'].transform(
        lambda x: x.rolling(window=20).mean()
    )
    features_df['volume_ratio'] = features_df['volume'] / features_df['volume_ma_20']
    
    # Price momentum
    for window in [5, 10, 20]:
        features_df[f'momentum_{window}'] = features_df.groupby('ticker')['adjusted_close'].transform(
            lambda x: x.pct_change(periods=window)
        )
    
    # High-Low spread
    features_df['hl_spread'] = (features_df['high'] - features_df['low']) / features_df['close']
    
    return features_df

# Apply feature engineering
print("Creating features for training data...")
train_features = create_features(train_df)

print("Creating features for test data...")
test_features = create_features(test_df)

print(f"\nTotal features created: {len(train_features.columns)}")
print(f"\nSample features:")
print(train_features.head())

## 3. Prepare Data for Training

Select feature columns and target (next day returns)

In [None]:
# Select feature columns (exclude date, ticker, price columns)
exclude_cols = ['date', 'ticker', 'open', 'high', 'low', 'close', 'adjusted_close', 'volume', 'returns']

feature_cols = [col for col in train_features.columns if col not in exclude_cols]

print(f"Feature columns ({len(feature_cols)}):")
for col in feature_cols:
    print(f"  - {col}")

# Target: next day returns
train_features['target'] = train_features.groupby('ticker')['returns'].shift(-1)
test_features['target'] = test_features.groupby('ticker')['returns'].shift(-1)

# Drop rows with NaN (from rolling windows and target shift)
train_clean = train_features.dropna()
test_clean = test_features.dropna()

print(f"\nClean training data: {len(train_clean)} rows")
print(f"Clean test data: {len(test_clean)} rows")

# Extract X and y
X_train = train_clean[feature_cols]
y_train = train_clean['target']

X_test = test_clean[feature_cols]
y_test = test_clean['target']

print(f"\nX_train shape: {X_train.shape}")
print(f"X_test shape: {X_test.shape}")

## 4. Hyperparameter Tuning with Time Series Cross-Validation

Find best alpha (L2 regularization strength) using walk-forward validation

In [None]:
# Alpha values to test
alpha_range = [0.001, 0.01, 0.1, 1.0, 10.0, 100.0, 1000.0]

# Time series cross-validation (5 splits)
tscv = TimeSeriesSplit(n_splits=5)

# Store results
cv_results = []

print("Running hyperparameter tuning...\n")

for alpha in alpha_range:
    fold_scores = []
    
    for fold_idx, (train_idx, val_idx) in enumerate(tscv.split(X_train)):
        # Split data
        X_fold_train, X_fold_val = X_train.iloc[train_idx], X_train.iloc[val_idx]
        y_fold_train, y_fold_val = y_train.iloc[train_idx], y_train.iloc[val_idx]
        
        # Scale features
        scaler = StandardScaler()
        X_fold_train_scaled = scaler.fit_transform(X_fold_train)
        X_fold_val_scaled = scaler.transform(X_fold_val)
        
        # Train model
        model = Ridge(alpha=alpha, random_state=42)
        model.fit(X_fold_train_scaled, y_fold_train)
        
        # Validate
        val_pred = model.predict(X_fold_val_scaled)
        r2 = r2_score(y_fold_val, val_pred)
        fold_scores.append(r2)
    
    mean_r2 = np.mean(fold_scores)
    std_r2 = np.std(fold_scores)
    
    cv_results.append({
        'alpha': alpha,
        'mean_r2': mean_r2,
        'std_r2': std_r2
    })
    
    print(f"Alpha={alpha:7.3f}: R2={mean_r2:.4f} (+/- {std_r2:.4f})")

# Convert to DataFrame
cv_df = pd.DataFrame(cv_results)

# Find best alpha
best_alpha = cv_df.loc[cv_df['mean_r2'].idxmax(), 'alpha']
best_r2 = cv_df.loc[cv_df['mean_r2'].idxmax(), 'mean_r2']

print(f"\n{'='*60}")
print(f"Best alpha: {best_alpha}")
print(f"Best CV R2: {best_r2:.4f}")
print(f"{'='*60}")

In [None]:
# Visualize CV results
plt.figure(figsize=(10, 6))
plt.errorbar(cv_df['alpha'], cv_df['mean_r2'], yerr=cv_df['std_r2'], 
             marker='o', capsize=5, capthick=2)
plt.xscale('log')
plt.xlabel('Alpha (L2 Regularization)', fontsize=12)
plt.ylabel('Mean R² Score', fontsize=12)
plt.title('Cross-Validation Results: Alpha Tuning', fontsize=14)
plt.grid(True, alpha=0.3)
plt.axvline(x=best_alpha, color='red', linestyle='--', label=f'Best alpha={best_alpha}')
plt.legend()
plt.tight_layout()
plt.show()

## 5. Train Final Model

Train with best alpha on full training set

In [None]:
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Train final model
print(f"Training final model with alpha={best_alpha}...")
final_model = Ridge(alpha=best_alpha, random_state=42)
final_model.fit(X_train_scaled, y_train)

# Training predictions
y_train_pred = final_model.predict(X_train_scaled)
train_r2 = r2_score(y_train, y_train_pred)
train_mse = mean_squared_error(y_train, y_train_pred)

print(f"\nTraining Performance:")
print(f"  R² Score: {train_r2:.4f}")
print(f"  MSE: {train_mse:.6f}")
print(f"  RMSE: {np.sqrt(train_mse):.6f}")

## 6. Feature Importance

Analyze which features are most important

In [None]:
# Get feature coefficients
feature_importance = pd.DataFrame({
    'feature': feature_cols,
    'coefficient': final_model.coef_
})
feature_importance['abs_coefficient'] = np.abs(feature_importance['coefficient'])
feature_importance = feature_importance.sort_values('abs_coefficient', ascending=False)

print("Top 15 Most Important Features:")
print(feature_importance.head(15))

# Visualize top features
top_n = 15
top_features = feature_importance.head(top_n)

plt.figure(figsize=(10, 8))
plt.barh(range(top_n), top_features['coefficient'].values)
plt.yticks(range(top_n), top_features['feature'].values)
plt.xlabel('Coefficient Value', fontsize=12)
plt.title(f'Top {top_n} Feature Coefficients', fontsize=14)
plt.axvline(x=0, color='black', linestyle='-', linewidth=0.5)
plt.grid(True, alpha=0.3, axis='x')
plt.tight_layout()
plt.show()

## 7. Evaluate on Hold-Out Test Set

Test on completely unseen data (2024 H2)

In [None]:
# Test predictions
y_test_pred = final_model.predict(X_test_scaled)

# Test performance
test_r2 = r2_score(y_test, y_test_pred)
test_mse = mean_squared_error(y_test, y_test_pred)

print(f"Hold-Out Test Performance:")
print(f"  R² Score: {test_r2:.4f}")
print(f"  MSE: {test_mse:.6f}")
print(f"  RMSE: {np.sqrt(test_mse):.6f}")

# Safety check: Retention rate
retention_rate = test_r2 / train_r2 if train_r2 > 0 else 0
print(f"\nRetention Rate: {retention_rate:.2%}")
print(f"Safety Check: {'✓ PASS' if retention_rate >= 0.80 else '✗ FAIL'} (≥80% required)")

In [None]:
# Visualize predictions vs actuals
plt.figure(figsize=(12, 5))

# Training set
plt.subplot(1, 2, 1)
plt.scatter(y_train, y_train_pred, alpha=0.3, s=10)
plt.plot([y_train.min(), y_train.max()], [y_train.min(), y_train.max()], 
         'r--', lw=2, label='Perfect prediction')
plt.xlabel('Actual Returns', fontsize=11)
plt.ylabel('Predicted Returns', fontsize=11)
plt.title(f'Training Set (R²={train_r2:.3f})', fontsize=12)
plt.legend()
plt.grid(True, alpha=0.3)

# Test set
plt.subplot(1, 2, 2)
plt.scatter(y_test, y_test_pred, alpha=0.3, s=10, color='orange')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 
         'r--', lw=2, label='Perfect prediction')
plt.xlabel('Actual Returns', fontsize=11)
plt.ylabel('Predicted Returns', fontsize=11)
plt.title(f'Test Set (R²={test_r2:.3f})', fontsize=12)
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 8. Trading Strategy Simulation

Simulate trades based on predictions

In [None]:
# Add predictions to test data
test_results = test_clean.copy()
test_results['predicted_return'] = y_test_pred
test_results['actual_return'] = y_test.values

# Simple strategy: Go long if predicted return > threshold
threshold = 0.0  # Can optimize this
test_results['signal'] = (test_results['predicted_return'] > threshold).astype(int)

# Calculate strategy returns
test_results['strategy_return'] = test_results['signal'] * test_results['actual_return']

# Cumulative returns
test_results['cumulative_return'] = (1 + test_results['actual_return']).cumprod() - 1
test_results['cumulative_strategy'] = (1 + test_results['strategy_return']).cumprod() - 1

print(f"Strategy Performance (Test Set):")
print(f"  Total trades: {test_results['signal'].sum()}")
print(f"  Win rate: {(test_results[test_results['signal']==1]['actual_return'] > 0).mean():.2%}")
print(f"  Buy-and-hold return: {test_results['cumulative_return'].iloc[-1]:.2%}")
print(f"  Strategy return: {test_results['cumulative_strategy'].iloc[-1]:.2%}")

In [None]:
# Calculate Sharpe ratio
strategy_returns = test_results['strategy_return']
sharpe_ratio = (strategy_returns.mean() / strategy_returns.std()) * np.sqrt(252)  # Annualized

print(f"\nSharpe Ratio (Annualized): {sharpe_ratio:.3f}")
print(f"Target Sharpe for Finance: ~1.5")
print(f"Performance: {'✓ Good' if sharpe_ratio >= 1.0 else '✗ Needs improvement'}")

In [None]:
# Plot cumulative returns
plt.figure(figsize=(14, 6))

plt.plot(test_results['date'], test_results['cumulative_return'], 
         label='Buy and Hold', alpha=0.7)
plt.plot(test_results['date'], test_results['cumulative_strategy'], 
         label='Strategy (Model-based)', alpha=0.7)

plt.xlabel('Date', fontsize=12)
plt.ylabel('Cumulative Return', fontsize=12)
plt.title('Strategy Performance vs Buy-and-Hold (Test Period)', fontsize=14)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.axhline(y=0, color='black', linestyle='-', linewidth=0.5)
plt.tight_layout()
plt.show()

## 9. Save Model

Save trained model and scaler for production use

In [None]:
import pickle

# Create models directory
models_dir = project_root / 'models'
models_dir.mkdir(exist_ok=True)

# Save model
model_path = models_dir / 'linear_finance_model.pkl'
with open(model_path, 'wb') as f:
    pickle.dump({
        'model': final_model,
        'scaler': scaler,
        'feature_cols': feature_cols,
        'best_alpha': best_alpha,
        'train_r2': train_r2,
        'test_r2': test_r2,
        'sharpe_ratio': sharpe_ratio
    }, f)

print(f"✓ Model saved to: {model_path}")
print(f"\nModel info:")
print(f"  Features: {len(feature_cols)}")
print(f"  Alpha: {best_alpha}")
print(f"  Train R²: {train_r2:.4f}")
print(f"  Test R²: {test_r2:.4f}")
print(f"  Sharpe: {sharpe_ratio:.3f}")

## 10. Summary & Next Steps

In [None]:
print("="*80)
print("LINEAR MODEL TRAINING SUMMARY - FINANCE SECTOR")
print("="*80)
print(f"\nData:")
print(f"  Training period: {train_start} to {train_end}")
print(f"  Test period: {test_start} to {test_end}")
print(f"  Tickers: {len(finance_tickers)} ({', '.join(finance_tickers[:5])}, ...)")
print(f"  Features: {len(feature_cols)}")

print(f"\nModel:")
print(f"  Type: Ridge Regression (L2 regularization)")
print(f"  Best alpha: {best_alpha}")

print(f"\nPerformance:")
print(f"  Train R²: {train_r2:.4f}")
print(f"  Test R²: {test_r2:.4f}")
print(f"  Retention: {retention_rate:.2%} {'✓' if retention_rate >= 0.80 else '✗'}")
print(f"  Sharpe Ratio: {sharpe_ratio:.3f} (Target: ~1.5)")

print(f"\nNext Steps:")
print(f"  1. Try different feature engineering approaches")
print(f"  2. Optimize trading strategy threshold")
print(f"  3. Test on other sectors (commodities)")
print(f"  4. Compare with XGBoost model")
print(f"  5. Integrate into Sentinel pipeline")
print("="*80)