# 6 Thomson-Reuters Tick History intraday data


In [1]:
import pandas as pd
import numpy as np

import re
import os

import pdb



### Parallelization

In [9]:
import dask
dask.config.set(scheduler="processes")

#@dask.delayed
def load_TRTH_trade(filename,
             tz_exchange="America/New_York",
             only_non_special_trades=True,
             only_regular_trading_hours=True,
             open_time="09:30:00",
             close_time="16:00:00",
             merge_sub_trades=True):
    try:
        if re.search('(csv|csv\\.gz)$',filename):
            DF = pd.read_csv(filename)
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search('parquet$',filename):
            DF = pd.read_parquet(filename)

    except Exception as e:
     #   print("load_TRTH_trade could not load "+filename)
     #   print(e)
        return None
    
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")
        print(e)
        return None

    
    if DF.shape[0]==0:
        return None
    
    if only_non_special_trades:
        DF = DF[DF["trade-stringflag"]=="uncategorized"]

    DF.drop(columns=["trade-rawflag","trade-stringflag"],axis=1,inplace=True)
    
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    
    if only_regular_trading_hours:
        DF=DF.between_time(open_time,close_time)    # warning: ever heard e.g. about Thanksgivings?
    
    if merge_sub_trades:
           DF=DF.groupby(DF.index).agg(trade_price=pd.NamedAgg(column='trade-price', aggfunc='mean'),
                                       trade_volume=pd.NamedAgg(column='trade-volume', aggfunc='sum'))
    
    return DF



#@dask.delayed
def load_TRTH_bbo(filename,
             tz_exchange="America/New_York",
             only_regular_trading_hours=True,
             merge_sub_trades=True):
    try:
        if re.search(r'(csv|csv\.gz)$',filename):
            DF = pd.read_csv(filename)
        if re.search(r'arrow$',filename):
            DF = pd.read_arrow(filename)
        if re.search(r'parquet$',filename):
            DF = pd.read_parquet(filename) 
    except Exception as e:
       # print("load_TRTH_bbo could not load "+filename)
        return None
    
    try:
        DF.shape
    except Exception as e: # DF does not exist
        print("DF does not exist")
        print(e)
        return None

    if DF.shape[0]==0:
        return None
    
        
    DF.index = pd.to_datetime(DF["xltime"],unit="d",origin="1899-12-30",utc=True)
    DF.index = DF.index.tz_convert(tz_exchange)  # .P stands for Arca, which is based at New York
    DF.drop(columns="xltime",inplace=True)
    
    if only_regular_trading_hours:
        DF=DF.between_time("09:30:00","16:00:00")    # ever heard about Thanksgivings?
        
    if merge_sub_trades:
        DF=DF.groupby(DF.index).last()
    

        
    return DF

In [10]:
import vaex

@dask.delayed
def load_merge_trade_bbo(ticker,date,
                         country="US",
                         dirBase="data/raw/TRTH/equities/",
                         suffix="parquet",
                         suffix_save=None,
                         dirSaveBase="data/clean/TRTH/equities/events",
                         saveOnly=False,
                         doSave=False
                        ):
    
    file_trade=dirBase+"/"+country+"/trade/"+ticker+"/"+str(date.date())+"-"+ticker+"-trade."+suffix
    file_bbo=file_trade.replace("trade","bbo")
    trades=load_TRTH_trade(file_trade)
    bbos  =load_TRTH_bbo(file_bbo)
    try:
        trades.shape + bbos.shape
    except:
        return None
    
    events=trades.join(bbos,how="outer")
    
    if doSave:
        dirSave=dirSaveBase+"/"+country+"/events/"+ticker
        if not os.path.isdir(dirSave):
            os.makedirs(dirSave)

        if suffix_save:
            suffix=suffix_save
        
        file_events=dirSave+"/"+str(date.date())+"-"+ticker+"-events"+"."+suffix
       # pdb.set_trace()

        saved=False
        if suffix=="arrow":
            events=vaex.from_pandas(events,copy_index=True)
            events.export_arrow(file_events)
            saved=True
        if suffix=="parquet":
         #   pdb.set_trace()
            events.to_parquet(file_events,use_deprecated_int96_timestamps=True)
            saved=True
            
        if not saved:
            print("suffix "+suffix+" : format not recognized")
            
        if saveOnly:
            return saved
    return events

In [11]:
from datetime import datetime

ticker="SPY.P"

startDate="2010-01-01"
endDate="2010-12-31"

datelist = pd.date_range(startDate,endDate).tolist()

In [12]:
%time allpromises=[load_merge_trade_bbo("SPY.P",date,saveOnly=True,doSave=True,suffix="parquet",suffix_save="arrow") for date in datelist]

CPU times: user 37.5 ms, sys: 172 µs, total: 37.7 ms
Wall time: 35.6 ms


Thus, it takes almost no time at all to create execution promises. Let us check that we really have promises:

In [13]:
allpromises[0]

Delayed('load_merge_trade_bbo-f59696f4-2848-4bba-bc5e-d67409e26de9')

To actually perform a computation, simply call the compute() function

In [14]:
%%time
allpromises[0].compute()

CPU times: user 9.96 ms, sys: 300 µs, total: 10.3 ms
Wall time: 2.44 s


Now, let us load all the files in a parallel way

In [15]:
%%time
alldata=dask.compute(allpromises) 

CPU times: user 1.86 s, sys: 8.74 ms, total: 1.87 s
Wall time: 52.5 s


#### Delayed, other ways


There are alternative ways to delay a function: use dask.delayed(some_function) directly. 

In [205]:
allpromises=[dask.delayed(pd.read_csv)(fn) for fn in allfiles]

or defined a delayed version of a function

In [12]:
load_TRTH_trade_delayed=dask.delayed(load_TRTH_trade)

In [13]:
del alldata  # cleanup




 

### Merge trades and bbo data

If one wishes to create a single dataframe, then one can proceeed in the following way. Note that it is not needed if one uses VAEX that can aggregate several files in a single virtual dataframe (see week 8).

In [14]:
trade_files=glob.glob("data/raw/TRTH/equities/US/trade/SPY.P/2009*")
trade_files.sort()

allpromises=[load_TRTH_trade(fn) for fn in trade_files]
trades=dask.compute(allpromises)[0]

trades=pd.concat(trades)

In [15]:
bbo_files=glob.glob("data/raw/TRTH/equities/US/bbo/SPY.P/2009*")
bbo_files.sort()

allpromises=[load_TRTH_bbo(fn) for fn in bbo_files]
bbos=dask.compute(allpromises)[0]

bbos=pd.concat(bbos)

In [None]:
%time events=trades.join(bbos,how="outer")

In [None]:
events.shape

(89048311, 6)

We are entering into the realms of big data. Let us save this object

In [18]:
# before saving a parquet object, we need to ensure that the columns are in numeric format
events["bid-price"]=events["bid-price"].values.astype("float")
events["bid-volume"]=events["bid-volume"].values.astype("float")
events["ask-price"]=events["ask-price"].values.astype("float")
events["ask-volume"]=events["ask-volume"].values.astype("float")

#so far, one still needs to add the use_deprectated_int96_timestamps option
events.to_arrow("SPY_2009_events.",use_deprecated_int96_timestamps=True,compression="brotli")

### VAEX to the rescue

In [86]:
try:
    if events.shape:
        del events
except:
    print("")




In [226]:
import vaex

df=vaex.open("data/clean/TRTH/equities/events/US/events/SPY.P/2010*arrow")
df

#,trade_price,trade_volume,bid-price,bid-volume,ask-price,ask-volume,index
0,,,90.44,89,90.46,64,2009-01-02 14:30:00.117999872
1,,,90.44,84,90.46,72,2009-01-02 14:30:00.117999872
2,,,90.44,84,90.45,5,2009-01-02 14:30:00.127999744
3,,,90.44,76,90.45,5,2009-01-02 14:30:00.127999744
4,,,90.44,40,90.45,10,2009-01-02 14:30:00.127999744
...,...,...,...,...,...,...,...
365559066,,,125.79,680,125.8,32,2010-12-31 20:59:59.978000128
365559067,,,125.79,689,125.8,32,2010-12-31 20:59:59.978000128
365559068,,,125.79,698,125.8,32,2010-12-31 20:59:59.989999616
365559069,,,125.79,689,125.8,32,2010-12-31 20:59:59.989999616


In [228]:
df.export("SPY_2009-2010_events.arrow",compression="brotli")   # 20Gb uncompressed