In [None]:
import warnings
import math

import numpy as np
import pandas as pd
import datetime as dt
from datetime import date

import plotly.express as px

from tqdm import tqdm

warnings.filterwarnings('ignore')

from concurrent.futures import ThreadPoolExecutor

# КНОПКА БАБЛО version 0.5

## Класс стратегии

In [None]:
class MyPosition:

    def __init__(self, amount: float, price_current: float, short: bool) -> None:
        """
        Инициализация объекта MyPosition.

        :param amount: Количество активов.
        :param price_current: Текущая цена актива.
        :param short: Флаг короткой позиции (True, если короткая; False, если длинная).
        """
        self._amount: float = amount
        self._price_current: float = price_current
        self._acc_fees: float = 0
        self._short: bool = short

    def update_state(self, price: float) -> None:
        """
        Обновление состояния позиции.

        :param price: Новая цена актива.
        """
        self._price_current = price
        if self._short:
            # Рассчитываем комиссию за перенос короткой позиции
            transfer_fee = abs(self._amount) * self._price_current * 0.00065
            self._acc_fees += transfer_fee

    def balance(self) -> float:
        """
        Вычисление баланса позиции.

        :return: Баланс позиции.
        """
        return self._amount * self._price_current

In [None]:
class Strategy:

    params = {'FEE': 0.0004}

    def __init__(self, states, start_equity):
        self._position = None
        self._states = states
        self._equity = start_equity
        self._margin_equity = 0

    def calc_upper_and_lower(self, state, states_ma, STD_COUNT_UP, STD_COUNT_DOWN):
        mean_pos_ma = states_ma[states_ma['date'] < state['date']].mean()
        std_pos_ma = states_ma[states_ma['date'] < state['date']].std()
        pos_ma_upper = mean_pos_ma + STD_COUNT_UP * std_pos_ma
        pos_ma_lower = mean_pos_ma - STD_COUNT_DOWN * std_pos_ma
        return pos_ma_upper, pos_ma_lower

    def run(self, RISK, STD_COUNT_UP, STD_COUNT_DOWN, MA_COUNT):
        states_ma = self._states['pos'].rolling(window=MA_COUNT).mean()
        data = []

        for i in tqdm(range(len(self._states)), disable=True):
            if i < 5 * MA_COUNT:
                continue
            elif (i == 5 * MA_COUNT):
                state = self._states.loc[i]
                pos_ma = states_ma[i]
                prev_pos_ma = states_ma[i - 1]
                (pos_ma_upper, pos_ma_lower) = self.calc_upper_and_lower(state, states_ma, STD_COUNT_UP, STD_COUNT_DOWN)
            else:
                state = self._states.loc[i]
                pos_ma = states_ma[i]
                prev_pos_ma = states_ma[i - 1]

            if self._position:
                if state['price'] != self._states['price'][i - 1]:
                    self._position.update_state(state['price'])

                data.append([
                    state['datetime'],
                    state['price'],
                    self._equity - self._position._acc_fees,
                    self._position._short,
                    pos_ma_upper,
                    pos_ma_lower,
                    pos_ma
                ])

                if prev_pos_ma > pos_ma_lower and pos_ma < pos_ma_lower and self._position._short:
                    pos_ma_upper, pos_ma_lower = self.calc_upper_and_lower(states_ma, STD_COUNT_UP, STD_COUNT_DOWN)
                    self.close_short()
                if prev_pos_ma < pos_ma_upper and pos_ma > pos_ma_upper and not self._position._short:
                    pos_ma_upper, pos_ma_lower = self.calc_upper_and_lower(states_ma, STD_COUNT_UP, STD_COUNT_DOWN)
                    self.close_long()
            else:
                if prev_pos_ma < pos_ma_lower and pos_ma > pos_ma_lower:
                    self.open_long(state, RISK)
                if prev_pos_ma > pos_ma_upper and pos_ma < pos_ma_upper:
                    self.open_short(state, RISK)

                data.append([
                    state['datetime'],
                    state['price'],
                    self._equity,
                    -1,
                    pos_ma_upper,
                    pos_ma_lower,
                    pos_ma
                ])

        return pd.DataFrame(data, columns=['datetime', 'price', 'equity', 'short', 'pos_ma_upper', 'pos_ma_lower', 'pos_ma'])

    def open_short(self, state, RISK):
        if self._position:
            raise Exception(f'Cannot open position, already have one {self._position}')

        self._margin_equity += (
                np.floor((1 - self.params['FEE']) * RISK * self._equity / state['price']) * state['price'] -
                self.params['FEE'] * self._equity)

        amount = (-1) * np.floor((1 - self.params['FEE']) * RISK * self._equity / state['price'])

        self._position = MyPosition(amount, state['price'], True)

    def open_long(self, state, RISK):
        if self._position:
            raise Exception(f'Cannot open position, already have one {self._position}')

        self._margin_equity -= (
                np.floor((1 - self.params['FEE']) * RISK * self._equity / state['price']) * state['price'] +
                self.params['FEE'] * self._equity)

        amount = np.floor((1 - self.params['FEE']) * RISK * self._equity / state['price'])

        self._position = MyPosition(amount, state['price'], False)

    def close_short(self):
        if self._position._short == 0:
            raise Exception(f'Cannot close short position, it is long')

        self._equity += (
                self._margin_equity + (1 - self.params['FEE']) * self._position.balance() - self._position._acc_fees)
        self._margin_equity = 0
        self._position = None

    def close_long(self):
        if self._position._short == 1:
            raise Exception(f'Cannot close long position, it is short')

        self._equity += (
                self._margin_equity + (1 - self.params['FEE']) * self._position.balance() - self._position._acc_fees)
        self._margin_equity = 0
        self._position = None

In [None]:
def run_strategy(all_data_copy, std_count_up, std_count_down, ma_count):
    strategy = Strategy(all_data_copy, 10000000)
    df = pd.DataFrame(strategy.run(1, std_count_up, std_count_down, ma_count))
    df['equity'] = df['equity'] / df['equity'].iloc[0]
    final_total_balance = df['equity'].iloc[-1]
    return final_total_balance

# Тренировка

def train(all_data_copy1: pd.DataFrame) -> (float, float, float):
    # Грубый поиск параметров
    std_counts_up = np.arange(0.5, 2, 0.25)
    std_counts_down = np.arange(0.5, 2, 0.25)
    ma_counts = np.arange(500, 3000, 500)

    parameters = []
    final_total_balances = []

    with ThreadPoolExecutor() as executor:
        futures = []

        for std_count_up in std_counts_up:
            for std_count_down in std_counts_down:
                for ma_count in ma_counts:
                    #print(std_count_up, std_count_down, ma_count)
                    future = executor.submit(run_strategy, all_data_copy1, std_count_up, std_count_down, ma_count)
                    futures.append((std_count_up, std_count_down, ma_count, future))

        for std_count_up, std_count_down, ma_count, future in futures:
            final_total_balance = future.result()
            parameters.append((std_count_up, std_count_down, ma_count))
            final_total_balances.append(final_total_balance)

    max_total_balance = max(final_total_balances)
    max_total_balance_index = final_total_balances.index(max_total_balance)
    corresponding_parameters = parameters[max_total_balance_index]

    # Уточнение параметров
    std_counts_up = np.arange(corresponding_parameters[0] - 0.1, corresponding_parameters[0] + 0.2, 0.1)
    std_counts_down = np.arange(corresponding_parameters[1] - 0.1, corresponding_parameters[1] + 0.2, 0.1)
    ma_counts = np.arange(corresponding_parameters[2] - 250, corresponding_parameters[2] + 500, 250)

    parameters = []
    final_total_balances = []

    with ThreadPoolExecutor() as executor:
        futures = []

        for std_count_up in std_counts_up:
            for std_count_down in std_counts_down:
                for ma_count in ma_counts:
                    future = executor.submit(run_strategy, all_data_copy1, std_count_up, std_count_down, ma_count)
                    futures.append((std_count_up, std_count_down, ma_count, future))

        for std_count_up, std_count_down, ma_count, future in futures:
            final_total_balance = future.result()
            parameters.append((std_count_up, std_count_down, ma_count))
            final_total_balances.append(final_total_balance)

    max_total_balance = max(final_total_balances)
    max_total_balance_index = final_total_balances.index(max_total_balance)
    corresponding_parameters = parameters[max_total_balance_index]

    print(f"Maximum Total Balance: {max_total_balance}")
    print(f"Corresponding Parameters (std_count, std_timerange): {corresponding_parameters}")
    return corresponding_parameters

# Тест

def test(all_data_copy2: pd.DataFrame, std_count_up: float, std_count_down: float, ma_count: float) -> float:
    strategy = Strategy(all_data_copy2, 10000000)
    df = pd.DataFrame(strategy.run(1, std_count_up, std_count_down, ma_count))
    df['equity'] = df['equity'] / df['equity'].iloc[0]
    final_total_balance = df['equity'].iloc[-1]

    print(f'Test profit = {final_total_balance}')
    return final_total_balance

## Применение

In [None]:
#ticker_names=['sr', 'gz', 'yn', 'fv', 'lk']

ticker_names=['gz']

for ticker in ticker_names:

    # Pos

    df = pd.read_csv(f'{ticker}_full_date.csv', sep=",")
    df = df.sort_values(by=['ticker', 'tradedate', 'tradetime']).drop_duplicates().reset_index().drop('index', axis=1)
    df['datetime'] = pd.to_datetime(df['tradedate'] + ' ' + df['tradetime']) 
    df = df.drop(['tradedate', 'tradetime'], axis=1) 
    df = df[df['clgroup'] == 'YUR'].reset_index().drop('index', axis=1)
    df = df[['datetime', 'pos']]
    data_pos = df

    # Price

    df = pd.read_csv(f'{ticker}_full_date_price.csv', sep=",")
    df.reset_index(inplace=True)
    df.rename(columns={'TRADEDATE': 'datetime'}, inplace=True)
    df['datetime'] = pd.to_datetime(df['datetime'])
    df = df[df['BOARDID']=='TQBR']
    df = df[['datetime', 'WAPRICE']]
    df.rename(columns={'WAPRICE': 'price'}, inplace=True)
    data_price = df

    # All data

    price_dict = dict(zip(data_price['datetime'].dt.date, data_price['price']))

    data_pos['price'] = data_pos['datetime'].dt.date.map(price_dict)

    all_data = data_pos.sort_values('datetime').drop_duplicates().dropna().reset_index().drop('index', axis=1)

    all_data_copy = all_data.copy()

    all_data_copy['date'] = all_data_copy['datetime'].dt.date

    # For train

    train_data = all_data_copy[all_data_copy['date'] < pd.to_datetime('2022-02-20').date()].reset_index().drop('index', axis=1)

    # For test

    test_data = all_data_copy[all_data_copy['date'] > pd.to_datetime('2022-12-31').date()].reset_index().drop('index', axis=1)

    corresponding_parameters = []

    corresponding_parameters = train(train_data)
    result = test(test_data, corresponding_parameters[0], corresponding_parameters[1], corresponding_parameters[2])

## Отладочная печать

In [None]:
# Pos

df = pd.read_csv(f'gz_full_date.csv', sep=",")
df = df.sort_values(by=['ticker', 'tradedate', 'tradetime']).drop_duplicates().reset_index().drop('index', axis=1)
df['datetime'] = pd.to_datetime(df['tradedate'] + ' ' + df['tradetime']) 
df = df.drop(['tradedate', 'tradetime'], axis=1) 
df = df[df['clgroup'] == 'YUR'].reset_index().drop('index', axis=1)
df = df[['datetime', 'pos']]
data_pos = df

# Price

df = pd.read_csv(f'gz_full_date_price.csv', sep=",")
df.reset_index(inplace=True)
df.rename(columns={'TRADEDATE': 'datetime'}, inplace=True)
df['datetime'] = pd.to_datetime(df['datetime'])
df = df[df['BOARDID']=='TQBR']
df = df[['datetime', 'WAPRICE']]
df.rename(columns={'WAPRICE': 'price'}, inplace=True)
data_price = df

# All data

price_dict = dict(zip(data_price['datetime'].dt.date, data_price['price']))

data_pos['price'] = data_pos['datetime'].dt.date.map(price_dict)

all_data = data_pos.sort_values('datetime').drop_duplicates().dropna().reset_index().drop('index', axis=1)

all_data_copy = all_data.copy()

all_data_copy['date'] = all_data_copy['datetime'].dt.date

# For train

train_data = all_data_copy[all_data_copy['date'] < pd.to_datetime('2022-02-20').date()].reset_index().drop('index', axis=1)

# For test

test_data = all_data_copy[all_data_copy['date'] > pd.to_datetime('2022-12-31').date()].reset_index().drop('index', axis=1)

strategy = Strategy(train_data, 10000000)
df = pd.DataFrame(strategy.run(1, 1.25, 0.4, 250))
px.line(df, x='datetime', y=['pos_ma', 'pos_ma_upper', 'pos_ma_lower'])

In [None]:
# Pos

df = pd.read_csv('gz_full_date.csv', sep=",")
df = df.sort_values(by=['ticker', 'tradedate', 'tradetime']).drop_duplicates().reset_index().drop('index', axis=1)
df['datetime'] = pd.to_datetime(df['tradedate'] + ' ' + df['tradetime']) 
df = df.drop(['tradedate', 'tradetime'], axis=1) 
df = df[df['clgroup'] == 'YUR'].reset_index().drop('index', axis=1)
df = df[['datetime', 'pos']]
data_pos = df

# Price

df = pd.read_csv('gz_full_date_price.csv', sep=",")
df.reset_index(inplace=True)
df.rename(columns={'TRADEDATE': 'datetime'}, inplace=True)
df['datetime'] = pd.to_datetime(df['datetime'])
df = df[df['BOARDID']=='TQBR']
df = df[['datetime', 'WAPRICE']]
df.rename(columns={'WAPRICE': 'price'}, inplace=True)
data_price = df

# All data

price_dict = dict(zip(data_price['datetime'].dt.date, data_price['price']))

data_pos['price'] = data_pos['datetime'].dt.date.map(price_dict)

all_data = data_pos.sort_values('datetime').drop_duplicates().dropna().reset_index().drop('index', axis=1)

all_data_copy = all_data.copy()

all_data_copy['date'] = all_data_copy['datetime'].dt.date

# For train

train_data = all_data_copy[all_data_copy['date'] < pd.to_datetime('2022-02-20').date()].reset_index().drop('index', axis=1)

# For test

test_data = all_data_copy[all_data_copy['date'] > pd.to_datetime('2022-12-31').date()].reset_index().drop('index', axis=1)

strategy = Strategy(train_data, 10000000)
df = pd.DataFrame(strategy.run(1, 0.25, 0.4, 250))
fig = px.line(df, x='datetime', y='equity').update_xaxes(type='category').show()

In [None]:
df = train_data.copy()
#window_size = corresponding_parameters[2]  # Пример размера окна
window_size = 1000 # Пример размера окна
df['pos_ma'] = df['pos'].rolling(window=window_size).mean()
mean_pos_ma = df['pos_ma'].mean()
std_pos_ma = df['pos_ma'].std()
pos_ma_upper = mean_pos_ma + 0.25 * std_pos_ma
pos_ma_lower = mean_pos_ma - 0.75 * std_pos_ma
print(mean_pos_ma, std_pos_ma)

In [None]:
fig = px.line(df, x='datetime', y='pos_ma', title=f'График pos MA{window_size} по времени', labels={'datetime': 'Дата и время', 'pos_ma': f'Pos MA{window_size}'})
fig.add_shape(
    dict(
        type="line",
        x0=df['datetime'].min(),
        x1=df['datetime'].max(),
        y0=pos_ma_upper,
        y1=pos_ma_upper,
        line=dict(color="green")
    )
)
fig.add_shape(
    dict(
        type="line",
        x0=df['datetime'].min(),
        x1=df['datetime'].max(),
        y0=pos_ma_lower,
        y1=pos_ma_lower,
        line=dict(color="red")
    )
)
fig.update_xaxes(type='category')
fig.show()

In [None]:
outliers_up = pd.DataFrame(columns=['datetime', 'pos', 'price', 'date'])
outliers_down = pd.DataFrame(columns=['datetime', 'pos', 'price', 'date'])
for i in range(len(df['pos'])-1):
    if (df['pos_ma'][i + 1] > (pos_ma_upper)) and (df['pos_ma'][i] < (pos_ma_upper)):
        outliers_up.loc[len(outliers_up)] = df.iloc[i + 1]
    elif (df['pos_ma'][i + 1] < (pos_ma_upper)) and (df['pos_ma'][i] > (pos_ma_upper)):
        outliers_down.loc[len(outliers_down)] = df.iloc[i + 1]
    elif (df['pos_ma'][i + 1] < (pos_ma_lower)) and (df['pos_ma'][i] > (pos_ma_lower)):
        outliers_down.loc[len(outliers_down)] = df.iloc[i + 1]
    elif (df['pos_ma'][i + 1] > (pos_ma_lower)) and (df['pos_ma'][i] < (pos_ma_lower)):
        outliers_up.loc[len(outliers_up)] = df.iloc[i + 1]
fig = px.line(df, x='datetime', y='price', title='График цен по времени', labels={'datetime': 'Дата и время', 'price': 'Цена'})
fig.add_trace(px.scatter(outliers_up, x='datetime', y='price').update_traces(marker=dict(color='green')).data[0])
fig.add_trace(px.scatter(outliers_down, x='datetime', y='price').update_traces(marker=dict(color='red')).data[0])
fig.update_xaxes(type='category')
fig.show()