# Compare Datasets

Created by Mitas Ray on 2024-11-19.

This notebook is used to compare two datasets. The procedure is to 
1. restrict the datasets to the same datetime window
2. perform high-level analysis on the values in the dataset
3. train a model with these datasets and see similar accuracy results

To run the notebook,
- on linux: use `ficc_python/requirements_py310.txt`, and use `>>> pip install jupyter`
- on mac: use `ficc_python/requirements_py310_mac_jupyter.txt`

Change the following files to enable credentials and the correct working directory:
- `automated_training_auxiliary_functions.py::get_creds(...)`
- `ficc/app_engine/demo/server/modules/get_creds.py::get_creds(...)`
- `automated_training_auxiliary_variables.py::WORKING_DIRECTORY`

In [1]:
# loads the autoreload extension
%load_ext autoreload
# automatically reloads all imported modules when their source code changes
%autoreload 2

In [2]:
import os
from datetime import datetime

import pandas as pd


# importing from parent directory: https://stackoverflow.com/questions/714063/importing-modules-from-parent-folder
import sys
sys.path.insert(0,'../')


from ficc.utils.auxiliary_functions import get_ys_trade_history_features
from ficc.utils.gcp_storage_functions import download_data

from automated_training_auxiliary_variables import CATEGORICAL_FEATURES, BINARY, NON_CAT_FEATURES, NUM_TRADES_IN_HISTORY_YIELD_SPREAD_MODEL, BUCKET_NAME, MODEL_TO_CUMULATIVE_DATA_PICKLE_FILENAME, WORKING_DIRECTORY
from automated_training_auxiliary_functions import STORAGE_CLIENT, MODEL_NAME_TO_KERAS_MODEL, check_that_model_is_supported, fit_encoders, create_input, train_and_evaluate_model, create_summary_of_results, get_optional_arguments_for_process_data, save_model
from set_random_seed import set_seed


set_seed()

INFO: Pandarallel will run on 5 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
Initialized pandarallel with 5 cores
INFO: Pandarallel will run on 5 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
Initialized pandarallel with 5 cores
INFO: Pandarallel will run on 5 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
Initialized pandarallel with 5 cores
In PRODUCTION mode (to change to TESTING mode, set `TESTING` to `True`); all files and models will be saved and NUM_EPOCHS=100
INFO: Pandarallel will run on 5 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
Initialized pandarallel with 5 cores


In [None]:
print(f'Working directory: {WORKING_DIRECTORY}. If this is incorrect, change it in `automated_training_auxiliary_variables.py::WORKING_DIRECTORY`')

Working directory: /Users/mitas/ficc/ficc_python. If this is incorrect, change it in `automated_training_auxiliary_variables.py::WORKING_DIRECTORY


In [4]:
MODEL = 'yield_spread_with_similar_trades'

Restrict the data between a start and end datetime.

In [5]:
def string_to_datetime(datetime_as_string: datetime | str) -> datetime:
    if isinstance(datetime_as_string, datetime): return datetime_as_string
    string_format = '%Y-%m-%d %H:%M:%S'
    try:
        return datetime.strptime(datetime_as_string, string_format)
    except Exception as e:
        print(f'{datetime_as_string} must be in {string_format} format')
        raise e


def restrict_data_to_specified_time_window(data: pd.DataFrame, 
                                           datetime_column_name: str, 
                                           start_datetime: datetime | str, 
                                           end_datetime: datetime | str) -> pd.DataFrame:
    '''Return a truncated version of `data` with values of `datetime_column_name` between 
    `start_datetime` and `end_datetime`.'''
    start_datetime, end_datetime = string_to_datetime(start_datetime), string_to_datetime(end_datetime)
    after_start_datetime = data[datetime_column_name] >= start_datetime
    before_end_datetime = data[datetime_column_name] <= end_datetime
    rows_to_keep = after_start_datetime & before_end_datetime
    rows_remaining = rows_to_keep.sum()
    print(f'{len(data) - rows_remaining} rows removed from the original {len(data)} rows. {rows_remaining} rows remain.')
    return data[rows_to_keep]

In [6]:
may_1_start_of_day = '2024-05-01 00:00:00'
october_1_start_of_day = '2024-10-01 00:00:00'
october_31_end_of_day = '2024-10-31 23:59:59'

In [7]:
def restrict_data_on_trade_datetime(data: pd.DataFrame, start_datetime_as_string: str, end_datetime_as_string: str) -> pd.DataFrame:
    return restrict_data_to_specified_time_window(data, 'trade_datetime', start_datetime_as_string, end_datetime_as_string)


def restrict_data_to_october_on_trade_datetime(data: pd.DataFrame) -> pd.DataFrame:
    return restrict_data_on_trade_datetime(data, october_1_start_of_day, october_31_end_of_day)


def restrict_data_from_may_to_october_on_trade_datetime(data: pd.DataFrame) -> pd.DataFrame:
    return restrict_data_on_trade_datetime(data, may_1_start_of_day, october_31_end_of_day)

Get data from Google Cloud Storage (or locally saved files) and restrict to desired dates.

In [8]:
FILES_DIRECTORY = f'{WORKING_DIRECTORY}/files'

In [9]:
old_data_file_path = f'{FILES_DIRECTORY}/old_data.pkl'
if os.path.isfile(old_data_file_path):
    old_data = pd.read_pickle(old_data_file_path)
else:
    old_data = download_data(STORAGE_CLIENT, BUCKET_NAME, MODEL_TO_CUMULATIVE_DATA_PICKLE_FILENAME[MODEL])
    old_data.to_pickle(old_data_file_path)

In [10]:
old_data = restrict_data_from_may_to_october_on_trade_datetime(old_data)

0 rows removed from the original 6419831 rows. 6419831 rows remain.


In [11]:
def download_files_from_sp_data_for_modeling_bucket(file_names: list) -> pd.DataFrame:
    '''Download each file in `file_names` and concatenate the dataframes together.'''
    return pd.concat([download_data(STORAGE_CLIENT, 'sp_data_for_modeling', file_name) for file_name in file_names])

In [12]:
new_data_file_path = f'{FILES_DIRECTORY}/new_data.pkl'
if os.path.isfile(new_data_file_path):
    new_data = pd.read_pickle(new_data_file_path)
else:
    new_data = download_files_from_sp_data_for_modeling_bucket(['trades_2024-05-01_to_2024-05-31.pkl', 
                                                                'trades_2024-06-01_to_2024-06-30.pkl', 
                                                                'trades_2024-07-01_to_2024-07-31.pkl', 
                                                                'trades_2024-08-01_to_2024-08-31.pkl', 
                                                                'trades_2024-09-01_to_2024-09-30.pkl', 
                                                                'trades_2024-10-01_to_2024-10-31.pkl'])
    new_data.to_pickle(new_data_file_path)

In [13]:
new_data = restrict_data_from_may_to_october_on_trade_datetime(new_data)

0 rows removed from the original 6463891 rows. 6463891 rows remain.


Compare datasets.

In [None]:
def compare_shapes(df1: pd.DataFrame, df2: pd.DataFrame) -> None:
    print('\n=== Dataset Shapes ===')
    print(f'Dataset 1 Shape: {df1.shape}')
    print(f'Dataset 2 Shape: {df2.shape}')
    num_rows_df1, num_rows_df2 = df1.shape[0], df2.shape[0]
    num_rows_difference = num_rows_df1 - num_rows_df2
    if num_rows_difference == 0:
        print('Both datasets have the same number of rows')
    elif num_rows_difference > 0:
        print(f'Dataset 1 has {num_rows_difference} ({round(num_rows_difference / num_rows_df2 * 100, 3)}%) more rows than Dataset 2')
    elif num_rows_difference < 0:
        print(f'Dataset 2 has {abs(num_rows_difference)} ({round(abs(num_rows_difference) / num_rows_df1 * 100, 3)}%) more rows than Dataset 1')
    else:
        raise ValueError(f'{num_rows_difference} has a value that cannot be compared to 0')


def compare_columns(df1: pd.DataFrame, df2: pd.DataFrame) -> None:
    print('\n=== Column Comparison ===')
    print(f'Dataset 1 Columns: {df1.columns.tolist()}')
    print(f'Dataset 2 Columns: {df2.columns.tolist()}')

    print('\n=== Common and Unique Columns ===')
    common_cols = set(df1.columns).intersection(set(df2.columns))
    unique_to_df1 = set(df1.columns) - set(df2.columns)
    unique_to_df2 = set(df2.columns) - set(df1.columns)
    print(f'Common Columns: {common_cols}')
    print(f'Columns only in Dataset 1: {unique_to_df1}')
    print(f'Columns only in Dataset 2: {unique_to_df2}')


def compare_data_types(df1: pd.DataFrame, df2: pd.DataFrame) -> None:
    print('\n=== Data Types ===')
    print('Dataset 1 Data Types:')
    print(df1.dtypes)
    print('\nDataset 2 Data Types:')
    print(df2.dtypes)

    ## below code does not work if there is a column with dtype numpy array
    # print('\n=== Unique Values per Column ===')
    # print('Dataset 1 Unique Values:')
    # print(df1.nunique())
    # print('\nDataset 2 Unique Values:')
    # print(df2.nunique())


def missing_values(df1: pd.DataFrame, df2: pd.DataFrame) -> None:
    print('\n=== Missing Values ===')
    print('Dataset 1 Missing Values:')
    missing_df1 = df1.isnull().sum()
    print(missing_df1[missing_df1 > 0])
    
    print('\nDataset 2 Missing Values:')
    missing_df2 = df2.isnull().sum()
    print(missing_df2[missing_df2 > 0])


def check_last_trade_in_history(df1: pd.DataFrame, df2: pd.DataFrame) -> None:
    print('\n=== Last Trade in History ===')
    columns_to_check = ['last_rtrs_control_number']
    column_to_merge_on = 'rtrs_control_number'
    columns_to_keep = [column_to_merge_on] + columns_to_check
    assert all([((column in df1.columns) and (column in df2.columns)) for column in columns_to_keep]), f'Not all columns in {columns_to_keep} are present in both datasets'
    df1, df2 = df1[columns_to_keep], df2[columns_to_keep]
    suffix1, suffix2 = '_df1', '_df2'
    merged_df = pd.merge(df1, df2, on=column_to_merge_on, suffixes=(suffix1, suffix2))
    for column in columns_to_check:
        print(f'{column}: {(merged_df[column + suffix1] != merged_df[column + suffix2]).sum()} rows have different values for the same {column_to_merge_on}')


def statistical_summary(df1: pd.DataFrame, df2: pd.DataFrame) -> None:
    '''`.describe(...)` has issues if there is a column with dtype numpy array.'''
    print('\n=== Statistical Summary ===')
    print('Dataset 1 Summary:')
    print(df1.describe(include='all'))
    print('\nDataset 2 Summary:')
    print(df2.describe(include='all'))

In [None]:
compare_shapes(old_data, new_data)
compare_columns(old_data, new_data)
# compare_data_types(old_data, new_data)
missing_values(old_data, new_data)
check_last_trade_in_history(old_data, new_data)
# statistical_summary(old_data, new_data)


=== Dataset Shapes ===
Dataset 1 Shape: (6419831, 141)
Dataset 2 Shape: (6463891, 139)
Dataset 2 has 44060 (0.686%) more rows than Dataset 1

=== Column Comparison ===
Dataset 1 Columns: ['rtrs_control_number', 'cusip', 'yield', 'is_callable', 'refund_date', 'accrual_date', 'dated_date', 'next_sink_date', 'coupon', 'delivery_date', 'trade_date', 'trade_datetime', 'par_call_date', 'interest_payment_frequency', 'is_called', 'is_non_transaction_based_compensation', 'is_general_obligation', 'callable_at_cav', 'extraordinary_make_whole_call', 'make_whole_call', 'has_unexpired_lines_of_credit', 'escrow_exists', 'incorporated_state_code', 'trade_type', 'par_traded', 'maturity_date', 'settlement_date', 'next_call_date', 'issue_amount', 'maturity_amount', 'issue_price', 'orig_principal_amount', 'max_amount_outstanding', 'dollar_price', 'calc_date', 'purpose_sub_class', 'called_redemption_type', 'calc_day_cat', 'previous_coupon_payment_date', 'instrument_primary_name', 'purpose_class', 'call_ti

Train yield spread with similar trades model.

In [16]:
def get_num_features_for_each_trade_in_history() -> int:
    optional_arguments_for_process_data = get_optional_arguments_for_process_data(MODEL)
    use_treasury_spread = optional_arguments_for_process_data.get('use_treasury_spread', False)
    trade_history_features = get_ys_trade_history_features(use_treasury_spread)
    return len(trade_history_features)

In [17]:
def train_model(data: pd.DataFrame, 
                last_trade_date_for_training_dataset: str):
    '''Heavily inspired by `automated_trianing_auxiliary_functions::train_model(...)`. The main changes are: 
    (1) assume that we are using the yield spread with similar trades model,
    (2) do not have an exclusions function
    (3) do not restrict the test set to just a single day
    '''
    check_that_model_is_supported(MODEL)
    encoders, fmax = fit_encoders(data, CATEGORICAL_FEATURES, MODEL)
    test_data = data[data.trade_date > last_trade_date_for_training_dataset]    # `test_data` can only contain trades after `last_trade_date_for_training_dataset`
    train_data = data[data.trade_date <= last_trade_date_for_training_dataset]    # `train_data` only contains trades before and including `last_trade_date_for_training_dataset`
    training_set_info = f'Training set contains {len(train_data)} trades ranging from trade datetimes of {train_data.trade_datetime.min()} to {train_data.trade_datetime.max()}'
    test_set_info = f'Test set contains {len(test_data)} trades ranging from trade datetimes of {test_data.trade_datetime.min()} to {test_data.trade_datetime.max()}'
    print(training_set_info)
    print(test_set_info)

    x_train, y_train = create_input(train_data, encoders, MODEL)
    x_test, y_test = create_input(test_data, encoders, MODEL)

    keras_model = MODEL_NAME_TO_KERAS_MODEL[MODEL]
    untrained_model = keras_model(x_train, 
                                  NUM_TRADES_IN_HISTORY_YIELD_SPREAD_MODEL, 
                                  get_num_features_for_each_trade_in_history(), 
                                  CATEGORICAL_FEATURES, 
                                  NON_CAT_FEATURES, 
                                  BINARY, 
                                  fmax)
    trained_model, mae, history = train_and_evaluate_model(untrained_model, x_train, y_train, x_test, y_test)
    result_df = create_summary_of_results(trained_model, test_data, x_test, y_test)
    save_model(trained_model, None, MODEL, 'old_data', upload_to_google_cloud_bucket=False)    # setting `encoders=None` to not save the encoders file
    return result_df

In [None]:
train_model(old_data, '2024-09-30')    # Tuesday 2024-10-01 - Thursday 2024-10-31 is the test set

Training set contains 5257795 trades ranging from trade datetimes of 2024-05-01 07:01:05 to 2024-09-30 18:24:32
Test set contains 1162036 trades ranging from trade datetimes of 2024-10-01 00:00:00 to 2024-10-31 18:35:10
BEGIN create_input
END create_input. Execution time: 0:00:32.438
BEGIN create_input
END create_input. Execution time: 0:00:07.218




Epoch 1/100


2024-11-19 15:13:43.898232: W tensorflow/core/grappler/costs/op_level_cost_estimator.cc:693] Error in PredictCost() for the op: op: "Softmax" attr { key: "T" value { type: DT_FLOAT } } inputs { dtype: DT_FLOAT shape { unknown_rank: true } } device { type: "CPU" model: "0" frequency: 2400 num_cores: 10 environment { key: "cpu_instruction_set" value: "ARM NEON" } environment { key: "eigen" value: "3.4.90" } l1_cache_size: 16384 l2_cache_size: 524288 l3_cache_size: 524288 memory_size: 268435456 } outputs { dtype: DT_FLOAT shape { unknown_rank: true } }




2024-11-19 15:18:21.142926: W tensorflow/core/grappler/costs/op_level_cost_estimator.cc:693] Error in PredictCost() for the op: op: "Softmax" attr { key: "T" value { type: DT_FLOAT } } inputs { dtype: DT_FLOAT shape { unknown_rank: true } } device { type: "CPU" model: "0" frequency: 2400 num_cores: 10 environment { key: "cpu_instruction_set" value: "ARM NEON" } environment { key: "eigen" value: "3.4.90" } l1_cache_size: 16384 l2_cache_size: 524288 l3_cache_size: 524288 memory_size: 268435456 } outputs { dtype: DT_FLOAT shape { unknown_rank: true } }


Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100

In [None]:
train_model(new_data, '2024-09-30')    # Tuesday 2024-10-01 - Thursday 2024-10-31 is the test set