In [1]:
from pathlib import Path
from collections import Counter
from datetime import timedelta
from datetime import datetime
from time import time

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns

sns.set_style('whitegrid')

In [2]:
def format_time(t):
    """return a formatted time string 'HH:MM:SS' 
        based on a numeric time() value"""
    m, s  =divmod(t, 60)
    h, m = divmod(m, 60)
    return f'{h:0>2.0f}:{m:0>2.0f}:{s:0>2.0f}'

Set Data paths
We will store the download in a data subdirectory and convert the result to hdf format (discussed in the last section of chapter 2).

In [16]:
data_path = Path('data') # set to e.g. external harddrive
itch_store = str(data_path / 'itch1.h5')
order_book_store = data_path / 'order_book.h5'
date = '10302019'

In [12]:
stock = 'AAPL'
order_dict = {-1:'sell', 1:'buy'}

The parsed messages allow us to rebuild the order flow for the given day. The 'R' message type contains a listing of all stocks traded during a given day, including information about initial public offerings (IPOs) and trading restrictions.

Throughout the day, new orders are added, and orders that are executed and canceled are removed from the order book. The proper accounting for messages that reference orders placed on a prior date would require tracking the order book over multiple days, but we are ignoring this aspect here.

Get all messages for given stock
The get_messages() function illustrates how to collect the orders for a single stock that affects trading (refer to the ITCH specification for details about each message):

In [21]:
def get_messages(date, stock=stock):
    """Collect trading messages for given stock"""
    with pd.HDFStore(itch_store) as store:
        # Fix: Use f-string to interpolate the stock variable
        stock_locate = store.select('R', where=f'stock = "{stock}"').stock_locate.iloc[0]
        
        # Fix: Use the actual stock_locate value, not the string
        query = f'stock_locate == {stock_locate}'
        
        data = {}
        # trading message types
        messages = ['A', 'F', 'E', 'C', 'X', 'D', 'U', 'P', 'Q']
        
        for m in messages:
            data[m] = store.select(m, where=query).drop('stock_locate', axis=1).assign(type=m)

    order_cols = ['order_reference_number', 'buy_sell_indicator', 'shares', 'price']
    orders = pd.concat([data['A'], data['F']], sort=False, ignore_index=True).loc[:, order_cols]

    for m in messages[2: -3]:
        data[m] = data[m].merge(orders, how='left')

    data['U'] = data['U'].merge(orders, how='left',
                                right_on='order_reference_number',
                                left_on='original_order_reference_number',
                                suffixes=['', '_replaced'])

    data['Q'].rename(columns={'cross_price': 'price'}, inplace=True)
    data['X']['shares'] = data['X']['cancelled_shares']
    data['X'] = data['X'].dropna(subset=['price'])

    data = pd.concat([data[m] for m in messages], ignore_index=True, sort=False)
    data['date'] = pd.to_datetime(date, format='%m%d%Y')
    data.timestamp = data['date'].add(data.timestamp)
    data = data[data.printable != 0]

    drop_cols = ['tracking_number', 'order_reference_number', 'original_order_reference_number',
                 'cross_type', 'new_order_reference_number', 'attribution', 'match_number',
                 'printable', 'date', 'cancelled_shares']
    return data.drop(drop_cols, axis=1).sort_values('timestamp').reset_index(drop=True)

In [24]:
messages = get_messages(date=date)
messages.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 813997 entries, 0 to 813996
Data columns (total 9 columns):
 #   Column              Non-Null Count   Dtype         
---  ------              --------------   -----         
 0   timestamp           813997 non-null  datetime64[ns]
 1   buy_sell_indicator  755911 non-null  object        
 2   shares              808721 non-null  float64       
 3   price               808721 non-null  float64       
 4   type                813997 non-null  object        
 5   executed_shares     54584 non-null   float64       
 6   execution_price     373 non-null     float64       
 7   shares_replaced     4886 non-null    float64       
 8   price_replaced      4886 non-null    float64       
dtypes: datetime64[ns](1), float64(6), object(2)
memory usage: 55.9+ MB


In [44]:
# Only convert 'type' to categorical to reduce the warning
messages['type'] = messages['type'].astype('category')

# Store with format='table'
with pd.HDFStore(order_book_store) as store:
    key = f'{stock}/messages'
    store.put(key, messages, format='table')
    print(store.info())

<class 'pandas.io.pytables.HDFStore'>
File path: data\order_book.h5
/AAPL/messages                                     frame_table  (typ->appendable,nrows->813997,ncols->9,indexers->[index],dc->[])  
/AAPL/messages/meta/values_block_1/meta            series_table (typ->appendable,nrows->9,ncols->1,indexers->[index],dc->[values]) 
/AAPL/messages/meta/values_block_3/meta            series_table (typ->appendable,nrows->2,ncols->1,indexers->[index],dc->[values]) 
/AAPL/sell                                         frame_table  (typ->appendable,nrows->81071608,ncols->2,indexers->[index],dc->[])
/AAPL/trades                                       frame        (shape->[72017,3])                                                 


In [30]:
def get_trades(m):
    """Combine C, E, P and Q messages into trading records"""
    trade_dict = {'executed_shares': 'shares', 'execution_price': 'price'}
    cols = ['timestamp', 'executed_shares']
    trades = pd.concat([m.loc[m.type == 'E', cols + ['price']].rename(columns=trade_dict),
                        m.loc[m.type == 'C', cols + ['execution_price']].rename(columns=trade_dict),
                        m.loc[m.type == 'P', ['timestamp', 'price', 'shares']],
                        m.loc[m.type == 'Q', ['timestamp', 'price', 'shares']].assign(cross=1),
                        ], sort=False).dropna(subset=['price']).fillna(0)
    return trades.set_index('timestamp').sort_index().astype(int)

In [50]:
# First, verify what buy_sell_indicator values exist in messages
print("Checking buy_sell_indicator values:")
print(messages['buy_sell_indicator'].value_counts(dropna=False))
print()

# Define required variables
order_dict = {-1: 'sell', 1: 'buy'}
stock = 'AAPL'

Checking buy_sell_indicator values:
buy_sell_indicator
1      425087
-1     330824
NaN     58086
Name: count, dtype: int64



In [31]:
trades = get_trades(messages)
print(trades.info())

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 72017 entries, 2019-10-30 04:00:02.486519868 to 2019-10-30 19:59:59.248635671
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype
---  ------  --------------  -----
 0   shares  72017 non-null  int64
 1   price   72017 non-null  int64
 2   cross   72017 non-null  int64
dtypes: int64(3)
memory usage: 2.2 MB
None


In [32]:
with pd.HDFStore(order_book_store) as store:
    store.put(f'{stock}/trades', trades)

In [51]:
def add_orders(orders, buysell, nlevels):
    """Add orders up to desired depth given by nlevels;
        sell in ascending, buy in descending order
    """
    new_order = []
    items = sorted(orders.copy().items())
    if buysell == 1:
        items = reversed(items)  
    for i, (p, s) in enumerate(items, 1):
        new_order.append((p, s))
        if i == nlevels:
            break
    return orders, new_order

In [38]:
def save_orders(orders, append=False):
    """Save order book data to HDF5 store"""
    cols = ['price', 'shares']
    for buysell, book in orders.items():
        # Check if book is empty before processing
        if not book:
            continue
            
        df = (pd.concat([pd.DataFrame(data=data,
                                     columns=cols)
                         .assign(timestamp=t)       
                         for t, data in book.items()]))
        key = f'{stock}/{order_dict[buysell]}'
        df.loc[:, ['price', 'shares']] = df.loc[:, ['price', 'shares']].astype(int)
        
        with pd.HDFStore(order_book_store) as store:
            if append:
                store.append(key, df.set_index('timestamp'), format='t')
            else:
                store.put(key, df.set_index('timestamp'))

In [52]:
order_book = {-1: {}, 1: {}}
current_orders = {-1: Counter(), 1: Counter()}
message_counter = Counter()
nlevels = 100

print("Starting order book processing...")
print("="*60)
start = time()

for message in messages.itertuples():
    i = message[0]
    
    # Save and reset every 100k messages
    if i % 1e5 == 0 and i > 0:
        elapsed = format_time(time() - start)
        print(f'{i:>10,} messages processed | Time: {elapsed}')
        
        # Save current batch
        if order_book[-1] or order_book[1]:
            save_orders(order_book, append=True)
        
        order_book = {-1: {}, 1: {}}
        start = time()
    
    # Skip if no buy/sell indicator
    if pd.isna(message.buy_sell_indicator):
        continue
    
    message_counter.update(message.type)
    if isinstance(message.buy_sell_indicator, str) or hasattr(message.buy_sell_indicator, 'categories'):
        # Categorical: 'B' = 1 (buy), 'S' = -1 (sell)
        buysell = 1 if str(message.buy_sell_indicator) == 'B' else -1
    else:
        # Numeric: already -1 or 1
        buysell = int(message.buy_sell_indicator)
    price, shares = None, None
    
    # Add new orders (A, F, U message types)
    if message.type in ['A', 'F', 'U']:
        price = int(message.price)
        shares = int(message.shares)
        
        current_orders[buysell].update({price: shares})
        current_orders[buysell], new_order = add_orders(current_orders[buysell], buysell, nlevels)
        order_book[buysell][message.timestamp] = new_order
    
    # Remove/modify orders (E, C, X, D, U message types)
    if message.type in ['E', 'C', 'X', 'D', 'U']:
        if message.type == 'U':
            if not pd.isna(message.shares_replaced):
                price = int(message.price_replaced)
                shares = -int(message.shares_replaced)
        else:
            if not pd.isna(message.price):
                price = int(message.price)
                shares = -int(message.shares)
        
        if price is not None:
            current_orders[buysell].update({price: shares})
            if current_orders[buysell][price] <= 0:
                current_orders[buysell].pop(price)
            current_orders[buysell], new_order = add_orders(current_orders[buysell], buysell, nlevels)
            order_book[buysell][message.timestamp] = new_order

Starting order book processing...
   100,000 messages processed | Time: 00:01:24
   200,000 messages processed | Time: 00:01:49
   300,000 messages processed | Time: 00:01:54
   400,000 messages processed | Time: 00:02:06
   500,000 messages processed | Time: 00:01:57
   600,000 messages processed | Time: 00:02:11
   700,000 messages processed | Time: 00:02:29
   800,000 messages processed | Time: 00:01:49


In [53]:
message_counter = pd.Series(message_counter)
print(message_counter)

A    357225
E     52781
D    318295
P     18861
X       576
F      2928
U      4886
C       359
dtype: int64


In [55]:
with pd.HDFStore(order_book_store) as store:
    print(store.info())

<class 'pandas.io.pytables.HDFStore'>
File path: data\order_book.h5
/AAPL/messages                                     frame_table  (typ->appendable,nrows->813997,ncols->9,indexers->[index],dc->[])   
/AAPL/messages/meta/values_block_1/meta            series_table (typ->appendable,nrows->9,ncols->1,indexers->[index],dc->[values])  
/AAPL/messages/meta/values_block_3/meta            series_table (typ->appendable,nrows->2,ncols->1,indexers->[index],dc->[values])  
/AAPL/sell                                         frame_table  (typ->appendable,nrows->149398744,ncols->2,indexers->[index],dc->[])
/AAPL/trades                                       frame        (shape->[72017,3])                                                  


In [56]:
with pd.HDFStore(order_book_store) as store:
    buy = store[f'{stock}/buy'].reset_index().drop_duplicates()
    sell = store[f'{stock}/sell'].reset_index().drop_duplicates()

KeyError: 'No object named AAPL/buy in the file'