In [136]:

import pyarrow.parquet as pq
import pandas as pd
import timeit

# Read in data
executions = pq.read_table('data/executions.parquet')
market_data = pq.read_table('data/marketdata.parquet')
refdata = pq.read_table('data/refdata.parquet')

# Convert to pandas
df_ex = executions.to_pandas()
df_md = market_data.to_pandas()
df_rf = refdata.to_pandas()


def get_unique_column_values(df, column_name):
    return len(df[column_name].unique())

def get_unique_column_values_2(df, column_name):
    return df[column_name].nunique()


# Measure performance of each method
print(timeit.timeit('get_unique_column_values(df_ex, "Venue")', globals=globals(), number=100))   # winner 0.0078
print(timeit.timeit('df_ex.Venue.nunique()', globals=globals(), number=100))                      # 0.00819
print(timeit.timeit('get_unique_column_values_2(df_ex, "Venue")', globals=globals(), number=100)) # 0.00853

# Get unique count for Venue and TradeTimes
print(f"Unique Venues in execution data: {get_unique_column_values(df_ex, 'Venue')}")
print(f"Unique TradeTimes in execution data: {get_unique_column_values(df_ex, 'TradeTime')}")



0.008905708000384038
0.00827308299994911
0.008282875001896173
Unique Venues in execution data: 6
Unique TradeTimes in execution data: 3846


In [137]:
# Data cleaning - performance checks

# identify which column holds the CONTINUOUS_TRADING value
df_ex.head()

# Four methods to filter the dataframe on a column value or list of values

def filter_df_on_column_value(df, column_name, column_value):
    return df[df[column_name] == column_value]

def filter_df_on_column_value2(df, column_name, column_value):
    return df[df[column_name].values == column_value]

def filter_df_with_query(df, column_name, column_value):
    return df.query(f"{column_name} == '{column_value}'")

def filter_df_by_values(dataframe, column_name, values):
    """
    Filter a Pandas DataFrame by values in a specific column.

    Parameters:
    - dataframe (pd.DataFrame): The DataFrame to be filtered.
    - column_name (str): The name of the column to filter on.
    - values (list): A list of values to filter by.

    Returns:
    - pd.DataFrame: The filtered DataFrame.
    """
    return dataframe[dataframe[column_name].isin(values)]

# Measure performance of each method

print(timeit.timeit('filter_df_on_column_value(df_ex, "Phase", "CONTINUOUS_TRADING")', globals=globals(), number=1000)) # 0.368282958999770
print(timeit.timeit('filter_df_on_column_value2(df_ex, "Phase", "CONTINUOUS_TRADING")', globals=globals(), number=1000)) # 0.24843849999888334
print(timeit.timeit('filter_df_with_query(df_ex, "Phase", "CONTINUOUS_TRADING")', globals=globals(), number=1000)) # 0.5727733750009065
print(timeit.timeit('filter_df_by_values(df_ex, "Phase", ["CONTINUOUS_TRADING"])', globals=globals(), number=1000)) # 0.2840273330002674

# As we will want to be able to filter future data frames on a list we will stick with 

0.3558741250017192
0.2392136250018666
0.589178209000238
0.2922007079978357


In [None]:
# Data transformations of execution data

df_ex = filter_df_by_values(df_ex, 'Phase', ["CONTINUOUS_TRADING"])

# Add column [‘side’], if quantity is negative, side = 2, if quantity is positive side = 1.
df_ex['side'] = df_ex['Quantity'].apply(lambda x: 2 if x < 0 else 1)

# Complement the data with refdata.parquet ex_rf by adding column primary_ticket and primary_mic on 
df_ex = pd.merge(df_ex, df_rf[['ISIN', 'primary_mic', 'primary_ticker', 'id']], on='ISIN', how='left')

# Add listing_id to facilitate joining on market data 
df_ex['listing_id'] = df_ex['id']
df_ex.drop(['id'], axis=1, inplace=True)

# Set datatype for TradeTime
df_ex['TradeTime'] = pd.to_datetime(df_ex['TradeTime'])

# Filter execution data on market data listing_ids
listing_ids = df_md['listing_id'].unique()
df_ex = filter_df_by_values(df_ex, "listing_id", listing_ids)





In [130]:
# Data transformations of market data

# Filter the market data on the column market_state == CONTINUOUS_TRADING
df_md = filter_df_by_values(df_md, "market_state", ["CONTINUOUS_TRADING"])
df_md['event_timestamp'] = pd.to_datetime(df_md['event_timestamp'])

# only keep the columns time, listing_id, best_bid_price, best_ask_price in the df_md dataframe
df_md = df_md[['event_timestamp', 'listing_id', 'best_bid_price', 'best_ask_price']]

In [131]:
# Calculations

# Get the bbo at the time of the trade, 1s before the trade and 1s after the trade
fields_to_rename = ['best_bid_price', 'best_ask_price']

# Join the dataframes on 'listing_id' and 'primary_mic'
final_df_0 = pd.merge_asof(df_ex.sort_values('TradeTime'), df_md.sort_values('event_timestamp'), left_on='TradeTime', right_on='event_timestamp', by='listing_id')
final_df_0 = final_df_0.rename(columns={field: field.replace("_price", "") for field in fields_to_rename})
final_df_0.to_csv('data/final_df_0.csv', index=False)

df_ex['TradeTime_min_1'] = df_ex['TradeTime'] - pd.DateOffset(seconds=1)
final_df_less_1 = pd.merge_asof(df_ex.sort_values('TradeTime_min_1'), df_md.sort_values('event_timestamp'), left_on='TradeTime_min_1', right_on='event_timestamp', by='listing_id')
final_df_less_1 = final_df_less_1.rename(columns={field: field.replace("_price", "_min_1s") for field in fields_to_rename})
final_df_less_1.to_csv('data/final_df_less_1.csv', index=False)

df_ex['TradeTime_plus_1'] = df_ex['TradeTime'] + pd.DateOffset(seconds=1)
final_df_plus_1 = pd.merge_asof(df_ex.sort_values('TradeTime_plus_1'), df_md.sort_values('event_timestamp'), left_on='TradeTime_plus_1', right_on='event_timestamp', by='listing_id')
final_df_plus_1 = final_df_plus_1.rename(columns={field: field.replace("_price", "_1s") for field in fields_to_rename})
final_df_plus_1.to_csv('data/final_df_plus_1.csv', index=False)

# merge final_df_0, final_df_less_1, final_df_plus_1 on listing_id and time
common_columns = ['listing_id', 'TradeTime', 'ISIN', 'Currency', 'Venue', 'Price', 'Trade_id', 'Phase', 'Quantity', 'side', 'primary_mic', 'primary_ticker'] 
final_df = pd.merge(final_df_0, final_df_less_1, on=common_columns, how='left')
final_df = pd.merge(final_df, final_df_plus_1, on=common_columns, how='left')


# drop these columns event_timestamp_x,TradeTime_min_1_x,event_timestamp_y,TradeTime_min_1_y,TradeTime_plus_1,event_timestamp
final_df.drop(['event_timestamp_x', 'TradeTime_min_1_x', 'event_timestamp_y', 'TradeTime_min_1_y', 'TradeTime_plus_1', 'event_timestamp'], axis=1, inplace=True)
final_df.to_csv('data/final_df.csv', index=False)

# to be converted into a function


In [135]:
# Calculate the mid price and the spread

# Find the Mid-Price at execution, 1s before the execution and 1s after the execution – respective column table names [‘mid_price’, ‘mid_price_min_1s’ ‘mid_price_1s’]
final_df['mid_price'] = (final_df['best_bid'] + final_df['best_ask']) / 2
final_df['mid_price_min_1s'] = (final_df['best_bid_min_1s'] + final_df['best_ask_min_1s']) / 2
final_df['mid_price_1s'] = (final_df['best_bid_1s'] + final_df['best_ask_1s']) / 2


# Calculate Slippage [‘slippage’] at execution price
#  For SELL: (execution_price – best_bid) / (best_ask – best_bid)
#  For BUY : (best_ask – execution_price) / (best_ask – best_bid)

def calculate_slippage(row):
    if row['side'] == 1:
        return (row['Price'] - row['best_bid']) / (row['best_ask'] - row['best_bid'])
    else:
        return (row['best_ask'] - row['Price']) / (row['best_ask'] - row['best_bid'])
    
final_df['slippage'] = final_df.apply(calculate_slippage, axis=1)
final_df.to_csv('data/final_df.csv', index=False)


