# Jane Street Market Prediction - TPU Training


Run some setup code.

In [None]:
import json
import os

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import precision_recall_curve, roc_curve

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing

from kaggle_datasets import KaggleDatasets
from kaggle_secrets import UserSecretsClient

# set up the TPU
tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
tf.config.experimental_connect_to_cluster(tpu)
tf.tpu.experimental.initialize_tpu_system(tpu)
tpu_strategy = tf.distribute.experimental.TPUStrategy(tpu)
tpu_replicas = tpu_strategy.num_replicas_in_sync

# get dataset credential from the Google Cloud SDK
# and  pass credential to tensorflow
# this needs to run after TPU intialization
user_secrets = UserSecretsClient()
user_credential = user_secrets.get_gcloud_credential()
user_secrets.set_tensorflow_credential(user_credential)

# set tensorflow's random seed
tf.random.set_seed(42)

Settings.

In [None]:
# training parameters
EPOCHS = 10
BATCH_SIZE = 512 * tpu_replicas
LEARNING_RATE = 0.2

# cross-validation parameters
FOLDS = 5
HOLDOUT = 4

# model parameters
WINDOW_SIZE = 20
NOISE = 0.1

`get_dataset()` returns a dataset generated from the folds in the list `folds`.

In [None]:
def get_dataset(folds, repeat=False, shuffle=False, cache=False):
    # load a dictionary mapping feature names to columns
    col_file = os.path.join(os.pardir, "input",
                            "jane-street-market-prediction-data",
                            "columns.json")
    with open(col_file) as file:
        cols = json.loads(file.read())

    # shorthand notation for autotune option
    auto = tf.data.experimental.AUTOTUNE

    # opens a tf record in filename as a dataset that parses serialized
    # tensors and returns sliding windows of WINDOW_SIZE samples
    def open_windowed_ds(filename):
        ds = tf.data.TFRecordDataset(filename)
        ds = ds.map(lambda x: tf.io.parse_tensor(x, tf.float32), num_parallel_calls=auto)
        ds = ds.window(WINDOW_SIZE, shift=1, drop_remainder=True)
        ds = ds.flat_map(lambda x: x.batch(WINDOW_SIZE))
        return ds

    # create a dataset with filenames of tf records in files_ds
    # then interleave the datasets obtained by calling
    # open_windowed_ds(x) on each element of files_ds
    data_path = KaggleDatasets().get_gcs_path()
    patterns = [data_path + f"/fold{fold}" + "/*.tfrec" for fold in folds]
    files = tf.io.gfile.glob(patterns)
    files_ds = tf.data.Dataset.from_tensor_slices(files)
    ds = files_ds.interleave(open_windowed_ds, num_parallel_calls=auto)

    # filter out any time series spanning multiple dates
    def single_date(series):
        dates, ix = tf.unique(series[:, cols["date"]])
        return tf.equal(tf.size(dates), tf.constant(1))

    ds = ds.filter(single_date)

    # separate the series into a training sample consisting
    # of the features and a label indicating whether the
    # response at final time is positive
    # need to explicitly reshape the tensors here for things
    # to work properly on TPU
    def collate(series):
        X = series[:, cols["feature_0"]:(cols["feature_129"] + 1)]
        y = (1.0 + tf.sign(series[-1, cols["resp"]])) / 2.0
        return tf.reshape(X, [WINDOW_SIZE, 130]), tf.reshape(y, [1])
    
    ds = ds.map(collate, num_parallel_calls=auto)

    # if shuffling, allow the dataset to ignore the order for speed
    ignore_order = tf.data.Options()
    ignore_order.experimental_deterministic = not shuffle
    ds = ds.with_options(ignore_order)

    # check if the dataset should repeat once exhausted
    if repeat:
        ds = ds.repeat()
    
    # check if we should shuffle the dataset
    if shuffle:
        ds = ds.shuffle(4 * BATCH_SIZE)

    # set the batch size of the dataset
    ds = ds.batch(BATCH_SIZE)
    
    # check if we should cache the dataset
    if cache:
        ds = ds.cache()

    # prefetch new batches in the background
    ds = ds.prefetch(auto)

    return ds


# load a simpler non-windowed version of training set
# to adapt the normalization layer - see comments in
# get_dataset() for explanations
def get_norm_dataset(folds):
    col_file = os.path.join(os.pardir, "input",
                            "jane-street-market-prediction-data",
                            "columns.json")

    with open(col_file) as file:
        cols = json.loads(file.read())

    def parse(serialized):
        tensor = tf.io.parse_tensor(serialized, tf.float32)
        tensor = tensor[cols["feature_0"]:(cols["feature_129"] + 1)]
        return tf.reshape(tensor, [130])

    auto = tf.data.experimental.AUTOTUNE
    ignore_order = tf.data.Options()
    ignore_order.experimental_deterministic = False

    data_path = KaggleDatasets().get_gcs_path()
    patterns = [data_path + f"/fold{fold}" + "/*.tfrec" for fold in folds]
    ds = tf.data.TFRecordDataset(tf.io.gfile.glob(patterns), num_parallel_reads=auto)
    
    ds = ds.with_options(ignore_order)
    ds = ds.map(parse, num_parallel_calls=auto)
    ds = ds.batch(BATCH_SIZE)
    ds = ds.prefetch(auto)
    
    return ds

Define the model.

In [None]:
train_folds = [fold for fold in range(FOLDS) if fold != HOLDOUT]

# compile model on the TPU
with tpu_strategy.scope():    
    # input layer
    inputs = layers.Input(shape=[WINDOW_SIZE, 130])
    
    # normalization layer
    norm = preprocessing.Normalization()
    norm_ds = get_norm_dataset(train_folds)
    norm.adapt(norm_ds)
    flow = norm(inputs)
    
    # gaussian noise
    flow = layers.GaussianNoise(stddev=NOISE)(flow)
    
    # minimalistic resnet
    # skip connection
    skip = flow
    skip = layers.Conv1D(filters=64,
                         kernel_size=5,
                         activation=None,
                         padding="same",
                         data_format="channels_last")(skip)
    skip = layers.BatchNormalization()(skip)
    
    # convolutional block
    flow = layers.Conv1D(filters=64,
                         kernel_size=5,
                         activation=None,
                         padding="same",
                         data_format="channels_last")(flow)
    flow = layers.BatchNormalization()(flow)
    flow = layers.Activation(activation="relu")(flow)

    flow = layers.Conv1D(filters=64,
                         kernel_size=5,
                         activation=None,
                         padding="same",
                         data_format="channels_last")(flow)
    flow = layers.BatchNormalization()(flow)

    # merge flow and skip
    flow = layers.Add()([skip, flow])
    flow = layers.Activation(activation="relu")(flow)
    
    # flattened layers and dense logic for prediction
    flow = layers.Flatten()(flow)
    outputs = layers.Dense(units=1, activation="sigmoid")(flow)

    model = keras.Model(inputs=inputs, outputs=outputs, name="model")
        
    # binary cross-entropy as loss function, Nexterov SGD as optimizer,
    # track precision, recall, and are under the ROC curve
    optimizer = keras.optimizers.SGD(learning_rate=LEARNING_RATE,
                                     momentum=0.9,
                                     nesterov=True)
    metrics=[keras.metrics.Recall(name="recall"),
             keras.metrics.Precision(name="precision"),
             keras.metrics.AUC(name="auc")]
    loss = tf.keras.losses.BinaryCrossentropy(name="binary_crossentropy")

    model.compile(loss=loss, optimizer=optimizer, metrics=metrics)
    
    # print a summary of the model
    model.summary()

Train the model.

In [None]:
# get the training and validation datasets
train_ds = get_dataset(train_folds, repeat=True, shuffle=True)
valid_ds = get_dataset([HOLDOUT], cache=True)

# load stats dictionary to get the number of training samples
stats_file = os.path.join(os.pardir, "input",
                          "jane-street-market-prediction-data",
                          "stats.json")

with open(stats_file) as file:
    stats = json.loads(file.read())

# train the model
hist = model.fit(train_ds,
                 epochs=EPOCHS,
                 steps_per_epoch=stats[str(HOLDOUT)]["length"] // BATCH_SIZE,
                 validation_data=valid_ds)

Plot the learning curve and associated metrics.

In [None]:
hist_df = pd.DataFrame(hist.history)

# loss
plt.figure(figsize=(8, 5))
sns.lineplot(data=hist_df["loss"], label="Training loss")
sns.lineplot(data=hist_df["val_loss"], label="Validation loss")
plt.title("Loss")
plt.grid(True)
plt.xlabel("Epoch")
plt.ylabel("")
plt.show()

# training metrics
plt.figure(figsize=(8, 5))
sns.lineplot(data=hist_df["precision"], label="Precision")
sns.lineplot(data=hist_df["recall"], label="Recall")
sns.lineplot(data=hist_df["auc"], label="Area under ROC curve")
plt.title("Training metrics")
plt.grid(True)
plt.xlabel("Epoch")
plt.ylabel("")
plt.show()

# validation metrics
plt.figure(figsize=(8, 5))
sns.lineplot(data=hist_df["val_precision"], label="Precision")
sns.lineplot(data=hist_df["val_recall"], label="Recall")
sns.lineplot(data=hist_df["val_auc"], label="Area under ROC curve")
plt.title("Validation metrics")
plt.grid(True)
plt.xlabel("Epoch")
plt.ylabel("")
plt.show()

Plot ROC curve and precision vs. recall on validation set.

In [None]:
test_ds = get_dataset([HOLDOUT])

y_ds = test_ds.map(lambda X, y: y)
labels = np.vstack(list(y_ds.as_numpy_iterator()))

X_ds = test_ds.map(lambda X, y: X)
probs = model.predict(X_ds)

# precision vs recall
precisions, recalls, thresholds = precision_recall_curve(labels, probs)

plt.figure(figsize=(8, 5))
plt.plot(thresholds, precisions[:-1], "b--", label="Precision", linewidth=2)
plt.plot(thresholds, recalls[:-1], "g-", label="Recall", linewidth=2)
plt.legend()
plt.xlabel("Threshold")
plt.title("Precision/recall at threshold")
plt.axis([0, 1, 0, 1])
plt.grid(True)
plt.show()

plt.figure(figsize=(8, 5))
plt.plot(recalls, precisions, "b-", linewidth=2)
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision at recall")
plt.axis([0, 1, 0, 1])
plt.grid(True)
plt.show()

# ROC curve
fpr, tpr, thresholds = roc_curve(labels, probs)
plt.figure(figsize=(8, 5))
plt.plot(fpr, tpr, linewidth=2)
plt.xlabel("False positive rate")
plt.ylabel("True positive rate")
plt.title("ROC curve")
plt.axis([0, 1, 0, 1])
plt.grid(True)
plt.show()

Save the model architecture and weights.

In [None]:
with open("model.json", "w") as f:
    f.write(model.to_json())

model.save_weights("model.h5")