In [23]:
import numpy as np
from numba import njit
from functools import partial
import talib 
import pandas as pd
import vectorbt as vbt
import vectorbt as vbt
from vectorbt.utils.colors import adjust_opacity
from vectorbt.utils.enum_ import map_enum_fields
from vectorbt.base.reshape_fns import broadcast, flex_select_auto_nb, to_2d_array
from vectorbt.portfolio.enums import SizeType, Direction, NoOrder, OrderStatus, OrderSide
from vectorbt.portfolio import nb
from statsmodels.tsa.stattools import adfuller
from joblib import Parallel, delayed
import sys
import os
function_essential_path = r"C:\Users\Jordi\Desktop\Environement de developement\Trading_Dev_Stratégie_Environement\FunctionEssential"
function_s_score_path = r'C:\Users\Jordi\Desktop\Environement de developement\Trading_Dev_Stratégie_Environement\Indicateur'
sys.path.append(function_essential_path)
sys.path.append(function_s_score_path)
import utils as us
import s_score as score

In [27]:
##FONCTION##
def compute_rolling_adf_pvalues(close_prices, window_size, num_jobs=-1):

    if not isinstance(close_prices, pd.Series):
        raise ValueError("close_prices doit être un pd.Series avec un index valide.")
    original_index = close_prices.index
    close_prices = close_prices.values  
    def compute_adf_pvalue(window_data):
        if np.count_nonzero(~np.isnan(window_data)) >= window_size / 2:  
            result = adfuller(window_data)
            return result[1]  
        else:
            return np.nan  
    p_values = Parallel(n_jobs=num_jobs)(
        delayed(compute_adf_pvalue)(close_prices[i:i + window_size])
        for i in range(len(close_prices) - window_size + 1)
    )
    p_values_series = pd.Series([np.nan] * (window_size - 1) + p_values, index=original_index)
    return p_values_series

In [30]:
##FONCTION LOGIQUE SIGNALS##

def entries_trades(close,sscore,adf_m30,adf_h1,adf_h4,seuil_score):
    long = np.full_like(close, False, dtype=bool)
    short = np.full_like(close, False, dtype=bool)
    trade_out = True
    for idx in range(len(close)):

        if (adf_m30[idx] < 0.05) & (adf_h1[idx] < 0.05) & (adf_h4[idx] < 0.05):

            if trade_out:
                if sscore[idx] <= -seuil_score:
                    long[idx] = True
                    trade_out = False
                if sscore[idx] >= seuil_score:
                    short[idx] = True
                    trade_out = False
            else:
                if (sscore[idx] <= 0.5) & (sscore[idx] >= -0.5):
                    trade_out = True






def exits_trades(long,short,close,sscore,seuil_sscore_out,seuil_sl):
    long_exit = np.zeros_like(long, dtype=np.float64)
    short_exit = np.zeros_like(short, dtype=np.float64)

    for col in range(long.shape[1]):
        for idx in range(len(close)):

            if long[idx:col]:
                SL = close[idx:col] * (1-seuil_sl)
                for j in range(idx+1,len(close)):
                    if close[j:col] <= SL:
                        long_exit[j:col] = SL
                    if sscore[j:col] >= -seuil_sscore_out:
                        long_exit[j:col] = close[j:col]

            if short[idx:col]:
                SL = close[idx:col] * (1+seuil_sl)
                for j in range(idx+1,len(close)):
                    if close[j:col] >= SL:
                        short_exit[j:col] = SL
                    if sscore[j:col] <= seuil_sscore_out:
                        short_exit[j:col] = close[j:col]

    return long_exit,short_exit



In [29]:
##INDICATEUR FACTORY##

Entry = vbt.IndicatorFactory(
    class_name="entries_trades",
    input_names=["close","sscore","adf_m30","adf_h1","adf_h4"],
    param_names=["seuil_score"],
    output_names=["long","short"],
).from_apply_func(entries_trades)

Exit = vbt.IndicatorFactory(
    class_name="exits_trades",
    input_names=["long","short","close","sscore"],
    param_names=["seuil_sscore_out","seuil_sl"],
    output_names=["long","short"],
).from_apply_func(exits_trades)

In [5]:
@njit
def pre_sim_func_nb(c):
    entry_price = np.full(c.target_shape[1], np.nan, dtype=np.float64)  
    return (entry_price,)

@njit
def order_func_nb(c, entry_price, long, short, long_exit,short_exit, size):
    price_now = nb.get_elem_nb(c, c.close)
    entry_long_now = nb.get_elem_nb(c, long)
    entry_short_now = nb.get_elem_nb(c, short)


    if entry_long_now:
        if c.position_now == 0:
            return nb.order_nb(
                size[c.col], 
                price=price_now,
                direction=Direction.LongOnly,
                fees=0.001
            )
        elif c.position_now < 0:
            return nb.order_nb(
                -size[c.col],  
                price=price_now,
                direction=Direction.ShortOnly,
                fees=0.0005
            )  

    if entry_short_now:
        if c.position_now == 0:
            return nb.order_nb(
                size[c.col], 
                price=price_now,
                direction=Direction.ShortOnly,
                fees=0.001
            )
        elif c.position_now > 0:
            return nb.order_nb(
                -size[c.col],  
                price=price_now,
                direction=Direction.LongOnly,
                fees=0.0005
            )

    #Si la valeur de l'idx de la sortie est différente de 0 ca veut dire que on doit out et on récupere la valeur de notre np.array long_exit 
    if c.position_now > 0:
        long_exit_now = nb.get_elem_nb(c, long_exit)
        if long_exit_now != 0:
            return nb.order_nb(
                -size[c.col],
                price=long_exit_now,
                direction=Direction.LongOnly,
                fees=0.0005
            )

    if c.position_now < 0:
        short_exit_now = nb.get_elem_nb(c, short_exit)
        if short_exit_now != 0:
            return nb.order_nb(
                -size[c.col],
                price=short_exit_now,
                direction=Direction.ShortOnly,
                fees=0.0005
            )
       

    return NoOrder

@njit
def post_order_func_nb(c, entry_price):
    if c.order_result.status == OrderStatus.Filled:
        entry_price[c.col] = c.order_result.price

In [None]:
##CLASS STRATEGIE##

class Strategie():

    def __init__(self,data,tickers,frequence):
        self.data = data
        self.tickers = tickers
        self.frequences = frequence
        self.stop_loss = None


    def set_params_strategie(self,sl,out,sscore,mf):
        self.stop_loss = sl
        self.ssscore_out = out
        self.sscore = sscore
        self.main_frequences = mf


    def backtest(self):

            entries_long_df = pd.DataFrame()   
            entries_short_df = pd.DataFrame()
            exits_long_df = pd.DataFrame()
            exits_short_df = pd.DataFrame()
            close_df = pd.DataFrame()
            datetime_df = pd.DataFrame()

            for ticker in self.tickers:

                data = self.data[self.main_frequences][ticker]
                datetime = data.reset_index()['Datetime']

                adf_m30 = compute_rolling_adf_pvalues(self.data['M30'][ticker]['close'],250)
                adf_h1 = compute_rolling_adf_pvalues(self.data['H1'][ticker]['close'],125)
                adf_h4 = compute_rolling_adf_pvalues(self.data['H4'][ticker]['close'],30)

                adf_m30_ressample = us.resample_shape_frequence(data['close'],adf_m30)
                adf_h1_ressample = us.resample_shape_frequence(data['close'],adf_h1)
                adf_h4_ressample = us.resample_shape_frequence(data['close'],adf_h4)

                us.get_details_variable([adf_m30_ressample,adf_h1_ressample,adf_h4_ressample,data['close']])

                
                trades = Entry.run(
                    data['close']

                )

                exit = Exit.run(

                )

                us.print_trades(0,trades.long,trades.short,None,None,data)

            #     entries_long_df[ticker] = trades.long.reset_index(drop=True)
            #     entries_short_df[ticker] = trades.short.reset_index(drop=True)
            #     exits_long_df[ticker] = exit.long.reset_index(drop=True)
            #     exits_short_df[ticker] = exit.short.reset_index(drop=True)
            #     close_df[ticker] = data['close'].reset_index(drop=True)
            #     datetime_df[ticker] = datetime.reset_index(drop=True)

                

            # size = np.array([600000]*len(self.tickers))
            # exits_long_df = exits_long_df.astype(np.float64).fillna(0)
            # exits_short_df = exits_short_df.astype(np.float64).fillna(0)
            # s_score_ajuster = np.asarray(s_score_ajuster, dtype=np.float64).reshape(-1,1)
            
            # portfolio = vbt.Portfolio.from_order_func(
            #     close_df,  
            #     order_func_nb,  
            #     entries_long_df.values,  
            #     entries_short_df.values,  
            #     exits_long_df.values,
            #     exits_short_df.values,
            #     size,  
            #     freq=self.frequences,  
            #     init_cash=100000,  # Ajout du capital initial
            #     pre_sim_func_nb=pre_sim_func_nb,  
            #     post_order_func_nb=post_order_func_nb,  
            #     broadcast_named_args=dict(  
            #         long=entries_long_df.values,  
            #         short=entries_short_df.values,  
            #         exits_long_df=exits_long_df.values,  
            #         exits_short_df=exits_short_df.values,  
            #         size=size  
            #     )
            # )
            # us.rapport_backtest(portfolio,close_df,datetime_df,self.tickers)
            # return portfolio

    def optimize(self,fitness_func,choice,period1,period2,params1,params2):
         
         if choice == 1:

            combinaison=[]
            stop_loss = []
            seuil_out = []
            for i in params1:
                for j in params2:
                    combinaison.append((i,j))
                    seuil_out.append(i)
                    stop_loss.append(j)

            trades = Entry.run(

            )

            exit = Exit.run(

            )

            long = trades.long
            short = trades.short
            long_exit = exit.long
            short_exit = exit.short


            long = long.reset_index(drop=True)
            short = short.reset_index(drop=True)
            long_exit = long_exit.reset_index(drop=True)
            short_exit = short_exit.reset_index(drop=True)
            
            close = close.reset_index(drop=True)
            close = pd.DataFrame(close)  

            capital = 100000  
            size = np.array([(0.05 * capital) / (0.02 * close.iloc[0, i]) for i in range(close.shape[1])])
            size = np.array([size[0]] * len(stop_loss))
            long = np.tile(long.values, (len(stop_loss), 1)).T
            short = np.tile(short.values, (len(stop_loss), 1)).T


            portfolio = vbt.Portfolio.from_order_func(
                close,  
                order_func_nb,  
                long,  
                short,  
                long_exit.values,
                short_exit.values,
                size,  
                freq=self.frequences,  
                init_cash=100000,  
                pre_sim_func_nb=pre_sim_func_nb,  
                post_order_func_nb=post_order_func_nb,  
                broadcast_named_args=dict(  
                    long=long,  
                    short=short,  
                    exits_long_df=long_exit.values,  
                    exits_short_df=short_exit.values,  
                    size=size  
                )
            )
            

            perf = us.get_heatmap(portfolio,combinaison,fitness_func)
            # us.get_3d_surface_metrics(2,portfolio,combinaison,fitness_func,"seuil_out","sl %",30,90)
            param1,param2,metrics = us.get_best_param(perf)
            return param1,param2



    def wfa(self,period_OOS, period_IOS):
 
        close_df = pd.DataFrame()
        for ticker in self.tickers:

            data = self.data[ticker]
            cycle = us.generate_wf_cycles(data, period_IOS, period_OOS)
            close = data['close']
            close_df[ticker] = close

            for i in range(len(cycle)):

                close_IOS = data["close"].iloc[cycle[i][0]:cycle[i][1]]
                close_OOS = data["close"].iloc[cycle[i][2]:cycle[i][3]]

                

                params1 = np.arange(10, 16, 1)
                params2 = np.arange(15, 26, 1)
                params1_opti, params2_opti = self.optimize()
                
                params3 = np.round(np.arange(0.5, -0.6, -0.1), 1)
                params4 = np.arange(0.009, 0.016, 0.001)
                params3_opti, params4_opti = self.optimize()

                #premiere itération on save la période IOS pour afficher l'entierté de mon jeu de donnée 
                if i == 0:

                    trades = Entry.run(
                        )
                    
                    exit = Exit.run(
                    )

                    entries_long = trades.long.to_frame(name=ticker)
                    entries_short = trades.short.to_frame(name=ticker)
                    exits_long = exit.long.to_frame(name=ticker)
                    exits_short = exit.short.to_frame(name=ticker)


                trades = Entry.run(

                )
                
                exit = Exit.run(
                    )

                entries_long = pd.concat([entries_long, trades.long.to_frame(name=ticker)], ignore_index=True)
                entries_short = pd.concat([entries_short, trades.short.to_frame(name=ticker)], ignore_index=True)
                exits_long = pd.concat([exits_long, exit.long.to_frame(name=ticker)], ignore_index=True)
                exits_short = pd.concat([exits_short, exit.short.to_frame(name=ticker)], ignore_index=True)




            capital = 100000   
            size = np.array([(0.05 * capital) / (0.02 * close_df.iloc[0, i]) for i in range(len(self.tickers))])
            stop_loss = np.array([0.01] * len(self.tickers))  
            take_profit = np.array([0.02] * len(self.tickers))

            portfolio = vbt.Portfolio.from_order_func(
                close,  
                order_func_nb,  
                entries_long.values,  
                entries_short.values,  
                exits_long.values,
                exits_short.values,
                size,  
                freq=self.frequences,  
                init_cash=100000,  # Ajout du capital initial
                pre_sim_func_nb=pre_sim_func_nb,  
                post_order_func_nb=post_order_func_nb,  
                broadcast_named_args=dict(  
                    long=entries_long.values,  
                    short=entries_short.values,  
                    exits_long=exits_long.values,  
                    exits_short=exits_short.values,  
                    size=size  
                )
            )
            us.get_pnl(portfolio)
            return portfolio



In [14]:
tickers = ['EURUSD']
DATA = us.get_data_forex(tickers,['M15','M30','H1','H4'],"2003-01-01","2013-01-01")
s_score_value = {}
for ticker in tickers:
    data_traitement = DATA['M15'][ticker]
    nb_nan = data_traitement['close'].isna().sum()
    close = data_traitement['close'].dropna()
    close_without_index = close.reset_index()
    result = score.mean_reversion(close, close_without_index['Datetime'], 85, "VOL", 0.5, "EGARCH")
    result_df = pd.DataFrame({
        's_score_value': result[0],
        'adjusted_s_scores_vol': result[1],
        'theta_vals': result[2],
        'mu_vals': result[3],
        'sigma_vals': result[4]
    })
    nan_rows = pd.DataFrame([[float('nan')] * result_df.shape[1]] * nb_nan, columns=result_df.columns)
    result_df = pd.concat([result_df, nan_rows], ignore_index=True)

    s_score_value[ticker] = result_df

  ohlc_resampled = df.resample(freq).agg({
Estimation des paramètres OU:   0%|          | 0/254431 [00:00<?, ?it/s]

  0%|          | 0/254431 [00:00<?, ?it/s]

Estimation des paramètres OU:   0%|          | 0/254240 [1:08:10<?, ?it/s]
  sigma_eq = sigma / np.sqrt(2 * theta)
Inequality constraints incompatible
See scipy.optimize.fmin_slsqp for code meaning.

Calcul de la volatilité conditionnelle: 100%|██████████| 254516/254516 [00:00<00:00, 268999.49it/s]


254515


  result_df = pd.concat([result_df, nan_rows], ignore_index=True)


In [35]:
strat = Strategie(DATA,tickers,"15m")

strat.set_params_strategie(0.01,-0.5,'M15')
strat.backtest()

Variable : None
Type : <class 'pandas.core.series.Series'>
Shape : (254516,)


Variable : None
Type : <class 'pandas.core.series.Series'>
Shape : (254516,)


Variable : None
Type : <class 'pandas.core.series.Series'>
Shape : (254516,)


Variable : close
Type : <class 'pandas.core.series.Series'>
Shape : (254516,)


