### Parse files from AWS S3
#### Define a function that prints a list of specified S3 files

In [2]:
import boto3

def list_s3_files(bucket_name, path, key, secret):

    bucket_name = bucket_name
    path = path

    s3 = boto3.resource('s3', aws_access_key_id=key, aws_secret_access_key=secret)
    bucket = s3.Bucket(bucket_name)

    s3_files=[]
    for object_summary in bucket.objects.filter(Prefix=path):
        if object_summary.key.endswith('gz'):
            s3_files.append(object_summary.key)

    print('Number of files: {}'.format(len(s3_files)))
    return s3_files

cl_files = list_s3_files(bucket_name=bucket, path=path, 
             key=key, secret=secret)
cl_files

Number of files: 898


['data/tickdata/CL/cl_zipped/BBO_CLF6_20151118.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151119.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151120.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151123.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151124.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151125.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151126.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151127.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151130.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151201.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151202.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151203.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151204.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151207.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151208.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151209.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151210.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLF6_20151211.t

#### Sort files by date
Use regular expressions to sort the file names.

In [3]:
from os import listdir
from os.path import isfile, join
import pandas as pd
import re
import itertools

def order_files_by_date_s3(s3_files):
    s3_file_list=[]
    for f in s3_files:
        s3_file_list.append(str(pd.to_datetime(str(f[-15:].strip('.txt.gz'))).date()))
    
    s3_dates=[]
    for f in s3_file_list:
        s3_dates.append(f.replace('-', '')) # replace - with nothing so it is the same format as the file

    # Match the file name by sorted dates
    sorted_file_list=[]
    for date in sorted(s3_dates):
        r = re.compile(".*(" + date + ").*")
        new_file = filter(r.match, s3_files)
        sorted_file_list.append(new_file)
       
    return list(itertools.chain.from_iterable(sorted_file_list))
        
cl_files_sorted = order_files_by_date_s3(cl_files)
CL = cl_files_sorted
print(len(cl_files))
cl_files_sorted

898


['data/tickdata/CL/cl_zipped/BBO_CLG5_20150102.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG5_20150105.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG5_20150106.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG5_20150107.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG5_20150108.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG5_20150109.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG5_20150112.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG5_20150113.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG5_20150114.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH5_20150115.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH5_20150116.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH5_20150119.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH5_20150120.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH5_20150121.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH5_20150122.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH5_20150123.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH5_20150126.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH5_20150127.t

#### Group the list into files by year
This way will allow me to have separate parquet files at the end of the script, one for each year starting from 2015.

In [4]:
def select_yearly_data(ticker, year):

    lst=[]
    for f in ticker:
        if f[-15:].strip('.txt.gz').startswith(year):
            lst.append(f)
                
    return lst

print(len(select_yearly_data(ticker=CL, year='2016')))
select_yearly_data(ticker=CL, year='2016')

255


['data/tickdata/CL/cl_zipped/BBO_CLG6_20160104.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG6_20160105.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG6_20160106.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG6_20160107.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG6_20160108.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG6_20160111.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG6_20160112.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG6_20160113.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLG6_20160114.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH6_20160115.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH6_20160118.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH6_20160119.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH6_20160120.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH6_20160122.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH6_20160125.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH6_20160126.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH6_20160127.txt.gz',
 'data/tickdata/CL/cl_zipped/BBO_CLH6_20160128.t

#### Define a function that concatenates the CL files from AWS S3 into one large dataframe for year 2015

In [5]:
import io

def concatenate_s3_files(s3_files):
    
    cols = ['Datetime', 'BidPx', 'BidSz', 'AskPx', 'AskSz', 'Flag', 'BidOrders', 
            'AskOrders', 'RptSeqId', 'MatchEventIndicator', 'Nanos']

    conn = boto3.client('s3', aws_access_key_id=key, 
                        aws_secret_access_key=secret)
    
    df_init = pd.DataFrame()
    counter=1
    for f in s3_files: # change this to the full list when connecting to ec2 cluster
        print('S3 file {} of {}'.format(counter, len(s3_files)))

        obj = conn.get_object(Bucket=bucket, Key=f)
        df_i = pd.read_csv(io.BytesIO(obj['Body'].read()), compression='gzip', names=cols)
        df_concatenated = pd.concat([df_init, df_i], axis=0)
        df_init = df_concatenated

        counter+=1

    df = df_init
    
    return df

#### Run the concatenating function on CL
This will take several hours using a memory optimized server on EC2 <br> I am using x1.16xlarge -> this has 64 cores and 976gb ram. I need a lot good amount of memory to run this code.

In [None]:
import dask.dataframe as dd

if __name__=='__main__':
    
    date_range=['2015', '2016', '2017', '2018']
    for year in date_range:
        print('Year: {}'.format(year+'...'))
        df = concatenate_s3_files(s3_files=select_yearly_data(ticker=CL, year=year))
        df.set_index('Datetime', inplace=True) # set datetime to be the index
        df.index = pd.to_datetime(df.index, unit='ms')
        df.sort_index(inplace=True) # sort index by datetime
        ddf = dd.from_pandas(df, npartitions=len(select_yearly_data(ticker=CL, year=year)))
        dd.to_parquet(ddf,
                      path=path,
                      engine='fastparquet',
                      storage_options={'key':key, 'secret':secret})

Year: 2015:
S3 file 1 of 258
S3 file 2 of 258
S3 file 3 of 258
S3 file 4 of 258
S3 file 5 of 258
S3 file 6 of 258
S3 file 7 of 258
S3 file 8 of 258
S3 file 9 of 258
S3 file 10 of 258
S3 file 11 of 258
S3 file 12 of 258
S3 file 13 of 258
S3 file 14 of 258
S3 file 15 of 258
S3 file 16 of 258
S3 file 17 of 258
S3 file 18 of 258
S3 file 19 of 258
S3 file 20 of 258
S3 file 21 of 258
S3 file 22 of 258
S3 file 23 of 258
S3 file 24 of 258
S3 file 25 of 258
S3 file 26 of 258
S3 file 27 of 258
S3 file 28 of 258
S3 file 29 of 258
S3 file 30 of 258
S3 file 31 of 258
S3 file 32 of 258
S3 file 33 of 258
S3 file 34 of 258
S3 file 35 of 258
S3 file 36 of 258
S3 file 37 of 258
S3 file 38 of 258
S3 file 39 of 258
S3 file 40 of 258
S3 file 41 of 258
S3 file 42 of 258
S3 file 43 of 258
S3 file 44 of 258
S3 file 45 of 258
S3 file 46 of 258
S3 file 47 of 258
S3 file 48 of 258
S3 file 49 of 258
S3 file 50 of 258
S3 file 51 of 258
S3 file 52 of 258
S3 file 53 of 258
S3 file 54 of 258
S3 file 55 of 258
S3 file

S3 file 189 of 255
S3 file 190 of 255
S3 file 191 of 255
S3 file 192 of 255
S3 file 193 of 255
S3 file 194 of 255
S3 file 195 of 255
S3 file 196 of 255
S3 file 197 of 255
S3 file 198 of 255
S3 file 199 of 255
S3 file 200 of 255
S3 file 201 of 255
S3 file 202 of 255
S3 file 203 of 255
S3 file 204 of 255
S3 file 205 of 255
S3 file 206 of 255
S3 file 207 of 255
S3 file 208 of 255
S3 file 209 of 255
S3 file 210 of 255
S3 file 211 of 255
S3 file 212 of 255
S3 file 213 of 255
S3 file 214 of 255
S3 file 215 of 255
S3 file 216 of 255
S3 file 217 of 255
S3 file 218 of 255
S3 file 219 of 255
S3 file 220 of 255
S3 file 221 of 255
S3 file 222 of 255
S3 file 223 of 255
S3 file 224 of 255
S3 file 225 of 255
S3 file 226 of 255
S3 file 227 of 255
S3 file 228 of 255
S3 file 229 of 255
S3 file 230 of 255
S3 file 231 of 255
S3 file 232 of 255
S3 file 233 of 255
S3 file 234 of 255
S3 file 235 of 255
S3 file 236 of 255
S3 file 237 of 255
S3 file 238 of 255
S3 file 239 of 255
S3 file 240 of 255
S3 file 241 

#### I lost internet connection to kernel while parsing year 2017, so continue running the above code from year 2017

In [None]:
import dask.dataframe as dd

if __name__=='__main__':
    
    date_range=['2017', '2018']
    for year in date_range:
        print('Year: {}'.format(year+'...'))
        df = concatenate_s3_files(s3_files=select_yearly_data(ticker=CL, year=year))
        df.set_index('Datetime', inplace=True) # set datetime to be the index
        df.index = pd.to_datetime(df.index, unit='ms')
        df.sort_index(inplace=True) # sort index by datetime
        ddf = dd.from_pandas(df, npartitions=len(select_yearly_data(ticker=CL, year=year)))
        dd.to_parquet(ddf,
                      path=path,
                      engine='fastparquet',
                      storage_options={'key':key, 'secret':secret})

Year: 2017...
S3 file 1 of 257
S3 file 2 of 257
S3 file 3 of 257
S3 file 4 of 257
S3 file 5 of 257
S3 file 6 of 257
S3 file 7 of 257
S3 file 8 of 257
S3 file 9 of 257
S3 file 10 of 257
S3 file 11 of 257
S3 file 12 of 257
S3 file 13 of 257
S3 file 14 of 257
S3 file 15 of 257
S3 file 16 of 257
S3 file 17 of 257
S3 file 18 of 257
S3 file 19 of 257
S3 file 20 of 257
S3 file 21 of 257
S3 file 22 of 257
S3 file 23 of 257
S3 file 24 of 257
S3 file 25 of 257
S3 file 26 of 257
S3 file 27 of 257
S3 file 28 of 257
S3 file 29 of 257
S3 file 30 of 257
S3 file 31 of 257
S3 file 32 of 257
S3 file 33 of 257
S3 file 34 of 257
S3 file 35 of 257
S3 file 36 of 257
S3 file 37 of 257
S3 file 38 of 257
S3 file 39 of 257
S3 file 40 of 257
S3 file 41 of 257
S3 file 42 of 257
S3 file 43 of 257
S3 file 44 of 257
S3 file 45 of 257
S3 file 46 of 257
S3 file 47 of 257
S3 file 48 of 257
S3 file 49 of 257
S3 file 50 of 257
S3 file 51 of 257
S3 file 52 of 257
S3 file 53 of 257
S3 file 54 of 257
S3 file 55 of 257
S3 fi

#### And... I lost internet connection again... restart from year 2018

In [6]:
import dask.dataframe as dd

if __name__=='__main__':
    
    date_range=['2018']
    for year in date_range:
        print('Year: {}'.format(year+'...'))
        df = concatenate_s3_files(s3_files=select_yearly_data(ticker=CL, year=year))
        df.set_index('Datetime', inplace=True) # set datetime to be the index
        df.index = pd.to_datetime(df.index, unit='ms')
        df.sort_index(inplace=True) # sort index by datetime
        ddf = dd.from_pandas(df, npartitions=len(select_yearly_data(ticker=CL, year=year)))
        dd.to_parquet(ddf,
                      path=path,
                      engine='fastparquet',
                      storage_options={'key':key, 'secret':secret})

Year: 2018...
S3 file 1 of 128
S3 file 2 of 128
S3 file 3 of 128
S3 file 4 of 128
S3 file 5 of 128
S3 file 6 of 128
S3 file 7 of 128
S3 file 8 of 128
S3 file 9 of 128
S3 file 10 of 128
S3 file 11 of 128
S3 file 12 of 128
S3 file 13 of 128
S3 file 14 of 128
S3 file 15 of 128
S3 file 16 of 128
S3 file 17 of 128
S3 file 18 of 128
S3 file 19 of 128
S3 file 20 of 128
S3 file 21 of 128
S3 file 22 of 128
S3 file 23 of 128
S3 file 24 of 128
S3 file 25 of 128
S3 file 26 of 128
S3 file 27 of 128
S3 file 28 of 128
S3 file 29 of 128
S3 file 30 of 128
S3 file 31 of 128
S3 file 32 of 128
S3 file 33 of 128
S3 file 34 of 128
S3 file 35 of 128
S3 file 36 of 128
S3 file 37 of 128
S3 file 38 of 128
S3 file 39 of 128
S3 file 40 of 128
S3 file 41 of 128
S3 file 42 of 128
S3 file 43 of 128
S3 file 44 of 128
S3 file 45 of 128
S3 file 46 of 128
S3 file 47 of 128
S3 file 48 of 128
S3 file 49 of 128
S3 file 50 of 128
S3 file 51 of 128
S3 file 52 of 128
S3 file 53 of 128
S3 file 54 of 128
S3 file 55 of 128
S3 fi

#### Read in parquet files and save them as pandas dataframes in memory
The functionality for what I will be doing later is much more straight forward in pandas, and since I have the ram to save it in pandas I don't need to use dask for this.

In [7]:
import dask.dataframe as dd
ddf_2015 = dd.read_parquet(path+'CL_2015_raw.parquet',
                storage_options={'key':key, 'secret':secret})
df_2015 = pd.DataFrame(ddf_2015.compute())
df_2015.head()

Unnamed: 0_level_0,BidPx,BidSz,AskPx,AskSz,Flag,BidOrders,AskOrders,RptSeqId,MatchEventIndicator,Nanos
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2015-01-01 22:58:04.873,53.68,8,53.68,13,0,4,6,6984066,0,
2015-01-01 22:58:21.292,53.74,7,53.68,13,0,3,6,6984074,0,
2015-01-01 22:58:21.292,53.74,7,53.69,2,0,3,2,6984076,0,
2015-01-01 22:58:21.292,53.74,7,53.85,2,0,3,1,6984077,0,
2015-01-01 22:58:21.292,53.74,7,53.85,2,0,3,1,6984078,0,


In [8]:
ddf_2016 = dd.read_parquet(path+'CL_2016_raw.parquet',
                storage_options={'key':key, 'secret':secret})
df_2016 = pd.DataFrame(ddf_2016.compute())
df_2016.head()

Unnamed: 0_level_0,BidPx,BidSz,AskPx,AskSz,Flag,BidOrders,AskOrders,RptSeqId,MatchEventIndicator,Nanos
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2016-01-03 22:58:05.026,37.5,58,37.5,117,0,16,33,681,4,201193
2016-01-03 22:58:06.531,37.5,58,37.5,116,0,16,32,685,4,313176
2016-01-03 22:58:09.678,37.5,58,37.5,115,0,16,31,687,4,342596
2016-01-03 22:58:11.288,37.5,58,37.5,116,0,16,32,689,4,484797
2016-01-03 22:58:12.568,37.5,58,37.5,117,0,16,33,690,4,916149


In [9]:
ddf_2017 = dd.read_parquet(path+'CL_2017_raw.parquet',
                storage_options={'key':key, 'secret':secret})
df_2017 = pd.DataFrame(ddf_2017.compute())
df_2017.head()

Unnamed: 0_level_0,BidPx,BidSz,AskPx,AskSz,Flag,BidOrders,AskOrders,RptSeqId,MatchEventIndicator,Nanos
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2017-01-02 22:46:00.025,53.96,9,53.96,13,0,3,10,67,4,719300
2017-01-02 22:46:10.623,53.96,4,53.96,13,0,3,10,70,4,474881
2017-01-02 22:47:01.748,53.94,9,53.96,13,0,3,10,75,4,683617
2017-01-02 22:47:01.748,53.94,9,53.94,10,0,3,3,76,4,683617
2017-01-02 22:47:07.123,53.96,4,53.94,10,0,3,3,80,4,656489


In [10]:
ddf_2018 = dd.read_parquet(path+'CL_2018_raw.parquet',
                storage_options={'key':key, 'secret':secret})
df_2018 = pd.DataFrame(ddf_2018.compute())
df_2018.head()

Unnamed: 0_level_0,BidPx,BidSz,AskPx,AskSz,Flag,BidOrders,AskOrders,RptSeqId,MatchEventIndicator,Nanos
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2018-01-01 22:45:03.885,60.1,2,60.2,2,0,2,1,68,4,516288
2018-01-01 22:45:04.511,60.1,2,60.2,3,0,2,2,70,4,117824
2018-01-01 22:45:09.321,60.1,2,60.2,2,0,2,1,71,4,100288
2018-01-01 22:46:00.052,60.1,2,60.11,2,0,2,1,72,4,694528
2018-01-01 22:47:32.338,60.1,5,60.11,2,0,3,1,75,4,120960


#### Read in the stored parquet files and create a parsed parquet file for each date
Parse the Flag column to extract the aggressor side

In [11]:
import numpy as np

def extract_aggressor(df):
    
    aggressor_flag = np.bitwise_and(df['Flag'], 0b110)
    df['Aggressor'] = 0
    df.loc[aggressor_flag == 0b010, 'Aggressor'] = 1
    df.loc[aggressor_flag == 0b100, 'Aggressor'] = -1 
    
    return df

if __name__=='__main__':
    df_2015 = extract_aggressor(df_2015)
    df_2016 = extract_aggressor(df_2016)
    df_2017 = extract_aggressor(df_2017)
    df_2018 = extract_aggressor(df_2018)

df_2015.head()

Unnamed: 0_level_0,BidPx,BidSz,AskPx,AskSz,Flag,BidOrders,AskOrders,RptSeqId,MatchEventIndicator,Nanos,Aggressor
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2015-01-01 22:58:04.873,53.68,8,53.68,13,0,4,6,6984066,0,,0
2015-01-01 22:58:21.292,53.74,7,53.68,13,0,3,6,6984074,0,,0
2015-01-01 22:58:21.292,53.74,7,53.69,2,0,3,2,6984076,0,,0
2015-01-01 22:58:21.292,53.74,7,53.85,2,0,3,1,6984077,0,,0
2015-01-01 22:58:21.292,53.74,7,53.85,2,0,3,1,6984078,0,,0


In [12]:
df_2016.head()

Unnamed: 0_level_0,BidPx,BidSz,AskPx,AskSz,Flag,BidOrders,AskOrders,RptSeqId,MatchEventIndicator,Nanos,Aggressor
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2016-01-03 22:58:05.026,37.5,58,37.5,117,0,16,33,681,4,201193,0
2016-01-03 22:58:06.531,37.5,58,37.5,116,0,16,32,685,4,313176,0
2016-01-03 22:58:09.678,37.5,58,37.5,115,0,16,31,687,4,342596,0
2016-01-03 22:58:11.288,37.5,58,37.5,116,0,16,32,689,4,484797,0
2016-01-03 22:58:12.568,37.5,58,37.5,117,0,16,33,690,4,916149,0


In [13]:
df_2017.head()

Unnamed: 0_level_0,BidPx,BidSz,AskPx,AskSz,Flag,BidOrders,AskOrders,RptSeqId,MatchEventIndicator,Nanos,Aggressor
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2017-01-02 22:46:00.025,53.96,9,53.96,13,0,3,10,67,4,719300,0
2017-01-02 22:46:10.623,53.96,4,53.96,13,0,3,10,70,4,474881,0
2017-01-02 22:47:01.748,53.94,9,53.96,13,0,3,10,75,4,683617,0
2017-01-02 22:47:01.748,53.94,9,53.94,10,0,3,3,76,4,683617,0
2017-01-02 22:47:07.123,53.96,4,53.94,10,0,3,3,80,4,656489,0


In [14]:
df_2018.head()

Unnamed: 0_level_0,BidPx,BidSz,AskPx,AskSz,Flag,BidOrders,AskOrders,RptSeqId,MatchEventIndicator,Nanos,Aggressor
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2018-01-01 22:45:03.885,60.1,2,60.2,2,0,2,1,68,4,516288,0
2018-01-01 22:45:04.511,60.1,2,60.2,3,0,2,2,70,4,117824,0
2018-01-01 22:45:09.321,60.1,2,60.2,2,0,2,1,71,4,100288,0
2018-01-01 22:46:00.052,60.1,2,60.11,2,0,2,1,72,4,694528,0
2018-01-01 22:47:32.338,60.1,5,60.11,2,0,3,1,75,4,120960,0


### Extract trades from Flag and AskSz column
#### Create a separate dataframe of only trades

In [15]:
def get_trades(df):

    trades_mask = df['AskSz'] == -1
    trades = (df[trades_mask]
              .drop(columns=['AskPx', 'AskSz', 'Flag', 'BidCount', 'AskCount'], errors='ignore')
              .rename(columns={'BidPx': 'Trade Price', 'BidSz': 'Volume'}))

    return trades

trades_2015 = get_trades(df_2015)
trades_2016 = get_trades(df_2016)
trades_2017 = get_trades(df_2017)
trades_2018 = get_trades(df_2018)
trades_2015.head()

Unnamed: 0_level_0,Trade Price,Volume,BidOrders,AskOrders,RptSeqId,MatchEventIndicator,Nanos,Aggressor
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2015-01-01 23:00:00.256,53.76,1,-1,-1,6984325,0,,0
2015-01-01 23:00:00.256,53.76,1,-1,-1,6984324,0,,0
2015-01-01 23:00:00.256,53.76,1,-1,-1,6984323,0,,0
2015-01-01 23:00:00.256,53.76,2,-1,-1,6984322,0,,0
2015-01-01 23:00:00.256,53.76,3,-1,-1,6984321,0,,0


#### Create separate dataframe with no trades

In [16]:
def get_bbo_updates(df):

    trades_mask = df['AskSz'] == -1
    updates = df[~trades_mask].drop(columns=['Aggressor', 'MatchEventIndicator'], errors='ignore')

    return updates

non_trades_2015 = get_bbo_updates(df_2015)
non_trades_2016 = get_bbo_updates(df_2016)
non_trades_2017 = get_bbo_updates(df_2017)
non_trades_2018 = get_bbo_updates(df_2018)

non_trades_2015.head()

Unnamed: 0_level_0,BidPx,BidSz,AskPx,AskSz,Flag,BidOrders,AskOrders,RptSeqId,Nanos
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2015-01-01 22:58:04.873,53.68,8,53.68,13,0,4,6,6984066,
2015-01-01 22:58:21.292,53.74,7,53.68,13,0,3,6,6984074,
2015-01-01 22:58:21.292,53.74,7,53.69,2,0,3,2,6984076,
2015-01-01 22:58:21.292,53.74,7,53.85,2,0,3,1,6984077,
2015-01-01 22:58:21.292,53.74,7,53.85,2,0,3,1,6984078,


#### Verify that the trades and non_trades dataframes sum up to the total size of the original df. All of these outputs must be True to continue!

In [17]:
print(len(trades_2015) + len(non_trades_2015) == len(df_2015)) # Should be True!
print(len(trades_2016) + len(non_trades_2016) == len(df_2016)) # Should be True!
print(len(trades_2017) + len(non_trades_2017) == len(df_2017)) # Should be True!
print(len(trades_2018) + len(non_trades_2018) == len(df_2018)) # Should be True!

True
True
True
True


#### Concatenate the trades and non_trades dataframes, then sort by index
This will take up a lot of memory...

In [18]:
def concat_trades_and_updates(trades, non_trades):
    
    df = pd.concat([trades, non_trades], axis=0)
    df.sort_index(inplace=True)
    
    return df

df_2015 = concat_trades_and_updates(trades_2015, non_trades_2015)
df_2016 = concat_trades_and_updates(trades_2016, non_trades_2016)
df_2017 = concat_trades_and_updates(trades_2017, non_trades_2017)
df_2018 = concat_trades_and_updates(trades_2018, non_trades_2018)
df_2018.head()

Unnamed: 0_level_0,Aggressor,AskOrders,AskPx,AskSz,BidOrders,BidPx,BidSz,Flag,MatchEventIndicator,Nanos,RptSeqId,Trade Price,Volume
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2018-01-01 22:45:03.885,,1,60.2,2.0,2,60.1,2.0,0.0,,516288,68,,
2018-01-01 22:45:04.511,,2,60.2,3.0,2,60.1,2.0,0.0,,117824,70,,
2018-01-01 22:45:09.321,,1,60.2,2.0,2,60.1,2.0,0.0,,100288,71,,
2018-01-01 22:46:00.052,,1,60.11,2.0,2,60.1,2.0,0.0,,694528,72,,
2018-01-01 22:47:32.338,,1,60.11,2.0,3,60.1,5.0,0.0,,120960,75,,


#### Convert this to a dask dataframe so it's memory efficient to write to S3. 
I set npartitions to the number of each trading days per year. <br>
Converting to a dask dataframe is an easy way to do a multi-part upload to S3.

In [19]:
def write_to_parquet(df, year, key, secret):

    ddf = dd.from_pandas(df, npartitions=len(select_yearly_data(ticker=CL, year=year)))
    dd.to_parquet(ddf, 
        path+'CL_'+year+'_parsed.parquet', 
         storage_options={'key':key, 'secret':secret})
    
    return

print('2015...')
write_to_parquet(df_2015, '2015', key, secret)
print('2016...')
write_to_parquet(df_2016, '2016', key, secret)
print('2017...')
write_to_parquet(df_2017, '2017', key, secret)
print('2018...')
write_to_parquet(df_2018, '2018', key, secret)

2015...
2016...
2017...
2018...


#### Check that all the null values are have the correct amount
The number of null values in Volume, Trade Price, and Aggressor should all be the same. This is because of trades and BBO updates being organized in the a certain format.

In [20]:
df_2015['Volume'].isnull().sum()

155122955

In [21]:
df_2015['Trade Price'].isnull().sum()

155122955

In [22]:
df_2015['Aggressor'].isnull().sum()

155122955

#### Concatenate all years as dask dataframe and write result to S3 as parquet
Saving to parquet and uploading the parquet to S3 is essentially a multi-part upload of 898 separate parquet files

In [29]:
print('2015, 2016...')
df = pd.concat([df_2015, df_2016], axis=0)
print('2017...')
df = pd.concat([df, df_2017], axis=0)
print('2018...')
df = pd.concat([df, df_2018], axis=0)
print('sorting...')
df.sort_index(inplace=True)

df.head()

2015, 2016...
2017...
2018...
sorting...


Unnamed: 0_level_0,Aggressor,AskOrders,AskPx,AskSz,BidOrders,BidPx,BidSz,Flag,MatchEventIndicator,Nanos,RptSeqId,Trade Price,Volume
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2015-01-01 22:58:04.873,,6,53.68,13.0,4,53.68,8.0,0.0,,,6984066,,
2015-01-01 22:58:21.292,,1,53.85,2.0,3,53.74,7.0,0.0,,,6984077,,
2015-01-01 22:58:21.292,,6,53.68,13.0,3,53.74,7.0,0.0,,,6984074,,
2015-01-01 22:58:21.292,,2,53.69,2.0,3,53.74,7.0,0.0,,,6984076,,
2015-01-01 22:58:21.292,,10,53.74,13.0,3,53.74,7.0,0.0,,,6984080,,


In [32]:
ddf_full_parsed = dd.from_pandas(df, 
        npartitions=len(select_yearly_data(ticker=CL, year='2015')) + 
                    len(select_yearly_data(ticker=CL, year='2016')) + 
                    len(select_yearly_data(ticker=CL, year='2017')) +
                    len(select_yearly_data(ticker=CL, year='2018')))

ddf_full_parsed

Unnamed: 0_level_0,Aggressor,AskOrders,AskPx,AskSz,BidOrders,BidPx,BidSz,Flag,MatchEventIndicator,Nanos,RptSeqId,Trade Price,Volume
npartitions=898,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2015-01-01 22:58:04.873,float64,int64,float64,float64,int64,float64,float64,float64,float64,float64,int64,float64,float64
2015-01-06 04:07:24.402,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2018-07-03 11:40:10.057,...,...,...,...,...,...,...,...,...,...,...,...,...
2018-07-03 21:00:00.115,...,...,...,...,...,...,...,...,...,...,...,...,...


In [33]:
dd.to_parquet(ddf_full_parsed, path+'CL_full_parsed.parquet', 
              engine='fastparquet', storage_options={'key':key, 'secret':secret})

In [35]:
dd.read_parquet(path+'CL_full_parsed.parquet', 
              engine='fastparquet', storage_options={'key':key, 'secret':secret})

Unnamed: 0_level_0,Aggressor,AskOrders,AskPx,AskSz,BidOrders,BidPx,BidSz,Flag,MatchEventIndicator,Nanos,RptSeqId,Trade Price,Volume
npartitions=898,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2015-01-01 22:58:04.873,float64,int64,float64,float64,int64,float64,float64,float64,float64,float64,int64,float64,float64
2015-01-06 04:07:24.402,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2018-07-03 11:40:10.057,...,...,...,...,...,...,...,...,...,...,...,...,...
2018-07-03 21:00:00.115,...,...,...,...,...,...,...,...,...,...,...,...,...
