In [1]:
import os
from pathlib import Path
def relative_path(abs_path: str) -> str:
    """
    Summary
        Returns the relative path from the current working directory to the file or directory given.

    Arguments:
        abs_path {str} -- Absolute path of the file or dir.

    Returns:
        str -- Path to the file or dir, relative to the current 
            working directory (not necessarily the same as this 
            file's directory). This allows the path to be used in
            python File I/O methods.
    """

    return os.path.relpath(abs_path, os.getcwd()) 


In [2]:
abs_path_to_data = os.path.join('/', 'home', 'mholloway', 'Desktop', 'old_sys', 'Desktop', 
                                'kline_data', '1m_data', 'BTCUSDT', 'BTCUSDT.csv')
rel_path_to_data = relative_path(abs_path_to_data); rel_path_to_data

'../old_sys/Desktop/kline_data/1m_data/BTCUSDT/BTCUSDT.csv'

In [3]:
import pandas as pd
btc_data = pd.read_csv(rel_path_to_data, index_col='Opened')
# btc_data.index = pd.to_datetime(btc_data.index)

In [4]:
btc_data.index

Index(['2019-06-01 00:00:00', '2019-06-01 00:01:00', '2019-06-01 00:02:00',
       '2019-06-01 00:03:00', '2019-06-01 00:04:00', '2019-06-01 00:05:00',
       '2019-06-01 00:06:00', '2019-06-01 00:07:00', '2019-06-01 00:08:00',
       '2019-06-01 00:09:00',
       ...
       '2020-05-20 04:16:00', '2020-05-20 04:17:00', '2020-05-20 04:18:00',
       '2020-05-20 04:19:00', '2020-05-20 04:20:00', '2020-05-20 04:21:00',
       '2020-05-20 04:22:00', '2020-05-20 04:23:00', '2020-05-20 04:24:00',
       '2020-05-20 04:25:00'],
      dtype='object', name='Opened', length=508530)

In [5]:
from typing import Dict

class StdCols:
    OPEN = 'open'
    HIGH = 'high'
    LOW = 'low'
    CLOSE = 'close'
    VOLUME = 'volume'
    DTIME = 'datetime'

class ReadDataOptions:
    def __init__(self,
                 data_to_std_names: Dict={'open': StdCols.OPEN, 'high':StdCols.HIGH, 'low':StdCols.LOW, 
                                          'close':StdCols.CLOSE, 'volume':StdCols.VOLUME, 'dtime':StdCols.DTIME},
                 dtime_format: str= '%Y-%m-%d %H:%M:%S'
                ):
        self.data_to_std_names = data_to_std_names
        self.dtime_format = dtime_format
        return

def read_asset_data(data_abs_path: str, options: ReadDataOptions):
    data_rel_path = relative_path(data_abs_path)
    data = pd.read_csv(data_rel_path)
    data = data.rename(columns=options.data_to_std_names)
    data.index = data[StdCols.DTIME]
    data.index = pd.to_datetime(data.index, format=options.dtime_format)
    data = data[[StdCols.OPEN, StdCols.HIGH, StdCols.LOW, StdCols.CLOSE, StdCols.VOLUME]]
    return data

In [6]:
ex_ = read_asset_data(abs_path_to_data, ReadDataOptions(
    {'Open':StdCols.OPEN, 'High':StdCols.HIGH, 'Low':StdCols.LOW, 
     'Close': StdCols.CLOSE, 'Volume': StdCols.VOLUME, 'Opened': StdCols.DTIME})); ex_

Unnamed: 0_level_0,open,high,low,close,volume
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2019-06-01 00:00:00,8555.00,8556.99,8547.57,8551.53,53.757876
2019-06-01 00:01:00,8550.11,8569.99,8550.02,8555.28,102.196624
2019-06-01 00:02:00,8554.08,8563.46,8553.57,8563.31,82.104908
2019-06-01 00:03:00,8563.31,8563.31,8550.15,8557.93,27.254395
2019-06-01 00:04:00,8557.93,8563.35,8551.63,8551.65,8.547643
...,...,...,...,...,...
2020-05-20 04:21:00,9724.72,9727.93,9724.03,9726.46,6.664670
2020-05-20 04:22:00,9725.92,9735.00,9725.85,9734.95,44.081942
2020-05-20 04:23:00,9734.95,9739.47,9734.95,9738.06,81.980793
2020-05-20 04:24:00,9738.31,9739.78,9735.32,9739.10,20.533022


In [7]:
def abs_path_from_name(crypto_name):
    return os.path.join('/', 'home', 'mholloway', 'Desktop', 'old_sys', 'Desktop', 
                                'kline_data', '1m_data', '{}USDT'.format(crypto_name), '{}USDT.csv'.format(crypto_name))
asset_names_ = ['BTC', 'LTC', 'ETH']
asset_paths_ = [relative_path(abs_path_from_name(crypto)) for crypto in asset_names_]

In [9]:
from typing import List, Union
import time
# synchronously read all asset data
def read_asset_datas(asset_names: str, data_abs_paths: List[str], all_options: Union[ReadDataOptions, List[ReadDataOptions]]):
    # type check and sanitize options input
    if (not isinstance(all_options, ReadDataOptions)) and (not isinstance(all_options, list)):
        raise ValueError('all_options has type {}, but it must either by of type {} or List[{}]'.format(
            type(all_options), ReadDataOptions, ReadDataOptions
        ))
    elif isinstance(all_options, ReadDataOptions):
        all_options = [all_options for _ in data_abs_paths] # if only one options is specified, infer same options for all data
    
    asset_to_data = {}
    for asset_name, data_abs_path, options in zip(asset_names, data_abs_paths, all_options):
        asset_to_data[asset_name] = read_asset_data(data_abs_path, options)
    return asset_to_data
start_ = time.time()
all_data_ = read_asset_datas(asset_names_, asset_paths_, ReadDataOptions(
    {'Open':StdCols.OPEN, 'High':StdCols.HIGH, 'Low':StdCols.LOW, 
     'Close': StdCols.CLOSE, 'Volume': StdCols.VOLUME, 'Opened': StdCols.DTIME}))
end_ = time.time()
print(f'Time to completion: {end_-start_}')

Time to completion: 3.4199752807617188


In [11]:
import multiprocessing as mp
# asynchronously read all asset data
def read_asset_datas_mp(asset_names: str, data_abs_paths: List[str], all_options: Union[ReadDataOptions, List[ReadDataOptions]]):
    # type check and sanitize options input
    if (not isinstance(all_options, ReadDataOptions)) and (not isinstance(all_options, list)):
        raise ValueError('all_options has type {}, but it must either by of type {} or List[{}]'.format(
            type(all_options), ReadDataOptions, ReadDataOptions
        ))
    elif isinstance(all_options, ReadDataOptions):
        all_options = [all_options for _ in data_abs_paths] # if only one options is specified, infer same options for all data
    
    asset_to_data = {}
    with mp.Pool() as pool:
        results = pool.starmap(read_asset_data, zip(data_abs_paths, all_options))
    return {asset_names[i] : results[i] for i in range(len(results))}
    
start_ = time.time()
all_data_ = read_asset_datas_mp(asset_names_, asset_paths_, ReadDataOptions(
    {'Open':StdCols.OPEN, 'High':StdCols.HIGH, 'Low':StdCols.LOW, 
     'Close': StdCols.CLOSE, 'Volume': StdCols.VOLUME, 'Opened': StdCols.DTIME}))
end_ = time.time()
print(f'Time to completion: {end_-start_}')

Time to completion: 2.0563271045684814


Looks like the multiprocessing version is much faster!

In [12]:
crypto_data = read_asset_datas_mp(asset_names_, asset_paths_, ReadDataOptions(
    {'Open':StdCols.OPEN, 'High':StdCols.HIGH, 'Low':StdCols.LOW, 
     'Close': StdCols.CLOSE, 'Volume': StdCols.VOLUME, 'Opened': StdCols.DTIME}))
crypto_data.keys()

dict_keys(['BTC', 'LTC', 'ETH'])

In [13]:
# mi_ = pd.MultiIndex.from_product([
#     list(crypto_data['BTC'].index), list(crypto_data.keys())
# ], names= ['datetime', 'asset_ids']); mi_

In [19]:
crypto_datas = [crypto_data[name] for name in asset_names_]


dict_keys(['BTC', 'LTC', 'ETH'])

In [115]:
sorted(res.index)

[Timestamp('2017-08-17 04:00:00'),
 Timestamp('2017-08-17 04:01:00'),
 Timestamp('2017-08-17 04:02:00'),
 Timestamp('2017-08-17 04:03:00'),
 Timestamp('2017-08-17 04:04:00'),
 Timestamp('2017-08-17 04:05:00'),
 Timestamp('2017-08-17 04:06:00'),
 Timestamp('2017-08-17 04:07:00'),
 Timestamp('2017-08-17 04:08:00'),
 Timestamp('2017-08-17 04:09:00'),
 Timestamp('2017-08-17 04:10:00'),
 Timestamp('2017-08-17 04:11:00'),
 Timestamp('2017-08-17 04:12:00'),
 Timestamp('2017-08-17 04:13:00'),
 Timestamp('2017-08-17 04:14:00'),
 Timestamp('2017-08-17 04:15:00'),
 Timestamp('2017-08-17 04:16:00'),
 Timestamp('2017-08-17 04:17:00'),
 Timestamp('2017-08-17 04:18:00'),
 Timestamp('2017-08-17 04:19:00'),
 Timestamp('2017-08-17 04:20:00'),
 Timestamp('2017-08-17 04:21:00'),
 Timestamp('2017-08-17 04:22:00'),
 Timestamp('2017-08-17 04:23:00'),
 Timestamp('2017-08-17 04:24:00'),
 Timestamp('2017-08-17 04:25:00'),
 Timestamp('2017-08-17 04:26:00'),
 Timestamp('2017-08-17 04:27:00'),
 Timestamp('2017-08-

In [104]:
crypto_merged['indx']=crypto_merged.index
crypto_merged.set_index(['indx', 'name'])

NotImplementedError: > 1 ndim Categorical are not supported at this time

In [88]:
first_time = crypto_merged.index[0]
last_name = crypto_merged.columns.levels[0][-1]; last_name

'ETH'

In [89]:
crypto_merged[last_name]

Unnamed: 0_level_0,open,high,low,close,volume,name
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2017-08-17 04:00:00,301.13,301.13,301.13,301.13,0.42643,ETH
2017-08-17 04:01:00,301.13,301.13,301.13,301.13,2.75787,ETH
2017-08-17 04:02:00,300.00,300.00,300.00,300.00,0.09930,ETH
2017-08-17 04:03:00,300.00,300.00,300.00,300.00,0.31389,ETH
2017-08-17 04:04:00,301.13,301.13,301.13,301.13,0.23202,ETH
...,...,...,...,...,...,...
2020-05-20 22:15:00,,,,,,
2020-05-20 22:16:00,,,,,,
2020-05-20 22:17:00,,,,,,
2020-05-20 22:18:00,,,,,,


In [21]:
ex_ = pd.Series([1, 'apple'])