In [1]:
import sys
import os
import matplotlib.pyplot as plt
import scipy.io as sio
import pandas as pd
import numpy as np
import seaborn as sns

from time import sleep
from datetime import datetime as dt
from natsort import natsorted

%matplotlib inline
sns.set_style("darkgrid")
plt.rcParams.update({'legend.fontsize': 'large',
                     'axes.labelsize': 'large',
                     'axes.titlesize': 'large',
                     'xtick.labelsize': 'large',
                     'ytick.labelsize': 'large'})


In [2]:
# CORRUPTED FILE: 'SE0000115446_SEK/8_SE0000115446_SEK_208.mat'
corrupted = ['8_SE0000115446_SEK_208.mat']
#sio.loadmat(corrupted[0])

Firstly start with file management. Filter out non-data directories, sort files in them to create big data files with needed data.

In [3]:
# Filter only needed directories 
dirs = [d for d in os.listdir('.') if os.path.isdir(os.path.join('.', d))]
del dirs[1]
del dirs[1]
dirs

['SE0000101032_SEK',
 'SE0000115446_SEK',
 'FI0009005318_EUR',
 'FI0009007835_EUR',
 'DK0010268606_DKK']

Collect necessary data.

In [4]:
import pytz # to handle timezones


# method to extract data from the order book
def extract_order_book(path, deep=10):
    # load data
    mat = sio.loadmat(path)
    order_book = mat['ob']

    # time strip and convert to correct timezone
    ts = pd.Series(order_book['ts'][0][0][:,0], name='ts')
    ts = pd.to_datetime(ts, unit='ms') 
    ts = ts.dt.tz_localize('UTC').dt.tz_convert('Europe/Helsinki')
    
    # filter out pre-session and after-session data
    f_ts = (ts.dt.hour > 8) & (ts.dt.hour < 18)
    
    # extract data to new data frame
    df = pd.DataFrame(ts[f_ts]).reset_index(drop=True)
    bidq = pd.DataFrame(order_book['bid_q'][0, 0][f_ts, :deep], columns=['bid_q' + str(i) for i in xrange(deep)])
    bidp = pd.DataFrame(order_book['bid_p'][0, 0][f_ts, :deep], columns=['bid_p' + str(i) for i in xrange(deep)])
    askq = pd.DataFrame(order_book['ask_q'][0, 0][f_ts, :deep], columns=['ask_q' + str(i) for i in xrange(deep)])
    askp = pd.DataFrame(order_book['ask_p'][0, 0][f_ts, :deep], columns=['ask_p' + str(i) for i in xrange(deep)])
    df = pd.concat([df, bidq, bidp, askq, askp], axis=1)
    return df

In [5]:
# sort and save filenames
f_names = []
for d in dirs:
    files = os.listdir(d)
    f_names.append(natsorted(files))

In [10]:
## HIDAS ÄLÄ AJA
# filename: SE0000101032_SEK.csv

'''
for i in xrange(len(dirs)):
    d = dirs[i]
    files = f_names[i]
    n = len(files)
    
    for j, f in enumerate(files):
        
        # update progress
        sys.stdout.write("%s (%i/%i)\t\r" % (d + '/' + f, j+1, n))
        sys.stdout.flush()
            
            
        # checks for bad files
        if not f.endswith('.mat'):
            continue
        if f in corrupted:
            continue
            
        df = extract_order_book(d+'/'+f)
        with open(d+'.csv', 'a') as w:
            df.to_csv(w, header=False, index=False)    
                
    break 
'''

'\nfor i in xrange(len(dirs)):\n    d = dirs[i]\n    files = f_names[i]\n    n = len(files)\n    \n    for j, f in enumerate(files):\n        \n        # update progress\n        sys.stdout.write("%s (%i/%i)\t\r" % (d + \'/\' + f, j+1, n))\n        sys.stdout.flush()\n            \n            \n        # checks for bad files\n        if not f.endswith(\'.mat\'):\n            continue\n        if f in corrupted:\n            continue\n            \n        df = extract_order_book(d+\'/\'+f)\n        with open(d+\'.csv\', \'a\') as w:\n            df.to_csv(w, header=False, index=False)    \n                \n    break \n'

### Spark

In [6]:
from pyspark.sql.functions import col

header = ['date'] +\
         ['bid_q' + str(i) for i in xrange(1, 11)] +\
         ['bid_p' + str(i) for i in xrange(1, 11)] +\
         ['ask_q' + str(i) for i in xrange(1, 11)] +\
         ['ask_p' + str(i) for i in xrange(1, 11)]

#import pandas as pd
#pd_df = pd.read_csv('SE0000101032_SEK_test.csv')
#del pd_df['Unnamed: 0']
#pd_df.to_csv('SE0000101032_SEK_test.csv', index=False, header=header)
#pd_df.head()


In [7]:
df = spark.read.format("csv").options(header="true", 
                                      inferSchema="true",
                                      dateFormat="yyyy-MM-dd HH:mm:ss")\
            .load("file:///home/janne/Thesis/Data/SE0000101032_SEK_test.csv")
df.select(df.columns[:12]).show()

+--------------------+------+------+------+------+------+------+------+------+------+-------+---------+
|                date|bid_q1|bid_q2|bid_q3|bid_q4|bid_q5|bid_q6|bid_q7|bid_q8|bid_q9|bid_q10|   bid_p1|
+--------------------+------+------+------+------+------+------+------+------+------+-------+---------+
|2010-06-01 09:00:...| 500.0|  10.0| 663.0|3000.0| 600.0| 810.0|1000.0| 300.0| 450.0|10800.0|1110000.0|
|2010-06-01 09:00:...| 500.0|  10.0| 663.0|3000.0| 600.0| 810.0|1000.0| 300.0| 450.0|10800.0|1110000.0|
|2010-06-01 09:00:...| 500.0| 110.0| 663.0|3000.0| 600.0| 810.0|1000.0| 300.0| 450.0|10800.0|1110000.0|
|2010-06-01 09:00:...| 500.0| 110.0| 200.0| 663.0|3000.0| 600.0| 810.0|1000.0| 300.0|  450.0|1110000.0|
|2010-06-01 09:02:...| 500.0| 110.0| 200.0|1000.0| 663.0|3000.0| 600.0| 810.0|1000.0|  300.0|1110000.0|
|2010-06-01 09:03:...| 500.0| 110.0| 200.0|1000.0| 663.0|3000.0| 600.0| 810.0|1000.0|  300.0|1110000.0|
|2010-06-01 09:03:...| 500.0| 110.0| 200.0|1000.0| 663.0|3000.0|

In [8]:
# calculate Mid-Prices per price level
for i in xrange(1, 11):
    marksColumns = [col('bid_p' + str(i)), col('ask_p' + str(i))]
    averageFunc = sum(x for x in marksColumns)/len(marksColumns)
    
    df = df.withColumn('mean_p' + str(i), averageFunc)
        
df.select(df.columns[40:]).show()

+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|  ask_p10|  mean_p1|  mean_p2|  mean_p3|  mean_p4|  mean_p5|  mean_p6|  mean_p7|  mean_p8|  mean_p9| mean_p10|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|1165000.0|1119000.0|1115000.0|1102500.0|1101000.0|1097500.0|1090500.0|1092500.0|1091500.0|1092500.0|1092500.0|
|1165000.0|1119000.0|1115000.0|1102500.0|1101000.0|1097500.0|1090500.0|1092500.0|1091500.0|1092500.0|1092500.0|
|1165000.0|1119000.0|1115000.0|1102500.0|1101000.0|1097500.0|1090500.0|1092500.0|1091500.0|1092500.0|1092500.0|
|1165000.0|1119000.0|1115000.0|1115000.0|1103500.0|1101500.0|1098500.0|1093000.0|1094000.0|1095000.0|1095000.0|
|1165000.0|1119000.0|1115000.0|1115000.0|1111000.0|1104000.0|1102500.0|1101000.0|1094500.0|1097500.0|1097500.0|
|1160000.0|1114000.0|1114000.0|1110000.0|1110000.0|1103500.0|1101500.0|1098500.0|1093000.0|1094000.0|109

In [9]:
# calculate spreads per price level
for i in xrange(1, 11):
    df = df.withColumn('spread_p' + str(i), col('ask_p' + str(i)) - col('bid_p' + str(i)))
    
df.select(df.columns[50:]).show()

+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+
| mean_p10|spread_p1|spread_p2|spread_p3|spread_p4|spread_p5|spread_p6|spread_p7|spread_p8|spread_p9|spread_p10|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+
|1092500.0|  18000.0|  30000.0|  75000.0|  82000.0|  91000.0| 109000.0| 115000.0| 123000.0| 135000.0|  145000.0|
|1092500.0|  18000.0|  30000.0|  75000.0|  82000.0|  91000.0| 109000.0| 115000.0| 123000.0| 135000.0|  145000.0|
|1092500.0|  18000.0|  30000.0|  75000.0|  82000.0|  91000.0| 109000.0| 115000.0| 123000.0| 135000.0|  145000.0|
|1095000.0|  18000.0|  30000.0|  50000.0|  77000.0|  83000.0|  93000.0| 114000.0| 118000.0| 130000.0|  140000.0|
|1097500.0|  18000.0|  30000.0|  50000.0|  62000.0|  78000.0|  85000.0|  98000.0| 117000.0| 125000.0|  135000.0|
|1095000.0|   8000.0|  28000.0|  40000.0|  60000.0|  77000.0|  83000.0|  93000.0| 114000.0| 1180

In [17]:
import pyspark.sql.functions as F

# calculate price differences for subsequent asking and bidding prices
# ask
for i in xrange(1, 10):
    df = df.withColumn('ask_diff' + str(i), F.abs(col('ask_p' + str(i+1)) - col('ask_p' + str(i))))

# bid
for i in xrange(1, 10):
    df = df.withColumn('bid_diff' + str(i), F.abs(col('bid_p' + str(i+1)) - col('bid_p' + str(i))))


df.select(df.columns[65:75]).show()


+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|ask_diff5|ask_diff6|ask_diff7|ask_diff8|ask_diff9|bid_diff1|bid_diff2|bid_diff3|bid_diff4|bid_diff5|
+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|   2000.0|   5000.0|   3000.0|   7000.0|   5000.0|  10000.0|  35000.0|   5000.0|   8000.0|  16000.0|
|   2000.0|   5000.0|   3000.0|   7000.0|   5000.0|  10000.0|  35000.0|   5000.0|   8000.0|  16000.0|
|   2000.0|   5000.0|   3000.0|   7000.0|   5000.0|  10000.0|  35000.0|   5000.0|   8000.0|  16000.0|
|   2000.0|   5000.0|   3000.0|   7000.0|   5000.0|  10000.0|  10000.0|  25000.0|   5000.0|   8000.0|
|   2000.0|   5000.0|   3000.0|   7000.0|   5000.0|  10000.0|  10000.0|  10000.0|  15000.0|   5000.0|
|   1000.0|   2000.0|   5000.0|   3000.0|   7000.0|  10000.0|  10000.0|  10000.0|  15000.0|   5000.0|
|   1000.0|   2000.0|   5000.0|   3000.0|   7000.0|  10000.0|  10000.0|  10000.0| 

In [27]:
marksColumns = [col('bid_p' + str(i)) for i in xrange(1, 11)]
df = df.withColumn('mean_bidp', averageFunc)

marksColumns = [col('ask_p' + str(i)) for i in xrange(1, 11)]
df = df.withColumn('mean_askp', averageFunc)

marksColumns = [col('bid_v' + str(i)) for i in xrange(1, 11)]
df = df.withColumn('mean_bidv', averageFunc)

marksColumns = [col('ask_v' + str(i)) for i in xrange(1, 11)]
df = df.withColumn('mean_askv', averageFunc)

df.select(df.columns[75:]).show()

+---------+---------+---------+---------+----------+---------+---------+---------+---------+
|bid_diff6|bid_diff7|bid_diff8|bid_diff9|mean_price|mean_bidp|mean_askp|mean_bidv|mean_askv|
+---------+---------+---------+---------+----------+---------+---------+---------+---------+
|   1000.0|   5000.0|   5000.0|   5000.0| 1053300.0|1053300.0|1053300.0|1053300.0|1053300.0|
|   1000.0|   5000.0|   5000.0|   5000.0| 1053300.0|1053300.0|1053300.0|1053300.0|1053300.0|
|   1000.0|   5000.0|   5000.0|   5000.0| 1053300.0|1053300.0|1053300.0|1053300.0|1053300.0|
|  16000.0|   1000.0|   5000.0|   5000.0| 1060300.0|1060300.0|1060300.0|1060300.0|1060300.0|
|   8000.0|  16000.0|   1000.0|   5000.0| 1065800.0|1065800.0|1065800.0|1065800.0|1065800.0|
|   8000.0|  16000.0|   1000.0|   5000.0| 1065800.0|1065800.0|1065800.0|1065800.0|1065800.0|
|   8000.0|  16000.0|   1000.0|   5000.0| 1065800.0|1065800.0|1065800.0|1065800.0|1065800.0|
|   8000.0|  16000.0|   1000.0|   5000.0| 1065800.0|1065800.0|1065800.