In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import warnings
warnings.filterwarnings('ignore')
import vectorbtpro as vbt
import os, sys, io, time, math, pickle, copy, json, asyncio, random, zipfile, functools, itertools
import datetime as dt
import talib.abstract as ta
import pandas as pd
import numpy as np

In [3]:
from dataloader import Dataloader
from backtest import Backtest, CommonParams, Direction

dataloader = Dataloader()
backtest = Backtest()

### Указываем пару и период с которыми будем работать

In [4]:
symbol = 'BTCUSDT'
start_date = '01.2023' # строго в формате MM.YYYY
end_date = '12.2023' # строго в формате MM.YYYY

### Скачивание исторических данных binance.futures
сохраняет csv файлы в klines/{symbol}. 
⚠️Пропустить если интересуемый период загружен

In [5]:
dataloader.download(symbol, start_date, end_date)

### Чтение данных
Это минутные свечи в дальнейшем будут использоваться:
1. Расчет точек входа на основе индикаторов
2. Имитация торгов на исторических данных

In [6]:
df = dataloader.read(symbol, start_date, end_date)

### Сигнал

In [7]:
# Подготовка данных для получения точек входа на основе сигналов
timeframe = '1h'

signal_df = dataloader.convert_tf(df, timeframe)
ohlcv = {c: signal_df[c].values for c in signal_df.columns}

In [8]:
def ma(source, length, ma_type):
    if ma_type == "SMA":
        return source.rolling(window=length).mean()
    elif ma_type == "Bollinger Bands":
        return source.rolling(window=length).mean()
    elif ma_type == "EMA":
        return source.ewm(span=length, adjust=False).mean()
    elif ma_type == "SMMA (RMA)":
        smma = [source[0]]
        for i in range(1, len(source)):
            smma.append((smma[i-1] * (length - 1) + source[i]) / length)
        return pd.Series(smma, index=source.index)
    elif ma_type == "WMA":
        weights = np.arange(1, length + 1)
        return source.rolling(window=length).apply(lambda prices: np.dot(prices, weights)/weights.sum(), raw=True)
    return None

def calculate_rsi_and_ma(prices):
    rsiLengthInput = 14
    rsiSourceInput = prices
    maTypeInput = "SMA"
    maLengthInput = 14
    bbMultInput = 2.0

    change = rsiSourceInput.diff()
    up = change.where(change > 0, 0)
    down = -change.where(change < 0, 0)
    up = ma(up, rsiLengthInput, "SMMA (RMA)")
    down = ma(down, rsiLengthInput, "SMMA (RMA)")

    rsi = 100 - (100 / (1 + up / down))

    rsiMA = ma(rsi, maLengthInput, maTypeInput)
    rsiMA = np.where(np.isnan(rsiMA), -1, rsiMA)
    rsi = np.where(np.isnan(rsi), -1, rsi)

    return {"rsi": rsi, "rsi_ma": rsiMA}

data = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'])
data['close'] = data['close'].astype(float)
prices = data['close'] # open, high, close, low
rsi_data = calculate_rsi_and_ma(prices)
rsi = rsi_data['rsi']
rsi_ma = rsi_data['rsi_ma']
correction = rsi - rsi_ma
result = np.where((correction >= 25) | (correction <= -25), 1, 0) # Тут коррекцию менять
signal_df['signal'] = result
print(print('signal count: ', signal_df['signal'].sum()))


signal count:  58
None


### Подготовка данных для бектеста

In [9]:
bdf = df.merge(signal_df[['datetime', 'signal']], on='datetime', how='left').fillna(0) # signal mapping with main df

In [10]:
step = .01 # шаг перебора параметров выхода
take_profits=np.arange(0.01, .10, step)                                   # take profit %
stop_losses=np.arange(0.01, .05, step)                                    # stop loss %
trailing_stop_activation=np.append(np.nan, np.arange(0.01, .05, step))    # trailing stop activation % - на каком процентном росте активируется трейлинг стоп
trailing_stops=np.append(np.nan, np.arange(0.01, .02, step/2))            # traling stop % - если np.nan то не используется
# формируем параметры
common_params = CommonParams(
    open_date_time=bdf['datetime'].values,
    open=bdf['open'].values,
    high=bdf['high'].values,
    low=bdf['low'].values,
    close=bdf['close'].values,
    entries=bdf['signal'].values,                                             # точки входа
    take_profits=take_profits,                                                # take profit %
    stop_losses=stop_losses,                                                  # stop loss %
    trailing_stop_activation=trailing_stop_activation,                        # trailing stop activation % - на каком процентном росте активируется трейлинг стоп
    trailing_stops=trailing_stops,                                            # traling stop % - если np.nan то не используется
    init_cach=10000,                                                          # начальный баланс $
    size=100,                                                                 # сумма входа в сделку $
    fees=.0004,                                                               # комиссия $
    slippage=.0001,                                                           # проскальзывание %
    direction=Direction.LONG                                                  # направление
)

In [11]:
# запуск бектеста
cpu_usage_weight = .08 # какой процент процессоров от общего количества использовать (для ускорения)
group_size = 20 # если ОЗУ позволяет на скачках то можно увеличить (для ускорения)
result = backtest.run(common_params, cpu_usage_weight, group_size)

  0%|          | 0/27 [00:00<?, ?it/s]

### Save backtest results

In [13]:
filename = f'{symbol} {timeframe} {start_date}-{end_date}.xlsx'
backtest.save(filename, result)