In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import numpy as np
import dask.dataframe as dd

import data_preprocessing as dp

from datetime import datetime, timedelta
import math
import re
import multiprocessing as mp

  import pandas.util.testing as tm


In [3]:
frequency = timedelta(seconds=60)
pair = 'USDT_BTC'
date_start = '2020-11-11'
date_end = '2021-03-31'
lob_depth = 100
norm_type = 'dyn_z_score'
roll = 1440*10 # 10 days

## Cache Data

In [4]:
# prepare inputs for multi processing
date_daterange = pd.date_range(date_start, date_end, freq='1D').astype('str').tolist()
processes = 5
multiple = math.ceil(len(date_daterange) / processes) # round up
slice_idx = np.arange(0, len(date_daterange), multiple)
date_slices = [date_daterange[i:i+multiple] for i in slice_idx]
inputs = [(pair, date_list[0], date_list[-1], frequency, lob_depth) for date_list in date_slices]
inputs

[('USDT_BTC', '2020-11-11', '2020-12-09', datetime.timedelta(seconds=60), 100),
 ('USDT_BTC', '2020-12-10', '2021-01-07', datetime.timedelta(seconds=60), 100),
 ('USDT_BTC', '2021-01-08', '2021-02-05', datetime.timedelta(seconds=60), 100),
 ('USDT_BTC', '2021-02-06', '2021-03-06', datetime.timedelta(seconds=60), 100),
 ('USDT_BTC', '2021-03-07', '2021-03-31', datetime.timedelta(seconds=60), 100)]

### Caching experiments (prices only - add trades if needed)
- with 5, 10 processes or single process

In [5]:
# %%time
# # time taken to process 10 file with 5 processes: 4min 20s (60/65% RAM)
# with mp.Pool(processes=5) as pool:
#     results = pool.starmap(dp.get_lob_data, inputs)

In [6]:
# %%time
# # time taken to process 10 file with 5 processes: 3min 39s (85/90% RAM)
# with mp.Pool(processes=10) as pool:
#     results = pool.starmap(dp.get_lob_data, inputs)

In [7]:
# %%time
# # time taken to process 10 file with 5 processes: Wall time: 17min 41s (20% RAM)
# dp.get_lob_data('USDT_BTC', date_start, date_end, timedelta(seconds=60), 100)

## Data Normalization

#### Import cache

In [8]:
# import px
results_px = dp.get_lob_data('USDT_BTC', date_start, date_end, timedelta(seconds=60), 100)
ddf_px = dd.read_csv(results_px, compression='gzip').compute()

# import trades
results_trade = dp.get_trade_data('USDT_BTC', date_start, date_end, timedelta(seconds=60))
ddf_trade = dd.read_csv(results_trade, compression='gzip').compute()


9.csv.gz
Found /home/federico/Python_vsc_dir/RL_Trader/Experiments/resampled/USDT_BTC/100_levels/60s/2021-02-20.csv.gz
Found /home/federico/Python_vsc_dir/RL_Trader/Experiments/resampled/USDT_BTC/100_levels/60s/2021-02-21.csv.gz
Found /home/federico/Python_vsc_dir/RL_Trader/Experiments/resampled/USDT_BTC/100_levels/60s/2021-02-22.csv.gz
Found /home/federico/Python_vsc_dir/RL_Trader/Experiments/resampled/USDT_BTC/100_levels/60s/2021-02-23.csv.gz
Found /home/federico/Python_vsc_dir/RL_Trader/Experiments/resampled/USDT_BTC/100_levels/60s/2021-02-24.csv.gz
Found /home/federico/Python_vsc_dir/RL_Trader/Experiments/resampled/USDT_BTC/100_levels/60s/2021-02-25.csv.gz
Found /home/federico/Python_vsc_dir/RL_Trader/Experiments/resampled/USDT_BTC/100_levels/60s/2021-02-26.csv.gz
Found /home/federico/Python_vsc_dir/RL_Trader/Experiments/resampled/USDT_BTC/100_levels/60s/2021-02-27.csv.gz
Found /home/federico/Python_vsc_dir/RL_Trader/Experiments/resampled/USDT_BTC/100_levels/60s/2021-02-28.csv.gz
F

In [9]:
ddf_px['Datetime'] = pd.to_datetime(ddf_px['Datetime'], format='%Y-%m-%d %H:%M:%S')
ddf_trade['Datetime'] = pd.to_datetime(ddf_trade['Datetime'], format='%Y-%m-%d %H:%M:%S')

# merge in unique dataset
df_data = pd.merge(ddf_px, ddf_trade, left_on='Datetime', right_on='Datetime', how='left')
df_data.sort_values(by='Datetime', inplace=True)
df_data.set_index('Datetime', inplace=True)

df_missings = df_data[df_data.isna().sum(axis=1)>0] # minutes with no trades

# impute NAs - zero for size and last px for price. Handle NAs at the top of the df when importing data
trade_px_cols = ['av_price_buy', 'av_price_sell', 'wav_price_buy', 'wav_price_sell']
trade_size_cols = ['amount_buy', 'amount_sell']
trade_orders_cols = ['unique_orders_buy', 'unique_orders_sell', 'clips_buy', 'clips_sell']
df_data.loc[:,trade_size_cols+trade_orders_cols] = df_data.loc[:,trade_size_cols+trade_orders_cols].fillna(0)
df_data.loc[:,trade_px_cols] = df_data.loc[:,trade_px_cols].fillna(method='ffill')

#### Prepare for standardization

In [10]:
# column subset - group of input variables with similar distributions
std_px_cols = ['Ask_Price', 'Bid_Price', 'Mid_Price', 'av_price_buy', 'av_price_sell','wav_price_buy', 'wav_price_sell']

rege_size = re.compile('._Size_')
std_depth_size_cols = [col for col in df_data.columns if re.search(rege_size, col)]

std_trade_size_cols = ['amount_buy', 'amount_sell']

rege_order_book = re.compile('._Level_')
std_depth_level_cols = [col for col in df_data.columns if re.search(rege_order_book, col)]

std_number_trade_cols = ['unique_orders_buy', 'unique_orders_sell', 'clips_buy', 'clips_sell']

In [11]:
# perform dynamic z score standardizations
px_dyn_stdz = dp.standardize(df_data[std_px_cols], stdz_depth=1, norm_type=norm_type, roll=roll)

depth_size_dyn_stdz = dp.standardize(df_data[std_depth_size_cols], stdz_depth=1, norm_type=norm_type, roll=roll)

trd_size_dyn_stdz = dp.standardize(df_data[std_trade_size_cols], stdz_depth=1, norm_type=norm_type, roll=roll)

depth_level_dyn_stdz = dp.standardize(df_data[std_depth_level_cols], stdz_depth=1, norm_type=norm_type, roll=roll)

trade_number_dyn_stdz = dp.standardize(df_data[std_number_trade_cols], stdz_depth=1, norm_type=norm_type, roll=roll)

# merge dfs back together
df_data_dyn_stdz = pd.concat([px_dyn_stdz, depth_size_dyn_stdz, trd_size_dyn_stdz, depth_level_dyn_stdz, trade_number_dyn_stdz], axis=1)
df_data_dyn_stdz.dropna(how='all', inplace=True)

rolling window = 100800, calculate as roll: 14400 * levels: 1 * shape[1]: 7
rolling window = 115200, calculate as roll: 14400 * levels: 1 * shape[1]: 8
rolling window = 28800, calculate as roll: 14400 * levels: 1 * shape[1]: 2
rolling window = 115200, calculate as roll: 14400 * levels: 1 * shape[1]: 8
rolling window = 57600, calculate as roll: 14400 * levels: 1 * shape[1]: 4


#### Data check

In [12]:
# original data
df_data.describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
Ask_Price,203040.0,36019.022776,14337.492444,15277.981632,19600.256724,34804.703235,49105.028926,61740.648399
Bid_Price,203040.0,36009.832764,14334.765573,15277.09061,19596.589476,34787.654107,49093.214382,61718.386654
Mid_Price,203040.0,36014.42777,14336.128252,15277.536121,19598.49781,34796.519778,49100.067433,61729.517527
Ask_Level_5bps,203040.0,3.337318,2.030443,-1.0,2.0,3.0,4.0,78.0
Ask_Size_5bps,203040.0,2.481163,1.64171,0.0,1.356365,2.164992,3.374176,50.114798
Bid_Level_5bps,203040.0,3.12733,1.807063,-1.0,2.0,3.0,4.0,52.0
Bid_Size_5bps,203040.0,2.357019,1.896499,0.0,1.249238,2.081705,3.138688,65.469819
Ask_Level_10bps,203040.0,6.76185,3.576232,-1.0,5.0,6.0,8.0,99.0
Ask_Size_10bps,203040.0,3.726547,2.066005,0.0,2.29256,3.379917,4.805411,56.542259
Bid_Level_10bps,203040.0,6.394893,2.865228,-1.0,5.0,6.0,8.0,84.0


In [13]:
# dynamic z score
df_data_dyn_stdz.describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
Ask_Price,188639.0,0.647255,1.339293,-3.838798,-0.31682,0.873857,1.611924,5.248069
Bid_Price,188639.0,0.642601,1.339521,-3.849015,-0.32104,0.87044,1.608507,5.222277
Mid_Price,188639.0,0.644899,1.339348,-3.843443,-0.318838,0.872212,1.610003,5.234065
av_price_buy,188639.0,0.645699,1.339534,-3.834536,-0.317854,0.872502,1.61066,5.249792
av_price_sell,188639.0,0.643877,1.339317,-3.850752,-0.31946,0.871461,1.608955,5.231112
wav_price_buy,188639.0,0.644788,1.339507,-3.845164,-0.318817,0.871528,1.610073,5.226776
wav_price_sell,188639.0,0.644389,1.339179,-3.855362,-0.318226,0.871973,1.60953,5.259932
Ask_Size_5bps,188639.0,-0.808705,0.454065,-2.017977,-1.100673,-0.794499,-0.521137,20.759091
Bid_Size_5bps,188639.0,-0.846877,0.491098,-2.017952,-1.139443,-0.843154,-0.579898,17.946778
Ask_Size_10bps,188639.0,-0.374803,0.510313,-1.983688,-0.668189,-0.366589,-0.092078,23.653891
