In [3]:
import tensorflow.keras
import tensorflow as tf
import pandas as pd
import numpy as np
import os
from datetime import datetime
from datetime import timedelta
from keras.utils import np_utils



check_gpu = len(tf.config.list_physical_devices('GPU')) > 0
print('GPU is', 'available' if check_gpu else 'NOT available')

GPU is available


In [5]:
def identify_10_lvls(df):
    prices=[]
    quantities = []
    if df['type'].unique()[0] == 'a':
        index = df['price'].sort_values(ascending=True).index
        for i in index:
            if len(prices) < 10:
                if df['price'][i] not in prices:
                    prices.append(df['price'][i])
                    quantities.append(df['amount'][i])
                else:
                    quantities[-1] += df['amount'][i]
            else:
                break
        if len(prices) < 10:
            decimals = len(str(prices[-1]).split('.')[1]) if len(str(prices[-1]).split('.'))==2 else 0
            while len(prices) < 10:
                increment = 10**-decimals
                prices.append(round(prices[-1]+increment, decimals))
                quantities.append(0)
                
    elif df['type'].unique()[0] == 'b':
        index = df['price'].sort_values(ascending=False).index
        for i in index:
            if len(prices) < 10:
                if df['price'][i] not in prices:
                    prices.append(df['price'][i])
                    quantities.append(df['amount'][i])
                else:
                    quantities[-1] += df['amount'][i]
            else:
                break
        
        if len(prices) < 10:
            decimals = len(str(prices[-1]).split('.')[1]) if len(str(prices[-1]).split('.'))==2 else 0
            while len(prices) < 10:
                increment = 10**-decimals
                prices.append(round(prices[-1]-increment, decimals))
                quantities.append(0)
    #return pd.Series({'price': prices, 'amount': quantities})
    return pd.DataFrame({'price': prices, 'amount': quantities})
    
    #return prices

In [6]:
def reduce_mem_usage(props):
    start_mem_usg = props.memory_usage().sum() / 1024**2 
    print("Memory usage of properties dataframe is :",start_mem_usg," MB")
    NAlist = [] # Keeps track of columns that have missing values filled in. 
    for col in props.columns:
        if props[col].dtype != object and props[col].dtype != 'datetime64[ns]' and props[col].dtype != 'category':  # Exclude strings
            
            # Print current column type
            #print("******************************")
            #print("Column: ",col)
            #print("dtype before: ",props[col].dtype)
            
            # make variables for Int, max and min
            IsInt = False
            mx = props[col].max()
            mn = props[col].min()
            
            # Integer does not support NA, therefore, NA needs to be filled
            if not np.isfinite(props[col]).all(): 
                NAlist.append(col)
                with pd.option_context('mode.chained_assignment', None):
                    props.loc[:,col].fillna(mn-1,inplace=True)  
                   
            # test if column can be converted to an integer
            asint = props.loc[:,col].fillna(0).astype(np.int64)
            result = (props.loc[:,col] - asint)
            result = result.sum()
            if result > -0.01 and result < 0.01:
                IsInt = True

            
            # Make Integer/unsigned Integer datatypes
            if IsInt:
                if mn >= 0:
                    with pd.option_context('mode.chained_assignment', None):
                        if mx < 255:
                            props.loc[:,col] = props.loc[:,col].astype(np.uint8)
                        elif mx < 65535:
                            props.loc[:,col] = props.loc[:,col].astype(np.uint16)
                        elif mx < 4294967295:
                            props.loc[:,col] = props.loc[:,col].astype(np.uint32)
                        else:
                            props.loc[:,col] = props.loc[:,col].astype(np.uint64)
                else:
                    if mn > np.iinfo(np.int8).min and mx < np.iinfo(np.int8).max:
                        props.loc[:,col] = props[col].astype(np.int8)
                    elif mn > np.iinfo(np.int16).min and mx < np.iinfo(np.int16).max:
                        props.loc[:,col] = props[col].astype(np.int16)
                    elif mn > np.iinfo(np.int32).min and mx < np.iinfo(np.int32).max:
                        props.loc[:,col] = props[col].astype(np.int32)
                    elif mn > np.iinfo(np.int64).min and mx < np.iinfo(np.int64).max:
                        props.loc[:,col] = props[col].astype(np.int64)    
            
            # Make float datatypes 32 bit
            else:
                with pd.option_context('mode.chained_assignment', None):
                    props.loc[:,col] = props.loc[:,col].astype(np.float32)
            
            # Print new column type
            #print("dtype after: ",props[col].dtype)
            #print("******************************")
    
    # Print final result
    print("___MEMORY USAGE AFTER COMPLETION:___")
    mem_usg = props.memory_usage().sum() / 1024**2 
    print("Memory usage is: ",mem_usg," MB")
    print("This is ",100*mem_usg/start_mem_usg,"% of the initial size")
    
    return props

In [7]:
pair_list = ['BTCUSDT', 'BTCUSDC', 'DOGEUSDT', 'ETHBTC', 'ETHUSDC', 'ETHUSDT', 'LINKETH', 'LINKUSDT', 'LTCUSDC', 'LTCUSDT', 'USDCUSDT','XRPETH', 'XRPUSDC', 'XRPUSDT']
label_lookback = [2, 5, 10, 15, 20, 30, 50, 60, 100]
label_alpha = 0.0002
dirname = os.path.dirname(os.getcwd())
raw_data_dir = os.path.join(dirname, "Data", "Raw_Data")
cleaned_data_dir = os.path.join(dirname, "Data", "Clean_Data")
norm_data_dir = os.path.join(dirname, "Data", "Norm_Data")

def clean_file(raw_data_folder_path):
    for raw_file in os.listdir(raw_data_folder_path):
        if raw_file[-3:] == ".gz":
            pair_name = [pair for pair in pair_list if pair in raw_file][0]
            new_file = os.path.join(cleaned_data_dir, pair_name, raw_file.replace(".gz",""))
            if not os.path.exists(new_file):
                print(raw_file)
                df = pd.read_csv(os.path.join(raw_data_folder_path,raw_file), compression="gzip", on_bad_lines='skip')

                df = df.groupby(['date','type']).apply(lambda x: identify_10_lvls(x)).reset_index()
                df = df.groupby('date', group_keys=False).apply(lambda x: x.sort_values(['type','price'], ascending=[True, False])).reset_index(drop=True)
                df['level_2'] = df['level_2']+1
                #df['date'] = df['date'].apply(lambda x: datetime.fromtimestamp(x/1000))
                df['date'] = df['date'].apply(lambda x: datetime.utcfromtimestamp(x/1000))
                df = df.rename(columns={"level_2": "level"})
                df = df[['type', 'price', 'amount', 'level', 'date']]

                # The mid_price is only calculated after normalisation
                # mid_price = df[df['level']==1].groupby('date').apply(lambda x: x['price'].mean()).reset_index(name='mid_price')
                # df = df.merge(mid_price)
                df.to_csv(new_file, index=False)

def z_score(df):
    unique_days = df.ymd.unique()
    unique_days.sort()
    means_stds = pd.DataFrame(index = unique_days, columns = ['mean_p', 'std_p', 'mean_a', 'std_a', 'obs', 'first_obs', 'last_obs'])
    for day in unique_days:
        daysToInclude = [day - timedelta(days = d) for d in range(1,6) if day -timedelta(days = d) in unique_days]
        if len(daysToInclude) == 5:
            subset = df[np.isin(df['ymd'], daysToInclude)]
            subset = subset.reset_index(drop=True)
            means_stds.loc[day, 'mean_p'] = subset.price.mean()
            means_stds.loc[day, 'std_p'] = subset.price.std()
            means_stds.loc[day, 'mean_a'] = subset.amount.mean()
            means_stds.loc[day, 'std_a'] = subset.amount.std()
            #means_stds.loc[day, 'obs'] = subset.shape[0]
            #means_stds.loc[day, 'first_obs'] = subset.loc[0, 'price']
            #means_stds.loc[day, 'last_obs'] = subset.loc[subset.shape[0]-1,'price']
            
    df = pd.merge(df, means_stds, right_index=True, left_on='ymd')
    df['price_norm'] = (df['price'] - df['mean_p']) / df['std_p']
    df['amount_norm'] = (df['amount'] - df['mean_a']) / df['std_a']
    
    # applied to not normalized prices, as otherwise labels will be wrong due to negative signs for some mid prices
    
    mid_price = df[df['level']==1].groupby('date').apply(lambda x: x['price'].mean()).reset_index(name='mid_price')
    #mid_price = df[df['level']==1].groupby('date').apply(lambda x: x['price_norm'].mean()).reset_index(name='mid_price')
    
    df = df.merge(mid_price)
        
    return pd.DataFrame({'date': df.date, 'day': df.ymd, 'type': df.type, 'level': df.level, 'price_norm': df.price_norm, 'amount_norm': df.amount_norm, 'mid_price': df['mid_price']})
             
def dummy(dat):
    return 1
           
def normalize(clean_data_folder_path, replace_files = 0):
    files_sorted = os.listdir(clean_data_folder_path)
    files_sorted.sort()
    for file in [file for file in files_sorted if file[-4:] == ".par"]:
        pair_name = [pair for pair in pair_list if pair in file][0]
        new_file = os.path.join(norm_data_dir, pair_name+'.feather')
        if not os.path.exists(new_file) or replace_files:
            print(file)
            final_df = pd.read_parquet(os.path.join(clean_data_folder_path, file))
            final_df['date'] = final_df['date'].astype('datetime64[ns]')
            final_df['ymd'] = final_df['date'].apply(lambda x: x.date())
            final_df = pd.melt(final_df, id_vars=['date', 'level', 'ymd', 'ask_amount', 'bid_amount'], value_name='price', var_name='type')
            final_df = final_df.sort_values(['date', 'price'], ascending=[True, False])
            final_df['type'] = np.where(final_df['type']=='ask_price', 'a', 'b')
            final_df['amount'] = np.where(final_df['type']=='a', final_df['ask_amount'] , final_df['bid_amount'])
            final_df = final_df.drop(columns=['ask_amount', 'bid_amount'])
    
            norm_df = final_df.groupby(dummy).apply(lambda x: z_score(x))
            norm_df = norm_df.reset_index(drop=True)
    
    
            output = norm_df.set_index(['date', 'day', 'level', 'type', 'mid_price'], drop = True).unstack(['level', 'type'])
            output = output.sort_index(axis=1,level=[1,2,0],ascending=[True,True, False])
            output.columns = output.columns.get_level_values(2) + '_' + [str(i) for i in output.columns.get_level_values(1)]  + '_' + output.columns.get_level_values(0)
            output = output.reset_index()
    
            output.loc[:,~output.columns.isin(['date', 'day', 'mid_price'])] = output.loc[:,~output.columns.isin(['date', 'day', 'mid_price'])].astype(float)

    
            # Create labels
            for k in label_lookback:
                output[f"label_k_{k}"] = (output.rolling(k, closed='left').mid_price.mean().shift(-(k+1)) - output['mid_price']) / output['mid_price']
                output[f"label_k_{k}"] = np.where(output[f"label_k_{k}"] >= label_alpha, 1, np.where(output[f"label_k_{k}"] <= -label_alpha, 3, 2))
                output[f"label_k_{k}"] = output[f"label_k_{k}"].astype('category')

            output = output[output['a_1_price_norm'].notna()].reset_index(drop = True)
            output = reduce_mem_usage(output)

            output.to_feather(new_file)
    
    #return output
                        

In [1]:
# Clean the raw data in a certain folder
# In case of EOF Error (unexpected end of file): gzcat <file_name> | less > file.csv
# Then gzip the csv file and rename it to the old name: command to gzip: gzip file.csv

# gzip -dc test.csv.gz > file.csv


In [8]:
# Normalize the cleaned data in a certain folder
clean_data_folder_path = cleaned_data_dir
normalize(clean_data_folder_path, replace_files = 0)

Binance-BTCUSDC_full.par
Memory usage of properties dataframe is : 680.7682132720947  MB
___MEMORY USAGE AFTER COMPLETION:___
Memory usage is:  364.49121284484863  MB
This is  53.541162136952764 % of the initial size
Binance-BTCUSDT_full.par
Memory usage of properties dataframe is : 684.4319429397583  MB
___MEMORY USAGE AFTER COMPLETION:___
Memory usage is:  366.45281314849854  MB
This is  53.54116167847424 % of the initial size
Binance-DOGEUSDT_full.par
Memory usage of properties dataframe is : 544.0568628311157  MB
___MEMORY USAGE AFTER COMPLETION:___
Memory usage is:  291.29448413848877  MB
This is  53.54118365912635 % of the initial size
Binance-ETHBTC_full.par
Memory usage of properties dataframe is : 680.7338752746582  MB
___MEMORY USAGE AFTER COMPLETION:___
Memory usage is:  364.47282791137695  MB
This is  53.54116214127316 % of the initial size
Binance-ETHUSDC_full.par
Memory usage of properties dataframe is : 687.040620803833  MB
___MEMORY USAGE AFTER COMPLETION:___
Memory usa