In [45]:
import numpy as np 
import pandas as pd

In [None]:
def calculate_overnight_delta(df, date_col='date', open_col='open', close_col='close'): 
    """ 
    Calculate the overnight delta for each trading day in the DataFrame.

    Parametesrs:
        1) df (pd.DataFrame): DataFrame containing stock data with 'Date', 'Open', and 'Close' columns.
        2) date_col (str): Name of the column containing dates. Default is 'date'.
        3) open_col (str): Name of the column containing opening prices. Default is 'open'.
        4) close_col (str): Name of the column containing closing prices. Default is 'close'.
    
    Returns:
        1) df_mod (pd.DataFrame): DataFrame with an additional 'Overnight_Delta' column.
    """

    df_mod = df.copy()  # copy the original DataFrame to avoid modifying it directly
    df_mod[date_col] = pd.to_datetime(df_mod[date_col])  # confirm date column as datetime type
    df_mod = df_mod.sort_values(by=date_col).reset_index(drop=True)  # sort by date and reset index

    # calculate overnight change and percentage change
    df_mod['overnight_delta'] = df_mod[open_col] - df_mod[close_col].shift(1)  
    df_mod['overnight_delta_pct'] = (df_mod['overnight_delta'] / df_mod[close_col].shift(1)) * 100  

    return df_mod


def identify_abnormal_delta(df, threshold): 
    """ 
    Identify abnormal overnight price changes.

    Parameters:
        1) df (pd.DataFrame): DataFrame containing stock data with 'Overnight_Delta' column.
        2) threshold (float): Threshold value for identifying abnormal overnight delta.
    
    Returns:
        1) df_mod (pd.DataFrame): DataFrame containing days with abnormal overnight delta.
    """

    df_mod = df.copy()  # copy the original DataFrame to avoid modifying it directly

    mean_delta_pct = df_mod['overnight_delta_pct'].mean()  # calculate mean overnight change percentage
    std_dev = df_mod['overnight_delta_pct'].std()  # repeate for std dev

    # calculate z score and label anomalies
    df_mod['z_score'] = (df_mod['overnight_delta_pct'] - mean_delta_pct) / std_dev
    df_mod['abnormal'] = df_mod['z_score'].abs() > threshold 

    return df_mod


def calculate_late_day_momentum(df): 
    """ 
    Calculate late day momentum. 

    Parameters: 
        1) df (pd.DataFrame): DataFrame containing stock data with 'close' column 
    
    Returns: 
        1) df_mod (pd.DataFrame): DataFrame with new indicator column 
    """

    df_mod = df.copy()  # copy input DataFrame to avoid direct modification 

    # 1) late day momentum 
    df_mod['intraday_delta'] = df_mod['close'] - df_mod['open']
    df_mod['close_position'] = (df_mod['close'] - df_mod['low']) / df_mod['intraday_delta']
    df_mod['close_position'] = df_mod['close_position'].fillna(0.5)  # handle division by 0 error 

    return df_mod


def calculate_volume_spike(df): 
    """ 
    Calculate volume spike. 
    
    Parameters: 
        1) df (pd.DataFrame): DataFrame with stock data and 'volume' column
    
    Returns: 
        1) df_mod (pd.DataFrame): DataFrame with new indicator column
    """

    # copy input DataFrame to avoid direct modification
    df_mod = df.copy()   

    # calculate 20d moving average 
    df_mod['avg_volume_20'] = df_mod['volume'].rolling(window=20).mean()
    # divide volume count by mean 
    df_mod['volume_ratio'] = df_mod['volume'] / df_mod['avg_volume_20']
    # label spikes greater than 2.0 
    df_mod['volume_spike'] = df_mod['volume_ratio'] > 2.0 

    return df_mod 


def calculate_rsi(df): 
    """ 
    Calculate relative strength indicator (RSI). 
    
    Parameters: 
        1) df (pd.DataFrame): DataFrame with stock data and 'close' column 
    
    Returns: 
        1) df_mod (pd.DataFrame): DataFrame with additional technical indicator column 
    """

    # copy input DataFrame to avoid direct modification
    df_mod = df.copy() 

    # calculate daily price change (close, close)
    delta = df_mod['close'].diff()

    # calculate positive, negative price changes 
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() 
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()

    # calculate relative strength and add to DataFrame
    relative_strength = gain / loss 

    df_mod['rsi'] = 100 - (100 / (1 + relative_strength))
    df_mod['rsi_oversold'] = df_mod['rsi'] < 30
    df_mod['rsi_overbought'] = df_mod['rsi'] > 70

    return df_mod 


def calculate_average_true_range(df): 
    """ 
    Calculate average true range (ATR). 
    
    Parameters: 
        1) df (pd.DataFrame): DataFrame with 'high', 'low, 'close' columns 
    
    Returns: 
        1) df_mod (pd.DataFrame): Dataframe with new technical indicator column
    """
    
    # copy input DataFrame to avoid direct modification
    df_mod = df.copy() 

    # calculate intraday max (high, low) range 
    price_range_high_low = df_mod['high'] - df_mod['low']
    # calculate intraday (high, close) range 
    price_range_high_close = np.abs(df_mod['high'] - df_mod['close'].shift())
    # calculate intraday (low, close) range 
    price_range_low_close = np.abs(df_mod['low'] - df_mod['close'].shift())

    # concatenate intraday ranges into Series object 
    price_ranges = pd.concat(
        [price_range_high_low, price_range_high_close, price_range_low_close], 
        axis=1
    )
    # store max range per day 
    daily_max_range = np.max(price_ranges, axis=1)

    # calculate 14d rolling max price change 
    df_mod['atr'] = daily_max_range.rolling(window=14).mean() 
    df_mod['atr_pct'] = (df_mod['atr'] / df_mod['close']) * 100 

    return df_mod


def calculate_annual_min_max_proximity(df): 
    """ 
    Calculate daily proximity to 52 wk high/low. 
    
    Parameters: 
        1) df (pd.DataFrame): Contains 'high', 'low' columns 
        
    Returns: 
        1) df_mod (pd.DataFrame): DataFrame with new technical indicator
    """

    # copy input DataFrame to avoid direct modification
    df_mod = df.copy()

    # store rolling min, max values
    df_mod['52_week_high'] = df_mod['high'].rolling(window=252).max()
    df_mod['52_week_low'] = df_mod['low'].rolling(window=252).min()

    # calculate relative value from daily close to 52 week high, low 
    df_mod['52_week_high_proximity'] = ( (df_mod['52_week_high'] - df_mod['close']) / df_mod['52_week_high']) * 100 
    df_mod['52_week_low_proximity'] = ( (df_mod['close'] - df_mod['52_week_low']) / df_mod['52_week_low']) * 100 

    # label when within threshold (5%) of 52 wk high, low 
    df_mod['52_week_high_threshold'] = df_mod['52_week_high_proximity'] < 5  
    df_mod['52_week_low_threshold'] = df_mod['52_week_low_proximity'] < 5

    return df_mod 


def calculate_intraday_momentum(df): 
    """ 
    Calculate intraday momentum. 
    
    Parameters: 
        1) df (pd.DataFrame): DataFrame with 'open', 'close' columns
    
    Returns: 
        1) df_mod (pd.DataFrame): DataFrame with new techincal indicator
    """

    # copy input DataFrame to avoid direct modification
    df_mod = df.copy() 

    # calculate relative intraday price change 
    df_mod['intraday_return'] = ( (df_mod['close'] - df_mod['open']) / df_mod['open']) * 100 

    # label strong, weak (+/- 2%) relative intraday price change 
    df_mod['intraday_return_strong_positive'] = df_mod['intraday_return'] > 2.0
    df_mod['intraday_return_strong_negative'] = df_mod['intraday_return'] < 2.0 

    return df_mod 


def label_day_of_week(df): 
    """ 
    Label day of week and one-hot encode. 
    
    Parameters: 
        1) df (pd.DataFrame): DataFrame containing 'date' column 
    
    Returns: 
        1) df_mod (pd.DataFrame): DataFrame with days of week labeled 
    """

    # copy the original DataFrame to avoid direct modification
    df_mod = df.copy()

    # label day of week 
    df_mod['day_of_week'] = df_mod['date'].dt.day_of_week

    # one-hot encode day of week 
    df_mod['is_monday'] = df_mod['day_of_week'] == 0
    df_mod['is_tuesday'] = df_mod['day_of_week'] == 1
    df_mod['is_wednesday'] = df_mod['day_of_week'] == 2
    df_mod['is_thursday'] = df_mod['day_of_week'] == 3
    df_mod['is_friday'] = df_mod['day_of_week'] == 4

    return df_mod 


def label_month_of_year(df): 
    """ 
    Label month of year and one-hot encode.
     
    Parameters: 
        1) df (pd.DataFrame): DataFrame with 'date' column
        
    Returns: 
        1) df (pd.DataFrame): DataFrame with months labeled
    """

    # copy the original DataFrame to avoid direct modification
    df_mod = df.copy() 

    # label months 
    df_mod['month_of_year'] = df_mod['date'].dt.month

    # one hot encode month of year 
    df_mod['is_jan'] = df_mod['month_of_year'] == 1 
    df_mod['is_feb'] = df_mod['month_of_year'] == 2
    df_mod['is_mar'] = df_mod['month_of_year'] == 3
    df_mod['is_apr'] = df_mod['month_of_year'] == 4 
    df_mod['is_may'] = df_mod['month_of_year'] == 5 
    df_mod['is_jun'] = df_mod['month_of_year'] == 6 
    df_mod['is_jul'] = df_mod['month_of_year'] == 7 
    df_mod['is_aug'] = df_mod['month_of_year'] == 8 
    df_mod['is_sep'] = df_mod['month_of_year'] == 9 
    df_mod['is_oct'] = df_mod['month_of_year'] == 10 
    df_mod['is_nov'] = df_mod['month_of_year'] == 11 
    df_mod['is_dec'] = df_mod['month_of_year'] == 12 

    return df_mod 


def label_start_end_of_month(df): 
    """ 
    Label first, final 5 days of each month. 
    
    Parameters: 
        1) df (pd.DataFrame): DataFrame with 'date' column
    
    Returns: 
        1) df_mod (pd.DataFrame): DataFrame with start, end months labeled 
    """

    # copy the original DataFrame to avoid direct modification
    df_mod = df.copy()  

    # extract date within month  
    df_mod['day_of_month'] = df_mod['date'].dt.day

    # label first, final 5d of month 
    df_mod['first_5d_month'] = df_mod['day_of_month'] <= 5 
    df_mod['final_5d_month'] = df_mod['day_of_month'] >= 25

    return df_mod 


def correlate_two_indicators(df, indicator1, indicator2): 
    """ 
    Test correlation between two indicators. 
    
    Parameters: 
        1) df (pd.DataFrame): DataFrame containing stock data with techincal and calendar indicators.
    
    Returns: 
        1) correlation (float): Spearman correlation coefficient between input indicators
    """

    correlation = np.correlate(df[indicator1], df[indicator2], method='spearman')

    return correlation


def test_indicators_to_overnight_delta(df): 
    """
    Test correlation between technical/calendar indicators and overnight delta.

    Parameters:
        1) df (pd.DataFrame): DataFrame containing stock data with technical and calendar
    
    Returns:
        1) results (pd.DataFrame): DataFrame summarizing indicator performance
    """
    df_mod = df.copy()
    df_mod = df_mod.dropna(subset=['abnormal'])
    
    results = []
    
    # Binary indicators
    indicator_cols = [
        'volume_spike', 'rsi_oversold', 'rsi_overbought',
        '52_week_high_threshold', '52_week_low_threshold', 
        'intraday_return_strong_positive', 'intraday_return_strong_negative',
        'is_monday', 'is_tuesday', 'is_wednesday', 'is_thursday', 'is_friday',
        'is_jan', 'is_dec', 'first_5d_month', 'final_5d_month'
    ]
    
    base_rate = df_mod['abnormal'].mean()
    
    # iterate across indicator columns 
    for indicator in indicator_cols:
        # skip if indicator not in input DataFrame
        if indicator not in df_mod.columns:
            continue
        
        # filter DataFrame for rows (days) where indicator is True
        indicator_found = df_mod[df_mod[indicator] == True]

        if len(indicator_found) > 0:
            # calculate conditional probability P(abnormal | indicator)
            abnormal_rate_given_indicator = indicator_found['abnormal'].mean()

            # calculate lift
            lift = abnormal_rate_given_indicator / base_rate if base_rate > 0 else 0
            
            results.append({
                'indicator': indicator,
                'occurrences': len(indicator_found),
                'base_abnormal_rate': base_rate * 100,
                'abnormal_rate_with_indicator': abnormal_rate_given_indicator * 100,
                'lift': lift,
                'signal_strength': 'Strong' if lift > 1.5 else 'Moderate' if lift > 1.2 else 'Weak'
            })
    
    # Continuous indicators comparison
    continuous_indicators = ['close_position', 'volume_ratio', 'rsi', 'atr_pct']
    
    
    for indicator in continuous_indicators:
        if indicator in df_mod.columns:
            abnormal_mean = df_mod[df_mod['abnormal'] == True][indicator].mean()
            normal_mean = df_mod[df_mod['abnormal'] == False][indicator].mean()
            difference = abnormal_mean - normal_mean
            
            results.append({
                'indicator': indicator + ' (continuous)',
                'abnormal_mean': abnormal_mean,
                'normal_mean': normal_mean,
                'difference': difference
            })
    
    return pd.DataFrame(results)


In [137]:
data_path = '/Users/sidrana/Projects/data/AAPL_2020-01-01_2024-12-23.csv.csv'
data = pd.read_csv(data_path)

data = calculate_overnight_delta(data)
data = identify_abnormal_delta(data, threshold=2)
data = calculate_late_day_momentum(data)
data = calculate_volume_spike(data) 
data = calculate_rsi(data) 
data = calculate_average_true_range(data) 
data = calculate_annual_min_max_proximity(data) 
data = calculate_intraday_momentum(data)

data = label_day_of_week(data)
data = label_month_of_year(data)
data = label_start_end_of_month(data)
results = test_indicators_to_overnight_delta(data)

# Modification notes 

1. When rolling functions (min, max, mean, etc.) have less than input days, then use min/max/mean seen value to date. 

In [140]:
500*0.038

19.0