# Imports

In [1]:
%load_ext watermark
%watermark

%load_ext autoreload
%autoreload 2

# import standard libs
import warnings

warnings.filterwarnings("ignore")
from IPython.display import display
from IPython.core.debugger import set_trace as debug
from pathlib import Path
import itertools
from pprint import pprint


def get_relative_project_dir(project_repo_name=None, partial=True):
    """helper fn to get local project directory"""
    current_working_directory = Path.cwd()
    cwd_parts = current_working_directory.parts
    if partial:
        while project_repo_name not in cwd_parts[-1]:
            current_working_directory = current_working_directory.parent
            cwd_parts = current_working_directory.parts
    else:
        while cwd_parts[-1] != project_repo_name:
            current_working_directory = current_working_directory.parent
            cwd_parts = current_working_directory.parts
    return current_working_directory


# import python scientific stack
import pandas as pd

pd.set_option("display.max_rows", 100)
import numpy as np
import scipy.stats as stats
import statsmodels.api as sm
import numba as nb

# import ffn
import yfinance as yf
import bottleneck as bk
import mlxtend as mlx

from typing import Callable

import numpy_ext as npx
from sklearn.tree import DecisionTreeClassifier
import lightgbm as lgb

import shap

# import visual tools
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

%matplotlib inline
import seaborn as sns

# import util libs
from tqdm import tqdm, tqdm_notebook


# ---------------------------------------------------
# THESE ARE VARIABLES FOR EASILY ACCESSING DIFFERENT
# DIRECTORIES FOR ACCESSING AND SAVING DATA AND IMAGES
# IF NECESSARY. CHANGE THEM TO MATCH YOUR DIRECTORY
# STRUCTURE.
# ---------------------------------------------------

REPO_NAME = "blackarbs_algo_strategy_dev" # CHANGE TO MATCH YOUR REPO NAME
print("\n", REPO_NAME)
project_dir = get_relative_project_dir(REPO_NAME)
data_dir = project_dir / "data"
external = data_dir / "external"
processed = data_dir / "processed"
viz = project_dir / "viz"


def cprint(df: pd.DataFrame, nrows: int = None):
    """
    Custom dataframe print function
    """
    if not isinstance(df, (pd.DataFrame,)):
        try:
            df = df.to_frame()
        except:
            raise ValueError("object cannot be coerced to df")

    if not nrows:
        nrows = 5
    print("*" * 79)
    print("dataframe information")
    print("-" * 79)
    print(f"HEAD num rows: {nrows}")
    print(df.head(nrows))
    print("-" * 25)
    print(f"TAIL num rows: {nrows}")
    print(df.tail(nrows))
    print("-" * 50)
    print(df.info())
    print("*" * 79)
    print()
    return


print()
%watermark -v -m -p numpy,pandas,scipy,sklearn,mlfinlab,seaborn,matplotlib -g

ModuleNotFoundError: No module named 'watermark'

In [None]:
%load_ext nb_black

sns_params = {
    "xtick.major.size": 2,
    "ytick.major.size": 2,
    "font.size": 12,
    "font.weight": "medium",
    "figure.figsize": (10, 7),
    # "font.family": "Ubuntu Mono",
}
sns.set_theme(
    context="talk",
    style="white",
    # palette=sns.color_palette("magma"),
    rc=sns_params,
)
savefig_kwds = dict(dpi=90, bbox_inches="tight", frameon=True, format="png")

# Import Data

In [None]:
def read_tradestation_data(fn: Path) -> pd.DataFrame:
    df = pd.read_csv(fn).rename(str.lower, axis="columns")
    df["datetime"] = pd.to_datetime(
        df["date"] + " " + df["time"], infer_datetime_format=True
    )
    df["volume"] = df["up"] + df["down"]
    df.drop(["date", "time"], axis=1, inplace=True)
    df.set_index("datetime", inplace=True)
    return df


spy = read_tradestation_data(external / "tradestation" / "SPY.txt")
cprint(spy)

# Data Resampling

In [None]:
def time_consolidator(df, period):
    aggregate = {
        "open": "first",
        "high": "max",
        "low": "min",
        "close": "last",
        "up": "sum",
        "down": "sum",
        "volume": "sum",
    }
    return df.resample(f"{period}Min").agg(aggregate).dropna()


data = dict()
df = spy.copy()
bars_5m = time_consolidator(df, 5)
bars_30m = time_consolidator(df, 30)
bars_1H = time_consolidator(df, 60)
bars_1D = time_consolidator(df, 60 * 24)
bars_1W = time_consolidator(df, 60 * 24 * 7)

data = {
    "5m": bars_5m,
    "30m": bars_30m,
    "1H": bars_1H,
    "1D": bars_1D,
    "1W": bars_1W,
}

# Feature Functions and Code

In [None]:
#################


def add_log_returns(df: pd.DataFrame, column: str) -> pd.DataFrame:
    df[f"log_{column}_return"] = np.log(df[column]).diff()
    return df


#################
def internal_bar_strength(df: pd.DataFrame) -> float:
    return (df.close - df.low) / (df.high - df.low)


def add_internal_bar_strength(df: pd.DataFrame) -> pd.DataFrame:
    df["ibs"] = internal_bar_strength(df)
    return df


#################
# @nb.jit
def aqr_momentum(array: np.ndarray) -> float:
    """
    Input:  Price time series.
    Output: Annualized exponential regression slope, 
            multiplied by the R2
    """    
    returns = np.diff(np.log(array))  # .diff()
    x = np.arange(len(returns))
    slope, _, rvalue, _, _ = stats.linregress(x, returns)
    return ((1 + slope) ** 252) * (rvalue ** 2)  # annualize slope and multiply by R^2


@nb.njit
def aqr_momo_numba(array: np.ndarray) -> float:
    y = np.diff(np.log(array))
    x = np.arange(y.shape[0])
    A = np.column_stack((x, np.ones(x.shape[0])))
    model, resid = np.linalg.lstsq(A, y)[:2]
    r2 = 1 - resid / (y.size * y.var())
    return (((1 + model[0]) ** 252) * r2)[0]


def add_aqr_momentum(df: pd.DataFrame, column: str, window: int) -> pd.DataFrame:
    df[f"aqr_momo_{column}_{window}"] = npx.rolling_apply(
        aqr_momentum, window, df[column].values, n_jobs=10
    )
    return df


def add_aqr_momentum_numba(df: pd.DataFrame, column: str, window: int) -> pd.DataFrame:
    df[f"aqr_momo_{column}_{window}"] = npx.rolling_apply(
        aqr_momo_numba, window, df[column].values, n_jobs=10
    )
    return df


#################


def get_slope(array: np.ndarray) -> float:
    returns = np.diff(np.log(array))
    x = np.arange(len(returns))
    slope, _, rvalue, _, _ = stats.linregress(x, returns)
    return slope


@nb.njit
def get_slope_numba(array: np.ndarray) -> float:
    y = np.diff(np.log(array))
    # y = y[~np.isnan(y)]
    x = np.arange(y.shape[0])
    A = np.column_stack((x, np.ones(x.shape[0])))
    model, resid = np.linalg.lstsq(A, y)[:2]
    return model[0]


def add_slope_column(df: pd.DataFrame, column: str, window: int) -> pd.DataFrame:
    df[f"slope_{column}_{window}"] = npx.rolling_apply(
        get_slope, window, df[column].values, n_jobs=10
    )
    return df


def add_slope_column_numba(df: pd.DataFrame, column: str, window: int) -> pd.DataFrame:
    df[f"slope_{column}_{window}"] = npx.rolling_apply(
        get_slope_numba, window, df[column].values, n_jobs=1
    )
    return df


#################
def add_average_price(df: pd.DataFrame) -> pd.DataFrame:
    df["average_price"] = (df.high + df.low + df.close + df.open) / 4
    return df


#################
def add_rolling_min(df: pd.DataFrame, column: str, window: int) -> pd.DataFrame:
    array = npx.rolling_apply(np.min, window, df[column].values, n_jobs=10)
    df[f"rmin_{column}_{window}"] = array
    return df


def add_rolling_max(df: pd.DataFrame, column: str, window: int) -> pd.DataFrame:
    array = npx.rolling_apply(np.max, window, df[column].values, n_jobs=10)
    df[f"rmax_{column}_{window}"] = array
    return df


#################

# for some reason njit is generating zerodivision errors whereas numpy is not
@nb.njit
def numba_vwap(
    avg: np.ndarray, v: np.ndarray, idx: np.ndarray, len_df: int, window: int
) -> np.ndarray:
    n = np.shape(np.arange(len_df - window))[0]
    A = np.empty((n, 2))
    for i in np.arange(len_df - window):
        tmp_avg = avg[i : i + window]
        tmp_v = v[i : i + window]
        aa = np.sum(tmp_v * tmp_avg) / np.sum(tmp_v)
        jj = idx[i + window]
        A[i, 0] = jj
        A[i, 1] = aa
    return A


def numpy_vwap(
    avg: np.ndarray, v: np.ndarray, idx: np.ndarray, len_df: int, window: int
) -> np.ndarray:
    n = np.shape(np.arange(len_df - window))[0]
    A = np.empty((n, 2))
    for i in tqdm(np.arange(len_df - window)):
        tmp_avg = avg[i : i + window]
        tmp_v = v[i : i + window]
        aa = np.sum(tmp_v * tmp_avg) / np.sum(tmp_v)
        jj = idx[i + window]
        A[i, 0] = jj
        A[i, 1] = aa
    return A


def add_rolling_vwap(df: pd.DataFrame, column: str, window: int) -> pd.DataFrame:
    v = df.volume.values
    avg = df[column].values
    idx = df.index.asi8
    # A = numba_vwap(avg, v, idx, len(df), window)
    A = numpy_vwap(avg, v, idx, len(df), window)
    outdf = (
        pd.DataFrame(A, columns=["index", f"rvwap_{window}"])
        .assign(datetime=lambda df: pd.to_datetime(df["index"], unit="ns"))
        .drop("index", axis=1)
        .set_index("datetime")
    )
    df = df.join(outdf, how="left")
    return df


#################
def add_rolling_bands(
    df: pd.DataFrame, column: str, dist: int, window: int
) -> pd.DataFrame:
    upper = df[column] + dist * df[column].rolling(window).std()
    lower = df[column] - dist * df[column].rolling(window).std()

    df[f"upper_band_{column}"] = upper
    df[f"lower_band_{column}"] = lower
    return df


#################
def add_acceleration(
    df: pd.DataFrame, column: str = "close", window: int = 10
) -> pd.DataFrame:
    return_diff = df[column].pct_change().diff()
    df[f"racc_{column}_{window}"] = return_diff.rolling(
        window
    ).std()  # standard deviation of second deriv aka acceleration
    return df


def roll_rank_bk(array):
    rank = array.size + 1 - bk.rankdata(array)[-1]
    A = array.shape[0]
    p = rank / A
    return p


#################
def add_volatility(
    df: pd.DataFrame, column: str = "close", window: int = 10
) -> pd.DataFrame:
    returns = df[column].pct_change()
    df[f"rvol_{column}_{window}"] = returns.rolling(window).std()
    return df


def relative_strength_index(df: pd.DataFrame, n: int) -> pd.Series:
    """
    Calculate Relative Strength Index(RSI) for given data.
    https://github.com/Crypto-toolbox/pandas-technical-indicators/blob/master/technical_indicators.py

    :param df: pandas.DataFrame
    :param n:
    :return: pandas.DataFrame
    """
    i = 0
    UpI = [0]
    DoI = [0]
    while i + 1 <= df.index[-1]:
        UpMove = df.loc[i + 1, "high"] - df.loc[i, "high"]
        DoMove = df.loc[i, "low"] - df.loc[i + 1, "low"]
        if UpMove > DoMove and UpMove > 0:
            UpD = UpMove
        else:
            UpD = 0
        UpI.append(UpD)
        if DoMove > UpMove and DoMove > 0:
            DoD = DoMove
        else:
            DoD = 0
        DoI.append(DoD)
        i = i + 1
    UpI = pd.Series(UpI)
    DoI = pd.Series(DoI)
    PosDI = pd.Series(UpI.ewm(span=n, min_periods=n).mean())
    NegDI = pd.Series(DoI.ewm(span=n, min_periods=n).mean())
    RSI = pd.Series(round(PosDI * 100.0 / (PosDI + NegDI)), name="RSI_" + str(n))
    return RSI


def add_rsi(df: pd.DataFrame, column: str = "close", window: int = 14) -> pd.DataFrame:
    out = df.reset_index()
    rsi = relative_strength_index(out, window)
    df[f"rsi_{column}_{window}"] = pd.Series(data=rsi.values, index=df.index)
    return df


#################


def np_racorr(array: np.ndarray, window: int, lag: int) -> np.ndarray:
    """
    rolling autocorrelation
    """
    return npx.rolling_apply(
        lambda array, lag: sm.tsa.acf(array, nlags=lag, fft=True)[lag],
        window,
        array,
        lag=lag,
        n_jobs=10,
    )


def add_rolling_autocorr(
    df: pd.DataFrame, column: str, window: int, lag: int
) -> pd.DataFrame:
    log_changes_array = np.log(df[column]).diff().values
    df[f"racorr_{column}_{window}"] = np_racorr(log_changes_array, window, lag)
    return df


#################
@nb.njit
def custom_percentile(array: np.ndarray) -> float:
    if (array.shape[0] - 1) == 0:
        return np.nan
    return (array[:-1] > array[-1]).sum() / (array.shape[0] - 1)


def add_custom_percentile(df: pd.DataFrame, column: str, window: int) -> pd.DataFrame:
    df[f"rank_{column}_{window}"] = npx.rolling_apply(
        custom_percentile, window, df[column].values, n_jobs=5
    )
    return df

## create data store for features

In [None]:
HDF_FILEPATH = processed / 'spy_features.h5'

store = pd.HDFStore(
    HDF_FILEPATH, mode="a", complevel=1, complib="blosc:lz4"
)
store.close()

### add and save features function

In [None]:
def add_and_save_features(data, data_frequency, periods, hdf_filepath, rank_window=10):
    """
    function to create features according to frequency and periods and save to
    hdf store. store must already be created.

    # Args
        data: dict, keys are frequency, values are dataframes
        data_frequency: str, one of the data frequencies from the data dict
        periods: dict, keys are period labels, values are the integers
        hdf_filepath: pathlib or str object
        rank_window: int to divide window by for ranking features

    """

    log_errors = []
    pprint(periods)

    df = data[data_frequency].copy()

    for key, window in tqdm(periods.items()):
        tqdm._instances.clear()
        try:
            tmp_df = (
                df.pipe(add_average_price)
                .pipe(add_rolling_vwap, column="average_price", window=window)
                .pipe(
                    add_rolling_bands, column=f"rvwap_{window}", dist=2, window=window
                )
                .pipe(add_internal_bar_strength)
                .pipe(add_rolling_min, column="low", window=window)
                .pipe(add_rolling_max, column="high", window=window)
                .dropna()
                .pipe(
                    add_slope_column_numba,
                    column=f"lower_band_rvwap_{window}",
                    window=window,
                )
                .pipe(
                    add_slope_column_numba,
                    column=f"upper_band_rvwap_{window}",
                    window=window,
                )
                .pipe(
                    add_slope_column_numba, column=f"rmin_low_{window}", window=window
                )
                .pipe(
                    add_slope_column_numba, column=f"rmax_high_{window}", window=window
                )
                .pipe(add_acceleration, column="close", window=window)
                .pipe(add_aqr_momentum_numba, column="close", window=window)
                .pipe(add_acceleration, column="average_price", window=window)
                .pipe(add_aqr_momentum_numba, column="average_price", window=window)
                .pipe(add_acceleration, column=f"rvwap_{window}", window=window)
                .pipe(add_acceleration, column="close", window=window)
                .pipe(add_acceleration, column="average_price", window=window)
                .pipe(add_aqr_momentum_numba, column=f"rvwap_{window}", window=window)
                .pipe(add_volatility, column="close", window=window)
                .pipe(add_volatility, column="average_price", window=window)
                .pipe(add_rsi, column="close", window=window)
                .pipe(add_rsi, column="average_price", window=window)
                .pipe(add_rolling_autocorr, column="close", window=window, lag=1)
                .pipe(
                    add_rolling_autocorr, column="average_price", window=window, lag=1
                )
            )
            columns_to_rank = tmp_df.columns[tmp_df.columns.get_loc("up") :]
            for column in tqdm(columns_to_rank):
                tmp_df = add_custom_percentile(
                    tmp_df, column, window=max(int(window // rank_window), rank_window)
                )
                # ^^ make window smaller since this basically a double lag

            # write to store iteratively in case of problems
            with pd.HDFStore(hdf_filepath) as store:
                store.put(
                    value=tmp_df, key=f"spy/{data_frequency}/{key}", format="table"
                )

        except Exception as error:
            log_error = dict(window=window, error=error)
            log_errors.append(log_error)
    pprint(f"{data_frequency} errors:\n{log_errors}")
    return

## 5 min multi day features

In [None]:
one_day_in_minutes = 1440
one_day = int(one_day_in_minutes // 5)
two_days = int(one_day * 2)
three_days = int(one_day * 3)
five_days = int(one_day * 5)
ten_days = int(one_day * 10)
one_month = int(one_day * 21)

period_labels = ["1_day", "2_day", "3_day", "5_day", "10_day", "21_day"]
periods = dict(
    zip(period_labels, [one_day, two_days, three_days, five_days, ten_days, one_month])
)

FREQ = '5m'
add_and_save_features(data, FREQ, periods, HDF_FILEPATH, rank_window=10)

## 30 min multi day features

In [None]:
one_day_in_minutes = 1440
one_day = int(one_day_in_minutes // 30)
two_days = int(one_day * 2)
three_days = int(one_day * 3)
five_days = int(one_day * 5)
ten_days = int(one_day * 10)
one_month = int(one_day * 21)

period_labels = ["1_day", "2_day", "3_day", "5_day", "10_day", "21_day"]
periods = dict(
    zip(period_labels, [one_day, two_days, three_days, five_days, ten_days, one_month])
)

FREQ = "30m"
add_and_save_features(data, FREQ, periods, HDF_FILEPATH, rank_window=10)

## 1 Hour (60 min) multi day features

In [None]:
one_day_in_minutes = 1440
one_day = int(one_day_in_minutes // 60)
two_days = int(one_day * 2)
three_days = int(one_day * 3)
five_days = int(one_day * 5)
ten_days = int(one_day * 10)
one_month = int(one_day * 21)
three_month = int(one_month * 3)

period_labels = ["1_day", "2_day", "3_day", "5_day", "10_day", "21_day", "63_day"]
periods = dict(
    zip(
        period_labels,
        [one_day, two_days, three_days, five_days, ten_days, one_month, three_month],
    )
)

FREQ = "1H"
add_and_save_features(data, FREQ, periods, HDF_FILEPATH, rank_window=10)

## daily features

In [None]:
one_day = 1
two_days = int(one_day * 2)
three_days = int(one_day * 3)
five_days = int(one_day * 5)
ten_days = int(one_day * 10)
one_month = int(one_day * 21)
three_month = int(one_month * 3)
six_month = int(one_month * 6)
twelve_month = int(one_month * 12)

period_labels = [  # "1_day",
    "2_day",
    "3_day",
    "5_day",
    "10_day",
    "21_day",
    "63_day",
    "126_day",
    "252_day",
]
periods = dict(
    zip(
        period_labels,
        [  # one_day,
            two_days,
            three_days,
            five_days,
            ten_days,
            one_month,
            three_month,
            six_month,
            twelve_month,
        ],
    )
)

FREQ = "1D"
add_and_save_features(data, FREQ, periods, HDF_FILEPATH, rank_window=2)

## weekly features

In [None]:
one_week = 1
two_weeks = int(one_week * 2)
three_weeks = int(one_week * 3)
one_month = int(one_week * 4)
three_month = int(one_month * 3)
six_month = int(one_month * 6)
twelve_month = int(one_month * 12)

period_labels = [
    # "1_week",
    "2_week",
    "3_week",
    "1_month",
    "3_month",
    "6_month",
    "12_month",
]
periods = dict(
    zip(
        period_labels,
        [
            # one_week,
            two_weeks,
            three_weeks,
            one_month,
            three_month,
            six_month,
            twelve_month,
        ],
    )
)

FREQ = "1W"
add_and_save_features(data, FREQ, periods, HDF_FILEPATH, rank_window=2)