### Problem 1 – the rates problem

You have three data files, rates_ccy_data.csv, rates_price_data.parq.gzip and rates_spot_rate_data.parq.gzip

rates_ccy_data is a currency pair reference file. It tells us the currency pairs (ccy_pair) in scope, whether we need to convert the price, and the conversion factor. Price in this example relates to the FX rate for the ccy_pair.

rates_price_data is a set of timestamped prices for ccy_pairs.

rates_spot_rate_data is a timestamped set of fx rates for ccy_pairs. The FX rates are in a column called ‘spot_mid_rate’.

Goal: Generate a new price for each row in rates_price_data. The new price depends on whether that ccy_pair is supported, needs to be converted and has sufficient data to convert:

If conversion is not required, then the new price is simply the ‘existing price’
If conversion is required, then the new price is: (‘existing price’/ ‘conversion factor’) + ‘spot_mid_rate’
If there is insufficient data to create a new price then capture this fact in some way
 
For each row in rates_price_data, if conversion is required then we need to find an appropriate ‘spot_mid_rate’. This is defined as the most recently timestamped rate for that specific ccy_pair within the hour that precedes the timestamped price.

Please generate the final price for each row in price_data. You can capture this as an output data file – csv is preferable.

For reference, the benchmark calculation time is < 1 second using python 3 with 16GB ram


#### Personal notes

New price has 2 failure conditions:
1. ccy_pair not supported, i.e. not found in rates_ccy
2. insufficient data to convert, i.e. no data in rates_price_data within the preceding one hour window

Flow should go like this:
1. is ccy_pair supported? if no, np.nan, otherwise proceed to next step
2. is conversion required? if no, existing price is used. if yes, proceed to next step.
3. is there sufficient data to convert? if no, np.nan. otherwise price/conversion_factor + spot_mid_rate

This results in 4 potential cases, 2 cases of np.nan, 1 case of unchanged price as the new price, 1 case of new price as computed with the formula above.

Ideally we should capture the cause of the NaN, therefore I will be adding a comments column.

Rough code outline:

1. Join rates_price_data with rates_ccy on ccy_pair, unsupported pairs will have NaN for rates_ccy data
2. Sort timestamp data for new rates_price and rates_spot, and then use pd.merge_asof, which is a merge by key distance, with a 1 hour tolerance. If tolerance not met, rates_spot data will be NaN
3. Compute new price, existing NaN in rates_ccy and rates_spot data will result in NaN in new price
4. If conversion_factor == False, set new_price column to original price, this will get us all 4 cases done in new_price column
5. Use conditional masks to identify fail cases for both unsupported ccy_pair and insufficient spot rate data, create a comments column

Validation checks:

1. Unique ccy_pair in rates_ccy_data? The merge will result in multiplicatively more rows for duplicated ccy_pair
2. If conversion_factor == False, new_price == price?
3. If conversion_factor == NaN, new_price == NaN & comment == 'unsupported ccy_pair'?
4. If spot_mid_rate == NaN, new_price == NaN & comment == 'no preceding data for spot rates'?

In [1]:
import pandas as pd
import numpy as np
import time

rates_ccy = pd.read_csv("rates_ccy_data.csv")
rates_spot = pd.read_parquet("rates_spot_rate_data.parq.gzip")
rates_price = pd.read_parquet("rates_price_data.parq.gzip")
std_price = pd.read_parquet("stdev_price_data.parq.gzip")

In [2]:
# Test to see if ccy_pair is a unique identifier for each row
rates_ccy['ccy_pair'].nunique() == rates_ccy.shape[0]

True

In [3]:
# Left join onto rates_price, results in np.nan if unsupported ccy_pair
rates = pd.merge(rates_price,rates_ccy,on='ccy_pair',how='left')

In [4]:
# Sort timestamps and convert to datetime64 datatype
rates["timestamp"] = pd.to_datetime(rates["timestamp"])
rates = rates.sort_values("timestamp")
rates_spot["timestamp"] = pd.to_datetime(rates_spot["timestamp"])
rates_spot = rates_spot.sort_values("timestamp")
# Duplicate timestamp in reference dataframe for later use
rates_spot["timestamp_preceding"] = rates_spot["timestamp"]

In [5]:
# Left join with 1 hour tolerance as per requirements
# Takes first timestamp before due to sorting
rates_new = pd.merge_asof(rates,rates_spot,on="timestamp",by='ccy_pair',direction="backward",tolerance=pd.Timedelta("1h"))

In [6]:
# Generate new prices with the given formula
rates_new['new_price'] = rates_new['price']/rates_new['conversion_factor'] + rates_new["spot_mid_rate"]

In [7]:
# Override np.nan new_price where conversion is not required, due to np.nan in conversion_factor
rates_new.loc[rates_new["convert_price"]==False, "new_price"] = rates_new.loc[rates_new["convert_price"]==False, "price"]

In [8]:
# Generate comments
cond_1 = rates_new["convert_price"].isna()
rates_new.loc[cond_1, "comments"] = "ccy_pair not supported"
cond_2 = rates_new["spot_mid_rate"].isna() & ~cond_1
rates_new.loc[cond_2, "comments"] = "no preceding data for spot rates"
rates_new["comments"] = rates_new["comments"].fillna("")

In [9]:
rates_new.to_csv("output.csv",index=False)

### Problem 2 – the standard deviation problem

You have a single gzip compressed parquet file called stdev_price_data.parq.gzip

It consists of timestamped ‘bid’, ‘mid’ and ‘ask’ prices for security IDs. These prices are generated in hourly snaps. For each ‘security_id’ at all possible hourly snap times in the interval defined below, we need to know the rolling standard deviation for bids, for mids and for asks. In other words, for each snap hour, generate the rolling standard deviation for each of bids, mids and asks for each ID.

To generate a standard deviation for a security id at a given hourly snap time, you need the most recent set of 20 contiguous hourly snap values for the security id. By contiguous we mean there are no gaps in the set of hourly snaps.

Please generate the standard deviation of price for each price type for each security for every possible hourly snap from 2021-11-20 00:00:00 to 2021-11-23 09:00:00. You can capture this as an output data file – csv is preferable

Some thoughts – in this problem, consider yourself at snap point in time. Look back at previous data and work out the stdev. An hour later, you are at the next snap time, and you calculate the new stdev. You can think of this as a) updating based on previous data or b) as a completely new calculation.

In a), you could consider storing stdev calculation state data at each snap time and using that at the next snap time.

In b), you ignore calculation data from previous steps and do the calculation afresh each time.

Although you only need to calculate from 20th Nov for a few days, in reality we run this calculation every hour and so your solution should be able to handle a request to show the result on any given hour. You are not required to support this but it should help guide your solution.

Although you are working with files, in reality we would be pulling this data from a database and caching any intermediate or state data to local (file) or remote (database) storage so the idea of having state data that can be accessed in the future is not unusual. For example, if someone queries our result we need to be able to easily regenerate it.

For reference, the benchmark calculation time is again < 1 second using python 3 with 16GB ram

#### Personal notes

At each point in time, if the data for the preceding 20 hours exist, the standard deviation is computed using "the square root of the average of the squared deviations of the values subtracted from their average value". This can then be cached alongside a few other variables for future piecemeal computations***, which can then be performed more efficiently similar to memoisation used in dynamic programming (for the purpose of solution a mentioned). This requires distinctively different code from the initial bulk computation (solution b). The code required are as follows:

Initial bulk compute (solution b):

1. Partition std_price into security id, and sort by snap_time
2. Identify contiguous sequences of 1 hour and breaks via feature engineering
3. Leverage pandas functions for vectorised computation

Incremental piecemeal compute (solution a)***:

1. From previous computations, store the first value of the N window, along with the rolling mean and standard deviation
2. Compute new rolling mean using previous rolling mean + (new value - previous first value)/N
3. Compute new rolling standard deviation using previous standard deviation + (new value - previous first value) * (new value - new rolling mean + previous first value - old rolling mean)

*** I have referred to this blog (https://jonisalonen.com/2014/efficient-and-accurate-rolling-standard-deviation/) for the algorithm, but this has not been implemented.

Validation checks:

1. No duplicate snap_time?
2. snap_time should be sorted correctly
3. Check for contiguity, should have 20 contiguous values
4. Stdev should be > 0

In [10]:
std_price = std_price.sort_values(["security_id", "snap_time"]).reset_index(drop=True)

In [11]:
# Generate row-to-row time deltas
time_diffs = std_price.groupby("security_id")["snap_time"].diff()

# Feature engineer a column to show at which points the contiguity is broken
std_price["contiguity_broken"] = (time_diffs != pd.Timedelta("1h")) & time_diffs.notna()

In [12]:
# Using the previous feature engineered column, identify new sequence within each security_id
std_price["sequence_id"] = std_price.groupby("security_id")["contiguity_broken"].cumsum()

In [13]:
# Vectorised operations with pandas to generate stdev
for col in ["bid", "mid", "ask"]:
    std_price[f"{col}_stdev"] = std_price.groupby(["security_id", "sequence_id"])[col].rolling(window=20, min_periods=20).std().reset_index(level=[0,1], drop=True)