## Install Libraries
#### TA-Lib
**Outside Docker Jupyter**
```bash
curl -L http://prdownloads.sourceforge.net/ta-lib/ta-lib-0.4.0-src.tar.gz -o ta-lib-0.4.0-src.tar.gz && tar -xzf ta-lib-0.4.0-src.tar.gz && cd ta-lib/ && ./configure && make && sudo make install
TA_LIBRARY_PATH=/usr/local/lib TA_INCLUDE_PATH=/usr/local/include pip install ta-lib
```

**Within Docker Jupyter**
```bash
!curl -L http://prdownloads.sourceforge.net/ta-lib/ta-lib-0.4.0-src.tar.gz -o ta-lib-0.4.0-src.tar.gz
!tar -xzf ta-lib-0.4.0-src.tar.gz
%cd ta-lib/
!./configure --prefix=$HOME --build=aarch64-unknown-linux-gnu
# !./configure --build=x86_64-unknown-linux-gnu
!make
!make install
!TA_LIBRARY_PATH=~/lib TA_INCLUDE_PATH=~/include pip install ta-lib
```

pip install numpy==1.26.0
pip install keras=3.7.0
pip install tensorflow=2.18.0
pip install pyfolio=0.9.2
pip install hurst=0.0.5

## Importing Libraries

In [None]:
import numpy as np
import pandas as pd
import talib

# Building the Artificial Neural Network
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Dropout

from itertools import combinations
import itertools as itt
import pyfolio as pf
from tqdm.notebook import tqdm
from hurst import compute_Hc

from sklearn.preprocessing import StandardScaler

import random

import matplotlib.pyplot as plt
from matplotlib.pyplot import cm

In [None]:
# Setting the random seed to a fixed number
random.seed(42)

## Helper Functions for CCV

In [None]:
def cpcv_generator(t_span, num_groups, k, verbose=True):
    # split data into N groups, with N << T
    # this will assign each index position to a group position
    group_num = np.arange(t_span) // (t_span // num_groups)
    group_num[group_num == num_groups] = num_groups-1
    
    # generate the combinations 
    test_groups = np.array(list(itt.combinations(np.arange(num_groups), k))).reshape(-1, k) # 15×2 matrix: [0,1],[0,2]...[0,5]...[3 5],[4 5]]
    C_nk = len(test_groups)
    n_paths = C_nk * k // num_groups 
    
    if verbose:
        print('n_sim:', C_nk)
        print('n_paths:', n_paths)
    
    # is_test is a T x C(n, k) array where each column is a logical array 
    # indicating which observation is in the test set
    is_test_group = np.full((num_groups, C_nk), fill_value=False) # 6×15 matrix
    is_test = np.full((t_span, C_nk), fill_value=False) # 100×15 matrix or 7566×15 matrix

    # assign test folds for each of the C(n, k) simulations
    for sim_idx, pair in enumerate(test_groups):
        i, j = pair # [0,1], [0,2],... [0,5], [1,2],..., [4,5]
        is_test_group[[i, j], sim_idx] = True # is_test_group is 6×15 matrix. 
                                              # Since test_groups is [0,1], [0,2], [0,3], [0,4],... [0,5], [1,2],..., [4,5], simulation with idx 3 will consists of ticks in group 0 and 4
        # assigning the test folds
        mask = (group_num == i) | (group_num == j) # group_num = [0, 0, 0, 0, ..., 1, 1, ..., 2, 2, 2, 2, ..., 3, 3, ..., 4, 4, 4, 4, ...] of length 100 or 7566 (number of ticks)
                                                   # for [i,j]=[0,4] mask becomes [True, True, True, True, ..., False, False, ..., False, False, False, False, ..., False, False, ..., True, True, True, True, ...]
        is_test[mask, sim_idx] = True # Mark the rows that belong to group i,j so that they belong to sim_idx and are for testing and backtesting.
                                      # For example, since groups [0,4] belong to simulation 3, we set simulation 3 to True for all the rows that belong to group 0 or 4 to indicate that they should be used
                                      # for testing in simulation 3

    # for each path, connect the folds from different simulations to form a test path
    # the fold coordinates are: the fold number, and the simulation index e.g. simulation 0, fold 0 etc
    path_folds = np.full((num_groups, n_paths), fill_value=np.nan)
    for p in range(n_paths):
        for group in range(num_groups):
            # If is_test_group[group, :]=[F, F, F, T, F, F, F, T, F, F, T, F, T, F, T] then sim_idx=3
            sim_idx = is_test_group[group, :].argmax().astype(int)
            path_folds[group, p] = sim_idx # Considering the above example where sim_idx=3 is associated with groups [0,4]: path_folds[0,0], path_folds[0,1],..., path_folds[0,4] will be set to sim_idx=3
                                           # Same for group 4: path_folds[4,0], path_folds[4,1],..., path_folds[4,4] will be set to sim_idx=3

            is_test_group[group, sim_idx] = False # Mark it as False so on the next iteration we get the next sim_idx when doing a "...argmax().astype(int)" (e.g. sim_idx=7)
            
    
    # finally, for each path we indicate which simulation we're building the path from and the time indices
    paths = np.full((t_span, n_paths), fill_value= np.nan) # 100×15 matrix or 7566×5 matrix
    for p in range(n_paths):
        for g in range(num_groups):
            mask = (group_num == g) # Get all the ticks that belong to group g
            paths[mask, p] = int(path_folds[g, p])
    # paths = paths_# .astype(int)

    # Once done the matrices will look like so:
    # is_test[99] = [F, F, F, F, True, F, F, F, True, F, F, True, F, True, True]
    # paths[99] = [4, 8, 11, 13, 14]
    # path_folds[5] = [4, 8, 11, 13, 14]
    return (is_test, paths, path_folds)

# AFML, snippet 7.1
# purging removes from the training set those samples that are build with information that 
# overlaps samples in the testing set.
def purge(t1, test_times): # whatever is not in the train set should be in the test set
    train = t1.copy(deep=True) # copy of the index
    for test_start, test_end in test_times.items():
        df_0 = train[(test_start <= train.index) & (train.index <= test_end)].index # train starts within test
        df_1 = train[(test_start <= train) & (train <= test_end)].index # train ends within test
        df_2 = train[(train.index <= test_start) & (test_end <= train)].index # train envelopes test
        train = train.drop(df_0.union(df_1).union(df_2))
    return train

# AFML, snippet 7.2
def embargo_(times, pct_embargo):
    step = int(times.shape[0] * pct_embargo) # more complicated logic if needed to use a time delta
    if step == 0:
        ans = pd.Series(times, index=test_times)
    else:
        ans = pd.Series(times[step:].values, index=times[:-step].index)
        ans = pd.concat([ans, pd.Series(times.iloc[-1], index=times[-step:].index)])
    return ans

# embargo removes a number of observations from the end of the test set.
def embargo(test_times, t1, pct_embargo=0.01): # done before purging
    # embargoed t1
    t1_embargo = embargo_(t1, pct_embargo)
    test_start, test_end = test_times.index[0], test_times.index[-1]
    #print(f"Test start/end times:{test_start}/{test_end}")
    test_times_embargoed = t1_embargo.loc[test_times.index]
    return test_times_embargoed

## Read the data

In [None]:
# Read the data from CSV file
data = pd.read_csv('aapl_prices.csv', parse_dates=True, sep=',', index_col=[0])
data.index = pd.to_datetime(data.index)
data = data.dropna()
data.head()

In [None]:
spy_data = pd.read_csv('spy_prices.csv', parse_dates=True, sep=',', index_col=[0])
spy_data = spy_data.loc[data.index]
spy_data.head()

## Preparing the dataset

In [None]:
# Preparing the dataset
data['H-L'] = data['High'] - data['Low']
data['O-C'] = data['Close'] - data['Open']
data['3day MA'] = data['Close'].shift(1).rolling(window = 3).mean()
data['10day MA'] = data['Close'].shift(1).rolling(window = 10).mean()
data['30day MA'] = data['Close'].shift(1).rolling(window = 30).mean()
data['21day MA'] = data['Close'].shift(1).rolling(21).mean()
data['263day MA'] = data['Close'].shift(1).rolling(262).mean()
data['Std_dev']= data['Close'].rolling(5).std()
data['RSI'] = talib.RSI(data['Close'].values, timeperiod = 9)
data['Williams %R'] = talib.WILLR(data['High'].values, data['Low'].values, data['Close'].values, 7)
data['zscore'] = (data.Close - data.Close.rolling(126).mean())/data.Close.rolling(126).std()
data['hurst'] = data['Close'].rolling(window=126).apply(lambda x: compute_Hc(x)[0])

data['High_std'] = (data['High']-data['High'].mean())/data['High'].std() 
spy_data['High_std'] = (spy_data['High']-spy_data['High'].mean())/spy_data['High'].std() 
data['aapl-spy'] = data['High_std'] - spy_data['High_std']

# 21 days holding period
data['returns'] = data.Close.shift(-21).ffill().pct_change(21)

## Defining input features from dataset

In [None]:
data['Price_Rise'] = np.where(data['Close'].shift(-21) > data['Close'], 1, 0)
data = data.dropna()

feats = ['zscore', 'aapl-spy']
#feats = ['zscore', 'Std_dev', '21day MA', '263day MA']
X = data[feats]
y = data['Price_Rise']

## Building the artificial neural network model

### is_test
Has _tick-number_ of rows, with each row pointing to a _number-of-sims_ long array, marking the simulation to be used as True. 
For example assuming: 
```
...
is_test[50] = [F, F, T, F, F, F, T, F, F, F, F, F, T, T, F]
...
is_test[99] = [F, F, F, F, T, F, F, F, T, F, F, T, F, T, T]
...
```
above, we read columnwise for each of the 15 simulations. In simulation index 2, tick number 50 will be considered for testing/backtesting, while tick 99 will not. On the other hand in simulation 4 tick 99 will be used for test while tick 50 will not.

### paths
Has _tick-number_ of rows and 5 columns. Each column represents one path (again, we read column wise) . A path consist of simulation indices. Example: `paths[99] = [4, 8, 11, 13, 14]`. In this example, tick 99 is part of 5 paths. In the first path we will consider the prediction of simulation 4. In the second path we will consider the prediction of simulation 8, and so on. It will be used for the backtesting after we have trained (using training set) and predicted using the test set. In other words, if our prediction (`pred`) matches the real signal (`old_signal`) then `backtest_paths[tick, p]` will hold the return that we would gain. This logic is depicted below:
```
for p in range(paths.shape[1]):
    for t, sim in enumerate(paths[:, p]): # index, value
        backtest_paths[t, p] = ( bool(pred[t, int(sim)]) &  bool(old_signal.iloc[t]) ) * ret.iloc[t]
```
`pred` has _tick-number_ of rows and _number-of-sims_ columns filled as follows: 
```
for sim in range(num_sim):
    test_idx = is_test[:, sim] 
    ...
    pred[test_idx, sim] = classifier.predict(X_test)
```

<img src="CCV.png" width="1000">

In [None]:
# prediction and evaluation times
# using business days, but the index is not holidays aware -- it can be fixed
t1_ = data.index

# we are holding our position for 21 days
t1 = pd.Series(t1_[21:], index=t1_[:-21]) # t1 is both the trade time and the event time

# realign data
data = data.loc[t1.index]

data = data.dropna()
# realign t1
t1 = t1.loc[data.index]

num_paths = 5
num_groups_test = 2
num_groups = num_paths + 1 
num_ticks = len(data)
is_test, paths, _ = cpcv_generator(num_ticks, num_groups, num_groups_test)
pred = np.full(is_test.shape, np.nan)

num_sim = is_test.shape[1] # num of simulations needed to generate all backtest paths
for sim in tqdm(range(num_sim)):
    # get train set|
    test_idx = is_test[:, sim] 
    
    # convert numerical indices into time stamps
    test_times = t1.loc[test_idx]
    
    # embargo
    test_times_embargoed = embargo(test_times, t1, pct_embargo=0.01)
    
    # purge
    train_times = purge(t1, test_times_embargoed)
    
    # split training / test sets
    X_test = X.loc[test_times.index, :]
    y_test = y.loc[X_test.index]
    
    X_train = X.loc[train_times.index, :]
    y_train = y.loc[X_train.index]

    sc = StandardScaler()
    
    X_train = sc.fit_transform(X_train)
    X_test = sc.transform(X_test)

    # reconstructing the backtest paths
    print(f'training classifier for simulation %s' % sim)
    np.random.seed(42)

    # https://stackoverflow.com/questions/45393429/keras-how-to-save-model-and-continue-training
    classifier = Sequential()
    classifier.add(Dense(units = 128, kernel_initializer = 'uniform', activation = 'relu', input_dim = X.shape[1]))
    classifier.add(Dense(units = 128, kernel_initializer = 'uniform', activation = 'relu')) # second layer
    classifier.add(Dense(units = 1, kernel_initializer = 'uniform', activation = 'sigmoid')) # output layer
    classifier.compile(optimizer = 'adam', loss = 'mean_squared_error', metrics = ['accuracy']) #  ‘adam’ is stochastic gradient descent.
    classifier.fit(X_train, y_train, batch_size = 10, epochs = 100, verbose=0)

    pred_ = classifier.predict(X_test)
    pred_ = np.concatenate(pred_, axis=0)

    pred_ = (pred_ > 0.5)
    
    # fill the backtesting prediction matrix
    pred[test_idx, sim] = pred_

## Computation of strategy returns and determine trade positions

In [None]:
backtest_paths = np.full((paths.shape[0], paths.shape[1]), np.nan)

for p in range(paths.shape[1]):
    for t, sim in enumerate(paths[:, p]): # index, value
        backtest_paths[t, p] =  np.where(pred[t, int(sim)] == True, data.iloc[t]['returns'], -data.iloc[t]['returns'])
        
import pyfolio as pf
perf_func = pf.timeseries.perf_stats

perf_list = []
for s in range(paths.shape[1]):
    perf_table = perf_func(backtest_paths[:, s])
    perf_list.append(perf_table)

perf_paths = pd.concat(perf_list, axis=1)
perf_paths

In [None]:
perf_paths.loc['Sharpe ratio', :].mean()

In [None]:
perf_paths.loc['Sharpe ratio', :].std()

In [None]:
perf_paths.loc['Sharpe ratio', :].hist()

In [None]:
perf_paths.loc['Max drawdown', :].mean() # slightly better than the base strategy -- but still very risky

In [None]:
perf_paths.loc['Max drawdown', :].std()

In [None]:
perf_paths.loc['Max drawdown', :].hist()

In [None]:
data['cumulative_market_returns'] = np.cumsum(data['returns'])

strategy_returns = np.full((paths.shape[0], paths.shape[1]), np.nan)
for p in range(paths.shape[1]):
    for t, sim in enumerate(paths[:, p]): # index, value
        strategy_returns[t, p] =  np.where(pred[t, int(sim)] == True, data.iloc[t]['returns'], -data.iloc[t]['returns'])

for p in range(paths.shape[1]):
    data[f'cumulative_strategy_returns_{p}'] = np.cumsum(strategy_returns[:, p])

In [None]:
plt.figure(figsize=(10,5))
plt.plot(data['cumulative_market_returns'], color='g', label='Market Returns')

color = iter(plt.colormaps.get_cmap('viridis').resampled(10).colors)
for p in range(paths.shape[1]):
    c = next(color)
    plt.plot(data[f'cumulative_strategy_returns_{p}'], color=c, label=f'Strategy Returns [path {p}]',linewidth=0.5)

plt.title('Market Returns and Strategy Returns', color='purple', size=15)

# Setting axes labels for close prices plot
plt.xlabel('Dates', {'color': 'orange', 'fontsize':15})
plt.ylabel('Returns(%)', {'color': 'orange', 'fontsize':15})

plt.legend()
plt.show()