# Candle Dataset Cleaning

After running the first training job on the full dataset, we can see that despite the training loss decreasing, the validation loss is remaining fairly constant. As a potential remedy, we are going to try removing all of the rows from a closed market from the dataset. This will have a few advantages:
1. It will drasitcally reduce the size of the data by approximately 55 - 60%, by keeping only the examples that are relevant to the task we need to perform. 
2. It will ensure that the loss metrics are mearusing the models performnace on market data where trading is occuring. Currently, the loss function is likely dilluted, as so much of the data is just a flat time-series, that the model is likely performing perfectly on these examples, causing the loss metric to be diluted, and likely not allowing a high enough gradient to be built up for the backward pass.

## Plan for Cleaning Data 

1. Remove any rows where the market is closed. We will never be using this in the actual model.
2. This should give us a a couple hundred time-series per ticker. We want to create a time-series ID for each continuous time series in the dataset. This will be used as a column ID.
3. Then iterate over each ticker. For each ticker:
 - Use our date indicices, to create a train, validation, and test set for that ticker. Be sure that the date the sets do not slice any time-series (e.g. each of the 3 sets should have a unique set of time-series IDs)
 - Append eacah of the three sets to a master train, validation, and test set respectively. 
 - These steps will replace the current `select_by_index` usage
4. Train the `TimeSeriesPreprocessor` on the train set
5. Create the train, validation, and test datasets, using the trained preprocessor



In [1]:
# Standard
import os
import random

# Third Party
from transformers import (
    EarlyStoppingCallback,
    PatchTSMixerConfig,
    PatchTSMixerForPrediction,
    Trainer,
    TrainingArguments,
)
import numpy as np
import pandas as pd
import torch
from sklearn.model_selection import train_test_split

# First Party
from tsfm_public.toolkit.dataset import ForecastDFDataset
from tsfm_public.toolkit.time_series_preprocessor import TimeSeriesPreprocessor


In [2]:
# Set seed for reproducibility
SEED = 42
torch.manual_seed(SEED)
random.seed(SEED)
np.random.seed(SEED)

## Dataset Preprocessing

We want to meticulously craft our examples for each trading day. Our primary focus here is to ensure that the model only is ever asked to forecast into live market data. We need it to understand this type of forecasting, and not be thrown off at all by examples from extended hours trading.

There are a few general steps we must take:
1. Ensure that all the data from the closed market is removed from the dataset, as this data will never be used in context or in forecasting.
2. Ensure that the timestamps are localized to America/New_York, so that we can get accurate date_strings to use for identifying each trading day.

In [3]:
# Load the Dataset from the CSV file
DATA_DIR = "/home/ubuntu/verb-workspace/data" # set this accordingly to the location of the data

MORNING_TRAIN_DATASET = f"{DATA_DIR}/1min-candles-train-MORNING.csv"
MORNING_VALID_DATASET = f"{DATA_DIR}/1min-candles-valid-MORNING.csv"
MORNING_TEST_DATASET = f"{DATA_DIR}/1min-candles-test-MORNING.csv"

DAY_TRAIN_DATASET = f"{DATA_DIR}/1min-candles-train-DAY.csv"
DAY_VALID_DATASET = f"{DATA_DIR}/1min-candles-valid-DAY.csv"
DAY_TEST_DATASET = f"{DATA_DIR}/1min-candles-test-DAY.csv"

AFTERNOON_TRAIN_DATASET = f"{DATA_DIR}/1min-candles-train-AFTERNOON.csv"
AFTERNOON_VALID_DATASET = f"{DATA_DIR}/1min-candles-valid-AFTERNOON.csv"
AFTERNOON_TEST_DATASET = f"{DATA_DIR}/1min-candles-test-AFTERNOON.csv"


timestamp_col = 't'

morning_train_data = pd.read_csv(
    MORNING_TRAIN_DATASET,
    parse_dates=[timestamp_col]
)
print("Morning Train Loaded")

morning_valid_data = pd.read_csv(
    MORNING_VALID_DATASET,
    parse_dates=[timestamp_col]
)
print("Morning Valid Loaded")


morning_test_data = pd.read_csv(
    MORNING_TEST_DATASET,
    parse_dates=[timestamp_col]
)
print("Morning Test Loaded")



day_train_data = pd.read_csv(
    DAY_TRAIN_DATASET,
    parse_dates=[timestamp_col]
)
print("Day Train Loaded")

day_valid_data = pd.read_csv(
    DAY_VALID_DATASET,
    parse_dates=[timestamp_col]
)
print("Day Valid Loaded")

day_test_data = pd.read_csv(
    DAY_TEST_DATASET,
    parse_dates=[timestamp_col]
)
print("Day Test Loaded")


afternoon_train_data = pd.read_csv(
    AFTERNOON_TRAIN_DATASET,
    parse_dates=[timestamp_col]
)
print("Afternoon Train Loaded")

afternoon_valid_data = pd.read_csv(
    AFTERNOON_VALID_DATASET,
    parse_dates=[timestamp_col]
)
print("Afternoon Valid Loaded")


afternoon_test_data = pd.read_csv(
    AFTERNOON_TEST_DATASET,
    parse_dates=[timestamp_col]
)
print("Afternoon Test Loaded")


Morning Train Loaded
Morning Valid Loaded
Morning Test Loaded
Day Train Loaded
Day Valid Loaded
Day Test Loaded
Afternoon Train Loaded
Afternoon Valid Loaded
Afternoon Test Loaded


In [4]:
from tqdm import tqdm
from pandarallel import pandarallel

pandarallel.initialize(nb_workers=3)

desired_order = ['ticker', 'date_string', 't', 'targ_o', 'targ_h', 'targ_l', 'targ_c', 'targ_v', 'targ_vwap', 'targ_red', 'targ_green', 'cont_market_open', 'cont_market_extended']

# Define custom aggregation functions
def vwap_agg(x):
    if isinstance(x, pd.Series):
        return 0
    return (x.targ_v * x.targ_c).sum() / x.targ_v.sum() if x.targ_v.sum() > 0 else 0

agg_dict = {
    'targ_o': 'first',
    'targ_h': 'max',
    'targ_l': 'min',
    'targ_c': 'last',
    'targ_v': 'sum',
    'targ_vwap': vwap_agg,
    'cont_market_open': 'max',
    'cont_market_extended': 'max'
}

def custom_agg(x):
    agg_values = {
        'targ_o': x.targ_o.iloc[0],
        'targ_h': x.targ_h.max(),
        'targ_l': x.targ_l.min(),
        'targ_c': x.targ_c.iloc[-1],
        'targ_v': x.targ_v.sum(),
        'targ_vwap': vwap_agg(x),
        'cont_market_open': x.cont_market_open.max(),
        'cont_market_extended': x.cont_market_extended.max()
    }
    return pd.Series(agg_values)

def aggregate_min_candles(df: pd.DataFrame) -> pd.DataFrame:
    # Set 'date' as the index of the dataframe
    # Be sure to make a copy here, so as to not mutate the original
    _df = df.set_index(timestamp_col)
    
    # Group by 'ticker' and 'date_string', and resample the data to 1-minute frequency
    groups = _df.groupby(['ticker', 'date_string'])
    
    def process_group(group):
        ticker = group.iloc[0].ticker
        date_str = group.iloc[0].date_string
        group.index = pd.to_datetime(group.index)
        resampled_group = group.resample('5min').apply(custom_agg)

        # Add the targ_red and targ_green columns back in
        resampled_group['targ_red'] = (resampled_group['targ_c'] < resampled_group['targ_o']).astype(int)
        resampled_group['targ_green'] = (resampled_group['targ_c'] > resampled_group['targ_o']).astype(int)
        resampled_group = resampled_group.reset_index()
        resampled_group['ticker'] = ticker
        resampled_group['date_string'] = date_str
        resampled_group = resampled_group.reindex(columns=desired_order)

        return resampled_group
    
    resampled_groups = groups.parallel_apply(process_group)
    resampled_df = resampled_groups.reset_index(drop=True)
    
    
    return resampled_df
    
datasets = {
    "morning_train": morning_train_data,
    "morning_valid": morning_valid_data,
    "morning_test": morning_test_data,
    "day_train": day_train_data,
    "day_valid": day_valid_data,
    "day_test": day_test_data,
    "afternoon_train": afternoon_train_data,
    "afternoon_valid": afternoon_valid_data,
    "afternoon_test": afternoon_test_data,
}

resampled_datasets = {}
for key, dataset in tqdm(datasets.items(), total=len(datasets)):
    resampled_dataset = aggregate_min_candles(dataset)
    resampled_datasets[key] = resampled_dataset



INFO: Pandarallel will run on 3 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


100%|██████████| 9/9 [04:03<00:00, 27.02s/it]


In [5]:
morning_train_5min = resampled_datasets['morning_train']
morning_valid_5min = resampled_datasets['morning_valid']
morning_test_5min = resampled_datasets['morning_test']

day_train_5min = resampled_datasets['day_train']
day_valid_5min = resampled_datasets['day_valid']
day_test_5min = resampled_datasets['day_test']

afternoon_train_5min = resampled_datasets['afternoon_train']
afternoon_valid_5min = resampled_datasets['afternoon_valid']
afternoon_test_5min = resampled_datasets['afternoon_test']

In [6]:
# # Let's look at the first examples from the morning_train set
example_day = morning_train_5min.loc[(morning_train_5min.ticker == 'AAPL') & (morning_train_5min.date_string == '2023-01-03')]
example_day


Unnamed: 0,ticker,date_string,t,targ_o,targ_h,targ_l,targ_c,targ_v,targ_vwap,targ_red,targ_green,cont_market_open,cont_market_extended
0,AAPL,2023-01-03,2023-01-03 05:30:00-05:00,130.8000,130.8000,130.800,130.8000,235.0,130.800000,0,0,0.0,1.0
1,AAPL,2023-01-03,2023-01-03 05:35:00-05:00,130.8000,130.8000,130.800,130.8000,0.0,0.000000,0,0,0.0,1.0
2,AAPL,2023-01-03,2023-01-03 05:40:00-05:00,130.8000,131.0200,130.800,131.0000,4873.0,130.972664,0,1,0.0,1.0
3,AAPL,2023-01-03,2023-01-03 05:45:00-05:00,130.9900,131.0500,130.990,131.0400,1535.0,131.041166,0,1,0.0,1.0
4,AAPL,2023-01-03,2023-01-03 05:50:00-05:00,131.0200,131.0400,131.000,131.0000,200.0,131.000000,1,0,0.0,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
62,AAPL,2023-01-03,2023-01-03 10:40:00-05:00,125.4000,125.7800,125.310,125.3500,1520944.0,125.623206,1,0,1.0,0.0
63,AAPL,2023-01-03,2023-01-03 10:45:00-05:00,125.3692,125.7250,125.270,125.6728,1243797.0,125.530814,0,1,1.0,0.0
64,AAPL,2023-01-03,2023-01-03 10:50:00-05:00,125.6712,125.7200,125.230,125.3200,1573416.0,125.370505,1,0,1.0,0.0
65,AAPL,2023-01-03,2023-01-03 10:55:00-05:00,125.3150,125.6558,125.315,125.5950,934120.0,125.498774,0,1,1.0,0.0


We can check the 111 is the right number of rows:

11:00 - 05:30 = 5 hours and 35 minutes = 335 minutes / 5 = 67

In [7]:
# Now let's assert that every morning group has this correct row count
train_groups = morning_train_5min.groupby(['ticker', 'date_string'])
valid_groups = morning_valid_5min.groupby(['ticker', 'date_string'])
test_groups = morning_test_5min.groupby(['ticker', 'date_string'])

NUM_ROWS = 67

# Create a list of group objects
all_groups = [train_groups, valid_groups, test_groups]

for groups in all_groups:
    for (ticker, date_string), group in groups:
        assert len(group) == NUM_ROWS

In [8]:
# Let's look at the first examples from the day_train set
example_day = day_train_5min.loc[(day_train_5min.ticker == 'AAPL') & (day_train_5min.date_string == '2023-01-03')]
example_day

Unnamed: 0,ticker,date_string,t,targ_o,targ_h,targ_l,targ_c,targ_v,targ_vwap,targ_red,targ_green,cont_market_open,cont_market_extended
0,AAPL,2023-01-03,2023-01-03 07:00:00-05:00,130.380,130.450,130.0300,130.1500,31669.0,130.140812,1,0,0.0,1.0
1,AAPL,2023-01-03,2023-01-03 07:05:00-05:00,130.150,130.350,130.1500,130.2600,17818.0,130.259433,0,1,0.0,1.0
2,AAPL,2023-01-03,2023-01-03 07:10:00-05:00,130.210,130.380,130.2000,130.3800,13314.0,130.327047,0,1,0.0,1.0
3,AAPL,2023-01-03,2023-01-03 07:15:00-05:00,130.450,130.740,130.3800,130.6200,15273.0,130.588837,0,1,0.0,1.0
4,AAPL,2023-01-03,2023-01-03 07:20:00-05:00,130.590,130.660,130.5700,130.6200,6774.0,130.642504,0,1,0.0,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
86,AAPL,2023-01-03,2023-01-03 14:10:00-05:00,124.890,124.930,124.6408,124.6697,657046.0,124.773914,1,0,1.0,0.0
87,AAPL,2023-01-03,2023-01-03 14:15:00-05:00,124.670,124.820,124.4800,124.5200,552017.0,124.634393,1,0,1.0,0.0
88,AAPL,2023-01-03,2023-01-03 14:20:00-05:00,124.520,124.580,124.4200,124.5384,587381.0,124.510407,0,1,1.0,0.0
89,AAPL,2023-01-03,2023-01-03 14:25:00-05:00,124.530,124.609,124.4600,124.5700,570037.0,124.524486,0,1,1.0,0.0


We can check the 111 is the right number of rows:

14:33 - 07:00 = 7 hours and 35 minutes = 455 minutes / 5 = 91

In [10]:
# Now let's assert that every day group has this correct row count
train_groups = day_train_5min.groupby(['ticker', 'date_string'])
valid_groups = day_valid_5min.groupby(['ticker', 'date_string'])
test_groups = day_test_5min.groupby(['ticker', 'date_string'])

NUM_ROWS = 91

# Create a list of group objects
all_groups = [train_groups, valid_groups, test_groups]

for groups in all_groups:
    for (ticker, date_string), group in groups:
        assert len(group) == NUM_ROWS

In [11]:
# Let's look at the first examples from the afternoon_train set
example_day = afternoon_train_5min.loc[(afternoon_train_5min.ticker == 'AAPL') & (afternoon_train_5min.date_string == '2023-01-03')]
example_day

Unnamed: 0,ticker,date_string,t,targ_o,targ_h,targ_l,targ_c,targ_v,targ_vwap,targ_red,targ_green,cont_market_open,cont_market_extended
0,AAPL,2023-01-03,2023-01-03 10:30:00-05:00,125.4600,125.690,125.10,125.2093,2046178.0,125.274898,1,0,1.0,0.0
1,AAPL,2023-01-03,2023-01-03 10:35:00-05:00,125.2100,125.420,125.05,125.3900,1888789.0,125.224760,0,1,1.0,0.0
2,AAPL,2023-01-03,2023-01-03 10:40:00-05:00,125.4000,125.780,125.31,125.3500,1520944.0,125.623206,1,0,1.0,0.0
3,AAPL,2023-01-03,2023-01-03 10:45:00-05:00,125.3692,125.725,125.27,125.6728,1243797.0,125.530814,0,1,1.0,0.0
4,AAPL,2023-01-03,2023-01-03 10:50:00-05:00,125.6712,125.720,125.23,125.3200,1573416.0,125.370505,1,0,1.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
61,AAPL,2023-01-03,2023-01-03 15:35:00-05:00,124.8200,125.000,124.74,124.8950,1249771.0,124.881194,0,1,1.0,0.0
62,AAPL,2023-01-03,2023-01-03 15:40:00-05:00,124.8950,125.000,124.77,124.9800,1193160.0,124.921284,0,1,1.0,0.0
63,AAPL,2023-01-03,2023-01-03 15:45:00-05:00,124.9900,125.170,124.81,124.9600,1481244.0,125.002379,1,0,1.0,0.0
64,AAPL,2023-01-03,2023-01-03 15:50:00-05:00,124.9600,125.420,124.93,125.3350,2626763.0,125.178504,0,1,1.0,0.0


### We can check the 111 is the right number of rows:

15:57 - 10:30 = 5 hours and 30 minutes = 330 minutes / 5 = 66

In [12]:
# Now let's assert that every afternoon group has this correct row count
train_groups = afternoon_train_5min.groupby(['ticker', 'date_string'])
valid_groups = afternoon_valid_5min.groupby(['ticker', 'date_string'])
test_groups = afternoon_test_5min.groupby(['ticker', 'date_string'])

NUM_ROWS = 66

# Create a list of group objects
all_groups = [train_groups, valid_groups, test_groups]

for groups in all_groups:
    for (ticker, date_string), group in groups:
        assert len(group) == NUM_ROWS

In [14]:
MORNING_TRAIN_DATASET_5MIN = f"{DATA_DIR}/5min-candles-train-MORNING.csv"
MORNING_VALID_DATASET_5MIN = f"{DATA_DIR}/5min-candles-valid-MORNING.csv"
MORNING_TEST_DATASET_5MIN = f"{DATA_DIR}/5min-candles-test-MORNING.csv"

DAY_TRAIN_DATASET_5MIN = f"{DATA_DIR}/5min-candles-train-DAY.csv"
DAY_VALID_DATASET_5MIN = f"{DATA_DIR}/5min-candles-valid-DAY.csv"
DAY_TEST_DATASET_5MIN = f"{DATA_DIR}/5min-candles-test-DAY.csv"

AFTERNOON_TRAIN_DATASET_5MIN = f"{DATA_DIR}/5min-candles-train-AFTERNOON.csv"
AFTERNOON_VALID_DATASET_5MIN = f"{DATA_DIR}/5min-candles-valid-AFTERNOON.csv"
AFTERNOON_TEST_DATASET_5MIN = f"{DATA_DIR}/5min-candles-test-AFTERNOON.csv"

morning_train_5min.to_csv(MORNING_TRAIN_DATASET_5MIN, index=False)
morning_valid_5min.to_csv(MORNING_VALID_DATASET_5MIN, index=False)
morning_test_5min.to_csv(MORNING_TEST_DATASET_5MIN, index=False)

day_train_5min.to_csv(DAY_TRAIN_DATASET_5MIN, index=False)
day_valid_5min.to_csv(DAY_VALID_DATASET_5MIN, index=False)
day_test_5min.to_csv(DAY_TEST_DATASET_5MIN, index=False)

afternoon_train_5min.to_csv(AFTERNOON_TRAIN_DATASET_5MIN, index=False)
afternoon_valid_5min.to_csv(AFTERNOON_VALID_DATASET_5MIN, index=False)
afternoon_test_5min.to_csv(AFTERNOON_TEST_DATASET_5MIN, index=False)