In [2]:
!pip install tensorflow-addons keras-beats pandas scikit-learn sktime joblib
!pip install -U "ray[data,train,tune,serve]"

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [3]:
!pip install pyarrow

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [3]:
import numpy as np
import pandas as pd

from kerasbeats import prep_time_series, NBeatsModel
# from tensorflow import keras

In [4]:
df = pd.read_feather('../data/df_btc_with_features_5m_spot.feather')

df['target'] = df['close'].copy()

start_time = df['open_time'].min()
end_time = df['open_time'].max()
dates = df['open_time'].unique()
n = len(dates)
train_idx = int(0.7 * n)
test_idx = int(0.8 * n)

train_df = df.iloc[:train_idx]
valid_df = df.iloc[train_idx:test_idx]
test_df = df.iloc[test_idx:]

In [5]:
import tensorflow as tf
tf.random.set_seed(42)
import tensorflow.python.keras.backend as K
import tensorflow.python.keras.layers as layers
from tensorflow.python.keras.callbacks import Callback, ReduceLROnPlateau, ModelCheckpoint, EarlyStopping

def create_nbeat_mlp(num_columns, num_labels, lookback, horizon, hidden_units, dropout_rates, ls=1e-2, lr=1e-3):
    nbeats = NBeatsModel(model_type = 'generic', lookback = lookback, horizon = horizon,
                         learning_rate = lr, batch_size = 4096,
                         num_generic_neurons = hidden_units[0]) # set as default
    nbeats.build_layer()
    time_input = keras.layers.Input(shape = (lookback * horizon, ))
    x_nb = nbeats.model_layer(time_input)

    xcons = keras.layers.Input(shape = (num_columns, ))
    x = keras.layers.Concatenate()([xcons, x_nb])
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dropout(dropout_rates[0])(x)

    for i in range(1, len(hidden_units)):
        x = tf.keras.layers.Dense(hidden_units[i])(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Activation('swish')(x)
        x = tf.keras.layers.Dropout(dropout_rates[i])(x)

    out = tf.keras.layers.Dense(num_labels, name = 'action')(x)
    model = tf.keras.models.Model(inputs = [time_input, xcons], outputs = out)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
                  loss = {'action' : tf.keras.losses.MeanSquaredError()},
                  metrics = {'action' : tf.metrics.MeanSquaredError(name = 'mse')})
    return model

In [6]:
import tensorflow as tf
tf.random.set_seed(42)
import tensorflow.python.keras.backend as K
import tensorflow.python.keras.layers as layers
from tensorflow.python.keras.callbacks import Callback, ReduceLROnPlateau, ModelCheckpoint, EarlyStopping
from ray import train
from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
from ray.data import read_numpy
from ray.train.tensorflow.keras import ReportCheckpointCallback


def train_func(config):
    batch_size = config.get('batch_size', 1024)
    num_generic_neurons = config['num_generic_neurons']
    num_generic_stacks = config['num_generic_stacks']
    num_generic_layers = config['num_generic_layers']
    num_trend_neurons = config['num_trend_neurons']
    num_trend_stacks = config['num_trend_stacks']
    num_trend_layers = config['num_trend_layers']
    num_seasonal_neurons = config['num_seasonal_neurons']
    polynomial_term = config['polynomial_term']
    lr = config['learning_rate']

    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    with strategy.scope():
        model = NBeatsModel(**config)
        model.build_layer()
        model.build_model()
        ckp = ModelCheckpoint(ckp_path, monitor='r2score', verbose=0,
                            save_best_only=True, save_weights_only=True, mode='max')
        es = EarlyStopping(monitor='r2score', min_delta=1e-4, patience=10, mode='max',
                            baseline=None, restore_best_weights=True, verbose=0)
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
                      loss = [tf.keras.losses.MeanSquaredError(name = 'mse')],
                      metrics = [tf.keras.metrics.MeanSquaredError(name = 'mse')])

    x_tr, y_tr = read_numpy(f'../data/x_tr_{lookback}.npy'), read_numpy(f'../data/y_tr_{lookback}.npy')
    x_val, y_val = read_numpy(f'../data/x_val_{lookback}.npy'),read_numpy(f'../data/y_val_{lookback}.npy')
    
    results = []
    for _ in range(epochs):
        history = model.fit(x_tr, y_tr, validation_data = (x_val, y_val),
                            batch_size = batch_size,
                            callbacks = [ReportCheckpointCallback(metrics = ['val_mse']), ckp, es])
        results.append(history.history)
    return results


In [7]:
from sklearn.preprocessing import MinMaxScaler


scaler = MinMaxScaler((0, 1))
train_df['target'] = scaler.fit_transform(train_df[['close']])
valid_df['target'] = scaler.transform(valid_df[['close']])

train_df['target_ret'] = train_df['close'].pct_change(1)
valid_df['target_ret'] = valid_df['close'].pct_change(1)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_df['target'] = scaler.fit_transform(train_df[['close']])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  valid_df['target'] = scaler.transform(valid_df[['close']])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_df['target_ret'] = train_df['close'].pct_change(1)
A value is trying to be se

In [None]:
horizon = 100
for lookback in range(100, 600, 100):
    for type in ['minmax', 'ret']:
        x_tr, y_tr = prep_time_series(train_df['target'], lookback = lookback, horizon = horizon)
        x_val, y_val = prep_time_series(valid_df['target'], lookback = lookback, horizon = horizon)
        x_tr, y_tr = prep_time_series(train_df['target_ret'], lookback = lookback, horizon = horizon)
        x_val, y_val = prep_time_series(valid_df['target'], lookback = lookback, horizon = horizon)

        np.save(f'../data/ray/x_tr_{lookback}_{type}.npy', x_tr)
        np.save(f'../data/ray/y_tr_{lookback}_{type}.npy', y_tr)
        np.save(f'../data/ray/x_val_{lookback}_{type}.npy', x_val)
        np.save(f'../data/ray/y_val_{lookback}_{type}.npy', y_val)
    

In [9]:
from ray.tune import TuneConfig
from ray.tune.search.bayesopt import BayesOptSearch
import ray


config = {
    "num_generic_neurons" : [200, 400, 500, 600, 700],
    'num_generic_stacks' : [30, 40, 50, 60],
    'num_generic_layers' : [3, 4, 5, 6],
    'num_trend_neurons' : [128, 256, 512],
    'num_trend_stacks' : [3, 4, 5, 6, 7],
    'num_trend_layers' : [4, 6, 8],
    'polynomial_term' : [2, 3, 4],
    'loss' : 'mse',
    'lr' : 1e-3,
    'lookback' : 200,
    'horizon' : 100
}

scaling_config = ScalingConfig(num_workers = 10, use_gpu = True)
trainer = TensorflowTrainer(
    train_loop_per_worker = train_func,
    train_loop_config = {"num_epochs"},
    scale_config = scaling_config,
    datasets={"train": train,
              "valid" : valid}
)



2023-12-05 08:15:28,102	ERROR services.py:1329 -- Failed to start the dashboard , return code 1
2023-12-05 08:15:28,105	ERROR services.py:1354 -- Error should be written to 'dashboard.log' or 'dashboard.err'. We are printing the last 20 lines for you. See 'https://docs.ray.io/en/master/ray-observability/ray-logging.html#logging-directory-structure' to find where the log file is.
2023-12-05 08:15:28,107	ERROR services.py:1398 -- 
The last 20 lines of /tmp/ray/session_2023-12-05_08-15-25_702261_4078/logs/dashboard.log (it contains the error message from the dashboard): 
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/ray/dashboard/utils.py", line 134, in get_all_modules
    raise e
  File "/usr/local/lib/python3.11/dist-packages/ray/dashboard/utils.py", line 121, in get_all_modules
    importlib.import_module(name)
  File "/usr/lib/python3.11/importlib/__init__.py", line 126, in import_module
    return _bootstrap._

KeyboardInterrupt: 

In [None]:
lookback = 200
horizon = 100
x_tr, y_tr = prep_time_series(df['close'], lookback = lookback, horizon = horizon)
trainer = TensorflowTrainer(
    train_loop_per_worker = 
)

In [None]:
def train_func(config):
    lookback = config['lookback']
    horizon = 100
    train_dataset = 


In [11]:
scores = []
batch_size = 4096
for fold, (train_idx, val_idx) in enumerate(cv.split(train_df, train_df[f'target_5m'], groups)):
    x_train, x_valid = train_df['target_5m'].iloc[train_idx], train_df['target_5m'].iloc[val_idx]

    min_train, max_train = min(train_df['open_time'].iloc[train_idx]).to_pydatetime(), max(
                train_df['open_time'].iloc[train_idx]).to_pydatetime()
    min_valid, max_valid = min(train_df['open_time'].iloc[val_idx]).to_pydatetime(), max(
                train_df['open_time'].iloc[val_idx]).to_pydatetime()

    x_tr, y_tr = prep_time_series(x_train, lookback = lookback, horizon = horizon)
    x_val, y_val = prep_time_series(x_valid, lookback = lookback, horizon = horizon)

    cutoff_tr, cutoff_val = x_train.shape[0] - x_tr.shape[0], x_valid.shape[0] - x_val.shape[0]
    x_tr_const, x_val_const = train_df[train_features_test].iloc[train_idx], train_df[train_features_test].iloc[val_idx]
    x_tr_const, x_val_const = x_tr_const.iloc[cutoff_tr:, :], x_val_const.iloc[cutoff_val:, :]

    print(f'Shape of X_const is {x_tr_const.shape}, x_tr is {x_tr.shape}, y_tr is {y_tr.shape}')

    ckp_path = f'../output/{directory}/NBEATS_MSE_{fold}_returns{horizon}m_{lookback}m_{date}.hdf5'
    model = create_nbeat_mlp(**params)
    ckp = ModelCheckpoint(ckp_path, monitor='val_action_mse', verbose=0,
                                  save_best_only=True, save_weights_only=True, mode='min')
    es = EarlyStopping(monitor='val_action_mse', min_delta=1e-4, patience=10, mode='min',
                        baseline=None, restore_best_weights=True, verbose=0)


    history = model.fit([x_tr, x_tr_const.values], y_tr,
                        validation_data = ([x_val, x_val_const.values], y_val),
                        epochs = 100, batch_size = batch_size, callbacks = [ckp, es])

    hist = pd.DataFrame(history.history)
    score = hist['val_action_mse'].min()
    print(f'Fold {fold} MSE:\t', score)
    scores.append(score)
    K.clear_session()

Shape of X_const is (68187, 50), x_tr is (68187, 60), y_tr is (68187, 6)


2023-11-25 22:26:13.865844: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


Epoch 1/100
Epoch 2/100
 1/17 [>.............................] - ETA: 1:17 - loss: nan - mse: nan

KeyboardInterrupt: 