# Split by date and Deduplicate

In [1]:
import datetime
import glob
import gzip
import json
import os
import shutil
import subprocess
import zipfile

import dask.bag as db
from dask.diagnostics import ProgressBar
from typing import List

In [2]:
MONTH = '2020-09'
INPUT_DIR = f'/data/crawler/trade-{MONTH}'
OUTPUT_DIR = f'/data/split/trade-{MONTH}'
SORTED_DIR = f'/data/json/trade-{MONTH}'

In [3]:
assert os.path.exists(INPUT_DIR)
if os.path.exists(OUTPUT_DIR):
    shutil.rmtree(OUTPUT_DIR)
if os.path.exists(SORTED_DIR):
    shutil.rmtree(SORTED_DIR)

## Split by date

In [4]:
def split(input_file:str, output_dir:str)->None:
    ss = os.path.basename(input_file).split('.')
    exchange_global = ss[0]
    market_type_global = ss[1]
    pair_global = ss[2]

    if input_file.endswith('.json.gz'):
        f = gzip.open(input_file, 'rt')
        lines = f.readlines()
        f.close()
    elif input_file.endswith('.zip'):
        try:
            zf = zipfile.ZipFile(input_file, 'r')
        except zipfile.BadZipFile:
            # raise ValueError(input_file)
            return
        assert len(zf.namelist()) == 1
        lines = zf.read(zf.namelist()[0]).decode('UTF-8').split('\n')
        zf.close()
    elif input_file.endswith('.json') or input_file.endswith('file.log'):
        f = open(input_file, 'rt')
        lines = f.readlines()
        f.close()
    else:
        raise ValueError('Unknown file suffix ' + input_file)

    output_file_pool = {}
    for line in lines:
        line = line.strip()
        if line:
            try:
                obj = json.loads(line)
            except json.JSONDecodeError as ex:
                continue  # TODO: why !!!
            if obj['exchange'] == 'Bitfinex' and obj['marketType'] == 'Futures':
                obj['marketType'] = 'Swap'  # bugfix for Bitfinex
                line = json.dumps(obj)
            elif obj['exchange'] == 'WhaleEx' and obj['trade_id'] != obj['raw']['tradeId']:
                obj['trade_id'] = str(obj['raw']['tradeId'])  # bugfix for WhaleEx
                line = json.dumps(obj)

            date_str = datetime.datetime.fromtimestamp(obj['timestamp']/1000.0).isoformat()[0:10]
            if date_str[:-3] != MONTH:
                continue  # discard timeout records

            exchange = obj['exchange']
            market_type = obj['marketType']
            pair = obj['pair']
            assert exchange == exchange_global
            assert market_type == market_type_global
            assert pair == pair_global
            rawPair = obj['rawPair']
            filename = f'{exchange}.{market_type}.{pair}.{rawPair}' if market_type == 'Futures' else f'{exchange}.{market_type}.{pair}'

            output_file = os.path.join(output_dir, f'{filename}.{date_str}.json')
            if output_file in output_file_pool:
                file_object = output_file_pool[output_file]
            else:
                file_object = open(output_file, 'at')
                output_file_pool[output_file] = file_object
            file_object.write(line + '\n')

    for file, file_object in output_file_pool.items():
        file_object.close()
    del lines

In [5]:
# split(f'{INPUT_DIR}/Huobi.Spot.EOS_USDT.2020-06-21.zip', OUTPUT_DIR)

In [6]:
def split_one_group(csv_files: List[str], output_dir: str)->None:
    '''Work on files within the same exchange, market_type and pair in a single process.'''
    for file in csv_files:
        split(file, output_dir)

def split_multi(input_dir:str, output_dir:str)->None:
    json_files = [f for f in glob.glob(os.path.join(input_dir, "**/*.json"), recursive=True)]
    zip_files = [f for f in glob.glob(os.path.join(input_dir, "**/*.zip"), recursive=True)]
    gz_files = [f for f in glob.glob(os.path.join(input_dir, "**/*.json.gz"), recursive=True)]
    log_files = [f for f in glob.glob(os.path.join(input_dir, "**/file.log"), recursive=True)]
    files = json_files+zip_files+gz_files+log_files

    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
    os.mkdir(output_dir)

    # exchange_market_pair -> files
    exchange_market_pair = {}
    for file in files:
        ss = os.path.basename(file).split('.')
        exchange = ss[0]
        market_type = ss[1]
        pair = ss[2]
        key = f'{exchange}.{market_type}.{pair}'
        if key not in exchange_market_pair:
            exchange_market_pair[key] = []
        exchange_market_pair[key].append(file)

    with ProgressBar():
        db.from_sequence(list(exchange_market_pair.keys())).map(lambda key: split_one_group(exchange_market_pair[key], output_dir)).compute()

In [7]:
split_multi(INPUT_DIR, OUTPUT_DIR)

[########################################] | 100% Completed | 19min 19.6s


## Deduplicate and sort

In [8]:
def dedup_and_sort(input_file:str, ouput_file:str)->None:
    trade_map = {};
    f = open(input_file, 'rt')
    for line in f:
        line = line.strip()
        if line:
            obj = json.loads(line)
            exchange = obj['exchange']
            market_type = obj['marketType']
            pair = obj['pair']
            raw_pair = obj['rawPair']
            trade_id = obj['trade_id']
            if not trade_id:  # Fix trade_id for Kraken, MXC
                if exchange == 'Kraken' or exchange == 'MXC':
                    obj['trade_id'] = str(obj['timestamp'])
                    trade_id = obj['trade_id']
                    line = json.dumps(obj)
            if not trade_id:
                f.close()
                raise ValueError(line)
            key = f'{exchange}-{market_type}-{pair}-{raw_pair}-{trade_id}'
            trade_map[key] = {'line': line, 'sort_key': int(trade_id) if exchange!='BitMEX' else obj['timestamp']}
    f.close()

    trade_array = list(trade_map.values())
    del trade_map
    trade_array.sort(key=lambda x: x['sort_key'])

    f = open(ouput_file, 'wt')
    for item in trade_array:
        f.write(item['line'] + '\n')
    del trade_array
    f.close()

In [9]:
def dedup_and_sort_wrapper(input_file:str, output_dir:str)->None:
    date_str = input_file.split('.')[-2]
    date_dir = os.path.join(output_dir, date_str)
    os.makedirs(date_dir, exist_ok = True)
    dedup_and_sort(input_file, os.path.join(date_dir, os.path.basename(input_file)))

In [10]:
def dedup_sort_multi(input_dir:str, output_dir:str)->None:
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
    os.mkdir(output_dir)
    files = [f for f in glob.glob(os.path.join(input_dir, "**/*.json"), recursive=True)]
    with ProgressBar():
        db.from_sequence(files).map(lambda file: dedup_and_sort_wrapper(file, output_dir)).compute()

In [11]:
dedup_sort_multi(OUTPUT_DIR, SORTED_DIR)

[########################################] | 100% Completed | 11min 49.4s
