# Parallel computations: dask

In [4]:
import pandas as pd
import numpy as np

import glob

allfiles=glob.glob("C:\\Projects\\FIN-525\\data\\intraday\\equities\\US\\bbo\\SPY.P\*")

### Dask in practice

In [5]:
import dask
dask.config.set(scheduler="processes")

@dask.delayed
def load_TRTH_trade(filename,
             tz_exchange="America/New_York",
             only_non_special_trades=True,
             only_regular_trading_hours=True,
             open_time="09:30:00",
             close_time="16:00:00",
             merge_sub_trades=True):
    
    DF = pd.read_csv(filename)

    if only_non_special_trades:
        DF = DF[DF["trade-stringflag"]=="uncategorized"]

    DF.drop(columns=["trade-rawflag","trade-stringflag"],axis=1,inplace=True)
    
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    
    if only_regular_trading_hours:
        DF=DF.between_time(open_time,close_time)    # warning: ever heard e.g. about Thanksgivings?
    
    if merge_sub_trades:
           DF=DF.groupby(DF.index).agg(trade_price=pd.NamedAgg(column='trade-price', aggfunc='mean'),
                                       trade_volume=pd.NamedAgg(column='trade-volume', aggfunc='sum'))
    
    return DF


In [6]:

allpromises=[load_TRTH_trade(fn) for fn in allfiles]    # this takes not time at all


In [7]:
allpromises                                             # as indeed nothing much happens

[Delayed('load_TRTH_trade-ce513b02-4fe9-48d0-8e70-fa8d9d399944'),
 Delayed('load_TRTH_trade-1525a48c-f0af-46a2-8bc5-f034b57dcca5'),
 Delayed('load_TRTH_trade-57e47422-7088-4028-99b8-f13153590b12'),
 Delayed('load_TRTH_trade-3c0defec-6ffb-4909-9413-4fc2059f7952'),
 Delayed('load_TRTH_trade-378275ff-1b0c-4a31-a5c5-f34756bdc4fd'),
 Delayed('load_TRTH_trade-3aa3cf77-4c04-4cd0-8219-d0f937cc79f4'),
 Delayed('load_TRTH_trade-a9b5877f-bf70-4a48-94cd-54ad5d7cdb79'),
 Delayed('load_TRTH_trade-f279e858-f849-44ba-a2f3-2da7e62bf143'),
 Delayed('load_TRTH_trade-17d833d3-862b-4408-8dd6-ba7c948a1648')]

In [9]:
alldata=dask.compute(allpromises)                       # now the computations take place. Monitor your CPU !

KeyError: 'trade-stringflag'