# ТЗ

## Обучение

Обучение ведется на большом интервале данных -> запрос делаем по дате начала/конца
Шаг данных можт быть меньше периода.


Параметры формирования выборки
 - Период
 - Кол-во точек
 - пара

## Торговля
Для операции нужны только актуальные данные -> запрашиваем минимальный набор для формирования датапоинта
Шаг данных может быть меньше периода.

In [1]:
import os
import sys
from pathlib import Path
path = Path(sys.path[0])
sys.path.insert(0, os.path.join(path.parent.absolute(), 'rt_libs'))


In [2]:
import numpy as np
from db import ClickHouseConnector
from data_providers import TradeDataProvider, TrainDbDataProvider


ModuleNotFoundError: No module named 'db'

In [3]:
%matplotlib notebook
db_connect_params = {
    "host" : "185.117.118.107",
    "port" : 59000,
    "user" : "alex",
    "password" : "Xrxcmr758",
    "database" : "rt"
}

# Запрос данных для торговли

In [57]:
query_for_trade = """
WITH
    %(pair)s AS p_pair,
    toUInt32(%(period)s) AS p_period,
    toUInt32(%(start)s) AS ts_min,
    toUInt32(%(end)s+p_period) AS ts_max

SELECT ts_main as ts, lowest_ask, highest_bid
FROM (SELECT ts_r, AVG(lowest_ask) AS lowest_ask, AVG(highest_bid) AS highest_bid
    FROM (SELECT FLOOR(ts / p_period) * p_period AS ts_r, lowest_ask, highest_bid
        FROM rt.orderbook
        WHERE pair = p_pair AND ts BETWEEN ts_min AND ts_max
        ORDER BY ts DESC) AS TB
    GROUP BY ts_r) AS tb1
    RIGHT JOIN (SELECT arrayJoin (range(ts_min, ts_max, p_period)) AS ts_main) AS tb2 ON tb1.ts_r = tb2.ts_main
    ORDER BY ts ASC

"""

In [64]:
import pandas as pd
from app_tools import with_exception
import logging
import sys, os


class TradeDataProviderError(Exception):
    pass


class TradeDataProvider:
    QUERY_PATH_ = "data_providers/sql/get_dataset_2.sql"
    QUERY_PATH = os.path.join(sys.path[0], "data_providers/sql/get_last_data.sql")

    @with_exception(TradeDataProviderError)
    def __init__(self, provider, query=None):
        self.provider = provider
        self.data = None
        self.log = logging.getLogger("{0}.{1}".format(__name__, self.__class__.__name__))
        query_path = query if query is not None else self.QUERY_PATH
        self.log.debug("Query path: {}".format(query_path))
        self.query = self._get_query(query_path)
        self.log.debug("Initialized")

    @with_exception(TradeDataProviderError)
    def get(self, ts, period, total_points, pairs_list=[]):
        self.log.debug("get() method has been called | ts: {0}, period: {1}, total_points:{2}"
                       .format(ts, period, total_points))

        #self.provider.cursor.execute(self.query, parameters={"period": period, "n_steps": total_points, "ts": ts})
        self.provider.cursor.execute(self.query, parameters={"period": period, "n_steps": total_points})
        raw_data = self.provider.cursor.fetchall()
        self.log.debug("raw_data received len: {0}".format(len(raw_data)))

        data = self._transform(raw_data)
        self.data = data.loc[data["pair"].isin(pairs_list)]
        self.log.debug("Filtered data shape: {0} x {1}".format(*self.data.shape))

        return self.data

    @staticmethod
    def _transform(raw_data):
        data = pd.DataFrame(raw_data, columns=["pair", "ts", "lowest_ask", "highest_bid"])
        data["ts"] = data["ts"].astype(int)
        data = data.set_index("ts")
        return data

    @staticmethod
    def _get_query(filename):
        with open(filename) as txt:
            query = txt.read()
        return query




In [73]:
import time
import numpy as np

ts = int(np.floor(time.time()/60)*60)
period = 300
total_points = 10


with ClickHouseConnector(db_connect_params) as provider:
    tdp = TradeDataProvider(provider)
    data = tdp.get(ts, period, total_points, pairs_list=['USDT_BTC'])

Cursor created, database connection established
Cursor closed


In [80]:
ts

1643145780

# Загрузка данных для обучения

In [75]:
from db import ClickHouseConnector
from poloniex import PublicAPI
import pytz
import datetime
import pandas as pd


class AbstractTrainDataProvider:

    def get(self, pair, start, end, period) -> pd.DataFrame:
        pass

    @staticmethod
    def date_to_unix_ts_in_utc(date) -> int:
        if isinstance(date, str):
            timezone = pytz.timezone("UTC")
            without_timezone = datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
            with_timezone = timezone.localize(without_timezone)
            transformed = int(with_timezone.timestamp())
        else:
            transformed = int(date)
        return transformed


class TrainDbDataProvider(AbstractTrainDataProvider):
    QUERY_PATH = "/Users/alex/Dev/_Reinforcement Trading/rt_libs/data_providers/sql/get_dataset.sql"

    def __init__(self, provider_params, query=None):
        self.provider_params = provider_params
        self.provider = ClickHouseConnector(self.provider_params)
        if query is None:
            self.query = self._get_query(self.QUERY_PATH)
        else:
            self.query = query

    def get(self, pair, start, end, period):
        params = self._build_params(pair, start, end, period)

        with self.provider:
            self.provider.cursor.execute(self.query, parameters=params)
            raw_data = self.provider.cursor.fetchall()

        data = self._transform(raw_data)
        return data

    @staticmethod
    def _get_query(filename):
        with open(filename) as txt:
            query = txt.read()
        return query

    def _build_params(self, pair, start, end, period):
        params = {
            "pair": pair,
            "period": period,
            "start": self.date_to_unix_ts_in_utc(start),
            "end": self.date_to_unix_ts_in_utc(end),
        }
        return params

    @staticmethod
    def _transform(raw_data):
        data = pd.DataFrame(raw_data, columns=["ts", "lowest_ask", "highest_bid"])
        data["ts"] = data["ts"].astype(int)
        data = data.set_index("ts")
        return data


In [78]:
task = {"pair": "BTC_LTC", "start": "2022-01-17 00:00:00", "end": "2022-01-19 00:00:00", "period": 900}

tdp = TrainDbDataProvider(db_connect_params)

data = tdp.get(**task)

Cursor created, database connection established
Cursor closed


In [79]:
data

Unnamed: 0_level_0,lowest_ask,highest_bid
ts,Unnamed: 1_level_1,Unnamed: 2_level_1
1642377600,0.003402,0.003400
1642378500,0.003401,0.003400
1642379400,0.003395,0.003394
1642380300,0.003396,0.003395
1642381200,0.003396,0.003394
...,...,...
1642546800,0.003350,0.003346
1642547700,0.003343,0.003342
1642548600,0.003340,0.003339
1642549500,0.003340,0.003338


# Единый дата провайдер

- Обязательно указываем по каким парам получаем данные - это ускоряет загрузку
- В качестве параметрво запроса нужно высчитать начало и конец периода.
    - для обучения эти параметры задаются явно
    - для торговли рассчитываются на основе параметров среза
- в обоих случаях внешний провайдер сеттим через with


In [82]:
query_for_common = """
WITH
    toUInt32(%(period)s) AS p_period,
    toUInt32(%(start)s) AS ts_min,
    toUInt32(%(end)s+p_period) AS ts_max

SELECT pair_r, ts_main as ts, lowest_ask, highest_bid
FROM (SELECT pair_r, ts_r, AVG(lowest_ask) AS lowest_ask, AVG(highest_bid) AS highest_bid
    FROM (SELECT pair as pair_r, FLOOR(ts / p_period) * p_period AS ts_r, lowest_ask, highest_bid
        FROM rt.orderbook
        WHERE pair IN (%(pairs_list)s) AND ts BETWEEN ts_min AND ts_max
        ORDER BY ts DESC) AS TB
    GROUP BY ts_r, pair_r) AS tb1
    RIGHT JOIN (SELECT arrayJoin (range(ts_min, ts_max, p_period)) AS ts_main) AS tb2 ON tb1.ts_r = tb2.ts_main
    ORDER BY pair_r, ts ASC

"""

In [104]:
import pandas as pd
from app_tools import with_exception
import logging
import sys
import os
import pytz
import datetime


class DataProviderError(Exception):
    pass


class DataProvider:
    QUERY_PATH_ = "data_providers/sql/get_dataset_2.sql"
    QUERY_PATH = os.path.join(sys.path[0], "data_providers/sql/get_last_data.sql")

    @with_exception(DataProviderError)
    def __init__(self, connector, query=None):
        """провайдер попадает из вне - из-за реализаци трейдера. При выгрузке данных для обучения это не критично. Можно назвать не провайдер, а коннектор"""
        self.connector = connector
        self.data = None
        self.log = logging.getLogger(__name__)
        if query is not None:
            self.query = query
            self.log.debug("Initialized with external query")
        else:
            self.query = self._get_query(self.QUERY_PATH)
            self.log.debug("Initialized with integrated query")
        
    @with_exception(DataProviderError)
    def request(self, start, end, step_size, pairs_list):  
        params = self._build_params(start, end, step_size, pairs_list)
        query = self.query % params
        
        self.connector.cursor.execute(query)
        raw_data = self.connector.cursor.fetchall()
        self.log.debug("raw_data received len: {0}".format(len(raw_data)))
        data = self._transform(raw_data)
        return data
    
    def _build_params(self, start, end, step_size, pairs_list):
        params = {
            "start": self.date_to_unix_ts_in_utc(start),
            "end": self.date_to_unix_ts_in_utc(end),
            "period": step_size,
            "pairs_list": ", ".join(["'" + pair + "'" for pair in  pairs_list])
        }
        return params
    
    @staticmethod
    def _transform(raw_data):
        data = pd.DataFrame(raw_data, columns=["pair", "ts", "lowest_ask", "highest_bid"])
        data["ts"] = data["ts"].astype(int)
        data = data.set_index("ts")
        return data

    @staticmethod
    def _get_query(filename):
        with open(filename) as txt:
            query = txt.read()
        return query
    
    @staticmethod
    def date_to_unix_ts_in_utc(date) -> int:
        if isinstance(date, str):
            timezone = pytz.timezone("UTC")
            without_timezone = datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
            with_timezone = timezone.localize(without_timezone)
            transformed = int(with_timezone.timestamp())
        else:
            transformed = int(date)
        return transformed





In [268]:
pairs_list=['USDT_BTC', 'BTC_ETH', 'USDT_DOGE']
step_size = 120

with ClickHouseConnector(db_connect_params) as conn:
    tdp = DataProvider(conn, query=query_for_common)
    data = tdp.request("2022-02-01 20:00:00", "2022-02-01 20:10:00", step_size, pairs_list)


Cursor created, database connection established
Cursor closed


In [269]:
data

Unnamed: 0_level_0,pair,lowest_ask,highest_bid
ts,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1643745600,BTC_ETH,0.071783,0.071768
1643745720,BTC_ETH,0.071864,0.071841
1643745840,BTC_ETH,0.07185,0.071847
1643745960,BTC_ETH,0.071884,0.071876
1643746080,BTC_ETH,0.071959,0.071948
1643746200,BTC_ETH,0.071987,0.071975
1643745600,USDT_BTC,38417.048446,38403.546912
1643745720,USDT_BTC,38513.635043,38508.80239
1643745840,USDT_BTC,38478.004627,38464.472571
1643745960,USDT_BTC,38471.548892,38453.752889


## Определение параметров для степа торговли

offset - защитный интервал, чтобы избежать раннего запроса для текущей точки данных

In [399]:
import time
step_size = 60

n_steps = 100


def get_period(step_size=step_size, n_steps=n_steps, offset=-60):
    now = time.time()
    now_ts = int(np.floor(now/step_size)*step_size)
    end = now_ts + offset
    start = now_ts - (n_steps)*step_size
    return start, end

    
start = "2022-02-01 20:00:00"
end = "2022-02-01 20:10:00"
start, end = get_period()
print(start, end)

    
    
pairs_list=['USDT_BTC', 'BTC_ETH', 'USDT_DOGE']

with ClickHouseConnector(db_connect_params) as conn:
    tdp = DataProvider(conn, query=query_for_common)
    data = tdp.request(start, end , step_size, pairs_list)
    display(data)



Cursor created, database connection established


1643826600 1643832540


Unnamed: 0_level_0,pair,lowest_ask,highest_bid
ts,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1643826600,BTC_ETH,0.071554,0.071530
1643826660,BTC_ETH,0.071605,0.071595
1643826720,BTC_ETH,0.071669,0.071657
1643826780,BTC_ETH,0.071687,0.071659
1643826840,BTC_ETH,0.071686,0.071676
...,...,...,...
1643832300,USDT_DOGE,0.140470,0.140165
1643832360,USDT_DOGE,0.140514,0.140168
1643832420,USDT_DOGE,0.140472,0.140199
1643832480,USDT_DOGE,0.140499,0.140271


Cursor closed


In [605]:
n_points = 3
period = 300


offset = min(data.index) + period*(n_points-1)

In [612]:
offset += 60

In [606]:
offset

1643827200

In [610]:
n_future_points = 3

In [613]:
start = offset + period
end = offset + (n_future_points+1) * period

idxs = np.arange(start, end, period)
idxs

array([1643827560, 1643827860, 1643828160])

In [617]:
max_val = 1643827860
idxs_ = np.array(list(map(lambda x: min(x, max_val), idxs)))
idxs_

array([1643827560, 1643827860, 1643827860])

In [602]:
data.index[:20]

Int64Index([1643826600, 1643826660, 1643826720, 1643826780, 1643826840,
            1643826900, 1643826960, 1643827020, 1643827080, 1643827140,
            1643827200, 1643827260, 1643827320, 1643827380, 1643827440,
            1643827500, 1643827560, 1643827620, 1643827680, 1643827740],
           dtype='int64', name='ts')

# DatapointFactory

- Создается для конкретной пары

In [618]:
import numpy as np
import time
from dataset_tools import DataPoint


class DataPointFactory:
    def __init__(self, period=300, n_observation_points=5, n_future_points=3, step_size=60):
        self.period = period
        self.n_observation_points = n_observation_points
        self.n_future_points = n_future_points
        self.step_size = step_size
    
        self.dataset = None
        self.cursor = None
        self.max_step = None
        
        self.done = True        

    def reset(self, dataset=None):
        if dataset is not None:
            self.dataset = dataset
            
        self.max_step = max(self.dataset.index)
        self.done = False
        self.cursor = min(self.dataset.index) + self.period*(self.n_observation_points-1)
        
        data_point = self.get_current_step()
        return data_point
    
    def get_idx(self):
        up_bound = self.cursor + self.period
        low_bound = up_bound - self.n_observation_points*self.period
        idxs = np.arange(low_bound, up_bound, self.period)
        return idxs
    
    def get_future_idx(self):
        low_bound = self.cursor + self.period
        up_bound = self.cursor + (self.n_future_points + 1) * self.period
        idxs = np.arange(low_bound, up_bound, self.period)
        idxs = np.array(list(map(lambda x: min(x, self.max_step), idxs)))
        return(idxs)

    def get_current_step(self):
        idxs = self.get_idx()
        data_ = self.dataset.loc[idxs, ["lowest_ask", "highest_bid"]]
        
        idxs = self.get_future_idx()
        data_f = self.dataset.loc[idxs, ["lowest_ask", "highest_bid"]]

        data_point = DataPoint(data_, dataset_future=data_f)        
        return data_point    

    def get_next_step(self):
        if not self.done:
            self.cursor += self.step_size
        self.done = True if self.cursor >= self.max_step else False
        data_point = self.get_current_step()
        return data_point, self.done


In [874]:
filtered = data[data["pair"]=="USDT_DOGE"]
filtered

Unnamed: 0_level_0,pair,lowest_ask,highest_bid
ts,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1643826600,USDT_DOGE,0.140185,0.139882
1643826660,USDT_DOGE,0.140277,0.139973
1643826720,USDT_DOGE,0.140395,0.140092
1643826780,USDT_DOGE,0.140272,0.140042
1643826840,USDT_DOGE,0.140401,0.140057
...,...,...,...
1643832300,USDT_DOGE,0.140470,0.140165
1643832360,USDT_DOGE,0.140514,0.140168
1643832420,USDT_DOGE,0.140472,0.140199
1643832480,USDT_DOGE,0.140499,0.140271


In [875]:
dpf = DataPointFactory()
dp = dpf.reset(dataset=filtered)

In [876]:
dp

Unnamed: 0_level_0,lowest_ask,highest_bid
ts,Unnamed: 1_level_1,Unnamed: 2_level_1
1643826600,0.140185,0.139882
1643826900,0.140335,0.14
1643827200,0.140533,0.140218
1643827500,0.140531,0.14021
1643827800,0.140331,0.140026


In [877]:
dp = dpf.get_current_step()
print(dp)

            lowest_ask  highest_bid
ts                                 
1643826600    0.140185     0.139882
1643826900    0.140335     0.140000
1643827200    0.140533     0.140218
1643827500    0.140531     0.140210
1643827800    0.140331     0.140026


In [878]:
dp = dpf.get_future_step()
print(dp)

            lowest_ask  highest_bid
ts                                 
1643828100    0.140342     0.140248
1643828400    0.140679     0.140383
1643828700    0.140659     0.140359


In [879]:
dp, done = dpf.get_next_step()
print(dp, done)

            lowest_ask  highest_bid
ts                                 
1643826660    0.140277     0.139973
1643826960    0.140263     0.139888
1643827260    0.140444     0.140169
1643827560    0.140470     0.140222
1643827860    0.140289     0.140014 False


In [904]:
import numpy as np
from app_tools import with_exception


class DataPointError(Exception):
    pass


class DataPoint:
    @with_exception(DataPointError)
    def __init__(self, dataset, dataset_future=None):
        self.data = dataset
        self.data_f = dataset_future
        self.current_index = max(self.data.index)

    @with_exception(DataPointError)
    def get_current_ts(self):
        return self.current_index

    @with_exception(DataPointError)
    def get_price(self, name, cursor=None):
        if cursor is None:
            cursor = self.current_index
        val = self.data.loc[cursor, name]
        return val

    @with_exception(DataPointError)
    def get_prices(self, name):
        data = self.data.loc[:, name]
        return data

    @with_exception(DataPointError)
    def get_future_prices(self, name, cursor=None):
        """Возвращает будущие точки"""
        data = self.data_f.loc[:, name]
        return data

    @with_exception(DataPointError)
    def get_timestamps(self):
        return self.data.index.values

    @with_exception(DataPointError)
    def get_current_data(self):
        return self.data

    @with_exception(DataPointError)
    def get_future_data(self):
        return self.data_f

    @with_exception(DataPointError)
    def get_last_diffs(self, num):
        data = self.get_current_data()
        col_idx = np.argmax(data.columns == 'lowest_ask')
        diffs = data.diff(axis=0).iloc[-num:, col_idx]
        return diffs.values

In [905]:
data_ = dpf.get_current_step() 

In [906]:
data_f = dpf.get_future_step() 

In [907]:
dp = DataPoint(data_, dataset_future=data_f)

In [910]:
dp.get_last_diffs(3)

array([ 1.80425e-04,  2.58500e-05, -1.80315e-04])

In [913]:
dp.data.loc[:, "lowest_ask"]

ts
1643826660    0.140277
1643826960    0.140263
1643827260    0.140444
1643827560    0.140470
1643827860    0.140289
Name: lowest_ask, dtype: float64

In [917]:
dp.data.loc[1643827260, "lowest_ask"] - dp.data.loc[1643826960, "lowest_ask"]

0.00018042500000001183

In [893]:
dp.data.loc[dp.current_index, "lowest_ask"]

0.140289405

In [851]:
nm = "lowest_ask"
mask = data.columns==nm
data.loc[1643826660, "lowest_ask"].values

array([7.16048500e-02, 3.74622702e+04, 1.40277320e-01])

In [None]:
class DataPointFactory:
    def __init__(self, period, observation_points, future_points=0):
        self.period = period
        self.offset = observation_points
        self.cursor = observation_points
        self.last_position = offset
        self.dataset = None
        self.done = True
        pass
    
    def reset(self):
        self.done = False
        self.cursor = self.offset
        self.data_point = self.get_current_step()
        return self.data_point
    
    
    def load_data(self, dataset):
        self.dataset = dataset
        self.last_position = len(self.dataset)
        self.done = False
        self.cursor = self.offset
    
    def get_datapoint(self):
        pass
    
    def get_current_step(self):
        dataset = self.get_dataset()
        data_point = DataPoint(dataset, self.offset, self.future_points)
        return data_point

    def get_next_step(self):
        if not self.done:
            self.cursor += 1
        self.done = True if self.cursor >= self.last_position else False
        self.data_point = self.get_current_step()
        return self.data_point, self.done

    