# 0. Thêm thư viện

Các thư viện cần thiết, trong đó có `yfinance` để lấy data

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras import backend as K
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.callbacks import EarlyStopping, ModelCheckpoint, Callback
from keras.regularizers import l1, l2
from keras.optimizers import Adam

from sklearn import tree
import xgboost as xgb
from sklearn.base import BaseEstimator
from sklearn.preprocessing import StandardScaler

from hypopt import GridSearch
import keras_tuner as kt

import yfinance as yf

from copy import deepcopy
from datetime import datetime, timedelta

import warnings
warnings.filterwarnings("ignore")


# 1. Lấy Data

hàm lấy data, là cổ phiếu của 50 công ty trên sàn `EURO_STOXX_50` \
Kết quả trả về là 1 dataframe có dạng m dòng, 50 cột với m là time range 

In [None]:
def EU_Stock_data(start_time,end_time, time_range = 'max'):
    """Lấy dữ liệu giá Close của 50 công ty trên sàn Euro_STOXX 50 vào thời gian cho trước"""

    stock_list = pd.read_html( 'https://en.wikipedia.org/wiki/EURO_STOXX_50')[4]['Ticker'].to_list()

    futures = pd.DataFrame()  

    # xét từng mã
    for symbol in stock_list:
        try:
            df = yf.Ticker(symbol).history(period = time_range, start = start_time, end = end_time)
            df = pd.DataFrame(df['Close'])
            df.columns = [symbol]
            df.index = df.index.date
            futures = pd.concat([futures,df],axis = 1, join = 'outer').sort_index()
        except:
            continue

    futures['Date'] = pd.to_datetime(futures.index, format='%Y-%m-%d')
    futures.set_index('Date', inplace=True)

    return futures

# 2. Classic TSMOM

Hàm thực hiện tính toán để lấy về giá trị volatility (biến động) của mỗi ngày

In [None]:
def Volatility_scale(data, ignore_na=False, adjust = True, com = 60, min_periods=0):
    """Scale data using ex ante volatility"""

    # Lưu trữ index, tức thời gian 
    std_index = data.index

    # chứa kết quả
    daily_index = pd.DataFrame(index=std_index)

    # xét từng cổ phiếu
    for oo in data.columns:
        returns = data[oo]  # Lấy ra các return
        returns.dropna(inplace=True)  # xử lý null bằng zero

        returns = returns.rolling(2).apply(lambda x: x.iloc[1] / x.iloc[0] - 1)
        returns.iloc[0] =  0

        # Tính cumulative (cum) return , nhưng ko có thành phần - 1
        ret_index = (1 + returns).cumprod()

        # Tính daily volatility (vol)
        day_vol = returns.ewm(ignore_na=ignore_na,
                              adjust=adjust,
                              com=com,
                              min_periods=min_periods).std(bias=False)
        
        vol = day_vol * np.sqrt(252)  # scale lại theo 252 ngày active trading

        # Join cum return và vol
        ret_index = pd.concat([ret_index, vol], axis=1)
        ret_index.columns = [oo, oo + '_Vol']  # Đặt tên cột cum return là tên cổ phiếu, bên cạnh là vol 

        # Join 
        daily_index = pd.concat([daily_index, ret_index], join = 'outer' ,axis=1)

    return daily_index


Hàm implement chiến lược TSMOM, với logic cụ thể như sau:
Tại ngày t ta so  với ngày t - k về trước, cụ thể ta có thể lấy giá close,
 hoặc cumulative return (nhưng không có thành phần - 1, tức $\text{cum return}_t = \prod_{i = 0}^{t} (1 + r_i)$), 
ở đây xét `cum_return_t` với của k ngày trước
`cum_return_{t-k}`
  - Giả sử `cum_return_t` > `cum_return_{t-k}` tức `sign(cum_return_t - cum_return_{t - k}) = 1` (hàm dấu trả về 1 nếu input > 0)  thì ta có signal = 1, tức đó là tín hiệu để vào lệnh long vào ngày mai 
(ngày t + 1), ngược lại thì signal = -1, là tín hiệu vào short
  -  Sau đó hold trong h -1 ngày tiếp theo (ngày t + 1 vào long đã bắt đầu tính là hold). 
  - Trong các ngày này (tức t + i với i từ 1 đến h), đều có sinh ra Profit and Loss (PnL)  tính theo công thức:\
 ` 0.4/ vol_t * return_{t, t + i}` với `return_{t, t + i}` là return trong giai đoạn t đến t + i, tính tùy vào trường hợp long hay short:
      - nếu long, `return_{t, t + i}` = 1 - `cum_return_t / cum_return_{t + i}`
      - nếu short, `return_{t, t + i}` =  1 - `cum_return_{t + i} / cum_return_t` 
      
    và Leverage, là ` target_vol / vol_t`   (target_vol đang để là 0.4)
 
 Tóm lại, Các kết quả trả về lần lượt là: 
- profit and loss `pnl` 
- `leverage`
- `signal`

In [None]:
def classic_TSMOM(data, k, h, tolerance = 0,ignore_na = False, adjust = True, com = 60, min_periods = 0):
    
    signal = pd.DataFrame(index = data.index)

    company = data.columns

    # gọi hàm Volatility scale
    daily_index = Volatility_scale(data,ignore_na=ignore_na,
                          adjust=adjust,
                          com=com,   
                          min_periods = min_periods)


    for oo in company:
        flag_h = 0
        flag_k = k+1
        df = pd.concat([daily_index[oo], daily_index[oo+"_Vol"]], axis=1)
        df = df.dropna(axis = 0, how = 'all')
        df['rolling returns'] = df[oo].pct_change(k) # so sánh thay đổi ở ngày t với k ngày trước đó (tức t - k)
        df['signal'] = 0.
        for x, v in enumerate(df['rolling returns']):
            if flag_h != 0:
                # Bỏ qua giai đoạn hold, tránh bị tính lặp lại
                flag_h = flag_h - 1
                continue
            # Bỏ qua thời gian cty chưa được lên sàn (nêu có)
            if df[oo].isnull().iloc[x] == False:
                # bỏ qua k ngày đầu vì chưa đủ k lookback
                if flag_k != 0:
                    flag_k = flag_k - 1
                    continue
            else: continue
            try:
                if df['rolling returns'].iloc[x-1] < tolerance:
                    for h_period in range(0,h):
                        # rolling return < 0, short rồi giữ trong h ngày, tính pnl, leverage///
                        df['signal'].iloc[x + h_period] = -1
                
                elif df['rolling returns'].iloc[x-1] > tolerance:
                    for h_period in range(0,h):
                        # rolling return > 0, long rồi giữ trong h ngày, tính pnl, leverage///
                        df['signal'].iloc[x + h_period] = 1

            except:pass
            

            # Đặt flag holding là h - 1, để qua vòng for mới bỏ qua ngày hold, tránh bị tính lặp lại
            if df['rolling returns'].iloc[x-1] != tolerance: flag_h = h - 1

        signal = pd.concat([signal, df['signal']], join = 'outer', axis=1)

    signal.columns = data.columns
    
    return signal

# 3. Model

## 3.1. Feature Engineering

In [None]:
def MACD(data,period_fast,period_slow):
    EMA_fast = pd.Series(
        data.ewm(ignore_na=False, span=period_fast, adjust=True).mean()
    )
    EMA_Slow = pd.Series(
        data.ewm(ignore_na=False, span=period_slow, adjust=True).mean()
    )
    return EMA_fast - EMA_Slow

def MACD_normalized(data, period_fast, period_slow):
    macd = MACD(data, period_fast, period_slow)
    ewm_std_63 = data.ewm(span=63).std()
    q = macd / ewm_std_63
    z = q / q.ewm(span=252).std()
    return z

def calculate_rsi(data, period  = 14):
    # daily changes
    delta = data.diff()

    gain = (delta.where(delta > 0, 0))
    loss = (-delta.where(delta < 0, 0))

    # average gain and loss over the period
    avg_gain = gain.rolling(window=period).mean()
    avg_loss = loss.rolling(window=period).mean()

    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))

    return rsi

In [2]:
def construct_features_single_asset(df,k,h,test = False):
    df = df.dropna(how='any',axis=0)
    if df.shape[0] < 64: 
        return None
    df["Return Daily"] = df["Close"].rolling(2).apply(lambda x: x.iloc[1] / x.iloc[0] - 1)
    df["Return Daily"].iloc[0] =  0
    df["Volatility_Scale"] = Volatility_scale(pd.DataFrame(df["Close"]))["Close_Vol"]
    df['Cummulative Return'] = (1+ df['Return Daily']).cumprod(axis = 0)
    df['Mean H Return'] = df["Return Daily"].rolling(h+1).apply(lambda x: x.iloc[range(1,h+1)].mean()).shift(-h)
    df['Next H Return'] = df['Cummulative Return'].pct_change(h).shift(-h)
    df['Square Sum Return'] = df["Return Daily"].rolling(h+1).apply(lambda x: x.iloc[range(1,h+1)].pow(2).sum()).shift(-h)
    df["Next H Vol"] = df["Volatility_Scale"].shift(-h)
    df['Next H PnL'] = df['Next H Return'] / df["Next H Vol"] 

    for temp in [k,1]:
        df["Past " + str(temp) + " Day" ] = df['Close'].pct_change(temp) / (df["Volatility_Scale"] * np.sqrt(252))
    
    # df["MACD_8_24"] = MACD_normalized(df["Close"],8,24)  # paper .?
    df["MACD_19_39"] = MACD_normalized(df["Close"],19,39) # for longer trend
    df["MACD_5_13"] = MACD_normalized(df["Close"], 5, 13) ## for fast trend
    # df["MACD_12_26"] = MACD_normalized(df["Close"], 12, 26) ## most common

    # RSI for overbought/oversold
    df['RSI_5'] = calculate_rsi(df['Close'], period=5)
    # df['RSI_10'] = calculate_rsi(df['Close'], period=10)
    # df['RSI_14'] = calculate_rsi(df['Close'], period = 14)
    ## price sma
    df['Price_SMA_5'] = df['Close'] / df['Close'].rolling(5).mean() - 1


    df['Signal'] = [1 if x > 0 else 0 for x in df['Next H Return']]
    
    df = df.dropna(how='any',axis=0)
    
    temp = pd.DataFrame(columns= df.columns)
    n = 0
    while True:
        try:
            temp = pd.concat([temp,df.iloc[[n*h],:]], axis = 0)
            n = n+1
        except: break
    
    try:
        df = temp[:-2]
    except:
        df = None

    return df

In [1]:
def feature_engineering(data,k,h,supervised = False, binary = True):
    company = data.columns
    features = []
    for i in [k,1]:
        features.append("Past " + str(i) + " Day")
    
    features.append("MACD_19_39")
    features.append("MACD_5_13")
    features.append('RSI_5')
    # features.append('RSI_10')
    features.append('Price_SMA_5')
    
    X_train = pd.DataFrame(columns=features)
    if supervised == False:
        y_train = pd.DataFrame(columns=["Mean H Return","Square Sum Return","Volatility_Scale"])
    elif supervised == True:
        if binary == True:
            y_train = pd.DataFrame(columns=["Signal"])
        else:
            y_train = pd.DataFrame(columns=["Next H PnL"])
            
    for oo in company:
        df = data[[oo]].copy()
        
        df.columns = ["Close"]

        df = construct_features_single_asset(df,k,h)

        if df is None:
            continue
        
        X_train = pd.concat([X_train,df[features]],axis = 0)
        if supervised == False:
            y_train = pd.concat([y_train,df[["Mean H Return","Square Sum Return","Volatility_Scale","Next H Return"]]],axis = 0)
        elif supervised == True:
            if binary == True:
                y_train = pd.concat([y_train,df[["Signal"]]],axis = 0)
            else:
                y_train = pd.concat([y_train,df[["Next H PnL"]]],axis = 0)
            
    return [X_train,y_train]

## 3.2. Building Model

### Decision Tree

In [None]:
def train_decision_tree(X_train,y_train,X_val,y_val,k,h):

    param_grid = {
        'max_depth': [3, 5, 7, 10, 15],
        'min_samples_split': [2, 5, 10, 20],
        'min_samples_leaf': [1, 2, 4, 8],
        'criterion': ['gini', 'entropy']
    }

    
    model = GridSearch(model = tree.DecisionTreeClassifier(random_state=42), param_grid = param_grid,parallelize=False)
    model.fit(X_train, y_train, X_val, y_val)
    
    return model

### XGBoost

In [None]:
def train_xgboost(X_train,y_train,X_val,y_val,k,h):

    param_grid = {
        'max_depth': [3, 5, 7, 9],
        'learning_rate': [0.01, 0.05, 0.1, 0.3],
        'n_estimators': [100, 200, 300],
        'min_child_weight': [1, 3, 5],
        'subsample': [0.6, 0.8, 1.0]
    }

    model = GridSearch(model = xgb.XGBClassifier(
        objective='reg:squarederror',
        random_state=42,
        use_label_encoder=False,
        eval_metric='logloss'
    ), param_grid = param_grid,parallelize=False)
    
    model.fit(X_train, y_train, X_val, y_val)
    
    
    return model

### MLP (supervised)

In [42]:
class MLP_supervised(kt.HyperModel):
    def __init__(self, k,binary):
        self.k = k
        self.binary = binary

    def build(self,hp):
        model = Sequential([
            Dropout(0, input_shape=(6,)),
            Dense(units=hp.Choice(f"units", [5, 20, 40]),activation = hp.Choice('activation', ['relu'])),
            Dropout(rate=hp.Choice("dropout", [0.1, 0.3, 0.5])),
            Dense(1,activation = 'sigmoid' if self.binary else None),
        ])

        if self.binary == True:
            loss = 'binary_crossentropy'
        else: loss = tf.keras.metrics.RootMeanSquaredError()

        model.compile(
            optimizer=Adam(
                learning_rate=hp.Choice("learning_rate", [1e-3, 1e-1, 1.0]),
                clipnorm = hp.Choice("max_grad_norm", [1e-2, 0.1, 1.0, 10.0])
            ),
            loss= loss,
        )
        return model
    def fit(self, hp, model, *args, **kwargs):        

        return model.fit(
            *args,
            batch_size=hp.Choice("batch_size", [256,512,1024]),
            **kwargs, epochs = 100, verbose=1
        )


In [45]:
def train_MLP_supervised(X_train,y_train,X_val,y_val,k,h,binary = True):
    
    tuner = kt.GridSearch(
        MLP_supervised(k = k,binary = binary),
        objective="val_loss",
        max_trials=50,
        overwrite=True,
        directory="tuning_dir",
        project_name= f"tune_MLP_supervised_{'binary' if binary else 'reg'}",
    )

    es = EarlyStopping(monitor='val_loss', verbose=1, patience=25)

    checkpoint_filepath = (
        'Test/Data/checkpoint_mlp_sup_binary.model.keras' if binary
        else 'Test/Data/checkpoint_mlp_sup_reg.model.keras'
    )
    model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor = 'val_loss',
    save_best_only=True, verbose = 1)    

    tuner.search(X_train, y_train, callbacks = [es],validation_data=(X_val, y_val))

    hypermodel = MLP_supervised(k = k,binary = binary)
    best_hp = tuner.get_best_hyperparameters()[0]
    model = hypermodel.build(best_hp)

    history = hypermodel.fit(best_hp,model,X_train, y_train,callbacks = [model_checkpoint_callback],validation_data = (X_val, y_val))
    
    return model,history

### Lasso (supervised)

In [None]:
class Lasso_supervised(kt.HyperModel):
    def __init__(self, k,binary):
        self.k = k
        self.binary = binary

    def build(self,hp):
        model = Sequential([
            Dense(1, input_shape = (6,),kernel_regularizer = l1(hp.Choice("l1_weight", [1e-4, 1e-3, 1e-2, 0.1,])),activation= 'sigmoid' if self.binary else None)
        ])
        
        if self.binary == True:
            loss = 'binary_crossentropy'
        else: loss = tf.keras.metrics.RootMeanSquaredError()

        model.compile(
            optimizer=Adam(
                learning_rate=hp.Choice("learning_rate", [1e-3, 1e-1, 1.0]),
                clipnorm = hp.Choice("max_grad_norm", [1e-2, 0.1, 1.0, 10.0])
            ),
            loss= loss,
        )
        return model
    def fit(self, hp, model, *args, **kwargs):
    
        return model.fit(
            *args,
            batch_size=hp.Choice("batch_size", [256,512,1024]),
            **kwargs, epochs = 100, verbose=1
        )


In [None]:
def train_Lasso_supervised(X_train,y_train,X_val,y_val,k,h,binary = True):

    X_val = np.array(X_val, dtype=np.float64)
    y_val = np.array(y_val, dtype=np.float64)
    tuner = kt.GridSearch(
        Lasso_supervised(k = k,binary=binary),
        objective="val_loss",
        max_trials=50,
        overwrite=True,
        directory="tuning_dir",
        project_name= f"tune_Lasso_supervised_{'binary' if binary else 'reg'}",

    )

    es = EarlyStopping(monitor='val_loss', verbose=1, patience=25)

    checkpoint_filepath = (
        'Test/Data/checkpoint_lasso_sup_binary.model.keras' if binary
        else 'Test/Data/checkpoint_lasso_sup_reg.model.keras'
    )
    model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor = 'val_loss',
    save_best_only=True)    


    tuner.search(X_train, y_train, callbacks = [es],validation_data=(X_val, y_val))

    hypermodel = Lasso_supervised(k = k,binary=binary)
    best_hp = tuner.get_best_hyperparameters()[0]
    model = hypermodel.build(best_hp)

    history = hypermodel.fit(best_hp,model,X_train, y_train,callbacks = [model_checkpoint_callback],validation_data = (X_val, y_val))
    
    return model,history

### MLP (Sharpe Loss optimization)

In [4]:
def sharpe_loss(h,target_vol = 0.2):
    def calculation(y_target_dummy, y_pred):

        mean = K.reshape(y_target_dummy[:, 0], (-1, 1))
        square_sum =  K.reshape(y_target_dummy[:, 1], (-1, 1))
        volatility_scale = K.reshape(y_target_dummy[:, 2], (-1, 1))
        next_h_return = K.reshape(y_target_dummy[:, 3], (-1, 1))

        sum_pofolio = mean * h * y_pred * target_vol / volatility_scale
        mean_pofolio = K.mean(mean * h * y_pred * target_vol / volatility_scale) / h

        std_pofolio = tf.math.sqrt(K.mean(square_sum * y_pred **2  * (target_vol / volatility_scale)**2
                                          - 2 * sum_pofolio * mean_pofolio 
                                          + (mean_pofolio ** 2) * h)/h)

    
        return  - (mean_pofolio / std_pofolio) *np.sqrt(252) + tf.math.sqrt(mean((next_h_return/volatility_scale - y_pred) ** 2))
    
    return calculation

In [8]:
class MLP_SharpeLoss(kt.HyperModel):
    def __init__(self, k,h):
        self.k = k
        self.h = h

    def build(self,hp):
        model = Sequential([
            Dropout(0, input_shape=(6,)),
            Dense(units=hp.Choice(f"units", [5, 20]),activation = hp.Choice('activation', ['tanh', 'relu'])),
            Dropout(rate=hp.Choice("dropout", [0.1, 0.3, 0.5])),
            Dense(1,activation = 'tanh'),
        ])

        model.compile(
            optimizer=Adam(
                learning_rate=hp.Choice("learning_rate", [1e-4, 1e-3, 1e-2, 1e-1, 1.0]),
                clipnorm = hp.Choice("max_grad_norm", [1e-2, 0.1, 1.0, 10.0])
            ),
            loss= sharpe_loss(h = self.h)
        )
        return model
    def fit(self, hp, model, *args, **kwargs):

        return model.fit(
            *args,
            batch_size=hp.Choice("batch_size", [256,512]),
            **kwargs, epochs = 100, verbose=1
        )


In [16]:
def train_MLP_sharpeLoss(X_train,y_train,X_val,y_val,k,h):

    tuner = kt.GridSearch(
        MLP_SharpeLoss(k = k,h=h),
        objective="val_loss",
        max_trials=50,
        overwrite=True,
        directory="tuning_dir",
        project_name="tune_MLP_sharpeLoss",
    )

    es = EarlyStopping(monitor='val_loss', verbose=1, patience=25)

    checkpoint_filepath = 'Test/Data/checkpoint_MLP_sharpeLoss.model.keras'
    model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor = 'val_loss',
    save_best_only=True)    


    tuner.search(X_train, y_train, callbacks = [es], validation_data=(X_val, y_val))

    hypermodel = MLP_SharpeLoss(k = k,h= h)
    best_hp = tuner.get_best_hyperparameters()[0]
    model = hypermodel.build(best_hp)

    history = hypermodel.fit(best_hp,model,X_train, y_train,callbacks = [model_checkpoint_callback],validation_data = (X_val, y_val))
    return model,history

### Lasso (Sharpe Loss optimization)

In [20]:
class Lasso_SharpeLoss(kt.HyperModel):
    def __init__(self, k,h):
        self.k = k
        self.h = h

    def build(self,hp):
        model = Sequential([
            Dense(1, input_shape = (6,),kernel_regularizer = l1(hp.Choice("l1_weight", [1e-3, 1e-2, 0.1,])),activation='sigmoid')
        ])

        model.compile(
            optimizer=Adam(
                learning_rate=hp.Choice("learning_rate", [1e-3, 1e-1, 1.0]),
                clipnorm = hp.Choice("max_grad_norm", [1e-2, 0.1, 1.0, 10.0])
            ),
            loss= sharpe_loss(h = self.h)
        )
        return model
    def fit(self, hp, model, *args, **kwargs):

        return model.fit(
            *args,
            batch_size=hp.Choice("batch_size", [256,512]),
            **kwargs, epochs = 100, verbose=1
        )


In [27]:
def train_Lasso_sharpeLoss(X_train,y_train,X_val,y_val,k,h):

    tuner = kt.GridSearch(
        Lasso_SharpeLoss(k = k,h=h),
        objective="val_loss",
        max_trials=50,
        overwrite=True,
        directory="tuning_dir",
        project_name="tune_Lasso_sharpeLoss",
    )

    es = EarlyStopping(monitor='val_loss', verbose=1, patience=25)

    checkpoint_filepath = 'Test/Data/checkpoint_Lasso_sharpeLoss.model.keras'
    model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor = 'val_loss',
    save_best_only=True)


    tuner.search(X_train, y_train, callbacks = [es], validation_data=(X_val, y_val))

    hypermodel = Lasso_SharpeLoss(k = k,h= h)
    best_hp = tuner.get_best_hyperparameters()[0]
    model = hypermodel.build(best_hp)

    history = hypermodel.fit(best_hp,model,X_train, y_train,callbacks = [model_checkpoint_callback], validation_data = (X_val, y_val))
    return model,history

## 3.3. Test model

In [None]:
def test_model_TSMOM(data, model,k,h):

    company = data.columns

    signal = pd.DataFrame(index = data.index)

    features = []
    for i in [k,1]:
        features.append("Past " + str(i) + " Day")

    features.append("MACD_19_39")
    features.append("MACD_5_13")
    features.append('RSI_5')
    # features.append('RSI_10')
    features.append('Price_SMA_5')

    for oo in company:
        df = data[[oo]].copy()
        
        df.columns = ["Close"]
        df = construct_features_single_asset(df,k,h,test= True)

        if df is None: continue
        time_index = data[oo].dropna(how = 'any').index
        company_signal = pd.DataFrame(index = time_index, columns = [oo])
        
        X_test = df[features]
        if X_test.shape[0] == 0: continue

        # Take signal
        # try:
        #     if model.loss ==  'binary_crossentropy':
        #         X_test['prediction'] = np.sign(model.predict(X_test) - 0.5)
        #     else:
        #         X_test['prediction'] = np.sign(model.predict(X_test))
        # except:
        #     X_test['prediction'] = np.sign(model.predict(X_test))
        #     X_test['prediction'][X_test['prediction'] == 0] = -1

        # Take Direct Output
        try:
            if model.loss ==  'binary_crossentropy':
                X_test['prediction'] = np.sign(model.predict(X_test) - 0.5)
                X_test['prediction'][X_test['prediction'] == 0] = -1
            else:
                X_test['prediction'] = model.predict(X_test)
        except:
            X_test['prediction'] = model.predict(X_test)
        
        for x,v in enumerate(X_test.index):
            company_signal.loc[v,oo] = X_test.loc[v,'prediction']
        
        company_signal = company_signal.ffill()
        company_signal = company_signal.fillna(0)

        signal = pd.concat([signal,company_signal], axis = 1, join = 'outer')


    return signal

In [None]:
def loss_history(history, model_name):
    fig, ax = plt.subplots(figsize=(12, 12))
    ax.plot(history.history['loss'])
    ax.plot(history.history['val_loss'])
    ax.set_title(str(model_name) + ' loss')
    ax.set_ylabel('loss')
    ax.set_xlabel('epoch')
    ax.legend(['train', 'validation'], loc='upper left')
    fig.savefig(str(model_name) + 'loss.png')
    del fig
    del ax
    return

# 4. Backtest

In [None]:
def backtest(data,signal,k,h,  vol_flag = 1, target_vol = 0.2, ignore_na = False, adjust = True, com = 60, min_periods = 0):
    
    pnl = pd.DataFrame(index=data.index)
    leverage = pd.DataFrame(index = data.index)

    company = signal.columns

    # gọi hàm Volatility scale
    daily_index = Volatility_scale(data,ignore_na=ignore_na,
                          adjust=adjust,
                          com=com,   
                          min_periods = min_periods)


    # Volatility settings
    vol_flag = vol_flag    # Set flag to 1 for vol targeting
    if vol_flag == 1:
        target_vol = target_vol 
    else:
        target_vol = 'no target vol'
    

    for oo in company:
        flag_h = 0
        flag_k = k+1
        df = pd.concat([daily_index[oo], daily_index[oo+"_Vol"]], axis=1)
        df = df.dropna(axis = 0, how = 'all')

        company_signal = signal[oo].dropna(axis = 0, how = 'all')
        df['pnl'] = 0. 
        df['leverage'] = 0.
        for x, v in enumerate(df['pnl']):
            if flag_h != 0:
                # Bỏ qua giai đoạn hold, tránh bị tính lặp lại
                flag_h = flag_h - 1
                continue
            # Bỏ qua thời gian cty chưa được lên sàn (nêu có)
            if df[oo].isnull().iloc[x] == False:
                # bỏ qua k ngày đầu vì chưa đủ k lookback
                if flag_k != 0:
                    flag_k = flag_k - 1
                    continue
            else: continue
            try:
                if company_signal.iloc[x] == -1:
                    for h_period in range(0,h):
                        if vol_flag == 1:
                            df['pnl'].iloc[x + h_period] = (1 - df[oo].iloc[x + h_period] / df[oo].iloc[x - 1 + h_period]) * \
                                target_vol / df[oo+"_Vol"].iloc[x -1] 
                            df['leverage'].iloc[x + h_period] = target_vol / df[oo+"_Vol"].iloc[x -1]
                        else:
                            df['pnl'].iloc[x + h_period] = (1 - df[oo].iloc[x + h_period] / df[oo].iloc[x - 1 + h_period])
                            df['leverage'].iloc[x+h_period] = 1
                elif company_signal.iloc[x] == 1:
                    for h_period in range(0,h):
                        if vol_flag == 1:
                            df['pnl'].iloc[x + h_period] = (df[oo].iloc[x + h_period] / df[oo].iloc[x - 1 + h_period] - 1) * \
                                    target_vol / df[oo+"_Vol"].iloc[x - 1]
                            df['leverage'].iloc[x+h_period] = target_vol / df[oo+"_Vol"].iloc[x -1]
                        else:
                            df['pnl'].iloc[x + h_period] = (df[oo].iloc[x + h_period] / df[oo].iloc[x - 1 + h_period] - 1)
                            df['leverage'].iloc[x+h_period] = 1
            except:pass
            
            if signal[oo].iloc[x] == 1 or signal[oo].iloc[x] == -1 : flag_h = h - 1


        leverage = pd.concat([leverage, df['leverage']], join = 'outer',axis = 1)
        pnl = pd.concat([pnl, df['pnl']], join = 'outer',axis=1)

    pnl.columns = signal.columns
    leverage.columns = signal.columns

    return [pnl,leverage]

Cuối cùng, ta lấy mean của 50 cổ phiếu để có `PnL` đại diện 

In [None]:
def strategy_daily_return(pnl):
    
    return pnl.mean(axis=1)

# 5. Example Code

## 5.1. Lấy Data

In [None]:
# # Thời gian input theo dạng yyyy-mm-dd; với ví dụ ở dưới 

# start_time = '2004-12-31'
# end_time = '2010-01-01' 

# df = EU_Stock_data(start_time,end_time)


## 5.2. Code demo Classic TSMOM

In [None]:
start_time = '2009-12-31'
end_time = '2024-12-22'

daily_return = EU_Stock_data(start_time = start_time, end_time=end_time)
daily_index = Volatility_scale(daily_return)

# print ra result là pnl, leverage, signal của hàm backtest_strategy(), với k = 3, h = 3, target volatility = 0.4
LOOKBACK = 3
HOLDING = 3
TARGET_VOL = 0.4

signal = classic_TSMOM(daily_return,LOOKBACK,HOLDING)
[pnl,leverage] = backtest(daily_return,signal,LOOKBACK,HOLDING, target_vol= TARGET_VOL)

print(f'pnl với k = {LOOKBACK} , h = {HOLDING}, target volatility = {TARGET_VOL}:')
pnl

print(f'leverage với k = {LOOKBACK} , h = {HOLDING}, target volatility = {TARGET_VOL}:')
leverage

print(f'signal với k = {LOOKBACK} , h = {HOLDING}, target volatility = {TARGET_VOL}:')
signal

## 5.3 Code so sánh các cặp k,h khi sử dụng classic TSMOM

In [None]:
start_time = '2019-12-31'
end_time = '2024-12-31'

data = EU_Stock_data(start_time = start_time, end_time=end_time)

for k in range(1,11):
    for h in range(1,11):
        # print([k,h]) # Kiểm tra tiến độ
        signal = classic_TSMOM(data,k,h)
        result = backtest(data,signal,k,h)
        # result[0].to_csv("pnl (k = " + str(0 if k < 10 else "") + str(k) + ", h = " + str(0 if h < 10 else "") + str(h) + ").csv")
        # result[1].to_csv("leverage (k = " + str(0 if k < 10 else "") + str(k) + ", h = " + str(0 if h < 10 else "")  + str(h) + ").csv")
        # result[2].to_csv("signal (k = " + str(0 if k < 10 else "") + str(k) + ", h = " + str(0 if h < 10 else "")  + str(h) + ").csv")
        temp = strategy_daily_return(result[0])
        try:
            temp2 = temp.to_list()
            temp2.insert(0,h)
            temp2.insert(0,k)
            stats.loc[len(stats.index)] = temp2
        except:
            index = temp.index.to_list()
            index.insert(0,'h')
            index.insert(0,'k')
            stats = pd.DataFrame(columns = index)
            temp2 = temp.to_list()
            temp2.insert(0,h)
            temp2.insert(0,k)
            stats.loc[len(stats.index)] = temp2
        del result

stats.to_csv("k_h_Comparing.csv")

## 5.4. Code demo thử model

### Train theo tỉ lệ Data (9 năm Train; 1 năm Validation và 5 năm Backtest từ 15 năm)

In [None]:
start_time = '2018-12-31'
end_time = '2024-12-22'

# start_time = '2004-12-31'
# end_time = '2019-12-31'

k = 5
h = 10

data = EU_Stock_data(start_time = start_time, end_time=end_time)

X_binary, y_binary = feature_engineering(data,k,h,supervised= True)
X_regression, y_regression = feature_engineering(data,k,h,supervised= True,binary= False)
X_sharpeloss, y_sharpeloss = feature_engineering(data,k,h,supervised= False)

test_data = data[data.index > datetime(pd.to_datetime(start_time).year + 5,12,31)]

model_name = ['classic_TSMOM','train_decision_tree','train_xgboost','train_MLP_supervised','train_Lasso_supervised','train_MLP_supervised_reg','train_Lasso_supervised_reg','train_MLP_sharpeLoss','train_Lasso_sharpeLoss']

for model in model_name:
    
    if model == 'classic_TSMOM':
        func = globals()[model]
        signal = func(test_data,k,h)
        # signal.to_csv("signal_" + str(model) + ".csv")
        pnl = strategy_daily_return(backtest(test_data,signal,k,h)[0])

    elif model[-3:] == 'reg':
        func = globals()[model[:-4]]
        X_train = np.array(X_regression[X_regression.index <= datetime(pd.to_datetime(start_time).year + 4,12,31)], dtype=np.float64)
        y_train = np.array(y_regression[y_regression.index <= datetime(pd.to_datetime(start_time).year + 4,12,31)], dtype=np.float64)

        X_val = np.array(X_regression[(X_regression.index > datetime(pd.to_datetime(start_time).year + 4,12,31)) & (X_regression.index <= datetime(pd.to_datetime(start_time).year + 5,12,31))], dtype=np.float64)
        y_val = np.array(y_regression[(y_regression.index > datetime(pd.to_datetime(start_time).year + 4,12,31)) & (y_regression.index <= datetime(pd.to_datetime(start_time).year + 5,12,31))], dtype=np.float64)
        temp_model,history = func(X_train,y_train,X_val,y_val,k,h)
        loss_history(history,str(model))
        del history
        signal = test_model_TSMOM(test_data,temp_model,k,h)
        # signal_1.to_csv("signal_" + str(model) + ".csv")
        pnl = strategy_daily_return(backtest(test_data,signal,k,h)[0])
        
        del temp_model

    elif model in ['train_decision_tree','train_xgboost','train_MLP_supervised','train_Lasso_supervised']:
        func = globals()[model]
        X_train = np.array(X_binary[X_binary.index <= datetime(pd.to_datetime(start_time).year + 4,12,31)], dtype=np.float64)
        y_train = np.array(y_binary[y_binary.index <= datetime(pd.to_datetime(start_time).year + 4,12,31)], dtype=np.float64)

        X_val = np.array(X_binary[(X_binary.index > datetime(pd.to_datetime(start_time).year + 4,12,31)) & (X_binary.index <= datetime(pd.to_datetime(start_time).year + 5,12,31))], dtype=np.float64)
        y_val = np.array(y_binary[(y_binary.index > datetime(pd.to_datetime(start_time).year + 4,12,31)) & (y_binary.index <= datetime(pd.to_datetime(start_time).year + 5,12,31))], dtype=np.float64)

        if model in ['train_MLP_supervised','train_Lasso_supervised']:
            temp_model,history = func(X_train,y_train,X_val,y_val,k,h)
            loss_history(history,str(model))
            del history
        else:
            temp_model = func(X_train,y_train,X_val,y_val,k,h)
        signal = test_model_TSMOM(test_data,temp_model,k,h)
        # signal.to_csv("signal_" + str(model) + ".csv")
        pnl = strategy_daily_return(backtest(test_data,signal,k,h)[0])

        del temp_model

    else:
        func = globals()[model]
        X_train = np.array(X_sharpeloss[X_sharpeloss.index <= datetime(pd.to_datetime(start_time).year + 4,12,31)], dtype=np.float64)
        y_train = np.array(y_sharpeloss[y_sharpeloss.index <= datetime(pd.to_datetime(start_time).year + 4,12,31)], dtype=np.float64)

        X_val = np.array(X_sharpeloss[(X_sharpeloss.index > datetime(pd.to_datetime(start_time).year + 4,12,31)) & (X_sharpeloss.index <= datetime(pd.to_datetime(start_time).year + 5,12,31))], dtype=np.float64)
        y_val = np.array(y_sharpeloss[(y_sharpeloss.index > datetime(pd.to_datetime(start_time).year + 4,12,31)) & (y_sharpeloss.index <= datetime(pd.to_datetime(start_time).year + 5,12,31))], dtype=np.float64)
        
        temp_model,history = func(X_train,y_train,X_val,y_val,k,h)
        loss_history(history,str(model))
        del history
        signal = test_model_TSMOM(test_data,temp_model,k,h)
        # signal_1.to_csv("signal_" + str(model) + ".csv")
        pnl = strategy_daily_return(backtest(test_data,signal,k,h)[0])
        
        del temp_model

    try:
        temp = pnl.to_list()
        temp.insert(0,model)
        stats.loc[len(stats.index)] = temp
    except:
        index = pnl.index.to_list()
        index.insert(0,'Model')
        stats = pd.DataFrame(columns = index)
        temp = pnl.to_list()
        temp.insert(0,model)
        stats.loc[len(stats.index)] = temp


stats.to_csv("result.csv")

### Train theo Rolling Window