### *Author: Vronsky Wikramanayake*

### Remarks

1. The market data file does not cover the number of id's or exchanges required to fulfill the entire executions file, so we lose more than half the instruments when we start to calculate items in part 4.
2. Leaving the data in parquet and operating outside of pandas would be more performant for large batches or the entire universe of data. A better way to build analytics on such data would be to leave it in parquet and operate a spark job on it. There may also be several other direct operations with parquet, eg an abstract layer on top to serve analytics. Python (pandas) works here leveraging the merge_asof function. Would have to see if spark offers something similar. 
3. No data quality checks have been done on id level time series from either marketdata or executions, particularly looking for spikes and missing data on the martketdata file.
4. The marketdata file only covers 3 venues whilst we have executed on 7, so when we join in marketdata, we assume we calculate best bid/ask & slippage based on the ability to trade on any venue with available volume.
5. I have also bought in an additional column, market state when merging to create best, -1s & +1s, as this can add colour into seeing weather our slippage crosses between pre & post states around the open / close times per exchange.
6. Note on tolerence from merge_asof: A tolerance of "10ms" has been applied to best joining on TradeTime & event_timestamp. The distinct ask is that we look at -1 and +1 seconds around the execution time for the other variations, hence why hard columns TradeTime_min_1 & TradeTime_1 were added. A tolerance of "10ms" has also been applied to the merging of these.
7. Output in csv is available, other formats are available based on what the downstream usage might be relative to performance needs. 

##### Assumptions: Joins are possible where:
- marketdata.market_state = exectuions.Phase
- marketdata.listing_id = refdata.id

In [1]:
#import libraries reuqired, pre installig may be required for some.

import pandas as pd
import pyarrow
from datetime import datetime, timedelta
import time
import numpy as np
import logging
from memory_profiler import profile
import psutil


In [2]:
%load_ext memory_profiler

In [3]:
#setting max cols & rows for faster eyeballing of data.

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

In [4]:
#use pandas built in parquet reader to load data into a pabdas dataframe.

executions = pd.read_parquet('exectuions.parquet', engine='pyarrow')
marketdata = pd.read_parquet('marketdata.parquet', engine='pyarrow')
refdata = pd.read_parquet('refdata.parquet', engine='pyarrow')

### 0. EDA (Exploratory Data Analysis)

- Convert any date or time objects to datetime format, review samples of the data, understand data types & number of unique elements,  and also time horizons where applicable. 

#### Executions Data EDA

In [5]:
executions['TradeTime'] = pd.to_datetime(executions['TradeTime'])
executions.loc[executions['Phase'] == 'CONTINUOUS_TRADING'].sort_values(by=['ISIN','TradeTime'], ascending=True).head()

Unnamed: 0,ISIN,Currency,Venue,TradeTime,Price,Trade_id,Phase,Quantity
4097,BE0003470755,EUR,XBRU,2022-09-02 13:54:18.867,82.32,4097,CONTINUOUS_TRADING,-24
4098,BE0003470755,EUR,XBRU,2022-09-02 13:55:02.917,82.3,4098,CONTINUOUS_TRADING,-28
4099,BE0003470755,EUR,XBRU,2022-09-02 13:55:45.674,82.26,4099,CONTINUOUS_TRADING,-76
4100,BE0003470755,EUR,XBRU,2022-09-02 13:56:08.132,82.22,4100,CONTINUOUS_TRADING,-37
4101,BE0003470755,EUR,XBRU,2022-09-02 13:56:27.827,82.22,4101,CONTINUOUS_TRADING,-14


In [6]:
executions.dtypes

ISIN                 object
Currency             object
Venue                object
TradeTime    datetime64[ns]
Price               float64
Trade_id              int64
Phase                object
Quantity              int64
dtype: object

In [7]:
executions.nunique()

ISIN           66
Currency        3
Venue           6
TradeTime    3846
Price        1247
Trade_id     4203
Phase           2
Quantity      711
dtype: int64

In [8]:
print(executions.Phase.unique())


['OPENING_AUCTION' 'CONTINUOUS_TRADING']


In [9]:
print(executions.TradeTime.min())
print(executions.TradeTime.max())

2022-09-02 07:00:09.160000
2022-09-02 15:36:21.730000


In [10]:
executions.groupby(['Venue']).size().sort_index


<bound method Series.sort_index of Venue
XBRU     403
XCSE     511
XETA    2904
XETB      36
XETS       6
XSWX     343
dtype: int64>

### Marketdata EDA

In [11]:
marketdata.loc[marketdata['market_state'] == 'CONTINUOUS_TRADING'].sort_values(by=['listing_id','event_timestamp'], ascending=True).head()

Unnamed: 0,event_timestamp,best_bid_size,best_bid_price,best_ask_price,best_ask_size,market_state,primary_mic,listing_id
245,2022-09-02 07:00:02.028662,165,46.73,47.5,197,CONTINUOUS_TRADING,XSWX,323436
246,2022-09-02 07:00:02.028762,450,46.4,47.5,197,CONTINUOUS_TRADING,XSWX,323436
248,2022-09-02 07:00:02.045976,450,46.4,46.94,76,CONTINUOUS_TRADING,XSWX,323436
250,2022-09-02 07:00:02.050104,170,46.41,46.94,76,CONTINUOUS_TRADING,XSWX,323436
251,2022-09-02 07:00:02.059316,170,46.41,47.5,197,CONTINUOUS_TRADING,XSWX,323436


In [12]:
marketdata.dtypes

event_timestamp    datetime64[ns]
best_bid_size               int64
best_bid_price            float64
best_ask_price            float64
best_ask_size               int64
market_state             category
primary_mic                object
listing_id                  int64
dtype: object

In [13]:
marketdata.nunique()

event_timestamp    1916868
best_bid_size         6307
best_bid_price        1874
best_ask_price        1823
best_ask_size         5129
market_state             8
primary_mic              3
listing_id              19
dtype: int64

In [14]:
marketdata.groupby(['market_state']).size()


market_state
AUCTION_ON_DEMAND                          0
CLOSED                                     9
CLOSING_AUCTION                          297
CONDITIONAL                                0
CONTINUOUS_TRADING                   1916113
CONTINUOUS_TRADING_PRIMARY_CLOSED          0
HALTED                                     0
INTRADAY_AUCTION                           6
NOT_APPLICABLE                             0
OPENING_AUCTION                          243
POST_TRADE                                48
PRE_OPEN                                   7
UNKNOWN                                    0
UNSCHEDULED_AUCTION                      179
dtype: int64

In [15]:
marketdata.groupby(['listing_id']).size()


listing_id
323436        72997
323448       288866
323472        67265
323478       147045
323496       193533
323502       204537
323508       246199
324072       129795
324078       141745
324084         7222
324144        60888
324162       220076
324168       107974
328259          842
328336        10268
286087258      8102
286087398      1621
286087896      2768
378534881      5159
dtype: int64

In [16]:
marketdata.groupby(['primary_mic']).size().sort_index


<bound method Series.sort_index of primary_mic
XBRU     683969
XETR      12491
XSWX    1220442
dtype: int64>

In [17]:
print(marketdata.event_timestamp.min())
print(marketdata.event_timestamp.max())

2022-09-02 01:00:57.205489803
2022-09-02 19:14:40.907065


### Refdata EDA

In [18]:
refdata.head()

Unnamed: 0,ISIN,id,Currency,primary_ticker,primary_mic
0,AT0000652011,286087258,EUR,EBO,XETR
1,AT0000730007,286087398,EUR,AZ2,XETR
2,AT0000937503,286087896,EUR,VAS,XETR
3,AT0000A21KS2,286088133,EUR,IMO1,XETR
4,BE0003470755,324072,EUR,SOLB,XBRU


In [19]:
refdata.dtypes

ISIN              object
id                 int64
Currency          object
primary_ticker    object
primary_mic       object
dtype: object

In [20]:
refdata.nunique()

ISIN              260
id                262
Currency            7
primary_ticker    261
primary_mic        17
dtype: int64

In [21]:
refdata.groupby(['primary_mic']).size().sort_index()

primary_mic
XAMS    11
XASE     1
XBRU     9
XCSE    18
XDUB     3
XETR    33
XHEL    11
XLIS     2
XLON    50
XMAD     4
XMIL     6
XNAS    19
XNYS    28
XOSL     6
XPAR    34
XSTO    13
XSWX    14
dtype: int64

### 1. Start

Count the number of executions within the executions.parquet file, determine the unique
number of [‘Venue’]s and the date of executions. Log output this information.

In [22]:
#a. Count number of unique executions, based on trades made that have volume.

a = len(executions[(executions['Quantity']!=0)])
a

4203

In [23]:
#b. Count unique number of venues.

b = executions['Venue'].nunique()
b

6

In [24]:
#c. Count unique number of dates.

c = executions['TradeTime'].dt.date.nunique()
c

1

In [25]:
#create and configue a information logger.

logging.basicConfig(filename="dspp.log", 
					format='%(asctime)s %(message)s', 
					filemode='w') 

logger=logging.getLogger() 

logger.setLevel(logging.INFO) 

logger.info(f"The unique number of executions with volume are {a}") 
logger.info(f"The unique number of venues are {b}")
logger.info(f"The unique number of dates are {c}")


### 2. Data Cleaning

In [26]:
#a. Filter executions.paraquet for only CONTINUOUS_TRADING trades.

executions_cont = executions[executions.Phase=='CONTINUOUS_TRADING']
executions_cont.head()

Unnamed: 0,ISIN,Currency,Venue,TradeTime,Price,Trade_id,Phase,Quantity
100,BE0003851681,EUR,XBRU,2022-09-02 07:39:39.072,91.8,100,CONTINUOUS_TRADING,11
101,BE0003851681,EUR,XBRU,2022-09-02 07:43:05.795,91.9,101,CONTINUOUS_TRADING,16
102,BE0003851681,EUR,XBRU,2022-09-02 07:47:55.688,91.85,102,CONTINUOUS_TRADING,22
103,BE0003851681,EUR,XBRU,2022-09-02 07:50:54.472,91.95,103,CONTINUOUS_TRADING,17
104,BE0003851681,EUR,XBRU,2022-09-02 07:54:06.487,92.05,104,CONTINUOUS_TRADING,23


In [27]:
#b. appending to log file.

logger.info(f"List of all trades during the CONTINUOUS_TRADING Phase {executions_cont}")

### 3. Data Transformations

In [28]:
#a. adding column [‘side’], if quantity is negative, side = 2, if quantity is positive side = 1.

executions['side'] = np.where(executions['Quantity']>0, 1, -1)
executions.head()


Unnamed: 0,ISIN,Currency,Venue,TradeTime,Price,Trade_id,Phase,Quantity,side
0,DE0006305006,EUR,XETA,2022-09-02 07:00:09.160,3.606,0,OPENING_AUCTION,-150,-1
1,DE0006305006,EUR,XETA,2022-09-02 07:02:32.790,3.624,1,OPENING_AUCTION,-198,-1
2,DE0006305006,EUR,XETA,2022-09-02 07:03:01.573,3.622,2,OPENING_AUCTION,-85,-1
3,DE0006305006,EUR,XETA,2022-09-02 07:03:01.829,3.622,3,OPENING_AUCTION,-89,-1
4,DE0006305006,EUR,XETA,2022-09-02 07:03:48.935,3.626,4,OPENING_AUCTION,-119,-1


In [29]:
#b. join refdata to map in [‘primary_ticker’] & [‘primary_mic’] on primary key ISIN.
#Note: the driving table should be executions, to validate we can do a count to match the original executions table count.

executions_ref = executions.merge(refdata, on=['ISIN','Currency'], how='left')

print("total number of executions_ref trades = " + str(executions_ref['Trade_id'].count()))
print("total number of executions = trades" + str(executions['Trade_id'].count()))
executions_ref.head()

total number of executions_ref trades = 4203
total number of executions = trades4203


Unnamed: 0,ISIN,Currency,Venue,TradeTime,Price,Trade_id,Phase,Quantity,side,id,primary_ticker,primary_mic
0,DE0006305006,EUR,XETA,2022-09-02 07:00:09.160,3.606,0,OPENING_AUCTION,-150,-1,331530,DEZ,XETR
1,DE0006305006,EUR,XETA,2022-09-02 07:02:32.790,3.624,1,OPENING_AUCTION,-198,-1,331530,DEZ,XETR
2,DE0006305006,EUR,XETA,2022-09-02 07:03:01.573,3.622,2,OPENING_AUCTION,-85,-1,331530,DEZ,XETR
3,DE0006305006,EUR,XETA,2022-09-02 07:03:01.829,3.622,3,OPENING_AUCTION,-89,-1,331530,DEZ,XETR
4,DE0006305006,EUR,XETA,2022-09-02 07:03:48.935,3.626,4,OPENING_AUCTION,-119,-1,331530,DEZ,XETR


### 4. Calculations

a. Best bid price and best ask (bbo) - bbo data in marketdata.parquet\
i. Find bbo price at execution, 1 second before execution and 1 second after execution\
from the marketdata.parquet file and add this data into the final output file –\
respective column table names [‘best_bid’,’best_ask’,\
‘best_bid_min_1s’,’best_ask_min_1s’, ‘best_bid_1s’,’best_ask_1s’]\
\
*assumption: its possible to join marketdata.listing_id = refdata.id*

In [30]:
#Note, not all primary tickers & ISINs are available.

print("the total number of unique id's in the executions file = " + str(executions_ref.id.nunique()))
print("the total number of unique primary_ticker in the executions file = " + str(executions_ref.primary_ticker.nunique()))
print("the total number of unique ISIN in the executions file = " + str(executions_ref.ISIN.nunique()))
print("the total number of unique primary_mic in the executions file = " + str(executions_ref.primary_mic.nunique()))
print("")
print("the total number of unique ISIN in the marketdata file = " + str(marketdata.listing_id.nunique()))


the total number of unique id's in the executions file = 67
the total number of unique primary_ticker in the executions file = 67
the total number of unique ISIN in the executions file = 66
the total number of unique primary_mic in the executions file = 5

the total number of unique ISIN in the marketdata file = 19


In [31]:
# function add_adjusted_trade_times that creates 2 new columns showing minus seconds and a addition of seconds.

def add_adjusted_trade_times(table, time_column, seconds):
    table['{}_min_{}'.format(time_column, seconds)] = table[time_column] + pd.Timedelta(seconds=-seconds)
    table['{}_{}'.format(time_column, seconds)] = table[time_column] + pd.Timedelta(seconds=seconds)

In [32]:
# in our project we are using 1 second around the TradeTime on executions_ref.

start_time = time.time()
cpu_percent_before = psutil.cpu_percent(interval=None)

#executing function
add_adjusted_trade_times(executions_ref, 'TradeTime', 1)

end_time = time.time()
execution_time = end_time - start_time
print("Execution Time:", execution_time, "seconds")

cpu_percent_after = psutil.cpu_percent(interval=None)
print("CPU Usage:", cpu_percent_after - cpu_percent_before, "%")

Execution Time: 0.0019199848175048828 seconds
CPU Usage: -29.2 %


In [33]:
# function merge_best that specifically takes pre defined executions table as the left & marketdata on the right, employing merge_asof to join.

@profile
def merge_and_clean_best(left_df, right_df):
    merged_df = pd.merge_asof(left_df.sort_values(by='TradeTime'),
                              right_df.sort_values(by='event_timestamp'),
                              left_by='id',
                              right_by='listing_id',
                              left_on='TradeTime',
                              right_on='event_timestamp',
                              direction='nearest',
                              tolerance=pd.Timedelta("10ms"))
    
    clean_df = merged_df.drop(columns=['event_timestamp', 'listing_id', 'best_bid_size', 'best_ask_size', 'primary_mic_y']) \
                        .rename(columns={'best_bid_price': 'best_bid', 'best_ask_price': 'best_ask', 'market_state': 'market_state_best'})
    
    return clean_df

In [34]:
cpu_percent_before = psutil.cpu_percent(interval=None)
start_time = time.time()

#executing function
merged_best = merge_and_clean_best(executions_ref,marketdata);

end_time = time.time()
execution_time = end_time - start_time
print("Execution Time:", execution_time, "seconds")

cpu_percent_after = psutil.cpu_percent(interval=None)
print("CPU Usage:", cpu_percent_after - cpu_percent_before, "%")

ERROR: Could not find file /var/folders/87/mngdtl395015kkpjbht_xrfw0000gn/T/ipykernel_23750/4242977476.py
Execution Time: 0.7313699722290039 seconds
CPU Usage: -30.0 %


In [35]:
print(len(merged_best))
merged_best.loc[merged_best['primary_ticker'] == 'AED'].sort_values(by=['Trade_id'], ascending=True).head()

4203


Unnamed: 0,ISIN,Currency,Venue,TradeTime,Price,Trade_id,Phase,Quantity,side,id,primary_ticker,primary_mic_x,TradeTime_min_1,TradeTime_1,best_bid,best_ask,market_state_best
1,BE0003851681,EUR,XBRU,2022-09-02 07:00:25.196,92.8,82,OPENING_AUCTION,71,1,328336,AED,XBRU,2022-09-02 07:00:24.196,2022-09-02 07:00:26.196,92.8,92.9,OPENING_AUCTION
2,BE0003851681,EUR,XBRU,2022-09-02 07:02:02.577,93.05,83,OPENING_AUCTION,13,1,328336,AED,XBRU,2022-09-02 07:02:01.577,2022-09-02 07:02:03.577,93.05,93.35,CONTINUOUS_TRADING
18,BE0003851681,EUR,XBRU,2022-09-02 07:08:11.684,93.1,84,OPENING_AUCTION,42,1,328336,AED,XBRU,2022-09-02 07:08:10.684,2022-09-02 07:08:12.684,93.05,93.25,CONTINUOUS_TRADING
24,BE0003851681,EUR,XBRU,2022-09-02 07:09:07.953,93.15,85,OPENING_AUCTION,13,1,328336,AED,XBRU,2022-09-02 07:09:06.953,2022-09-02 07:09:08.953,93.05,93.25,CONTINUOUS_TRADING
29,BE0003851681,EUR,XBRU,2022-09-02 07:10:16.279,92.85,86,OPENING_AUCTION,15,1,328336,AED,XBRU,2022-09-02 07:10:15.279,2022-09-02 07:10:17.279,92.75,92.9,CONTINUOUS_TRADING


In [36]:
# function merge_and_clean_min_1 that adds in -1 second market data.

@profile
def merge_and_clean_min_1(left_df, right_df):
   merged_df = pd.merge_asof(left_df.sort_values(by='TradeTime_min_1'),
                             right_df.sort_values(by='event_timestamp'),
                             left_by='id',
                             right_by='listing_id',
                             left_on='TradeTime_min_1',
                             right_on='event_timestamp',
                             direction='nearest',
                             tolerance=pd.Timedelta("10ms"))
   
   clean_df = merged_df.drop(columns=['event_timestamp','listing_id','best_bid_size','best_ask_size','primary_mic']) \
                        .rename(columns={'best_bid_price': 'best_bid_min_1s', 'best_ask_price': 'best_ask_min_1s','market_state':'market_state_min_1s'})

   return clean_df
    

In [37]:
cpu_percent_before = psutil.cpu_percent(interval=None)
start_time = time.time()

#executing function
merged_min_1 = merge_and_clean_min_1(merged_best,marketdata);

end_time = time.time()
execution_time = end_time - start_time
print("Execution Time:", execution_time, "seconds")

cpu_percent_after = psutil.cpu_percent(interval=None)
print("CPU Usage:", cpu_percent_after - cpu_percent_before, "%")

ERROR: Could not find file /var/folders/87/mngdtl395015kkpjbht_xrfw0000gn/T/ipykernel_23750/4229837936.py
Execution Time: 0.4755871295928955 seconds
CPU Usage: 42.2 %


In [38]:
print(len(merged_min_1))
merged_min_1.loc[merged_min_1['primary_ticker'] == 'AED'].sort_values(by=['Trade_id'], ascending=True).head()

4203


Unnamed: 0,ISIN,Currency,Venue,TradeTime,Price,Trade_id,Phase,Quantity,side,id,primary_ticker,primary_mic_x,TradeTime_min_1,TradeTime_1,best_bid,best_ask,market_state_best,best_bid_min_1s,best_ask_min_1s,market_state_min_1s
1,BE0003851681,EUR,XBRU,2022-09-02 07:00:25.196,92.8,82,OPENING_AUCTION,71,1,328336,AED,XBRU,2022-09-02 07:00:24.196,2022-09-02 07:00:26.196,92.8,92.9,OPENING_AUCTION,,,
2,BE0003851681,EUR,XBRU,2022-09-02 07:02:02.577,93.05,83,OPENING_AUCTION,13,1,328336,AED,XBRU,2022-09-02 07:02:01.577,2022-09-02 07:02:03.577,93.05,93.35,CONTINUOUS_TRADING,,,
18,BE0003851681,EUR,XBRU,2022-09-02 07:08:11.684,93.1,84,OPENING_AUCTION,42,1,328336,AED,XBRU,2022-09-02 07:08:10.684,2022-09-02 07:08:12.684,93.05,93.25,CONTINUOUS_TRADING,,,
24,BE0003851681,EUR,XBRU,2022-09-02 07:09:07.953,93.15,85,OPENING_AUCTION,13,1,328336,AED,XBRU,2022-09-02 07:09:06.953,2022-09-02 07:09:08.953,93.05,93.25,CONTINUOUS_TRADING,,,
29,BE0003851681,EUR,XBRU,2022-09-02 07:10:16.279,92.85,86,OPENING_AUCTION,15,1,328336,AED,XBRU,2022-09-02 07:10:15.279,2022-09-02 07:10:17.279,92.75,92.9,CONTINUOUS_TRADING,,,


In [39]:
# function merge_and_clean_1 that adds in +1 second market data.

@profile
def merge_and_clean_1(left_df, right_df):
   merged_df = pd.merge_asof(merged_min_1.sort_values(by='TradeTime_1'),
                             marketdata.sort_values(by='event_timestamp'),
                             left_by='id',
                             right_by='listing_id',
                             left_on='TradeTime_1',
                             right_on='event_timestamp',
                             direction='nearest',
                             tolerance=pd.Timedelta("10ms"))
   
   clean_df = merged_df.drop(columns=['event_timestamp','listing_id','best_bid_size','best_ask_size','primary_mic']) \
               .rename(columns={'best_bid_price': 'best_bid_1s', 'best_ask_price': 'best_ask_1s','market_state':'market_state_1s'})

   return clean_df


In [40]:
cpu_percent_before = psutil.cpu_percent(interval=None)
start_time = time.time()

#executing function
merged_all = merge_and_clean_1(merged_min_1,marketdata);

end_time = time.time()
execution_time = end_time - start_time
print("Execution Time:", execution_time, "seconds")

cpu_percent_after = psutil.cpu_percent(interval=None)
print("CPU Usage:", cpu_percent_after - cpu_percent_before, "%")

ERROR: Could not find file /var/folders/87/mngdtl395015kkpjbht_xrfw0000gn/T/ipykernel_23750/4140448716.py
Execution Time: 0.47478294372558594 seconds
CPU Usage: -10.399999999999999 %


In [41]:
print(len(merged_all))
merged_all.loc[merged_all['primary_ticker'] == 'AED'].sort_values(by=['Trade_id'], ascending=True).head()

4203


Unnamed: 0,ISIN,Currency,Venue,TradeTime,Price,Trade_id,Phase,Quantity,side,id,primary_ticker,primary_mic_x,TradeTime_min_1,TradeTime_1,best_bid,best_ask,market_state_best,best_bid_min_1s,best_ask_min_1s,market_state_min_1s,best_bid_1s,best_ask_1s,market_state_1s
1,BE0003851681,EUR,XBRU,2022-09-02 07:00:25.196,92.8,82,OPENING_AUCTION,71,1,328336,AED,XBRU,2022-09-02 07:00:24.196,2022-09-02 07:00:26.196,92.8,92.9,OPENING_AUCTION,,,,,,
2,BE0003851681,EUR,XBRU,2022-09-02 07:02:02.577,93.05,83,OPENING_AUCTION,13,1,328336,AED,XBRU,2022-09-02 07:02:01.577,2022-09-02 07:02:03.577,93.05,93.35,CONTINUOUS_TRADING,,,,,,
18,BE0003851681,EUR,XBRU,2022-09-02 07:08:11.684,93.1,84,OPENING_AUCTION,42,1,328336,AED,XBRU,2022-09-02 07:08:10.684,2022-09-02 07:08:12.684,93.05,93.25,CONTINUOUS_TRADING,,,,,,
24,BE0003851681,EUR,XBRU,2022-09-02 07:09:07.953,93.15,85,OPENING_AUCTION,13,1,328336,AED,XBRU,2022-09-02 07:09:06.953,2022-09-02 07:09:08.953,93.05,93.25,CONTINUOUS_TRADING,,,,,,
29,BE0003851681,EUR,XBRU,2022-09-02 07:10:16.279,92.85,86,OPENING_AUCTION,15,1,328336,AED,XBRU,2022-09-02 07:10:15.279,2022-09-02 07:10:17.279,92.75,92.9,CONTINUOUS_TRADING,,,,,,


b. Mid-Price – bbo data in marketdata.parquet \
i. Find the Mid-Price at execution, 1s before the execution and 1s after the execution \
– respective column table names [‘mid_price’, ‘mid_price_min_1s’ ‘mid_price_1s’]

In [42]:
# function that calculates and adds in mid prices for best, +1 & -1.

cpu_percent_before = psutil.cpu_percent(interval=None)
start_time = time.time()

#executing function
def find_mid_price(table):
    table['mid_price'] = round(((table['best_bid'] + table['best_ask']) / 2), 2)
    table['mid_price_min_1s'] = round((table['best_bid_min_1s'] + table['best_ask_min_1s']) / 2)
    table['mid_price_1s'] = round((table['best_bid_1s'] + table['best_ask_1s']) / 2)
    
end_time = time.time()
execution_time = end_time - start_time
print("Execution Time:", execution_time, "seconds")

cpu_percent_after = psutil.cpu_percent(interval=None)
print("CPU Usage:", cpu_percent_after - cpu_percent_before, "%")

Execution Time: 7.700920104980469e-05 seconds
CPU Usage: -100.0 %


In [43]:
merged_all.loc[merged_all['primary_ticker'] == 'AED'].sort_values(by=['Trade_id'], ascending=True).head()

Unnamed: 0,ISIN,Currency,Venue,TradeTime,Price,Trade_id,Phase,Quantity,side,id,primary_ticker,primary_mic_x,TradeTime_min_1,TradeTime_1,best_bid,best_ask,market_state_best,best_bid_min_1s,best_ask_min_1s,market_state_min_1s,best_bid_1s,best_ask_1s,market_state_1s
1,BE0003851681,EUR,XBRU,2022-09-02 07:00:25.196,92.8,82,OPENING_AUCTION,71,1,328336,AED,XBRU,2022-09-02 07:00:24.196,2022-09-02 07:00:26.196,92.8,92.9,OPENING_AUCTION,,,,,,
2,BE0003851681,EUR,XBRU,2022-09-02 07:02:02.577,93.05,83,OPENING_AUCTION,13,1,328336,AED,XBRU,2022-09-02 07:02:01.577,2022-09-02 07:02:03.577,93.05,93.35,CONTINUOUS_TRADING,,,,,,
18,BE0003851681,EUR,XBRU,2022-09-02 07:08:11.684,93.1,84,OPENING_AUCTION,42,1,328336,AED,XBRU,2022-09-02 07:08:10.684,2022-09-02 07:08:12.684,93.05,93.25,CONTINUOUS_TRADING,,,,,,
24,BE0003851681,EUR,XBRU,2022-09-02 07:09:07.953,93.15,85,OPENING_AUCTION,13,1,328336,AED,XBRU,2022-09-02 07:09:06.953,2022-09-02 07:09:08.953,93.05,93.25,CONTINUOUS_TRADING,,,,,,
29,BE0003851681,EUR,XBRU,2022-09-02 07:10:16.279,92.85,86,OPENING_AUCTION,15,1,328336,AED,XBRU,2022-09-02 07:10:15.279,2022-09-02 07:10:17.279,92.75,92.9,CONTINUOUS_TRADING,,,,,,


c. Calculate Slippage [‘slippage’] at execution price \
i. For SELL: (execution_price – best_bid) / (best_ask – best_bid) \
ii. For BUY : (best_ask – execution_price) / (best_ask – best_bid)

In [44]:
#function that takes a dataframe, calculated slippage & appends the column to the end of the same dataframe.
#note: in both cases for short (SELL) or long (BUY), postive slippage is a good outcome, meaning you relatively got a better price than expected. 

def calculate_slippage(df):

    slippage = []

    # Calculate slippage based on SELL and BUY scenarios, including the handling of side != 1 or -1
    for index, row in df.iterrows():
        if row['side'] == -1:
            slippage.append((row['Price'] - row['best_bid']) / (row['best_ask'] - row['best_bid']))
        elif row['side'] == 1:
            slippage.append((row['best_ask'] - row['Price']) / (row['best_ask'] - row['best_bid']))
        else:
            slippage.append(None)

    df['slippage'] = slippage

In [45]:
cpu_percent_before = psutil.cpu_percent(interval=None)
start_time = time.time()

#executing function
calculate_slippage(merged_all)
    
end_time = time.time()
execution_time = end_time - start_time
print("Execution Time:", execution_time, "seconds")

cpu_percent_after = psutil.cpu_percent(interval=None)
print("CPU Usage:", cpu_percent_after - cpu_percent_before, "%")

Execution Time: 0.1295931339263916 seconds
CPU Usage: -23.800000000000004 %


In [46]:
merged_all.loc[merged_all['primary_ticker'] == 'AED'].sort_values(by=['Trade_id'], ascending=True).head()

Unnamed: 0,ISIN,Currency,Venue,TradeTime,Price,Trade_id,Phase,Quantity,side,id,primary_ticker,primary_mic_x,TradeTime_min_1,TradeTime_1,best_bid,best_ask,market_state_best,best_bid_min_1s,best_ask_min_1s,market_state_min_1s,best_bid_1s,best_ask_1s,market_state_1s,slippage
1,BE0003851681,EUR,XBRU,2022-09-02 07:00:25.196,92.8,82,OPENING_AUCTION,71,1,328336,AED,XBRU,2022-09-02 07:00:24.196,2022-09-02 07:00:26.196,92.8,92.9,OPENING_AUCTION,,,,,,,1.0
2,BE0003851681,EUR,XBRU,2022-09-02 07:02:02.577,93.05,83,OPENING_AUCTION,13,1,328336,AED,XBRU,2022-09-02 07:02:01.577,2022-09-02 07:02:03.577,93.05,93.35,CONTINUOUS_TRADING,,,,,,,1.0
18,BE0003851681,EUR,XBRU,2022-09-02 07:08:11.684,93.1,84,OPENING_AUCTION,42,1,328336,AED,XBRU,2022-09-02 07:08:10.684,2022-09-02 07:08:12.684,93.05,93.25,CONTINUOUS_TRADING,,,,,,,0.75
24,BE0003851681,EUR,XBRU,2022-09-02 07:09:07.953,93.15,85,OPENING_AUCTION,13,1,328336,AED,XBRU,2022-09-02 07:09:06.953,2022-09-02 07:09:08.953,93.05,93.25,CONTINUOUS_TRADING,,,,,,,0.5
29,BE0003851681,EUR,XBRU,2022-09-02 07:10:16.279,92.85,86,OPENING_AUCTION,15,1,328336,AED,XBRU,2022-09-02 07:10:15.279,2022-09-02 07:10:17.279,92.75,92.9,CONTINUOUS_TRADING,,,,,,,0.333333


### Saving the output to csv

In [47]:
#function that saves the file to a csv with the production timestamp leading the file name.

def df_to_csv(df, file_name):
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    file_path = f"{timestamp}_{file_name}.csv"
    df.to_csv(file_path, index=False)

In [48]:
df_to_csv(merged_all, "merged_all")

### 5. Please provide performance metrics on your program

1. Each function has been setup to show execution time.
2. Each function has been setup to show cpu usage.
3. Each memory intensive function, ie the merge functions, memory usage is listed below.


In [49]:
%mprun -f merge_and_clean_best merge_and_clean_best(executions_ref,marketdata)

ERROR: Could not find file /var/folders/87/mngdtl395015kkpjbht_xrfw0000gn/T/ipykernel_23750/4242977476.py



Filename: /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/memory_profiler.py

Line #    Mem usage    Increment  Occurrences   Line Contents
  1185    759.0 MiB    759.0 MiB           1               @wraps(wrapped=func)
  1186                                                     def wrapper(*args, **kwargs):
  1187    759.0 MiB      0.0 MiB           1                   prof = get_prof()
  1188    738.6 MiB    -20.4 MiB           1                   val = prof(func)(*args, **kwargs)
  1189    738.6 MiB      0.0 MiB           1                   show_results_bound(prof)
  1190    738.6 MiB      0.0 MiB           1                   return val

In [50]:
%mprun -f merge_and_clean_min_1 merge_and_clean_min_1(merged_best,marketdata)

ERROR: Could not find file /var/folders/87/mngdtl395015kkpjbht_xrfw0000gn/T/ipykernel_23750/4229837936.py



Filename: /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/memory_profiler.py

Line #    Mem usage    Increment  Occurrences   Line Contents
  1185    738.6 MiB    738.6 MiB           1               @wraps(wrapped=func)
  1186                                                     def wrapper(*args, **kwargs):
  1187    738.6 MiB      0.0 MiB           1                   prof = get_prof()
  1188    711.4 MiB    -27.2 MiB           1                   val = prof(func)(*args, **kwargs)
  1189    711.4 MiB      0.0 MiB           1                   show_results_bound(prof)
  1190    711.4 MiB      0.0 MiB           1                   return val

In [51]:
%mprun -f merge_and_clean_1 merge_and_clean_1(merged_min_1,marketdata)

ERROR: Could not find file /var/folders/87/mngdtl395015kkpjbht_xrfw0000gn/T/ipykernel_23750/4140448716.py



Filename: /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/memory_profiler.py

Line #    Mem usage    Increment  Occurrences   Line Contents
  1185    711.4 MiB    711.4 MiB           1               @wraps(wrapped=func)
  1186                                                     def wrapper(*args, **kwargs):
  1187    711.4 MiB      0.0 MiB           1                   prof = get_prof()
  1188    711.4 MiB      0.0 MiB           1                   val = prof(func)(*args, **kwargs)
  1189    711.4 MiB      0.0 MiB           1                   show_results_bound(prof)
  1190    711.4 MiB      0.0 MiB           1                   return val