In [1]:
import psycopg2
import pandas as pd
import numpy as np
import re
from datetime import datetime
import pytz
import re
from operator import itemgetter

pd.set_option('display.max_columns', 999)

VALID_FEATURES = [
    'pair_id','open_time','close_time','dow','tod',
    'open','high','low','close',
    'number_of_trades','volume','quote_asset_volume','taker_buy_base_asset_volume','taker_buy_quote_asset_volume',
    'ma14','ma30','ma90',
    'sup14','sup30','sup90',
    'res14','res30','res90',
    'atr','atr_diff','atr_ma14',
    'rsi','rsi_diff','rsi_ma14',
    'trend_up','trend_up3','trend_up14','trend_up30',
    'cs_ss','cs_ssr','cs_hm','cs_hmr','cs_brh','cs_buh','cs_ebu','cs_ebr'
]

In [2]:
# local postgres connection only
import hidden
sql_string = hidden.psycopg2(hidden.secrets())
print('PostgreSQL connection data taken from hidden.py')

# Make the connection and cursor
conn = psycopg2.connect(sql_string, connect_timeout=3)

PostgreSQL connection data taken from hidden.py


In [3]:
# These are only used for caching
import hashlib
import pickle

def get_batch_data(base_coin, quote_coin, start_time, end_time, columns, batch_size=30000, extra_rows=0, use_cache=True):
    column_info = [(x,) + re.match('^(?P<feature>[a-z][a-z0-9]*(?:_[a-z][a-z0-9]*)*)(?:_(?P<shift>[0-9]{1,3}))?$',x).groups() for x in columns]
    max_lookback = max([(0 if x==None else int(x)) for _,_,x in column_info])
    s = base_coin+quote_coin+f"{start_time}"+f"{end_time}"+"".join(columns)+str(batch_size)+str(extra_rows)
    h = hashlib.md5(s.encode('utf-8')).hexdigest()
    should_use_cache = use_cache and (datetime.strptime(end_time, '%Y-%m-%d') < datetime.now(pytz.timezone('UTC')).replace(tzinfo=None))
    if should_use_cache:
        # Can use cache
        try:
            with open(f'../data/t2/{h}.pkl', 'rb') as fp:
                print(f"Using cache file: ../data/t2/{h}.pkl")
                return pickle.load(fp)
        except:
            print(f"No cache found")
            pass

    sql = f"""
select
    f.*, open_time, open, high, low, close, volume, close_time, quote_asset_volume, number_of_trades, taker_buy_base_asset_volume, taker_buy_quote_asset_volume	
from
    (
        (select * from (select id as the_pair from pairs p where p.coin1='{base_coin}' and p.coin2='{quote_coin}') z inner join candlestick_15m on the_pair=pair_id where close_time notnull and open_time < '{start_time}' order by open_time desc limit {max_lookback + extra_rows})
            union all
        (select * from (select id as the_pair from pairs p where p.coin1='{base_coin}' and p.coin2='{quote_coin}') z inner join candlestick_15m on the_pair=pair_id where close_time notnull and open_time between '{start_time}' and '{end_time}' order by open_time limit {batch_size})
    ) cm
inner join 
    features f on f.pair_id = cm.pair_id and f.candle_open_time = cm.open_time
order by
    open_time desc
"""
    base_df = pd.read_sql_query(sql, conn)
    df = base_df[['candle_open_time']].copy()
    for name, feature, shift in column_info:
        assert feature in VALID_FEATURES, f"Invalid feature: {feature} for {name}"
        df[name] = base_df[feature].shift((0 if shift==None else -int(shift)))
        
    if extra_rows == 0:
        extra_df = None
    else:
        extra_df = df.copy()
        extra_df['is_extra'] = ~extra_df['candle_open_time'].between(start_time, end_time)
        extra_df = extra_df.set_index('candle_open_time').sort_index()
        
    df = df[df['candle_open_time'].between(start_time, end_time)]
    df = df.set_index('candle_open_time').sort_index()

    ref_df = base_df[['open_time','open', 'high', 'low', 'close']].copy()
    ref_df = ref_df[ref_df['open_time'].between(start_time, end_time)]
    ref_df = ref_df.set_index('open_time').sort_index()

    batch_close_time = base_df['close_time'].max()
    
    if should_use_cache:
        print(f"Saving cache to: ../data/t2/{h}.pkl")
        with open(f'../data/t2/{h}.pkl', 'wb') as fp:
            pickle.dump((df, ref_df, extra_df, batch_close_time), fp, protocol=4)

    return df, ref_df, extra_df, batch_close_time

In [4]:
static_columns = ['open']
repeat_columns = ['high', 'low', 'close', 'rsi', 'trend_up3','trend_up14', 
                  'cs_ss','cs_ssr','cs_hm','cs_hmr','cs_ebu','cs_ebr']

columns = static_columns + [f"{rc}_{i}" for rc in repeat_columns for i in range(0,8)]

In [5]:
#columns

In [6]:
#len(repeat_columns)

In [7]:
mapping = {
    'ETHBTC':0,
    'BTCUSDT':1,
    'ETHUSDT':2,
    'BTCETH':-1,
    'USDTBTC':-2,
    'USDTETH':-3
}

In [8]:
from itertools import permutations

# download raw data pre-processing, this will be the same data that the simulator receives

for a,b in permutations(['ETH','BTC','USDT'],2):
    df, ref_df, _, _ = get_batch_data(a, b, '2018-01-01', '2021-08-07', columns, 500000, 0, True)
    df = df.astype(float)
    
    with open(f'../data/t2_{a}_{b}.pkl', 'wb') as fp:
        pickle.dump(df, fp, protocol=4)

Using cache file: ../data/t2/8ab4181b1214fa6a2afce44ba627789e.pkl
Using cache file: ../data/t2/a84bfae4281950e643216f415fc2075c.pkl
Using cache file: ../data/t2/ab0b63fbc788e447eb424e4bbb7e44fb.pkl
Using cache file: ../data/t2/c4a9096f8ad174bd1cdc357c444c3007.pkl
Using cache file: ../data/t2/eb764641cee5900041fbe5e702d4b715.pkl
Using cache file: ../data/t2/7914487a7ea3f192f96a3f0faf0cd739.pkl


In [9]:
# additional ETL here, like normalization, scaling and other stuff, also create rows for target use

dfs = []

for a,b in permutations(['ETH','BTC','USDT'],2):
    with open(f'../data/t2_{a}_{b}.pkl', 'rb') as fp:
        df = pickle.load(fp)
        df = df.astype(float)
    
        for c in df.columns:
            if c.startswith("trend_up"):
                d = 'tc2x_' + c.replace('_up','')
                df[d] = (df[c] - 0.5) * 2
            elif c.startswith("rsi_"):
                df['tc2x_'+c] = (df[c] - 50) / 50
            elif c.startswith("high_") or c.startswith("low_") or c.startswith("close_"):
                df['tc2x_'+c] = ((df[c] / df['open']) - 1) * 30
        
        flag_pairs = [('tc2x_ss','cs_ss','cs_ssr'),('tc2x_hm','cs_hm','cs_hmr'),('tc2x_eb','cs_ebu','cs_ebr')]
        
        for newp,p1,p2 in flag_pairs:
             for i in range(0,8):
                    df[f"{newp}_{i}"] = df[f"{p1}_{i}"] - df[f"{p2}_{i}"]
        
        shifts = []
        for i in range(1,96):
            shifts.append(((df['close_0'].shift(-i) / df['open']) - 1))
            #df[f'tc2y_close_{i}'] =((df['close_0'].shift(-i) / df['open']) - 1) * 30
        shift_df = pd.concat(shifts, axis=1).dropna()
        shift_df[(shift_df > -0.01) & (shift_df < 0.03)] = np.nan
        y = shift_df.bfill(axis=1).iloc[:,0] > 0
        
        df = df[[c for c in df.columns if c.startswith("tc2")]]
        df = df.clip(-1,1)
        df['y'] = y.astype(float)
        df['pair_id'] = mapping[f"{a}{b}"]
        df = df.reset_index().set_index(['pair_id','candle_open_time']).dropna()
        
        dfs.append(df)

df = pd.concat(dfs)
dfs = []

In [10]:
df['y'].quantile(np.linspace(0.0, 1.0, num=21))

0.00    0.0
0.05    0.0
0.10    0.0
0.15    0.0
0.20    0.0
0.25    0.0
0.30    0.0
0.35    0.0
0.40    0.0
0.45    0.0
0.50    0.0
0.55    0.0
0.60    0.0
0.65    0.0
0.70    0.0
0.75    0.0
0.80    1.0
0.85    1.0
0.90    1.0
0.95    1.0
1.00    1.0
Name: y, dtype: float64

In [11]:
with open(f'../data/t2/full.pkl', 'wb') as fp:
    pickle.dump(df, fp, protocol=4)

In [12]:
with open(f'../data/t2/train.pkl', 'wb') as fp:
    pickle.dump((df[df.index.get_level_values(1) < '2021-01-01']).reset_index(drop=True), fp, protocol=4)

In [13]:
with open(f'../data/t2/test.pkl', 'wb') as fp:
    pickle.dump((df[df.index.get_level_values(1) >= '2021-01-01']).reset_index(drop=True), fp, protocol=4)