In [5]:
import pandas as pd

import numpy as np

import matplotlib.pyplot as plt

np.random.seed(42)


df = pd.read_csv('data/binance/BTCUSDT_1m_Futures.csv', index_col=0, parse_dates=True)
data = df[['Open', 'High', 'Low', 'Close', 'Volume']][-100000:]


data



Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2024-04-24 04:02:00,66432.5,66446.6,66415.3,66423.2,166.851
2024-04-24 04:03:00,66423.2,66423.3,66318.0,66403.0,864.803
2024-04-24 04:04:00,66402.9,66427.9,66380.0,66415.8,122.492
2024-04-24 04:05:00,66415.8,66416.3,66385.0,66416.3,89.957
2024-04-24 04:06:00,66416.2,66416.2,66300.0,66355.9,469.749
...,...,...,...,...,...
2024-07-02 14:37:00,62093.7,62093.7,62080.0,62093.0,39.111
2024-07-02 14:38:00,62092.9,62122.9,62086.6,62117.6,90.510
2024-07-02 14:39:00,62117.6,62142.0,62111.0,62142.0,63.159
2024-07-02 14:40:00,62142.0,62150.4,62091.4,62095.1,123.362


In [44]:
from numba import jit, njit, prange, vectorize, float64, int64, cuda, float32, guvectorize, void
from numba import float32, int32, int64, void, float64
from numba.typed import List
from numba.types import UniTuple

import numba as nb


nb_float = float64
nb_int = int64



class Backtest:
    def __init__(self, data, ma_lenth=100, num_grid=100, fee=-0.00015, pyrimading=1, test_size=0.2, Rf= 8/365/24/60):
        
        self.Rf = Rf
        self.pyramiding = pyrimading
        self.fee = fee
        self.num_grid = num_grid

        
        stop_idx = data[(data['SMA'] < data['High']) & (data['SMA'] > data['Low'])].index[-1]
        data = data[:stop_idx].dropna()
        self.data = data
        
        self.split_idx = int(len(data) * (1 - test_size))
        
        self.decimal = len(str(data.iloc[0]['Close']).split('.')[1]) if '.' in str(data.iloc[0]['Close']) else 0
        self.data.loc[:, 'SMA'] = self.data['Close'].rolling(window=ma_lenth).mean().round(self.decimal)
        

        self.orders = self.get_orders(self.data.iloc[:self.split_idx], num_grid=num_grid)
        
        self.amount = np.ones((self.num_grid*2, self.pyramiding), dtype=np.float64)/(self.pyramiding*2*self.num_grid)*100
        
    @staticmethod
    def get_orders(data, num_grid=100):
        
        max_diff = ((data['SMA']-data['Close'])/data['SMA']).abs().max()

        id = np.concatenate([np.arange(-num_grid, 0, 1, dtype=int), np.arange(1, num_grid+1, dtype=int)])
        orders = {
            'id': id,
            'open_price': -id * max_diff/num_grid,
            'close_price': np.zeros(num_grid*2),
            'direction': np.sign(id),
            }

        orders = pd.DataFrame(orders)
        
        return orders

    @staticmethod
    # @njit([UniTuple(nb_float[:, :], 2)(nb_float[:, :], nb_float[:, :], nb_float, nb_float[:])], parallel=True, fastmath=True)
    @njit(parallel=True)
    def run_jit(data, orders, amount, fee):
        
        balance = np.zeros(len(data), dtype=np.float64)
        
        # pos = np.zeros((len(data), len(orders)), dtype=np.float64)
        
        # usdt_b = np.zeros(len(data), dtype=np.float64)
        
        normal_qty = 1 - fee
        
        for j in prange(len(orders)):
            
            id, open_on, close_on, direction = orders[j]
            
            pyramiding = amount[j]
            
            num_positions = 0
            
            usdt = 0
            
            crypto = 0

            for i in range(len(data)):
                open, high, low, close, volume, sma = data[i]
                open_price = (open_on +1 ) * sma
                close_price = (close_on +1 ) * sma
                
                if i == 0:
                    if direction > 0:
                        if open < open_price:
                            num_positions += 1
                            crypto += amount[j, 0]/open_price
                            usdt -= normal_qty * pyramiding[0]
                            # pos[i, j] = open_price
                            
                    else:
                        if open > open_price:
                            num_positions += 1
                            crypto -= amount[j, 0]/open_price
                            usdt += normal_qty * pyramiding[0]
                            # pos[i, j] = open_price

                for k in range(num_positions + 1):
                    if k == num_positions:

                        if direction > 0:

                            if low < open_price and open > open_price and k < len(pyramiding) - 1:

                                num_positions += 1
                                crypto += pyramiding[k]/open_price
                                usdt -= normal_qty * pyramiding[k]
                                # pos[i, j] = open_price

                        else:
                            if high > open_price and open < open_price and k < len(pyramiding) - 1:
                                num_positions += 1
                                crypto -= pyramiding[k]/open_price
                                usdt += normal_qty * pyramiding[k]
                                # pos[i, j] = open_price
                    else:
                        if direction > 0:
                            if high > close_price and open < close_price:
                                num_positions = 0
                                usdt += normal_qty * crypto * close_price
                                crypto = 0

                        else:    
                            if low < close_price and open > close_price:
                                num_positions = 0
                                usdt += normal_qty * crypto * close_price
                                crypto = 0
                
                # usdt_b[i] += usdt
                balance[i] += crypto * close + usdt
                              
        return balance, None, None
    
    
    def run(self):
        
        # positions = self.run_jit(self.data.to_numpy(), self.orders.to_numpy())

        # self.positions = positions
        self.balance, self.pos, self.usdt_b = self.run_jit(self.data.to_numpy(), self.orders.to_numpy(), self.amount, self.fee)
        self.drawdown = self.cummax_diff(self.balance)
        self.returns = np.diff(self.balance)
        
    def save(self, train_stats, val_stats, amount):
        import json
        with open('stats.json', 'w') as f:
            json.dump({
                'amount': amount.tolist(),
                'train_stats': train_stats,
                'val_stats': val_stats,

            }, f)
            
    
    @staticmethod
    def get_stats(returns, drawdown, Rf):
            
            mean = np.mean(returns)
            dev = np.abs(returns - mean).mean()
            std = np.std(returns)
            max_drawdown = -np.min(drawdown)
            
            sharpe = (mean - Rf) / std
            sharped = (mean - Rf) / dev
            calmar = (mean-Rf) / max_drawdown
            sortino = (mean - Rf) / np.std(returns[returns < 0])
            
            return mean, std, max_drawdown, sharpe, sharped, calmar, sortino
        
    def stats(self, print_stats=False):
        train_stats = self.get_stats(self.returns[:self.split_idx], self.drawdown[:self.split_idx], self.Rf)
        val_stats = self.get_stats(self.returns[self.split_idx:], self.drawdown[self.split_idx:]-self.drawdown[self.split_idx], self.Rf)
        if print_stats:
            print('Train', *train_stats)
            print('Test', *val_stats)
        # else:
        #     self.save(train_stats, val_stats, self.amount)
        return train_stats, val_stats
    
    def evaluate(self, amount):
        self.amount = amount
        self.run()
        return self.stats()[0][3]
        

    @staticmethod
    @njit(parallel=True)
    def cummax_diff(A):
        B = np.zeros_like(A)
        max_val = A[0]
        for i in prange(len(A)):
            if A[i] < max_val:
                B[i] = A[i] - max_val
            else:
                max_val = A[i]
                B[i] = 0
        return B

    def test_jit(self):
    
        self.drawdown = self.portfolio.reshape(len(self.portfolio), -1).sum(axis=1)
        
    def plot(self):
        
        # plt.figure(figsize=(20, 10))
        # # plt.plot(self.data['Close'], label='Close')
        # plt.plot(self.data['SMA'], label='SMA')
        # # plt.plot(self.data['High'], label='High')   
        # # plt.plot(self.data['Low'], label='Low')
        
        # idx = np.where(self.pos != 0)
        # plt.scatter(self.data.iloc[idx[0]].index, self.pos[idx], c='m', s=4)
        # plt.legend()
        
        # plt.show()
        
        plt.figure(figsize=(20, 10))
        plt.plot(self.balance)
        plt.ylabel('gain in % of initial balance')
        plt.xlabel('Time')
        plt.title('Balance')
        plt.show()
        plt.figure(figsize=(20, 10))
        plt.plot(self.drawdown, label='Drawdown')
        plt.plot(np.zeros(self.drawdown.shape[0]), c='r')
        plt.show()
        # plt.figure(figsize=(20, 10))
        # plt.plot(self.usdt_b, label='USDT balance')
        # plt.show()

bt = Backtest(data, pyrimading=20)

bt.run()
bt.stats()

((np.float64(1.0187045794043372e-06),
  np.float64(0.0023090566774794253),
  np.float64(0.48658185156637934),
  np.float64(-0.006150561703970651),
  np.float64(-0.017505527272521536),
  np.float64(-2.918726937119485e-05),
  np.float64(-0.006204279591444781)),
 (np.float64(3.5945649093595485e-06),
  np.float64(0.0021029547448830417),
  np.float64(0.13546577757213496),
  np.float64(-0.005528476193382874),
  np.float64(-0.013376875221090294),
  np.float64(-8.58234120175229e-05),
  np.float64(-0.006001001184796519)))

In [49]:
from scipy.optimize import basinhopping

def func(x):
    return -bt.evaluate(x.reshape(x0.shape))


cons = {'type':'ineq','fun': lambda x: 100 - x.sum()}

x0 = np.ones((bt.num_grid*2, bt.pyramiding), dtype=np.float64)/(bt.pyramiding*2*bt.num_grid)*100

res = basinhopping(func, x0.flatten(), niter=100, T=1, stepsize=0.1, minimizer_kwargs={'method': 'COBYLA', 'bounds': [(0, 100)], 'constraints': cons})

  return self.minimizer(self.func, x0, **self.kwargs)
  calmar = (mean-Rf) / max_drawdown


In [50]:
res

                    message: ['requested number of basinhopping iterations completed successfully']
                    success: False
                        fun: 0.006273104644988018
                          x: [ 2.500e-02  2.500e-02 ...  2.500e-02
                               2.500e-02]
                        nit: 100
      minimization_failures: 101
                       nfev: 8462115
                       njev: 2115
 lowest_optimization_result:  message: ABNORMAL_TERMINATION_IN_LNSRCH
                              success: False
                               status: 1
                                  fun: 0.006273104644988018
                                    x: [ 2.500e-02  2.500e-02 ...
                                         2.500e-02  2.500e-02]
                                  nit: 0
                                  jac: [-2.071e+04 -9.118e+03 ...
                                        -3.716e+04 -8.992e+04]
                                 nfev: 84021
         

In [51]:
x = res.x.reshape(x0.shape)

In [52]:
x

array([[0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025],
       [0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025],
       [0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025],
       ...,
       [0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025],
       [0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025],
       [0.025, 0.025, 0.025, ..., 0.025, 0.025, 0.025]])

In [47]:
%timeit bt.evaluate(np.ones((bt.num_grid*2, bt.pyramiding), dtype=np.float64)/(bt.pyramiding*2*bt.num_grid)*100)

19.5 ms ± 238 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [45]:
%timeit -n 10 -r 7 bt.run()

22.7 ms ± 5.38 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [43]:
%timeit -n 10 -r 7 bt.stats()

1.17 ms ± 345 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
