Read raw data downloaded from binance historical data api. Then combine them and convert them to paquet file

In [6]:
import sys
sys.path.append("../")

In [7]:
import pandas as pd
import numpy as np
import zipfile
from pathlib import Path
import gc
from tqdm import tqdm
import pyarrow.parquet as pq
import pyarrow as pa
import psutil
import shutil
from python.yingruiz_config import KLINE_COLUMN_NAMES

In [4]:
def memory_usage():
    process = psutil.Process()
    r = process.memory_info().rss / (1024 ** 3)
    return r

def read_kline_data(data_dir, header):
    month_dir = Path(data_dir) / f"spot/monthly/klines"
    day_dir = Path(data_dir) / f"spot/daily/klines" 
    
    target_pairs_month = [path.name for path in month_dir.glob("*")]
    target_pairs_day = [path.name for path in day_dir.glob("*")]
    print(target_pairs_day)
    assert set(target_pairs_month) == set(target_pairs_day)
    
    intervals = [interval.name for interval in (month_dir / target_pairs_month[0]).glob("*")]
    
    data_dic = {}
    for pair in target_pairs_month:
        data_dic[pair] = {interval:None for interval in intervals}
    
    for pair in tqdm(data_dic):
        for interval in data_dic[pair]:
            temp_holder = []
            for zip_file in (month_dir / pair / interval).glob("**/*.zip"):
                temp_holder.append(pd.read_csv(zip_file, header=None).values)
            for zip_file in (day_dir / pair / interval).glob("**/*.zip"):
                temp_holder.append(pd.read_csv(zip_file, header =None).values)
            temp = pd.DataFrame(np.concatenate(temp_holder))

            temp.columns = header
            data_dic[pair][interval] = temp
            
        
        #print(memory_usage())
        gc.collect()
    return data_dic

def read_data(data_dir, target, header):
    # processs path
    assert target in {"aggTrades", "trades"}, f"valid targets are aggTrades, klines, trades"
    month_dir = Path(data_dir) / f"spot/monthly/{target}"
    day_dir = Path(data_dir) / f"spot/daily/{target}" 
    
    target_pairs_month = [path.name for path in month_dir.glob("*")]
    target_pairs_day = [path.name for path in day_dir.glob("*")]
    # print(target_pairs_day)
    assert set(target_pairs_month) == set(target_pairs_day)    
    
    
    target_pairs = target_pairs_month
    data_dic = {}
    for pair in target_pairs:
        data_dic[pair] = None

    for pair in tqdm(data_dic):
        temp_holder = []
        for zip_file in (month_dir / pair).glob("**/*.zip"):
            temp_holder.append(pd.read_csv(zip_file, header=None))
            #gc.collect()
            print(memory_usage(), zip_file)
        for zip_file in (day_dir / pair).glob("**/*.zip"):
            temp_holder.append(pd.read_csv(zip_file, header =None))
        temp = pd.concat(temp_holder, ignore_index=True)
        
        #print(count)
        # return temp
    
        temp.columns = header
        data_dic[pair] = temp
        
        # show this epoch memory usage
        # print(memory_usage())
        gc.collect()
    return data_dic

def read_trade_zip_and_store_to_arrow(data_dir, target, pair, header):
    assert target in {"aggTrades", "trades"}, f"valid targets are aggTrades, klines, trades"
    month_dir = Path(data_dir) / f"spot/monthly/{target}"
    day_dir = Path(data_dir) / f"spot/daily/{target}" 
    
    temp_holder = []
    for zip_file in (month_dir / pair).glob("**/*.zip"):
        temp_holder.append(pd.read_csv(zip_file, header=None))
        #gc.collect()
        # print(memory_usage(), zip_file)
    for zip_file in (day_dir / pair).glob("**/*.zip"):
        temp_holder.append(pd.read_csv(zip_file, header =None))
    temp = pd.concat(temp_holder, ignore_index=True)

    temp.columns = header
    
    new_save_dir = Path("/workspace/projects/binance-public-data/python/data/yingruiz_custom/klines")
    df = pa.Table.from_pandas(temp)
    path = new_save_dir / f"{pair}-{target}.parquet"
    pq.write_table(df, path)
    
    print(memory_usage(), shutil.disk_usage("/workspace"))

#### data dir

In [5]:
data_dir = Path("/workspace/projects/binance-public-data/python/data")

#### read all of kline data

In [None]:
kline_data = read_kline_data(data_dir=data_dir, header = KLINE_COLUMN_NAMES)

#### convert readed kline data to paquet file

In [None]:
def save_to_paquet_wrapper(save_dir = Path("/workspace/projects/binance-public-data/python/data/yingruiz_custom/klines"), kline_data = kline_data):
    # safe check
    previous = None 
    for pair in kline_data:
        if previous == None:
            previous = set(kline_data[pair].keys())
        assert previous == set(kline_data[pair].keys()), f"detect pair {pair} has a different intervals {kline_data[pair].keys()}"
        
    for pair in kline_data:
        for interval in kline_data[pair]:
            table = pa.Table.from_pandas(kline_data[pair][interval])
            path = save_dir / f"{pair}-{interval}-klines.parquet"
            pq.write_table(table, path)
            
    return

In [None]:
save_to_paquet_wrapper()

#### unsed functions

In [None]:
def read_trade_zip_and_store_to_arrow(data_dir, target, pair, header):
    assert target in {"aggTrades", "trades"}, f"valid targets are aggTrades, klines, trades"
    month_dir = Path(data_dir) / f"spot/monthly/{target}"
    day_dir = Path(data_dir) / f"spot/daily/{target}" 
    
    temp_holder = []
    for zip_file in (month_dir / pair).glob("**/*.zip"):
        temp_holder.append(pd.read_csv(zip_file, header=None))
        #gc.collect()
        # print(memory_usage(), zip_file)
    for zip_file in (day_dir / pair).glob("**/*.zip"):
        temp_holder.append(pd.read_csv(zip_file, header =None))
    temp = pd.concat(temp_holder, ignore_index=True)

    temp.columns = header
    
    new_save_dir = Path("/workspace/projects/binance-public-data/python/data/yingruiz_custom/klines")
    df = pa.Table.from_pandas(temp)
    path = new_save_dir / f"{pair}-{target}.parquet"
    pq.write_table(df, path)
    
    print(memory_usage(), shutil.disk_usage("/workspace"))