In [1]:
#import gzip
#import shutil
from pathlib import Path
#from urllib.request import urlretrieve
from datetime import datetime
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from struct import unpack
from collections import namedtuple, Counter
from datetime import timedelta
from time import time

In [2]:
### Set Data paths

In [3]:
data_path = Path('D:/') # set to e.g. external harddrive
itch_store = str(data_path / 'itch.h5')
order_book_store = data_path / 'order_book.h5'

In [4]:
data_path

WindowsPath('D:/')

In [5]:
## ITCH Format Settings
### System Event Codes – Daily

In [6]:
event_codes = {'O': 'Start of Messages',
               'S': 'Start of System Hours',
               'Q': 'Start of Market Hours',
               'M': 'End of Market Hours',
               'E': 'End of System Hours',
               'C': 'End of Messages'}

In [7]:
encoding = {'primary_market_maker': {'Y': 1, 'N': 0},
            'printable'           : {'Y': 1, 'N': 0},
            'buy_sell_indicator'  : {'B': 1, 'S': -1},
            'cross_type'          : {'O': 0, 'C': 1, 'H': 2},         # “O” = Opening Cross, “C” = Closing Cross, 
                                                                      # “H” = Cross for IPO and halted / paused securities
            'imbalance_direction' : {'B': 0, 'S': 1, 'N': 0, 'O': -1}}
                                                                      # “B” = buy imbalance, “S” = sell imbalance
                                                                      # “N” = no imbalance, “O” = Insufficient orders to calculate

In [8]:
### Formats Dictionary 
#### It is used to assemble the format strings

In [9]:
formats = { ('integer', 2): 'H',   # int of length 2 => format string 'H'
            ('integer', 4): 'I',
            ('integer', 6): '6s',  # int of length 6 => parse as string, convert later
            ('integer', 8): 'Q',
            ('alpha', 1)  : 's',
            ('alpha', 2)  : '2s',
            ('alpha', 4)  : '4s',
            ('alpha', 8)  : '8s',
            ('price_4', 4): 'I',
            ('price_8', 8): 'Q', }

In [10]:
#### Basic Cleaning

In [11]:
def clean_message_types(df):
    # transfer all the columns' name into lower case and remove all the leading and trailing spaces
    df.columns = [c.lower().strip() for c in df.columns]
    # remove all the leading and trailing spaces in the values
    df.value = df.value.str.strip()
    # clean all values in the name column 
    df.name = (df.name
               .str.strip()
               .str.lower()
               .str.replace(' ', '_')
               .str.replace('-', '_')
               .str.replace('/', '_'))
    # clean all values in the notes column 
    df.notes = df.notes.str.strip()
    # add message type column
    df['message_type'] = df.loc[df.name == 'message_type', 'value']
    return df

In [12]:
#### Load Message Types

In [13]:
message_types = clean_message_types(pd.read_excel('message_types.xlsx', sheet_name='messages', encoding='latin1')
                                    .sort_values('id').drop('id', axis=1))

In [14]:
message_types

Unnamed: 0,name,offset,length,value,notes,message_type
0,message_type,0,1,S,System Event Message,S
1,stock_locate,1,2,Integer,Always 0,
2,tracking_number,3,2,Integer,Nasdaq internal tracking number,
3,timestamp,5,6,Integer,Nanoseconds since midnight,
4,event_code,11,1,Alpha,See System Event Codes below,
5,message_type,0,1,R,Stock Directory Message,R
6,stock_locate,1,2,Integer,Locate Code uniquely assigned to the security ...,
7,tracking_number,3,2,Integer,Nasdaq internal tracking number,
8,timestamp,5,6,Integer,Time at which the directory message was genera...,
9,stock,11,8,Alpha,Denotes the security symbol for the issue in t...,


In [15]:
#### Get Message Labels

In [16]:
# extract the notes and message type columns that has value
message_labels = (message_types.loc[:, ['message_type', 'notes']]
                  .dropna()
                  .rename(columns={'notes': 'name'}))
# clean the notes column
message_labels.name = (message_labels.name
                       .str.lower()
                       .str.replace('message', '')
                       .str.replace('.', '')
                       .str.strip().str.replace(' ', '_'))
# message_labels.to_csv('message_labels.csv', index=False)

In [17]:
message_types

Unnamed: 0,name,offset,length,value,notes,message_type
0,message_type,0,1,S,System Event Message,S
1,stock_locate,1,2,Integer,Always 0,
2,tracking_number,3,2,Integer,Nasdaq internal tracking number,
3,timestamp,5,6,Integer,Nanoseconds since midnight,
4,event_code,11,1,Alpha,See System Event Codes below,
5,message_type,0,1,R,Stock Directory Message,R
6,stock_locate,1,2,Integer,Locate Code uniquely assigned to the security ...,
7,tracking_number,3,2,Integer,Nasdaq internal tracking number,
8,timestamp,5,6,Integer,Time at which the directory message was genera...,
9,stock,11,8,Alpha,Denotes the security symbol for the issue in t...,


In [18]:
# fill NA in the message_type column
message_types.message_type = message_types.message_type.ffill()
# remove the message type in the rows
message_types = message_types[message_types.name != 'message_type']
# clean the value column in the data frame
message_types.value = (message_types.value
                       .str.lower()
                       .str.replace(' ', '_')
                       .str.replace('(', '')
                       .str.replace(')', ''))
message_types.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 152 entries, 1 to 172
Data columns (total 6 columns):
name            152 non-null object
offset          152 non-null int64
length          152 non-null int64
value           152 non-null object
notes           152 non-null object
message_type    152 non-null object
dtypes: int64(2), object(4)
memory usage: 8.3+ KB


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self[name] = value


In [19]:
message_types

Unnamed: 0,name,offset,length,value,notes,message_type
1,stock_locate,1,2,integer,Always 0,S
2,tracking_number,3,2,integer,Nasdaq internal tracking number,S
3,timestamp,5,6,integer,Nanoseconds since midnight,S
4,event_code,11,1,alpha,See System Event Codes below,S
6,stock_locate,1,2,integer,Locate Code uniquely assigned to the security ...,R
7,tracking_number,3,2,integer,Nasdaq internal tracking number,R
8,timestamp,5,6,integer,Time at which the directory message was genera...,R
9,stock,11,8,alpha,Denotes the security symbol for the issue in t...,R
10,market_category,19,1,alpha,Indicates Listing market or listing market tie...,R
11,financial_status_indicator,20,1,alpha,"For Nasdaq listed issues, this field indicates...",R


In [20]:
### Get message specification

In [21]:
# Get ITCH specs and create formatting (type, length) tuples
specs = message_types
specs['formats'] = specs[['value', 'length']].apply(tuple, axis=1).map(formats)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  This is separate from the ipykernel package so we can avoid doing imports until


In [22]:
specs

Unnamed: 0,name,offset,length,value,notes,message_type,formats
1,stock_locate,1,2,integer,Always 0,S,H
2,tracking_number,3,2,integer,Nasdaq internal tracking number,S,H
3,timestamp,5,6,integer,Nanoseconds since midnight,S,6s
4,event_code,11,1,alpha,See System Event Codes below,S,s
6,stock_locate,1,2,integer,Locate Code uniquely assigned to the security ...,R,H
7,tracking_number,3,2,integer,Nasdaq internal tracking number,R,H
8,timestamp,5,6,integer,Time at which the directory message was genera...,R,6s
9,stock,11,8,alpha,Denotes the security symbol for the issue in t...,R,8s
10,market_category,19,1,alpha,Indicates Listing market or listing market tie...,R,s
11,financial_status_indicator,20,1,alpha,"For Nasdaq listed issues, this field indicates...",R,s


In [24]:
# Extract formatting for alpha numerical fields
alpha_fields = specs[specs.value == 'alpha'].set_index('name')
alpha_msgs = alpha_fields.groupby('message_type')
alpha_formats = {k: v.to_dict() for k, v in alpha_msgs.formats}
alpha_length = {k: v.add(7).to_dict() for k, v in alpha_msgs.length}

In [25]:
alpha_fields

Unnamed: 0_level_0,offset,length,value,notes,message_type,formats
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
event_code,11,1,alpha,See System Event Codes below,S,s
stock,11,8,alpha,Denotes the security symbol for the issue in t...,R,8s
market_category,19,1,alpha,Indicates Listing market or listing market tie...,R,s
financial_status_indicator,20,1,alpha,"For Nasdaq listed issues, this field indicates...",R,s
round_lots_only,25,1,alpha,Indicates if Nasdaq system limits order entry ...,R,s
issue_classification,26,1,alpha,Identifies the security class for the issue as...,R,s
issue_sub_type,27,2,alpha,Identifies the security sub-type for the issue...,R,2s
authenticity,29,1,alpha,Denotes if an issue or quoting participant rec...,R,s
short_sale_threshold_indicator,30,1,alpha,Indicates if a security is subject to mandator...,R,s
ipo_flag,31,1,alpha,Indicates if the Nasdaq security is set up for...,R,s


In [26]:
alpha_msgs

<pandas.core.groupby.groupby.DataFrameGroupBy object at 0x000001AE23129FD0>

In [27]:
alpha_formats

{'A': {'buy_sell_indicator': 's', 'stock': '8s'},
 'C': {'printable': 's'},
 'F': {'buy_sell_indicator': 's', 'stock': '8s', 'attribution': '4s'},
 'H': {'stock': '8s', 'trading_state': 's', 'reserved': 's', 'reason': '4s'},
 'I': {'imbalance_direction': 's',
  'stock': '8s',
  'cross_type': 's',
  'price_variation_indicator': 's'},
 'J': {'stock': '8s'},
 'K': {'stock': '8s', 'ipo_quotation_release_qualifier': 's'},
 'L': {'mpid': '4s',
  'stock': '8s',
  'primary_market_maker': 's',
  'market_maker_mode': 's',
  'market_participant_state': 's'},
 'P': {'buy_sell_indicator': 's', 'stock': '8s'},
 'Q': {'stock': '8s', 'cross_type': 's'},
 'R': {'stock': '8s',
  'market_category': 's',
  'financial_status_indicator': 's',
  'round_lots_only': 's',
  'issue_classification': 's',
  'issue_sub_type': '2s',
  'authenticity': 's',
  'short_sale_threshold_indicator': 's',
  'ipo_flag': 's',
  'luld_reference_price_tier': 's',
  'etp_flag': 's',
  'inverse_indicator': 's'},
 'S': {'event_code'

In [28]:
alpha_length

{'A': {'buy_sell_indicator': 8, 'stock': 15},
 'C': {'printable': 8},
 'F': {'buy_sell_indicator': 8, 'stock': 15, 'attribution': 11},
 'H': {'stock': 15, 'trading_state': 8, 'reserved': 8, 'reason': 11},
 'I': {'imbalance_direction': 8,
  'stock': 15,
  'cross_type': 8,
  'price_variation_indicator': 8},
 'J': {'stock': 15},
 'K': {'stock': 15, 'ipo_quotation_release_qualifier': 8},
 'L': {'mpid': 11,
  'stock': 15,
  'primary_market_maker': 8,
  'market_maker_mode': 8,
  'market_participant_state': 8},
 'P': {'buy_sell_indicator': 8, 'stock': 15},
 'Q': {'stock': 15, 'cross_type': 8},
 'R': {'stock': 15,
  'market_category': 8,
  'financial_status_indicator': 8,
  'round_lots_only': 8,
  'issue_classification': 8,
  'issue_sub_type': 9,
  'authenticity': 8,
  'short_sale_threshold_indicator': 8,
  'ipo_flag': 8,
  'luld_reference_price_tier': 8,
  'etp_flag': 8,
  'inverse_indicator': 8},
 'S': {'event_code': 8},
 'W': {'breached_level': 8},
 'Y': {'stock': 15, 'reg_sho_action': 8},


In [None]:
# Generate message classes as named tuples and format strings
message_fields, fstring = {}, {}
for t, message in specs.groupby('message_type'):
    message_fields[t] = namedtuple(typename=t, field_names=message.name.tolist())
    fstring[t] = '>' + ''.join(message.formats.tolist())

In [None]:
fstring

In [None]:
def format_alpha(mtype, data):
    """Process byte strings of type alpha"""

    for col in alpha_formats.get(mtype).keys():
        if mtype != 'R' and col == 'stock':
            data = data.drop(col, axis=1)
            continue
        data.loc[:, col] = data.loc[:, col].str.decode("utf-8").str.strip()
        if encoding.get(col):
            data.loc[:, col] = data.loc[:, col].map(encoding.get(col))
    return data

In [None]:
## Process Messages


In [None]:
def store_messages(m):
    """Handle occasional storing of all messages"""
    with pd.HDFStore(itch_store) as store:
        for mtype, data in m.items():
            # convert to DataFrame
            data = pd.DataFrame(data)

            # parse timestamp info
            data.timestamp = data.timestamp.apply(int.from_bytes, byteorder='big')
            data.timestamp = pd.to_timedelta(data.timestamp)

            # apply alpha formatting
            if mtype in alpha_formats.keys():
                data = format_alpha(mtype, data)

            s = alpha_length.get(mtype)
            if s:
                s = {c: s.get(c) for c in data.columns}
            dc = ['stock_locate']
            if m == 'R':
                dc.append('stock')
            store.append(mtype,
                         data,
                         format='t',
                         min_itemsize=s,
                         data_columns=dc)

In [None]:
messages = {}
message_count = 0
message_type_counter = Counter()

In [None]:
start = time()
data  = open('D:/01302019.NASDAQ_ITCH50', mode='rb')
while True:

    # determine message size in bytes
    message_size = int.from_bytes(data.read(2), byteorder='big', signed=False)

    # get message type by reading first byte
    message_type = data.read(1).decode('ascii')

    # create data structure to capture result
    if not messages.get(message_type):
        messages[message_type] = []

    message_type_counter.update([message_type])

    # read & store message
    record = data.read(message_size - 1)
    message = message_fields[message_type]._make(unpack(fstring[message_type], record))
    messages[message_type].append(message)

    # deal with system events
    if message_type == 'S':
        timestamp = int.from_bytes(message.timestamp, byteorder='big')
        print('\n', event_codes.get(message.event_code.decode('ascii'), 'Error'))
        print('\t{0}\t{1:,.0f}'.format(timedelta(seconds=timestamp * 1e-9),
                                     message_count))
        if message.event_code.decode('ascii') == 'C':
            store_messages(messages)
            break

    message_count += 1
    if message_count % 2.5e7 == 0:
        timestamp = int.from_bytes(message.timestamp, byteorder='big')
        print('\t{0}\t{1:,.0f}\t{2}'.format(timedelta(seconds=timestamp * 1e-9),
                                            message_count,
                                            timedelta(seconds=time() - start)))
        store_messages(messages)
        messages = {}


print(timedelta(seconds=time() - start))