# Momentum-Based Signal Validation



In [156]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
import yfinance as yf

In [157]:
# Statistical libraries
from scipy import stats
from scipy.stats import jarque_bera, shapiro, normaltest
from sklearn.model_selection import TimeSeriesSplit, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score

plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
np.random.seed(42)

In [158]:
data = yf.download("AMZN AAPL GOOG",start="2020-01-01", end="2023-01-01").head()
missing_values = data.isnull().sum().sum()
print(missing_values)


  data = yf.download("AMZN AAPL GOOG",start="2020-01-01", end="2023-01-01").head()
[*********************100%***********************]  3 of 3 completed

0





*** Data Collection ***

In [159]:

def get_stock_data(tickers, start_date = '2020-01-01', end_date = '2024-07-01'):
    stock_data = {}
    for ticker in tickers:
        data = yf.download(ticker, start= start_date, end = end_date, auto_adjust=False)

        if len(data) > 0:
            stock_data[ticker] = data
        else:
            print("No data found")

    return stock_data

def quality_check(data, ticker):
    # Check quality of data for one stock
    # data = data.xs(ticker, axis=1, level=1)
    total_days = len(data)
    missing_values = data.isnull().sum().sum()

    zero_volume_days = (data['Volume'] == 0).sum()

    daily_returns = data['Close'].pct_change()
    extreme_moves = (abs(daily_returns) > 0.2).sum()

    print(f"{ticker}:")
    print(f"Total observations: {total_days}")
    print(f"Missing values: {missing_values}")
    print(f"Zero volume days: {zero_volume_days}")
    print(f"Extreme moves (>20%): {extreme_moves}")

    return {
        'total_days': total_days,
        'missing_values': missing_values,
        'zero_volume': zero_volume_days,
        'extreme_moves': extreme_moves
    }


def analyze_returns_stats(data, ticker):
    print("Return Statistics for " + ticker)
    returns = data['Close'].pct_change().dropna()

    # basic stats

    mean_return = float(returns.mean())
    volatility = float(returns.std())
    annual_return = mean_return * 252
    annual_vol = volatility * np.sqrt(252)


    print(f"Daily return: {mean_return:.4f} ({annual_return:.2%} annualized)")
    print(f"Daily volatility: {volatility:.4f} ({annual_vol:.2%} annualized)")
    print(f"Skewness: {float(returns.skew()):.3f}")
    print(f"Kurtosis: {float(returns.kurtosis()):.3f}")

    
    #Test for normality
    jb_stat, jb_pvalue = jarque_bera(returns)

    print(f"*** Normality Test ***")
    print(f"Jarque Bera p-value: {jb_pvalue:.6f}")
    if jb_pvalue >= 0.05:
        print("Returns are normal")
    else:
        print('Returns are NOT normal')

    return returns



    




*** Feature Engineering, Creating Momentum Signals ***
- Main Idea: stocks that have been going up, might continue going up

In [160]:
def create_momentum_features(data):
    df = data.copy()
    #one day returns
    df['returns_1d'] = df['Close'].pct_change()

    #momentum periods of one week, 2 weeks, 1 month, 3 months
    momentum_periods = [5, 10, 20, 60]

    for period in momentum_periods:
        df[f'momentum_{period}d'] = df['Close'].pct_change(period)

    # moving averages
    ma_periods = [10, 20, 50]
    for period in ma_periods:
        df[f'sma_{period}d'] = df['Close'].pct_change(period).mean()
        df[f'above_sma_{period}'] =  (df['Close'] > df[f'sma_{period}']).astype(int)
        print(period + "--day moving average signal")

    # volatility

    df['vol_10d'] = df['returns_1d'].rolling(10).std()
    df['vol_20d'] = df['returns_1d'].rolling(20).std()
    print("***Volatility features***")

    # RSI - Relative Strength Index - momentum oscillator
    def calculate_rsi(prices, window = 14):
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window = window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window = window).mean()
        rs = gain/loss
        rsi = 100 - (100 / (1 + rs))
        return rsi

    df['rsi'] = calculate_rsi(df['Close'])
    df['rsi_oversold'] = calculate_rsi(df['Close'] < 30).astype(int)
    df['rsi_undersold'] = calculate_rsi(df['close'] > 70).astype(int)

    print("*** RSI momentum indicator ***")

    df['vol_sma_20'] = df['Volume'].rolling(20).mean()
    df['volume_ratio'] = df['Volume'] / df['volume_sma_20']
    df['high_volume'] = (df['volume_ratio'] > 1.5).astype(int)
    print("*** Volume Features ***")

    df['high_20d'] = df['High'].rolling(20).max()
    df['low_20d'] = df['Low'].rolling(20).min()
    df['price_position'] = (df['Close'] - df['low_20d']) / (df['high_20d'] - df['low_20d'])
    print("*** Price Position Feature ***")
    
    print(f"\n There are {len([col for col in df.columns if col not in data.columns])} new features created\n")

    return df


    


In [161]:
def create_target_variable(data, forward_days = 5):
    print("Creating target: {forward_days}-day forward return")
    target = data['Close'].pct_change(forward_days).shift(-forward_days)

    print(f"Target mean: {target.mean():.4f}")
    print(f"Target std: {target.std():.4f}")
    print(f"Valid predictions: {target.count()}\n")

    return target

*** ML - Test if signals work ***

In [162]:
def prep_ml_dataset(feature_data, target, train_ratio = 0.7):
    og_columns = ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']
    feature_columns = [col for col in feature_data.columns if col not in og_columns]

    X = feature_data[feature_columns].copy()
    y = target.copy()

    ml_data = pd.concat([X, y.rename('target')], axis = 1).dropna()

    print(f"Original data points: {len(feature_data)}")
    print(f"Features: {len(feature_columns)}")
    print(f"First five feature names: {feature_columns[:5]}")

    if len(ml_data) < 100:
        print(f"Not enough data for ML: ({len(ml_data)} < 100)")
        return None, None, None, None, None
    
    X_clean = ml_data[feature_columns]
    y_clean = ml_data['target']

    # Time series split!!!!
    split = int(len(ml_data) * train_ratio)

    X_train = X_clean.iloc[:split]
    X_test = X_clean.iloc[split:]
    y_train = y_clean.iloc[:split]
    y_test = y_clean.iloc[split:]

    print(f"Training set has {len(X_train)} observations")
    print(f"Testing set has {len(X_test)} observations\n")

    return X_train, X_test, y_train, y_test, feature_columns
    

In [163]:
def train_test_models(X_train, X_test, y_train, y_test, feature_names):
    print("Training and testing models...")

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    models = {
        'Linear Regression' : LinearRegression(),
        'Random Forest': RandomForestRegressor(n_estimator = 100, max_depth = 5, random_state = 42)
    }

    results = {}

    for name, model  in models.items():
        if name == 'Linear Regression':
            model.fit(X_train_scaled, y_train)
            train_pred = model.predict(X_train_scaled)
            test_pred = model.predict(X_test_scaled)
        else:
            model.fit(X_train, y_train)
            train_pred = model.predict(X_train)
            test_pred = model.predict(X_test)
    
        train_r2 = r2_score(y_train, train_pred)
        test_r2 = r2_score(y_test, test_pred)

        results[name] = {
            'train_r2': train_r2,
            'test_r2' : test_r2,
            'overfitting': train_r2 - test_r2
        }

        print(f"Train R^2: {train_r2:.4f}")
        print(f"Test R^2: {test_r2:.4f}")
        print(f"Overfitting: {train_r2 - test_r2:.4}")

        #random forest feature importance
        if hasattr(model, 'feature_importances_'):
            importance_df = pd.DataFrame({
                'feature' : feature_names,
                'importance': model.feature_importances_
            })
            importance_df.sort_values('importance', ascending = False)
            print(f"Top 5 important features:")
            for _, row in importance_df.head().iterrows():
                print(f"- {row['feature']}: {row['importance']:.4f}")



In [164]:
def cross_validation(X, y):
    print(f"Cross validation")

    tscv = TimeSeriesSplit(n_splits = 3)

    model = RandomForestRegressor(n_estimators = 50, max_depth = 3, random_state = 42)
    cv_scores = cross_val_score(model, X, y, cv = tscv, scoring = 'r^2')
    print(f"CV results:")
    print(f"- Mean R^2:{cv_scores.mean():.4f}")
    print(f"- Std R^2: {cv_scores.std():.4f}")
    print(f"- Individual Scores:{cv_scores.round(4)}")

    if cv_scores.mean() > 0.02:
        print(f"Model shows predictive power")
    else:
        print("Weak predictive power")

    if cv_scores.std() < 0.05:
        print("Stable performance across time periods")
    else:
        print("Unstable - performance varies")

    return cv_scores

In [154]:
tickers = ['AAPL', 'TSLA', 'GOOG', 'NVDA', 'MSFT']

stock_data = get_stock_data(tickers)


main_ticker = list(stock_data.keys())[3]

data = stock_data[main_ticker]
print(data)

quality_report = quality_check(data, main_ticker)
    
returns = analyze_returns_stats(data, main_ticker)


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed

Ticker            NVDA
Date                  
2020-01-02    5.971747
2020-01-03    5.876164
2020-01-06    5.900805
2020-01-07    5.972246
2020-01-08    5.983446
...                ...
2024-06-24  118.072693
2024-06-25  126.050171
2024-06-26  126.360077
2024-06-27  123.950844
2024-06-28  123.500984

[1130 rows x 1 columns]
NVDA:
Total observations: 1130
Missing values: 0
Zero volume days: Ticker
NVDA    0
dtype: int64
Extreme moves (>20%): Ticker
NVDA    1
dtype: int64
Return Statistics for NVDA
Daily return: 0.0033 (82.08% annualized)
Daily volatility: 0.0341 (54.07% annualized)
Skewness: 0.446
Kurtosis: 4.267
*** Normality Test ***
Jarque Bera p-value: 0.000000
Returns are NOT normal



  mean_return = float(returns.mean())
  volatility = float(returns.std())
  print(f"Skewness: {float(returns.skew()):.3f}")
  print(f"Kurtosis: {float(returns.kurtosis()):.3f}")


Rolling [window=2,center=False,axis=0,method=single]
