# TAQ data extract

In [1]:
# Modules

import numpy as np
import os
import pandas as pd
import multiprocessing as mp
from itertools import product

In [2]:
def taq_data_extract(ticker, type, year):
    """Extracts the data for every day in a year.

    Extracts the trades and quotes (TAQ) data for a day from a CSV file with
    the information of a whole year. The time range for each day is from 9:30
    to 16:00, that means, the open market time.

    :param ticker: string of the abbreviation of the stock to be analized
     (i.e. 'AAPL').
    :param type: string with the type of the data to be extracted
     (i.e. 'trades' or 'quotes').
    :param year: string of the year to be analyzed (i.e. '2016').
    :return: None -- The function extracts the data and does not return a
     value.
    """

    try:

        df = pd.DataFrame()
        chunksize = 10 ** 7

        init_date = f'01/01/{year}'
        last_date = f'12/31/{year}'

        # Use only the bussiness days
        dt = pd.date_range(start=init_date, end=last_date, freq='B')
        dt_df = dt.to_frame(index=False)
        date_list = dt_df[0].astype(str).tolist()

        # Load data
        csv_file = f'data/{ticker}_{year}_NASDAQ_{type}.csv'

        df_type = {'quotes': {
                        'Date': 'str',
                        'Time': 'int',
                        'Bid': 'int',
                        'Ask': 'int',
                        'Vol_Bid': 'int',
                        'Vol_Ask': 'int',
                        'Mode': 'int',
                        'Cond': 'str',
                    },
                   'trades': {
                        'Date': 'str',
                        'Time': 'int',
                        'Ask': 'int',
                        'Vol_Ask': 'int',
                        'Mode': 'int',
                        'Corr': 'int',
                        'Cond': 'str',
                    }}

        col_names = {'quotes': ['Date', 'Time', 'Bid', 'Ask', 'Vol_Bid',
                                'Vol_Ask', 'Mode', 'Cond'],
                     'trades': ['Date', 'Time', 'Ask', 'Vol_Ask', 'Mode',
                                'Corr', 'Cond']}

        # Save data
        if (not os.path.isdir(f'hdf5_dayly_data_{year}/')):

            try:
                os.mkdir(f'hdf5_dayly_data_{year}/')
                print('Folder to save data created')

            except FileExistsError:
                print('Folder exists. The folder was not created')

        for chunk in pd.read_csv(csv_file, chunksize=chunksize, sep='\s+',
                                 names=col_names[type], dtype=df_type[type],
                                 na_filter=False, low_memory=False):

            chunk['Date'] = pd.to_datetime(chunk['Date'], format='%Y-%m-%d')
            chunk.set_index('Date', inplace=True)
            if (type == 'quotes'):
                chunk.drop(['Mode', 'Cond'], axis=1, inplace=True)
            else:
                chunk.drop(['Mode', 'Corr', 'Cond'], axis=1, inplace=True)

            for date in date_list:
                day = chunk.index.isin([date])
                df = chunk.loc[day & (chunk['Time'] >= 34200)
                               & (chunk['Time'] < 57600)]

                if not df.empty:
                    df.to_hdf(f''.join(('hdf5_dayly_data_'
                              + f'{year}/taq_{ticker}_{type}_{date}.h5')
                              .split()), key=type,
                              format='table', append=True)

        print('Data Saved')
        print()

        return None

    except AssertionError:
        print('No data')
        print()
        return None

### The following blocks measure the time used to extract the data. As the function append new information every time the function is runned, I delete the files in every run of the block.

In [3]:
ticker_i = 'AAPL'
ticker_j = 'MSFT'
type_q = 'quotes'
type_t = 'trades'
year = '2008'

In [4]:
%%timeit
taq_data_extract(ticker_i, type_q, year)
os.system('rm hdf5_dayly_data_2008/*.h5')

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

12min 56s ± 1min 26s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [5]:
%%timeit
taq_data_extract(ticker_i, type_t, year)
os.system('rm hdf5_dayly_data_2008/*.h5')

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

2min 13s ± 2.18 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
%%timeit
taq_data_extract(ticker_j, type_q, year)
os.system('rm hdf5_dayly_data_2008/*.h5')

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

16min 56s ± 3min per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [7]:
%%timeit
taq_data_extract(ticker_j, type_t, year)
os.system('rm hdf5_dayly_data_2008/*.h5')

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

Data Saved

2min 1s ± 2.4 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


### The following block extract the data that will be computed by the other functions

In [8]:
tickers = ['AAPL', 'TAQ']

In [None]:
%time
with mp.Pool(processes=mp.cpu_count()) as pool:
    print('Extracting dayly data')
    pool.starmap(taq_data_extract,
                 product(tickers, ['quotes'], [year]))
    pool.starmap(taq_data_extract,
                 product(tickers, ['trades'], [year]))

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 13.1 µs
Extracting dayly data


Process ForkPoolWorker-13:
Process ForkPoolWorker-14:
Process ForkPoolWorker-11:
Process ForkPoolWorker-15:
Process ForkPoolWorker-12:
Process ForkPoolWorker-10:
Process ForkPoolWorker-16:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/tp/jchenaol/.pyenv/versions/3.6.5/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/tp/jchenaol/.pyenv/versions/3.6.5/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/home/tp/jchenaol/.pyenv/versions/3.6.5/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/tp/jchenaol/.pyenv/versions/3.6.5/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/tp/jchenaol/.pyenv/versions/3.6.

  File "/home/tp/jchenaol/.pyenv/versions/3.6.5/lib/python3.6/site-packages/tables/index.py", line 554, in _g_post_init_hook
    self.create_temp()
  File "/home/tp/jchenaol/.pyenv/versions/3.6.5/lib/python3.6/site-packages/tables/index.py", line 1013, in create_temp
    ".tmp", "pytables-", self.tmp_dir)
  File "/home/tp/jchenaol/.pyenv/versions/3.6.5/lib/python3.6/tempfile.py", line 342, in mkstemp
    return _mkstemp_inner(dir, prefix, suffix, flags, output_type)
  File "/home/tp/jchenaol/.pyenv/versions/3.6.5/lib/python3.6/tempfile.py", line 260, in _mkstemp_inner
    fd = _os.open(file, flags, 0o600)
KeyboardInterrupt
