In [1]:
from functools import reduce
from dask import delayed
import pandas as pd
import fastparquet
import distributed
import dask
import json
import gzip
import os

In [2]:
cluster = distributed.LocalCluster()
client = distributed.Client(cluster)
cluster

VBox(children=(HTML(value='<h2>LocalCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

In [13]:
OUTPUT_DIRECTORY = '/home/mikeokslonger/data_unseen'

In [14]:
def batch(l, n):
    return [list(l[i:i+n]) for i in range(0, len(l), n)]

def read_json_gz(path):
    return json.load(gzip.GzipFile(path))

def get_pairs(path):
    if type(path) == list:
        return reduce(lambda a, b: a.union(get_pairs(b)), list(path), set())
    return set(read_json_gz(path).keys())

def get_trades(paths):
    def __get_trade_df__(trades):
        df = pd.DataFrame(trades)
        df['id'] = df['id'].astype(int)
        df['price'] = df['price'].astype(float)
        df['quantity'] = df['quantity'].astype(float)
        df['buy'] = df['side'] == 'buy'
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        del df['side']
        return df
        
    def __get_trades__(path):
        doc = read_json_gz(path)
        date = int(path.split('/')[-2].replace('-', ''))
        name = int(''.join(path.split('/')[-2:]).replace('-', '').split('.')[0])
        pairs_no_error = [p for p in list(doc.keys()) if 'error' not in doc.get(p, {}).get('trades', {})]
        pairs_with_data = [p for p in pairs_no_error if len(doc.get(p, {}).get('trades', [])) > 0]
        dfs = [__get_trade_df__(doc[pair]['trades']).assign(pair=pair.replace('/', '-')).assign(time=name).assign(date=date)
               for pair in pairs_with_data]
        if dfs:
            return pd.concat(dfs)
    
    trade_dfs = [__get_trades__(p) for p in paths]
    if trade_dfs:
        df = pd.concat(trade_dfs).reset_index().drop('index', axis=1)
        fastparquet.write(f'{OUTPUT_DIRECTORY}/trades.parquet',
                          df, compression='snappy', file_scheme='hive',
                          partition_on=['pair', 'date'], write_index=False)

def get_orderbooks(paths):
    def __get_orderbook__(orderbooks):
        if 'ask' in orderbooks and 'bid' in orderbooks:
            asks = (orderbooks['ask'] + 10 * [{'price': 0.0, 'size': 0.0}])[:10]
            bids = (orderbooks['bid'] + 10 * [{'price': 0.0, 'size': 0.0}])[:10]
            return pd.DataFrame({'askprice': pd.to_numeric([d['price'] for d in asks]),
                                 'asksize': pd.to_numeric([d['size'] for d in asks]),
                                 'bidprice': pd.to_numeric([d['price'] for d in bids]),
                                 'bidsize': pd.to_numeric([d['size'] for d in bids])})
        return pd.DataFrame({'askprice': [], 'asksize': [], 'bidprice': [], 'bidsize': []})

    def __get_orderbooks__(path):
        doc = read_json_gz(path)
        date = int(path.split('/')[-2].replace('-', ''))
        name = int(''.join(path.split('/')[-2:]).replace('-', '').split('.')[0])
        dfs = [__get_orderbook__(doc[pair]['orderbook']).assign(pair=pair.replace('/', '-')).assign(time=name).assign(date=date)
               for pair in list(doc.keys())]
        df = pd.concat(dfs)
        return df
    
    df = pd.concat([__get_orderbooks__(p) for p in paths]).reset_index().drop('index', axis=1)
    fastparquet.write(f'{OUTPUT_DIRECTORY}/orderbooks.parquet',
                      df, compression='snappy', file_scheme='hive',
                      partition_on=['pair', 'date'], write_index=False)

In [53]:
get_trades(paths_by_date[4])

In [37]:
df = __get_trades__(paths_by_date[4][5])

In [9]:
data_paths = !find /home/mikeokslonger/hitbtc/ | grep json
paths_df = pd.DataFrame(data_paths, columns=['path'])
paths_df['directory'] = paths_df['path'].apply(lambda x: os.path.dirname(x))
paths_df['filename'] = paths_df['path'].apply(lambda x: os.path.basename(x))
paths_by_date = [list(df.path.values) for _, df in paths_df.groupby('directory')][13:]

In [10]:
### orderbooks
tasks = [delayed(get_orderbooks)(paths) for paths in paths_by_date]
futures = client.compute(tasks)
distributed.progress(futures)

VBox()

In [15]:
### trades
tasks = [delayed(get_trades)(paths) for paths in paths_by_date]
futures = client.compute(tasks)
distributed.progress(futures)

VBox()

ValueError('No objects to concatenate')