In [3]:
# additional packages I had to install to get this notebook working 
! pip install dask-ml
! pip install xlrd

In [35]:
# DASK imports
from dask.distributed import Client
from dask_saturn import SaturnCluster

# Scikit imports
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestRegressor

# misc imports
import time
from datetime import datetime
import joblib
import numpy as np
import pandas as pd
pd.options.display.max_rows = 999

# imports for loading AWS data 
import glob, os
import shutil

# Get data from AWS at a months interval 

In [None]:
# You might need to run the bash below (depending on your permissioning) to get the data folder 
# created from the notebook itself 

# ! sudo chown -R jovyan:users ~/.local/share/jupyter 
! mkdir -p ~/data/deutsche-boerse-xetra-pds/{date}

In [7]:
# Edit the start/end date and the output folders
from_date = '2017-07-01'
until_date = '2018-07-01'
dates = list(pd.date_range(from_date, until_date, freq='M').strftime('%Y-%m-%d'))

local_data_folder = '~/data/deutsche-boerse-xetra-pds' # do not end in /
output_folder = '~/data/processed' # do not end in /

! mkdir -p {local_data_folder}

for date in dates:
    success_file =  os.path.join(local_data_folder, date, 'success')
    ! mkdir -p {local_data_folder}/{date}
    ! aws s3 sync s3://deutsche-boerse-xetra-pds/{date} {local_data_folder}/{date} --no-sign-request
    ! touch {success_file}

In [None]:
# If the cell above is too much data for you to pull 
# to get one day of data uncomment the bash below  

# date = '2019-05-01'
# ! aws s3 ls s3://deutsche-boerse-xetra-pds/{date}/ --no-sign-request 
# ! mkdir -p ~/data/deutsche-boerse-xetra-pds/{date}
# ! aws s3 sync s3://deutsche-boerse-xetra-pds/{date} ~/data/deutsche-boerse-xetra-pds/{date} --no-sign-request
# ! ls ~/data/deutsche-boerse-xetra-pds/{date}

First we will shove everything into a df like we usually would, and run some calculations

# EDA

In [8]:
path = r'data/db_data/{}' 

files = set()
li_1 = []
for i in dates:
    file_name = path.format(i)
    li_1.append(file_name)
    for data_dir in li_1:
        files.update(glob.glob(os.path.join(data_dir, '*.csv')))
    big_df = pd.concat(map(pd.read_csv, files))

In [78]:
%%time

big_df.describe()

CPU times: user 156 ms, sys: 32.3 ms, total: 188 ms
Wall time: 191 ms


Unnamed: 0,StartPrice,MaxPrice,MinPrice,EndPrice
count,456390.0,456390.0,456390.0,456390.0
mean,67.843189,67.858359,67.827654,67.84284
std,96.646164,96.65954,96.631026,96.643878
min,0.003,0.003,0.003,0.003
25%,20.0,20.0,19.99,20.0
50%,44.22,44.235,44.2,44.215
75%,86.48,86.5,86.45,86.48
max,6481.5,6483.0,6481.5,6483.0


Below we are computing the correlation between an indicator and rate of return.

In [10]:
%%time 

# calculate rate of return i.e. how much do the price changes compared to the previous price.
big_df['PctChange'] = big_df['EndPrice'].pct_change()

def line_distance(df, a, b):
    return np.absolute(df[a] - df[b]) + np.absolute(df[a].shift(1) - df[b].shift(1))

distance_to_max_line = line_distance(big_df, 'MaxPrice', 'EndPrice')
distance_to_min_line = line_distance(big_df, 'MinPrice', 'EndPrice')

big_df['Indicator'] = (distance_to_min_line - distance_to_max_line)/100.0 # divide by 100 because the prices are around 100
big_df['Indicator[t - 1]'] =  big_df['Indicator'].shift(1)
big_df['PctChange[t - 1]'] = big_df['PctChange'].shift(1)

big_df[['PctChange',  'PctChange[t - 1]', 'Indicator', 'Indicator[t - 1]']].corr()[['PctChange']]

CPU times: user 106 ms, sys: 87.8 ms, total: 193 ms
Wall time: 209 ms


Unnamed: 0,PctChange
PctChange,1.0
PctChange[t - 1],-0.000384
Indicator,0.001115
Indicator[t - 1],0.000211


So far, pandas is doing fine, given this data volume we're able to calculate rate of return etc. and are still performant. 

Changing gears, let's see how performance is beyond summary statistics. Let's try fitting a regressor on this data using regular Scikit and then using Dask to distribute joblib across a Saturn Dask Cluster. 

# Setting up regressor 

In [13]:
big_df.info()
big_df.columns

<class 'pandas.core.frame.DataFrame'>
Int64Index: 456390 entries, 0 to 7077
Data columns (total 18 columns):
ISIN                456390 non-null object
Mnemonic            456390 non-null object
SecurityDesc        456390 non-null object
SecurityType        456390 non-null object
Currency            456390 non-null object
SecurityID          456390 non-null object
Date                456390 non-null object
Time                456390 non-null object
StartPrice          456390 non-null float64
MaxPrice            456390 non-null float64
MinPrice            456390 non-null float64
EndPrice            456390 non-null float64
TradedVolume        456390 non-null object
NumberOfTrades      456390 non-null object
PctChange           456389 non-null float64
Indicator           456389 non-null float64
Indicator[t - 1]    456388 non-null float64
PctChange[t - 1]    456388 non-null float64
dtypes: float64(8), object(10)
memory usage: 66.2+ MB


Index(['ISIN', 'Mnemonic', 'SecurityDesc', 'SecurityType', 'Currency',
       'SecurityID', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice',
       'EndPrice', 'TradedVolume', 'NumberOfTrades', 'PctChange', 'Indicator',
       'Indicator[t - 1]', 'PctChange[t - 1]'],
      dtype='object')

In [14]:
# one hot encode SecurityType
big_df['SecurityType'] = pd.get_dummies(big_df['SecurityType'])

# change objects to floats 
big_df['TradedVolume'] = pd.to_numeric(big_df['TradedVolume'], errors='coerce')
big_df['NumberOfTrades'] = pd.to_numeric(big_df['NumberOfTrades'], errors='coerce')

In [23]:
# separate target from rest of dataset 
X = big_df.drop(['Time', 'Date','PctChange[t - 1]','PctChange','Mnemonic','EndPrice',
               'SecurityDesc','Currency', 'ISIN'], axis=1)

# X = big_df.drop(['PctChange[t - 1]','PctChange','EndPrice'], axis=1)

y = big_df['PctChange']

# split the data to have a true hold out dataset 
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, random_state=4444)

In [24]:
# normalize our features
standard_scaler = StandardScaler()
# we will impute the median
simple_imputer = SimpleImputer()
# intiate random forest model
log_reg = RandomForestRegressor()

# setup pipeline to fit model on 
pl = Pipeline(steps=[
    ('imputing', simple_imputer),
    ('scaling', standard_scaler),
    ('classifier', log_reg)
])
pl.set_params(
    imputing__missing_values=np.nan, imputing__strategy="median"
)

Pipeline(memory=None,
         steps=[('imputing',
                 SimpleImputer(add_indicator=False, copy=True, fill_value=None,
                               missing_values=nan, strategy='median',
                               verbose=0)),
                ('scaling',
                 StandardScaler(copy=True, with_mean=True, with_std=True)),
                ('classifier',
                 RandomForestRegressor(bootstrap=True, ccp_alpha=0.0,
                                       criterion='mse', max_depth=None,
                                       max_features='auto', max_leaf_nodes=None,
                                       max_samples=None,
                                       min_impurity_decrease=0.0,
                                       min_impurity_split=None,
                                       min_samples_leaf=1, min_samples_split=2,
                                       min_weight_fraction_leaf=0.0,
                                       n_estimators=100, n_jo

In [26]:
%%time

# this is using regular scikit, no dask scaling out joblib to a cluster 
pl.fit(X_train,y_train)

CPU times: user 4min 25s, sys: 1.29 s, total: 4min 26s
Wall time: 4min 26s


Pipeline(memory=None,
         steps=[('imputing',
                 SimpleImputer(add_indicator=False, copy=True, fill_value=None,
                               missing_values=nan, strategy='median',
                               verbose=0)),
                ('scaling',
                 StandardScaler(copy=True, with_mean=True, with_std=True)),
                ('classifier',
                 RandomForestRegressor(bootstrap=True, ccp_alpha=0.0,
                                       criterion='mse', max_depth=None,
                                       max_features='auto', max_leaf_nodes=None,
                                       max_samples=None,
                                       min_impurity_decrease=0.0,
                                       min_impurity_split=None,
                                       min_samples_leaf=1, min_samples_split=2,
                                       min_weight_fraction_leaf=0.0,
                                       n_estimators=100, n_jo

### ```4min 26s``` is not all that long, but we are only pulling a year of data from our dataset
### let's see if we can do better switching over to Dask**

# Compare performance using Dask Cluster

In [27]:
# setup Dask Saturn cluster with 7 workers 
cluster = SaturnCluster()
client = Client('tcp://sami-cancer-dask.main-namespace:8786')
cluster

VBox(children=(HTML(value='<h2>SaturnCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n   …

***Note if you're trying to create a local cluster just use: ```client = Client(processes=False)```


In [28]:
cluster.scale(7)

In [29]:
%%time 

# this is using dask to scale out joblib to a cluster 
with joblib.parallel_backend('dask'):
    (pl.fit(X_train, y_train))

CPU times: user 6.99 s, sys: 5.56 s, total: 12.6 s
Wall time: 43.1 s


### ```43.1 s``` is not a bad performance jump! 
### 84% speed up! 
### let's see how Dask stacks up on predictions 

In [32]:
%%time

#this is not using Dask, summarize the fit of the model
y_predict = pl.predict(X_test)

CPU times: user 8.28 s, sys: 31.9 ms, total: 8.31 s
Wall time: 8.33 s


In [33]:
%%time 

#this is using Dask 
with joblib.parallel_backend('dask'):
    (pl.predict(X_test))

CPU times: user 11.7 s, sys: 73.7 ms, total: 11.8 s
Wall time: 6.84 s


Not as much of a performance gain but combined with the 617% performance gain during the fit process, saving a few additional seconds adds up to much faster training/predicting using this Dask+joblib distributed approach to financial modeling 