In [1]:
import numpy as np
import pandas as pd
import os
from tqdm.notebook import tqdm
import talib

In [None]:
import os
import requests


def download_binance_data(config):
    """Download historical data from Binance"""
    base_url = "https://data.binance.vision/data/spot/monthly/klines"
    os.makedirs(os.path.join(config["data_dir"], config["interval"]), exist_ok=True)

    for d in pd.date_range(config["start_date"], config["end_date"], freq="M"):
        # Fixed filename formatting - using month number directly
        filename = f"{config['ticker']}-{config['interval']}-{d.year}-{d.month:02d}.zip"
        save_path = os.path.join(config["data_dir"], config["interval"], filename)

        if not os.path.exists(save_path):
            url = f"{base_url}/{config['ticker']}/{config['interval']}/{filename}"
            print(f"Trying to download: {url}")  # Added for debugging
            response = requests.get(url)
            if response.status_code == 200:
                with open(save_path, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {filename}")
            else:
                print(
                    f"Failed to download {filename} (Status code: {response.status_code})"
                )


# Run this once to download the data
# download_binance_data(data_config)

In [2]:
data_dir = os.path.join('..', 'data',  'BTCUSDT', '1h')

In [3]:
data_config = {
  'data_dir': os.path.join('..', 'data',  'BTCUSDT'),
  'names': ["open time", "open", "high", "low", "close", "volume",
            "close time", "quote asset volume", "number of trades",
            "taker buy base asset volume", "taker buy quote asset volume", "date"],
  'columns': ["open time", "open", "high", "low", "close", "volume"], 
  'start_date': '2017-08', 
  'end_date': '2022-06', 
  'start_date_training': '2018-01-01',
  'ticker': 'BTCUSDT',
  'interval': '1h', 
}

class DataLoader():
    def __init__(self, config):
        self.config = config
        self.data = self.load_data()
        
    def load_data(self):
        """
        Load data and return prepared data frame
        """
        df = pd.DataFrame()
        for d in tqdm(pd.date_range(self.config['start_date'], self.config['end_date'], freq='M')):
            temp_dir = os.path.join(self.config['data_dir'], self.config['interval'],
                                    '{}-{}-{}-{:02}.zip'.format(self.config['ticker'],
                                                         self.config['interval'], 
                                                         d.year, 
                                                         d.month))
            df = df.append(pd.read_csv(temp_dir, names=self.config['names']))
        
        df = df[self.config['columns']]
        df['date'] = pd.to_datetime(df['open time'], unit='ms')
        df = df.sort_values(by = 'date')
        
        d1 = df['date'].to_list()[0]
        d2 = df['date'].to_list()[-1]
        
        df = df.set_index('date')
        df = df.reindex(pd.date_range(d1, d2, freq='H')).fillna(method = 'ffill')
        
        #df["log_return"] = np.log(df['close'] / df['close'].shift(1))
        df["return"] = df['close'].pct_change()
        df = df.drop('open time', axis=1)
        return df
    
    def generate_features(self):
        
        for i in [5, 10, 15, 20]:
            self.data[f'MA_{i}'] = talib.MA(self.data['close'], timeperiod=i)
            self.data[f'MA_{i}'] = self.data[f'MA_{i}']/self.data['close']
        
        for i in [7, 14, 21]:
            self.data[f'RSI_{i}'] = talib.RSI(self.data['close'], timeperiod=i)
            self.data[f'MFI_{i}'] = talib.MFI(self.data['high'],
                                              self.data['low'],
                                              self.data['close'],
                                              self.data['volume'],
                                              timeperiod=i)
        
        self.data['target_return'] = self.data['return'].shift(-1)
        self.data['target'] = self.data['target_return'].apply(lambda x: 1 if x > 0 else 0)
        self.data = self.data.dropna() 
        self.data = self.data[self.data.index >= pd.to_datetime(self.config['start_date_training'])]


In [None]:
dt = DataLoader(data_config)
df = dt.load_data()
dt.generate_features()
dt.data.head(10)

In [7]:
from xgboost import XGBClassifier

trader_config = {
    'features': ['MA_5', 'MA_10', 'MA_15', 'MA_20',
                 'RSI_7', 'MFI_7', 'RSI_14', 'MFI_14',
                 'RSI_21', 'MFI_21'],
    # out of sample start date
    'oos_start_date': '2022-01-01', 
    # look back (training size) in days
    'look_back': 90,
    # training size
    'step': 30,
}

class HourlyBacktester():
    def __init__(self, data, config):
        self.config = config
        self.full_data = data
        self.df_res = None
        
    def get_oos_data(self):
        """
            Returns out-of-sample data
        """
        return self.full_data[self.full_data.index >= pd.to_datetime(self.config['oos_start_date'])]
    
    def get_is_data(self):
        """
            Returns in-sample data
        """
        return self.full_data[self.full_data.index < pd.to_datetime(self.config['oos_start_date'])]
    
    @staticmethod
    def get_model(xgb_param):
        """
            Returns model
        """
        params = xgb_param.copy()
        params["eval_metric"] = "logloss"  # Add eval_metric to model parameters
        return XGBClassifier(**params, objective="binary:logistic")

    def apply_strategy(self, params, look_back, oos=False):
        """
            The main logic of the strategy
        """
        self.config["look_back"] = look_back

        if oos:
            df = self.get_oos_data()
            dft = self.get_is_data()
            df = dft[-self.config['look_back']:].append(df)
        else:
            df = self.get_is_data()
            
        self.df_res = pd.DataFrame()
        i = 0
        with tqdm(total = len(df)) as pbar:
            pbar.update(self.config['look_back']*24)
            while True:
                train_start = i*self.config['step']*24
                train_end = train_start + self.config['look_back']*24
                test_end = train_end + self.config['step']*24
                if train_end >= len(df):
                    break

                df_train = df[train_start:train_end]
                df_test = df[train_end:test_end]

                x_tr = df_train[self.config['features']]
                x_test = df_test[self.config['features']]
                y_tr = df_train['target']

                clf = self.get_model(params)
                # Remove eval_metric from fit() call
                clf.fit(x_tr, y_tr)

                pred = clf.predict(x_test)
                df_pred = pd.DataFrame({
                    'date': df_test.index,
                    'target_return': df_test['target_return'],
                    'target': df_test['target'],
                    'prediction': pred
                })
                if len(self.df_res):
                    self.df_res = self.df_res.append(df_pred)
                else:
                    self.df_res = df_pred
                pbar.update(self.config["step"] * 24)
                i += 1

    def get_score(self):
        
        self.df_res['hourly_return'] = self.df_res[['target_return', 'target', 'prediction']].apply(
           lambda row: np.abs(row[0]) if row[1] == row[2] else -np.abs(row[0]) , axis=1
        )
        
        self.df_res['cum_ret'] = self.df_res['hourly_return'].cumsum()
        
        return self.df_res['cum_ret'].to_list()[-1]

    def objective(self, trial):
        
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 350, 1000),
            'max_depth': trial.suggest_int('max_depth', 3, 10),
            'learning_rate': trial.suggest_uniform('learning_rate', 0.01, 0.10),
            'subsample': trial.suggest_uniform('subsample', 0.50, 0.90),
            'colsample_bytree': trial.suggest_uniform('colsample_bytree', 0.50, 0.90),
            'gamma': trial.suggest_int('gamma', 0, 20), 
        }
        
        look_back = trial.suggest_int('look_back', 30, 180)
        
        self.apply_strategy(params, look_back)
        
        return self.get_score()

In [None]:
import optuna
import warnings
import neptune
import neptune.integrations.optuna as optuna_utils

warnings.filterwarnings("ignore")

run = neptune.init_run(
    project=os.getenv("NEPTUNE_PROJECT_NAME"),
    api_token=os.getenv("NEPTUNE_API_TOKEN"),
)

neptune_callback = optuna_utils.NeptuneCallback(run)

hb = HourlyBacktester(dt.data, trader_config)
n_trials = 20

study = optuna.create_study(direction="maximize")
study.optimize(hb.objective, n_trials=n_trials, callbacks=[neptune_callback])

In [None]:
run.stop()

In [None]:
best_params = study.best_params
look_back = best_params.pop('look_back')

hb = HourlyBacktester(dt.data, trader_config)
hb.apply_strategy( best_params, look_back)
hb.get_score()

hb.df_res.head()

In [None]:
import matplotlib.pyplot as plt

plt.rcParams["figure.figsize"] = (11, 7)
hb.df_res['buy_and_hold_ret'] = (hb.df_res['target_return'] + 1).cumprod() - 1
hb.df_res[['cum_ret', 'buy_and_hold_ret']].plot()
plt.legend(['Cumulative return XGB', 'Cumulative return buy and hold'])
plt.grid()
plt.ylabel('Cumulative return')
plt.title('Cumulative return without transaction costs')
plt.show()

In [None]:
hb = HourlyBacktester(dt.data, trader_config)
hb.apply_strategy( best_params, look_back, oos=True)
hb.get_score()
plt.rcParams["figure.figsize"] = (11, 7)
hb.df_res['buy_and_hold_ret'] = (hb.df_res['target_return'] + 1).cumprod() - 1
hb.df_res[['cum_ret', 'buy_and_hold_ret']].plot()
plt.legend(['Cumulative return XGB', 'Cumulative return buy and hold'])
plt.grid()
plt.ylabel('Cumulative return')
plt.title('Out-of-sample cumulative return without transaction costs')
plt.show()

In [None]:
from unittest.mock import Mock

json = Mock()
json.order = lambda x, z: {"executedQty":1,
                             "cummulativeQuoteQty":np.random.rand()+1}