In [2]:
!pip install scikit-learn



In [None]:
!freqtrade download-data --config user_data\config\main_config.json --timeframes 15m --timerange 20230101-20240512

In [None]:
import pandas as pd
import talib
import numpy as np

class TechnicalDataPreparation():

    @staticmethod
    def apply_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
        """Applies the technical indicators functions from talib library."""

        upper_band, _, lower_band = talib.BBANDS(df["close"], nbdevup=2, nbdevdn=2, matype=0)
        df["BBANDS"] = (df["close"] - lower_band) / (upper_band - lower_band)
        log_return = np.log(df["close"]) - np.log(df['close'].shift(1))
        df["Z_score"] = ((log_return - log_return.rolling(20).mean()) / log_return.rolling(20).std())
        df["RSI"] = talib.RSI(df["close"]) / 100
        df["ULTOSC"] = (talib.ULTOSC(df["high"], df["low"], df["close"])) / 100
        df["pct_change"] = df["close"].pct_change()
        df["zs_Vol"] = (df["volume"] - df["volume"].mean()) / df["volume"].std()
        df["SMA_Ratio_21"] = (df["close"] - talib.SMA(df["close"], 21)) / talib.SMA(df["close"], 21)
        df["SMA_Ratio_21_50"] = (df["close"] - talib.SMA(df["close"], 21)) / talib.SMA(df["close"], 50)
        df["SMA_Ratio_50"] = (df["close"] - talib.SMA(df["close"], 50)) / talib.SMA(df["close"], 50)
        df["SMA_Ratio_50_100"] = (df["close"] - talib.SMA(df["close"], 50)) / talib.SMA(df["close"], 100)
        return df

    @staticmethod
    def apply_cdl_pattern(df: pd.DataFrame) -> pd.DataFrame:
        """Applies the technical candlechart pattern functions from talib library."""

        df["CDL_2CROWS"] = talib.CDL2CROWS(df["open"], df["high"], df["low"], df["close"]) / 100
        df["CDL_3BLACKCROWS"] = talib.CDL3BLACKCROWS(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_3WHITESOLDIERS"] = talib.CDL3WHITESOLDIERS(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_ABANDONEDBABY"] = talib.CDLABANDONEDBABY(
            df["open"], df["high"], df["low"], df["close"], penetration=0
        ) / 100
        df["CDL_BELTHOLD"] = talib.CDLBELTHOLD(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_COUNTERATTACK"] = talib.CDLCOUNTERATTACK(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_DARKCLOUDCOVER"] = talib.CDLDARKCLOUDCOVER(
            df["open"], df["high"], df["low"], df["close"], penetration=0
        ) / 100
        df["CDL_DRAGONFLYDOJI"] = talib.CDLDRAGONFLYDOJI(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_ENGULFING"] = talib.CDLENGULFING(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_EVENINGDOJISTAR"] = talib.CDLEVENINGDOJISTAR(
            df["open"], df["high"], df["low"], df["close"], penetration=0
        ) / 100
        df["CDL_EVENINGSTAR"] = talib.CDLEVENINGSTAR(
            df["open"], df["high"], df["low"], df["close"], penetration=0
        ) / 100
        df["CDL_GRAVESTONEDOJI"] = talib.CDLGRAVESTONEDOJI(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_HANGINGMAN"] = talib.CDLHANGINGMAN(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_HARAMICROSS"] = talib.CDLHARAMICROSS(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_INVERTEDHAMMER"] = talib.CDLINVERTEDHAMMER(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_MARUBOZU"] = talib.CDLMARUBOZU(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_MORNINGDOJISTAR"] = talib.CDLMORNINGDOJISTAR(
            df["open"], df["high"], df["low"], df["close"], penetration=0
        ) / 100
        df["CDL_MORNINGSTAR"] = talib.CDLMORNINGSTAR(
            df["open"], df["high"], df["low"], df["close"], penetration=0
        ) / 100
        df["CDL_PIERCING"] = talib.CDLPIERCING(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_RISEFALL3METHODS"] = talib.CDLRISEFALL3METHODS(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_SHOOTINGSTAR"] = talib.CDLSHOOTINGSTAR(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_SPINNINGTOP"] = talib.CDLSPINNINGTOP(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        df["CDL_UPSIDEGAP2CROWS"] = talib.CDLUPSIDEGAP2CROWS(
            df["open"], df["high"], df["low"], df["close"]
        ) / 100
        return df

    @staticmethod
    def check_label(x):
        if (abs((x['s-1'] - x['close_MA']) / x['close_MA']) > x['alpha']) and \
                (abs((x['s-1'] - x['close_MA']) / x['close_MA']) < (x['beta'])):
            if x['s-1'] > x['close_MA']:
                return -1
            if x['s-1'] < x['close_MA']:
                return 1
            else:
                return 0
        else:
            return 0

    @staticmethod
    def assign_labels(df: pd.DataFrame,
                      backw: int,
                      forwardw: int,
                      alpha: int,
                      beta: int) -> pd.DataFrame:
        df_copy = df.copy()
        df_copy["close_MA"] = df_copy["close"].ewm(span=backw).mean()
        df_copy["s-1"] = df_copy["close"].shift(-1 * forwardw)
        df_copy["alpha"] = alpha
        df_copy["beta"] = beta * (1 + (forwardw * 0.1))
        df_copy["label"] = df_copy.apply(TechnicalDataPreparation.check_label, axis=1)
        return df_copy["label"]


In [2]:
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import os

data_path = r"E:\Programming\Freqtrade\freqtrade\user_data\data\binance"
five_min_data = [file for file in os.listdir(path=data_path) if "15m" in file]

dfs = []

for file_name in five_min_data:
    df_temp = pd.read_feather(os.path.join(data_path, file_name))
    dfs.append(df_temp)

df = pd.concat(dfs, ignore_index=True)
df["date_small"] = pd.to_datetime(df["date"], format="%Y-%m-%d")
df.reset_index(drop=True, inplace=True)
df_rf = df.loc[(df["date_small"] >= "2023-01-01") & 
               (df["date_small"] <= "2024-01-01")]
#### backtest on data from 2024-01-02
df_rf = df_rf.drop(["date_small", "date"], axis=1)
df_rf.head()

Unnamed: 0,open,high,low,close,volume
0,0.383,0.385,0.383,0.383,22319.8
1,0.383,0.384,0.382,0.384,26600.9
2,0.384,0.384,0.381,0.382,26848.3
3,0.381,0.383,0.381,0.382,24329.6
4,0.382,0.383,0.382,0.383,12654.6


In [None]:
df_rf = TechnicalDataPreparation().apply_technical_indicators(df=df_rf)
df_rf = TechnicalDataPreparation().apply_cdl_pattern(df=df_rf)
y = TechnicalDataPreparation().assign_labels(df=df_rf, backw=5,
                                                        forwardw=1,
                                                        alpha=0.038,
                                                        beta=0.24)

In [None]:
import talib.abstract as ta
from technical import qtpylib
df_rf["rsi"] = ta.RSI(df_rf)
df_rf["slowk"] = ta.STOCH(df_rf)["slowk"]
df_rf["fastd"] = ta.STOCH(df_rf)["slowd"]
df_rf["williams_r"] = ta.WILLR(df_rf)
macd = ta.MACD(df_rf)
df_rf["macd"] = macd["macd"]
df_rf["macdsignal"] = macd["macdsignal"]
df_rf["OBV"] = ta.OBV(df_rf)
df_rf.dropna(inplace=True)
df_rf.head()

Unnamed: 0,open,high,low,close,volume,rsi,slowk,fastd,williams_r,macd,macdsignal,OBV
33,0.384,0.384,0.384,0.384,50.0,49.642574,66.666667,88.888889,-50.0,0.000522,0.000442,129808.2
34,0.385,0.385,0.385,0.385,26.5,55.902678,66.666667,77.777778,-0.0,0.000541,0.000462,129834.7
35,0.384,0.384,0.384,0.384,35.0,49.302285,33.333333,55.555556,-50.0,0.00047,0.000464,129799.7
36,0.384,0.385,0.384,0.384,10497.4,49.302285,33.333333,44.444444,-50.0,0.000409,0.000453,129799.7
37,0.384,0.384,0.383,0.383,1789.0,43.364302,0.0,22.222222,-100.0,0.000277,0.000418,128010.7


In [None]:
X = df_rf
y = [1 if signal else 0 for signal in (df_rf["close"] / df_rf["open"] > 1.02)]

In [14]:
X_train, X_test, y_train, y_test = train_test_split(df_rf, y, test_size=0.2, random_state=42, stratify=y)

In [136]:
rf = RandomForestClassifier(random_state=42)

param_grid = {
    'n_estimators': [50, 100, 200],  # Number of trees in the forest
    'max_depth': [None, 10, 20, 30],  # Maximum depth of the tree
    'min_samples_split': [2, 5, 10],  # Minimum number of samples required to split an internal node
    'min_samples_leaf': [1, 2, 4],  # Minimum number of samples required at each leaf node
    'max_features': ['auto', 'sqrt', 'log2'],  # Number of features to consider when looking for the best split
    'bootstrap': [True, False]  # Whether bootstrap samples are used when building trees
}

grid_search = GridSearchCV(estimator=rf, param_grid=param_grid, cv=5, n_jobs=-3, verbose=2)

grid_search.fit(X_train, y_train)

print("Best Parameters:", grid_search.best_params_)
print("Best Score:", grid_search.best_score_)

Fitting 5 folds for each of 648 candidates, totalling 3240 fits


KeyboardInterrupt: 