Last updated by Developer on 2024-12-30. This notebook creates the similar trade history redis from the same CUSIP trade history redis.

In [1]:
import redis
from functools import wraps
import time
from datetime import timedelta

import multiprocess as mp    # using `multiprocess` instead of `multiprocessing` because function to be called in `map` is in the same file as the function which is calling it: https://stackoverflow.com/questions/41385708/multiprocessing-example-giving-attributeerror

import numpy as np
import pandas as pd
import pickle

In [2]:
# taken directly from `cloud_functions/fast_trade_history_redis_update/main.py`
MAX_NUM_TRADES_IN_SIMILAR_TRADE_HISTORY = 64    # wanted a value larger than 32 in case the last many trades were from the same CUSIP and so chose the next power of 2
NUM_OF_DAYS_IN_YEAR = 360
FEATURES_FOR_EACH_TRADE_IN_HISTORY = {'msrb_valid_from_date': 'DATETIME', 
                                      'msrb_valid_to_date': 'DATETIME', 
                                      'rtrs_control_number': 'INTEGER', 
                                      'trade_datetime': 'DATETIME', 
                                      'publish_datetime': 'DATETIME', 
                                      'yield': 'FLOAT', 
                                      'dollar_price': 'FLOAT', 
                                      'par_traded': 'NUMERIC', 
                                      'trade_type': 'STRING', 
                                      'is_non_transaction_based_compensation': 'BOOLEAN', 
                                      'is_lop_or_takedown': 'BOOLEAN', 
                                      'brokers_broker': 'STRING', 
                                      'is_alternative_trading_system': 'BOOLEAN', 
                                      'is_weighted_average_price': 'BOOLEAN', 
                                      'settlement_date': 'DATE', 
                                      'calc_date': 'DATE', 
                                      'calc_day_cat': 'INTEGER', 
                                      'maturity_date': 'DATE', 
                                      'next_call_date': 'DATE', 
                                      'par_call_date': 'DATE', 
                                      'refund_date': 'DATE', 
                                      'transaction_type': 'STRING', 
                                      'sequence_number': 'INTEGER'}

In [3]:
def function_timer(function_to_time):
    '''This function is to be used as a decorator. It will print out the execution time of `function_to_time`.'''
    @wraps(function_to_time)    # used to ensure that the function name is still the same after applying the decorator when running tests: https://stackoverflow.com/questions/6312167/python-unittest-cant-call-decorated-test
    def wrapper(*args, **kwargs):    # using the same formatting from https://docs.python.org/3/library/functools.html
        print(f'BEGIN {function_to_time.__name__}')
        start_time = time.time()
        result = function_to_time(*args, **kwargs)
        end_time = time.time()
        print(f'END {function_to_time.__name__}. Execution time: {timedelta(seconds=end_time - start_time)}')
        return result
    return wrapper

In [None]:
reference_data_redis_client = redis.Redis(host='10.108.4.36', port=6379, db=0)    # use read endpoint since use case is read-only allowing for lower latency and to not accidentally corrupt the redis by attempting to write to it
trade_history_redis_client = redis.Redis(host='10.75.46.229', port=6379, db=0)    # use read endpoint since use case is read-only allowing for lower latency and to not accidentally corrupt the redis by attempting to write to it
similar_trade_history_redis_client = redis.Redis(host='10.117.191.180', port=6379, db=0)

In [5]:
@function_timer
def delete_all_keys_in_redis(redis_client):
    num_keys_before_deletion = redis_client.dbsize()
    print(f'Attempting to delete {num_keys_before_deletion} keys')
    for key in redis_client.scan_iter():
        redis_client.delete(key)
    num_keys_after_deletion = redis_client.dbsize()
    assert num_keys_after_deletion == 0, f'Number of keys after deleting all of them is: {num_keys_after_deletion}, but should be 0'

In [6]:
# taken directly from `cloud_functions/fast_trade_history_redis_update/main.py`
def create_trade_history_numpy_array(trade_history_df, max_num_trades):
    trade_history_df = trade_history_df.drop_duplicates(subset='rtrs_control_number', keep='first')    # keep the most recently published `rtrs_control_number` which we can assume is in descending order of 'publish_datetime' and 'sequence_number' due to the `.sort_values(...)` statement above
    trade_history_df = trade_history_df[trade_history_df['transaction_type'] != 'C']    # drop all cancelled trades
    trade_history_df = trade_history_df.sort_values(by=['trade_datetime', 'publish_datetime', 'sequence_number'], ascending=False)
    return trade_history_df.head(max_num_trades).to_numpy()

In [7]:
# taken directly from `cloud_functions/fast_trade_history_redis_update/main.py`
def get_key_trade_history_pair(key, trade_history, redis_client, max_num_trades, key_transform_func=None, verbose=False, keep_cusip_in_trade_history=False):
    '''`key_transform_func` is helpful in turning a tuple into a primitive type (e.g. string) that can be 
    used as a key for Redis. Redis does not allow tuples to be used as keys.'''
    if key_transform_func is not None: key = key_transform_func(key)
    if verbose: print(f'Calling get_key_trade_history_pair(...) with key={key} and trade_history:\n{trade_history.to_markdown()}')
    features_for_each_trade_in_history = list(FEATURES_FOR_EACH_TRADE_IN_HISTORY.keys())
    if keep_cusip_in_trade_history: features_for_each_trade_in_history.append('cusip')
    trade_history = trade_history[features_for_each_trade_in_history]    # this procedure cannot be done outside of this function since it removes the `cusip` field
    if redis_client.exists(key):
        old_trade_history = redis_client.get(key)
        try:
            old_trade_history = pd.DataFrame(pickle.loads(old_trade_history), columns=features_for_each_trade_in_history)
        except Exception as e:
            print('key:', key)
            print('old_trade_history:\n', pd.DataFrame(pickle.loads(old_trade_history)))
            raise e
        trade_history = pd.concat([trade_history, old_trade_history], ignore_index=True)
    return key, create_trade_history_numpy_array(trade_history, max_num_trades)

In [8]:
# taken directly from `cloud_functions/fast_trade_history_redis_update/main.py`
def remove_negative_and_missing_yields(trades_df: pd.DataFrame) -> pd.DataFrame:
    num_trades_before_removal = len(trades_df)
    trades_df = trades_df[~pd.isna(trades_df['yield'])]    # remove trades that have missing yields
    trades_df = trades_df[trades_df['yield'] >= 0]    # remove trades that have negative yields
    num_trades_after_removal = len(trades_df)
    if num_trades_before_removal != num_trades_after_removal: print(f'Removed {num_trades_before_removal - num_trades_after_removal} trades for having negative or missing yields, leaving {num_trades_after_removal} trades')
    return trades_df

In [9]:
# taken directly from `cloud_functions/fast_trade_history_redis_update/main.py`
def upload_trade_history_to_redis(key, trade_history, redis_client):
    '''Add `trade_history` to `redis_client` for a corresponding `key`. If we are in 
    testing mode, then we should wipe the redis before using it for production.
    TODO: wipe redis before using for production.
    NOTE: If the only new trade_message is a cancellation message and there is only one trade in the history 
    (for example), we will upload a trade_history with nothing in it. This is good and desirable, because this 
    will overwrite/replace a key/CUSIP with a trade_message that has subsequently been cancelled.'''
    trade_history = pickle.dumps(trade_history, protocol=pickle.HIGHEST_PROTOCOL)
    redis_client.set(key, trade_history)

In [10]:
# taken directly from `cloud_functions/fast_trade_history_redis_update/main.py`
@function_timer
def upload_to_redis_from_upload_function(pairs, upload_function):
    '''Upload each pair from `pairs` to the redis using the `upload_function`.'''
    [upload_function(key, trade_history) for key, trade_history in pairs]
    return pairs    # unused return value

In [11]:
# taken directly from `cloud_functions/fast_trade_history_redis_update/main.py`
@function_timer
def update_similar_trade_history_redis(new_trades, verbose=False):
    '''Update the redis corresponding to the similar trade history with the rows from `new_trades`. If the feature set 
    defining the related trade does not exist in the redis, then create the similar trade history starting from this 
    trade(s). If the feature set does exist, then check if there are new messages for old RTRS control numbers 
    and substitute those new messages for the old ones. If the `transaction_type` is 'C', remove the trade, 
    otherwise, replace the old message with the newest message. Add new trades to the dataframe in descending 
    order of `trade_datetime`. The definition of similar is one that matches on `issue_key`, `maturity_year_by_5`, 
    and `coupon_by_1`, where maturity_year_by_5 takes the maturity_year and floor divides it by 5 and coupon_by_1 
    takes the coupon and floor divides it by 1.
    NOTE: 'I' is an instruction or the first trade message. 'C' is to cancel the trade. We see here the trade messages 
    have the same information. 'M' and 'R' both indicate modification. 'R' is an MSRB modification (e.g., to fill in 
    par_traded when that value is initially null because of the `par_traded` over $5M rule).
    NOTE: for a particular RTRS control number, there is a specific `trade_datetime`. A more recent message for that  
    RTRS control number, such as a modify or a cancellation, would correspond to a more recent `publish_datetime`.
    NOTE: Setting `verbose` to `True` provides detailed print output and is helpful for testing.'''
    ## the below line is commented out because we already removed negative and missing yields when creating the all trades dataframe
    # new_trades = remove_negative_and_missing_yields(new_trades)    # only keep trades with nonnegative yields
    new_trades = new_trades.dropna(subset=['issue_key', 'maturity_date', 'trade_date', 'coupon'])    # remove trades that have null values for features that we need to determine similarity
    if len(new_trades) == 0:
        print('No trades to add to the similar trade history redis after removing trades with negative yields, and trades with null values for yield, issue_key, maturity_date, trade_date, or coupon.')
        return None
    # add features for definition of similar
    new_trades['years_to_maturity_date_by_5'] = ((new_trades['maturity_date'] - new_trades['trade_date']).dt.days // NUM_OF_DAYS_IN_YEAR) // 5
    new_trades['coupon_by_1'] = np.nan    # initialize the column
    is_zero_coupon = new_trades['coupon'] == 0
    new_trades.loc[is_zero_coupon, 'coupon_by_1'] = -1    # zero coupon has its own bucket
    new_trades.loc[~is_zero_coupon, 'coupon_by_1'] = new_trades.loc[~is_zero_coupon, 'coupon'] // 1
    new_trades = new_trades.astype({'issue_key': int, 'years_to_maturity_date_by_5': int, 'coupon_by_1': int})
    if verbose: print(f'new_trades:\n{new_trades.drop(columns=["recent"]).to_markdown()}')    # drop `recent` column because it has a lot of data that makes it difficult to read the output

    features_to_string = lambda features: '_'.join([str(feature) for feature in features])    # `features` should be a tuple or list; NOTE: this lambda function is identical to `similar_group_to_similar_key(...)` in `app_engine/demo/server/modules/finance.py`
    get_features_similar_trade_history_pair_caller = lambda features, df: get_key_trade_history_pair(features, df, similar_trade_history_redis_client, MAX_NUM_TRADES_IN_SIMILAR_TRADE_HISTORY, features_to_string, verbose=verbose, keep_cusip_in_trade_history=True)
    features_trade_history_pairs = [get_features_similar_trade_history_pair_caller(features, df_for_features) for features, df_for_features in new_trades.groupby(['issue_key', 'years_to_maturity_date_by_5', 'coupon_by_1'])]

    upload_similar_trade_history_to_similar_trade_history_redis = lambda features, trade_history: upload_trade_history_to_redis(features, trade_history, similar_trade_history_redis_client)
    upload_to_redis_from_upload_function(features_trade_history_pairs, upload_similar_trade_history_to_similar_trade_history_redis)
    return features_trade_history_pairs    # return value is unused, but perhaps can be used later to store these values into bigquery for testing

In [12]:
def process_trade_history_data(cusip):
    if not reference_data_redis_client.exists(cusip): return None
    trade_history_data = pd.DataFrame(pickle.loads(trade_history_redis_client.get(cusip)), columns=list(FEATURES_FOR_EACH_TRADE_IN_HISTORY.keys()))
    reference_data = pickle.loads(reference_data_redis_client.get(cusip))[0]    # index 0 indicates the most recent snapshot of the reference data
    trade_history_data[['coupon', 'issue_key']] = reference_data[['coupon', 'issue_key']]
    trade_history_data['cusip'] = cusip.decode('utf-8')    # put cusip in the dataframe to use further downstream to filter the similar trades by removing the target CUSIP; decode with 'utf-8' is necessary since the keys are byte-strings; https://stackoverflow.com/questions/606191/convert-bytes-to-a-string-in-python-3
    trade_history_data = trade_history_data.dropna(subset=['maturity_date', 'coupon', 'issue_key'])    # remove trades that have null values for features that we need to determine similarity
    trade_history_data['issue_key'] = trade_history_data['issue_key'].astype(int)
    return remove_negative_and_missing_yields(trade_history_data)

Aggregate all trades from the trade history redis.

In [None]:
all_trades = None
num_cusips = 0    # set to a strictly positive value to cap the number of CUSIPs investigated to `num_cusips`; used primarily for testing

if num_cusips > 0:
    for cusip in trade_history_redis_client.scan_iter():
        print(cusip)
        trade_history_data = process_trade_history_data(cusip)
        if trade_history_data is not None:
            all_trades = pd.concat([all_trades, trade_history_data]) if all_trades is not None else trade_history_data
            num_cusips -= 1
            if num_cusips == 0: break
else:    # apply parallelization
    print('Using parallelization')
    with mp.Pool() as pool_object:    # using template from https://docs.python.org/3/library/multiprocessing.html
        all_trades = pool_object.map(process_trade_history_data, trade_history_redis_client.scan_iter())
    all_trades = pd.concat([trades for trades in all_trades if trades is not None])

all_trades.to_pickle('all_trades.pkl')

In [14]:
print(f'Total number of trades: {len(all_trades)}')
display(all_trades.head(10))
display(all_trades.tail(10))

Total number of trades: 15621665


Unnamed: 0,msrb_valid_from_date,msrb_valid_to_date,rtrs_control_number,trade_datetime,publish_datetime,yield,dollar_price,par_traded,trade_type,is_non_transaction_based_compensation,...,calc_day_cat,maturity_date,next_call_date,par_call_date,refund_date,transaction_type,sequence_number,coupon,issue_key,cusip
0,2021-05-05 10:54:07,2100-01-01,2021050501478900,2021-05-05 10:53:56,2021-05-05 10:54:07,0.564,101.235,10000.0,D,False,...,2,2021-12-01,,,,I,6343.0,2.75,1012784,57587AHR2
1,2021-05-05 10:54:02,2100-01-01,2021050501478500,2021-05-05 10:53:56,2021-05-05 10:54:02,0.389,101.335,10000.0,S,False,...,2,2021-12-01,,,,I,6341.0,2.75,1012784,57587AHR2
2,2021-04-27 11:26:03,2100-01-01,2021042702071200,2021-04-27 11:25:31,2021-04-27 11:26:03,0.537,101.299,10000.0,D,False,...,2,2021-12-01,,,,I,9128.0,2.75,1012784,57587AHR2
3,2021-04-27 11:26:03,2100-01-01,2021042702065800,2021-04-27 11:25:31,2021-04-27 11:26:03,0.537,101.299,10000.0,D,False,...,2,2021-12-01,,,,I,9126.0,2.75,1012784,57587AHR2
4,2021-04-27 11:26:03,2100-01-01,2021042702072800,2021-04-27 11:25:31,2021-04-27 11:26:03,0.369,101.399,10000.0,S,False,...,2,2021-12-01,,,,I,9127.0,2.75,1012784,57587AHR2
5,2021-04-23 11:40:37,2100-01-01,2021042302125500,2021-04-23 11:40:30,2021-04-23 11:40:37,0.411,101.387,25000.0,S,False,...,2,2021-12-01,,,,I,9391.0,2.75,1012784,57587AHR2
6,2021-04-23 11:40:43,2100-01-01,2021042302126700,2021-04-23 11:40:30,2021-04-23 11:40:43,0.578,101.287,25000.0,D,False,...,2,2021-12-01,,,,I,9399.0,2.75,1012784,57587AHR2
7,2021-04-21 12:39:52,2100-01-01,2021042103293900,2021-04-21 12:39:46,2021-04-21 12:39:52,0.23,101.524,15000.0,S,False,...,2,2021-12-01,,,,I,15534.0,2.75,1012784,57587AHR2
8,2021-04-21 12:39:59,2100-01-01,2021042103296500,2021-04-21 12:39:46,2021-04-21 12:39:59,0.393,101.424,15000.0,D,False,...,2,2021-12-01,,,,I,15544.0,2.75,1012784,57587AHR2
9,2021-04-16 08:47:02,2100-01-01,2021041600270100,2021-04-16 08:46:48,2021-04-16 08:47:02,0.395,101.442,15000.0,D,False,...,2,2021-12-01,,,,I,1051.0,2.75,1012784,57587AHR2


Unnamed: 0,msrb_valid_from_date,msrb_valid_to_date,rtrs_control_number,trade_datetime,publish_datetime,yield,dollar_price,par_traded,trade_type,is_non_transaction_based_compensation,...,calc_day_cat,maturity_date,next_call_date,par_call_date,refund_date,transaction_type,sequence_number,coupon,issue_key,cusip
0,2022-05-18 11:45:24,2100-01-01,2022051805574900,2022-05-18 11:45:00,2022-05-18 11:45:24,2.45,103.726,230000.0,S,False,...,2,2023-12-01,,,,I,23765.0,5.0,1223291,830745LN3
1,2022-05-18 11:45:24,2100-01-01,2022051805574700,2022-05-18 11:45:00,2022-05-18 11:45:24,2.45,103.726,150000.0,S,False,...,2,2023-12-01,,,,I,23763.0,5.0,1223291,830745LN3
0,2020-03-27 06:00:52,2100-01-01,2020032017621900,2020-03-20 17:02:00,2020-03-27 06:00:52,2.398,104.161,9000000.0,S,False,...,3,2030-11-15,2021-11-15,2021-11-15,2021-11-15,,,5.0,901018,59259YHK8
1,2020-03-27 06:00:52,2100-01-01,2020032017477600,2020-03-20 16:51:09,2020-03-27 06:00:52,2.55,103.911,9000000.0,D,False,...,3,2030-11-15,2021-11-15,2021-11-15,2021-11-15,,,5.0,901018,59259YHK8
2,2020-03-27 06:00:52,2100-01-01,2020032017480100,2020-03-20 16:51:09,2020-03-27 06:00:52,2.6,103.829,9000000.0,P,False,...,3,2030-11-15,2021-11-15,2021-11-15,2021-11-15,,,5.0,901018,59259YHK8
3,2020-03-20 16:40:41,2100-01-01,2020032017265100,2020-03-20 16:37:25,2020-03-20 16:40:41,2.55,103.911,1000000.0,S,False,...,3,2030-11-15,2021-11-15,2021-11-15,2021-11-15,,,5.0,901018,59259YHK8
4,2020-03-20 16:40:32,2100-01-01,2020032017260400,2020-03-20 16:36:40,2020-03-20 16:40:32,2.6,103.829,1000000.0,P,False,...,3,2030-11-15,2021-11-15,2021-11-15,2021-11-15,,,5.0,901018,59259YHK8
5,2020-03-17 10:02:27,2100-01-01,2020031701306600,2020-03-17 10:00:47,2020-03-17 10:02:27,1.63,105.48,1200000.0,S,False,...,3,2030-11-15,2021-11-15,2021-11-15,2021-11-15,,,5.0,901018,59259YHK8
6,2020-03-17 10:01:35,2100-01-01,2020031701291200,2020-03-17 09:57:25,2020-03-17 10:01:35,1.65,105.446,1425000.0,S,False,...,3,2030-11-15,2021-11-15,2021-11-15,2021-11-15,,,5.0,901018,59259YHK8
7,2020-03-17 08:50:26,2100-01-01,2020031700408700,2020-03-17 08:46:09,2020-03-17 08:50:26,1.75,105.278,2625000.0,P,False,...,3,2030-11-15,2021-11-15,2021-11-15,2021-11-15,,,5.0,901018,59259YHK8


Update similar trades redis.

In [15]:
all_trades = all_trades.sort_values(by=['trade_datetime'], ascending=False)
all_trades['trade_date'] = all_trades['trade_datetime'].dt.date

delete_all_keys_in_redis(similar_trade_history_redis_client)
feature_groups_similar_trade_history_pairs = update_similar_trade_history_redis(all_trades)

all_trades = all_trades.drop(columns='trade_date')

BEGIN delete_all_keys_in_redis
Attempting to delete 10 keys
END delete_all_keys_in_redis. Execution time: 0:00:00.583588
BEGIN update_similar_trade_history_redis
BEGIN upload_to_redis_from_upload_function
END upload_to_redis_from_upload_function. Execution time: 4:57:08.793376
END update_similar_trade_history_redis. Execution time: 10:30:24.276463


In [None]:
for similar_trades_feature_group in similar_trade_history_redis_client.scan_iter():
    print(similar_trades_feature_group)