In [None]:
import numpy as np
import pandas as pd
from dataclasses import dataclass

import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

from sklearn.preprocessing import StandardScaler

import plotly.graph_objects as go
from tqdm.notebook import tqdm

%matplotlib inline

In [None]:
@dataclass
class Config:
    # data
    symbol = "LTC/USDT"
    path = "/home/naer/work/buttomCash/src/data/BTС-Minute.csv"
    window_size = 6 * 30 * 2 + 1
    train_split_size = 0.9

    # model
    input_size = window_size - 1
    num_lstm_layers = 4
    hidden_size = 128
    dropout = 0.0
    num_classes = 2

    # training
    device = ("cpu",)  # "cuda" or "cpu"
    batch_size = 30
    num_epoch = 3
    learning_rate = 3e-4
    scheduler_step_size = 100

    # backtesting
    init_margin = 50
    imr = 20


config = Config()

In [None]:
def load_data(config, data_range=None):
    data = pd.read_csv(config.path)[data_range:]

    data = data.sort_values(by="date")
    data_date = data["date"].to_list()
    data_date = [data_date[i] for i in range(0, len(data_date), 60 * 4)]
    data_close_price = data["close"].to_list()
    data_close_price = [
        data_close_price[i] for i in range(0, len(data_close_price), 60 * 4)
    ]
    data_close_price = np.array(data_close_price)
    data = data.set_index("date")

    num_data_points = len(data_date)
    display_date_range = (
        "from " + data_date[0] + " to " + data_date[num_data_points - 1]
    )
    print("Number data points:", num_data_points, display_date_range)

    return data_date, data_close_price, data


data_date, data_close_price, data = load_data(config)

In [None]:
def backtest_prepare_data(x, window_size):
    scaler = StandardScaler()
    n_row = x.shape[0] - window_size + 1
    strides = np.lib.stride_tricks.as_strided(
        x, shape=(n_row, window_size), strides=(x.strides[0], x.strides[0])
    )
    X = np.empty([1, window_size - 1])
    last_new_price = np.empty([1, 2])
    for stride in tqdm(strides):
        last_new_price = np.concatenate(
            (
                last_new_price,
                np.array([stride[-2], stride[-1]]).reshape(1, -1),
            ),
            axis=0,
        )
        norm_stride = stride[:-1].reshape(
            1, -1
        )  # scaler.fit_transform(stride[:-1].reshape(1,-1).T).reshape(1,-1)
        X = np.concatenate((X, norm_stride), axis=0)

    return X[1:], last_new_price[1:]


class BacktestDataset(Dataset):
    def __init__(self, x, y, date):
        x = np.expand_dims(x, 2)
        self.x = x.astype(np.float32).reshape(-1, 1, config.window_size - 1)
        self.y = y.astype(np.float32)
        self.date = date

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return (self.x[idx], self.y[idx], self.date[idx])

In [None]:
class Backtest:
    def __init__(self, model, deposit):
        # self.model = lambda: randint(0,1)
        self.model = model
        self.config = config
        self.dataloader = self.get_dataloader()

        self.imr_stop = [
            [75, 5e3],
            [50, 1e4],
            [40, 5e4],
            [25, 25e4],
            [10, 1e6],
            [5, 5e6],
            [4, 1e7],
            [3, 2e7],
            [2, 3e7],
            [1, np.Inf],
        ]

        self.deposit = deposit
        self.imr_idx = 0
        self.imr = self.imr_stop[self.imr_idx][0]
        self.margin = self.deposit * self.imr
        self.trashhold = 0
        self.stop_loss_coef = -10

        self.history = {"order": [], "date": []}

    def get_dataloader(self):
        self.data_date, self.data_close_price, self.data = load_data(
            config, data_range=-60 * 24 * 30 * 3
        )
        strides, last_new_price = backtest_prepare_data(
            self.data_close_price, self.config.window_size
        )
        dataset_bt = BacktestDataset(strides, last_new_price, self.data_date)
        bt_dataloader = DataLoader(
            dataset_bt, batch_size=1, shuffle=False, num_workers=6
        )
        return bt_dataloader

    def pnl_roe(self, type_order, entry_price, exit_price):
        assert type_order in [-1, 1], "order type must be in [-1, 1]"
        pnl = (
            type_order
            * (exit_price - entry_price)
            * (self.margin / entry_price)
        )
        roe = pnl / self.deposit * 100
        return round(pnl, 2), round(roe, 2)

    def calc_stop_loss(self, entry):
        stop_loss = (
            self.stop_loss_coef
            / 100
            * self.deposit
            / (self.deposit * self.imr / entry)
            + entry
        )
        return round(stop_loss, 2)

    def update_margin(self, pnl, roe):
        self.deposit += pnl
        if self.deposit * self.imr > self.imr_stop[self.imr_idx][1]:
            self.imr_idx += 1
        self.imr = self.imr_stop[self.imr_idx][0]
        self.margin = self.deposit * self.imr

    def test_model_strategy(self, plot=False):
        his = []
        for idx, (window, prices, date1) in enumerate(self.dataloader):
            date1 = date1[0]
            self.history["order"].append(self.deposit)
            self.history["date"].append(date1)

            entry_price, exit_price = map(int, prices[0])
            out_model = self.model.forward(window).detach().cpu()

            # неуверенность модели
            if float(max(out_model[0])) < self.trashhold:
                continue
            else:
                forecast = int(torch.argmax(out_model))
            # forecast = self.model()

            date2 = self.data_date[self.data_date.index(date1) + 1]
            tmp_prices = self.data.loc[date1:date2]["close"].to_list()

            # buy
            if forecast == 0:
                self.margin = self.margin / 100 * (1 - 12e-5)
                stop_loss = self.calc_stop_loss(entry_price)
                triggering_sl = [x for x in tmp_prices if x <= stop_loss]
                exit_price = exit_price if triggering_sl is [] else stop_loss
                pnl, roe = self.pnl_roe(1, entry_price, exit_price)
                self.update_margin(pnl, roe)

            # sell
            if forecast == 1:
                self.margin = self.margin / 100 * (1 - 12e-5)
                stop_loss = self.calc_stop_loss(entry_price)
                triggering_sl = [x for x in tmp_prices if x >= stop_loss]
                exit_price = exit_price if triggering_sl is [] else stop_loss
                pnl, roe = self.pnl_roe(-1, entry_price, exit_price)
                self.update_margin(pnl, roe)

            # break of margin call
            if self.deposit <= 0:
                self.history["order"].append(0)
                self.history["date"].append(date1)
                break
            if self.deposit > 1000000:
                self.history["order"].append(1000000)
                self.history["date"].append(date1)
                break

            his.append(
                [self.deposit, entry_price, exit_price, stop_loss, pnl, roe]
            )

        if plot:
            fig = go.Figure(
                [
                    go.Scatter(
                        x=self.history["date"],
                        y=self.history["order"],
                        mode="lines+markers",
                    )
                ]
            )
            fig.show()

        return his

In [None]:
# backtest = Backtest(model, deposit=100)
# history = backtest.test_model_strategy(plot=True)