## Create Unit tests for this project

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from ts_boilerplate.params import ROOT_DIR, DATA, TRAIN, CROSS_VAL
from ts_boilerplate.data import get_X_y
from ts_boilerplate.generate_dummy_data import generate_data_monotonic_increase, generate_data_zeros_and_ones, generate_X_y_zeros_and_ones

%load_ext autoreload
%autoreload 2

## Let's create a dummy time series dataset whose value increment by 1 every day

In [2]:
data = generate_data_monotonic_increase()
data

In [3]:
data_df = pd.DataFrame(data)
data_df

In [4]:
data_df.to_csv(os.path.join(ROOT_DIR, "data", "raw", "data_dummy.csv"), index=False)

In [5]:
pd.read_csv(os.path.join(ROOT_DIR, "data", "raw", "data_dummy.csv"))

## getX_y

In [6]:
X, y = get_X_y(data, **TRAIN)
print(X.shape)
print(y.shape)

In [7]:
# Let's compute the shape arithmetically (for unittests)
(len(data) \
            - (TRAIN['input_length']  -1) \
            - (TRAIN['output_length'] -1) \
            - TRAIN['horizon']) \
        / TRAIN["stride"]

☝️ ceiling rounding function should be used for stride > 1

## Create train_test_split

In [8]:
train_test_ratio = TRAIN["train_test_ratio"]
input_length = TRAIN["input_length"]
output_length = TRAIN["output_length"]
data.shape

In [9]:
last_train_idx = round(train_test_ratio * len(data))
data_train = data[0:last_train_idx, :]

first_test_idx = last_train_idx - input_length
data_test = data[first_test_idx:, :]

In [10]:
data_train

In [11]:
data_test

In [12]:
X_train, y_train = get_X_y(data_train, **TRAIN)
X_test, y_test = get_X_y(data_test, **TRAIN)
print("####### Last train pair")
print(X_train[-1])
print(y_train[-1])
print("####### First test pair")
print(X_test[0])
print(y_test[0])

In [13]:
gap = np.min(y_test) - np.max(y_train)
gap

In [14]:
assert gap >= TRAIN["horizon"], "❗️❗️ Data leak detected between (X_train, y_train) and (X_test, y_test)❗️❗️ "

## Create folds

In [15]:
def get_folds(data: np.ndarray, fold_length: int, fold_stride: int):
    folds = []
    for i in range(0, len(data), fold_stride):
        if (i+fold_length) > len(data):
            break
        fold = data[i:i+fold_length, :]
        folds.append(fold)
    return folds

folds = get_folds(data, **CROSS_VAL)
print('n_folds= ', len(folds))
print(folds[-1])

## Model

In [16]:
import tensorflow as tf
from keras.layers import Dense, SimpleRNN, Reshape

In [17]:
model = tf.keras.Sequential()
model.add(SimpleRNN(1, activation='tanh', input_shape=X_train.shape[1:]))
model.add(Dense(TRAIN['output_length'] * DATA["n_targets"], activation='linear'))
model.add(Reshape(y_train.shape[1:]))
model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(learning_rate=0.1), metrics=tf.keras.metrics.MAPE)
print(model.summary())

In [18]:
verbose = 0
es = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                                          patience=2,
                                          verbose=verbose,
                                          mode='min',
                                          restore_best_weights=True)
history = model.fit(X_train,
                        y_train,
                        epochs=50,
                        batch_size=32,
                        validation_split=0.3,
                        callbacks=[es],
                        verbose=verbose)

In [19]:
y_test

In [20]:
y_pred = model.predict(X_test)
y_pred 

In [23]:
from ts_boilerplate.metrics import mape
mape(y_test, y_pred)