---

## LSTM Model v1

---

Basic usage of an LSTM - single variable.

In [None]:
import math
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from keras.layers import Dense,RepeatVector, LSTM, Dropout
from keras.layers import Flatten, Conv1D, MaxPooling1D
from keras.layers import Bidirectional, Dropout
from keras.models import Sequential
from keras.utils import plot_model

In [None]:
class ModelLSTMv1:
  """
  Constructs a model instance. Input data should be a dataframe containing
  only the data to be modeled.
  """
  debug = False

  def __init__(self, input, window_size=30, scale_range=(-1,1), test_ratio=0.2, num_epochs=300, debug=False):
    self.debug = debug

    # GUARDs
    if input is None:
      raise ValueError ('ModelLSTMv1 requires input')

    if self.debug:
      print('### Building ModelLSTMv1::')

    self.input = input
    self.WINDOW_SIZE = window_size
    self.SCALE_RANGE = scale_range
    self.TEST_RATIO = test_ratio
    self.NUM_EPOCHS = num_epochs


  def prep(self):
    """
    Performs scaling of the data
    """
    # Create transformed input data by scaling
    self.scaler = MinMaxScaler(feature_range=self.SCALE_RANGE)
    input_tx = self.scaler.fit_transform(self.input)

    if self.debug:
      print(f'### After scaling data type is: {type(input_tx)}')

    # Now create X and y modeling vars
    X = []
    y = []

    # Move window through training data - each block of input X has a single supervised target y
    for i in range(len(input_tx) - self.WINDOW_SIZE):
        X.append(input_tx[i:i+self.WINDOW_SIZE])
        y.append(input_tx[i+self.WINDOW_SIZE])

    # convert to np arrays
    X = np.asanyarray(X)
    y = np.asanyarray(y)

    # Split into train/test
    NUM_TEST = math.floor(len(input_tx) * self.TEST_RATIO)
    NUM_TRAIN = len(input_tx) - NUM_TEST

    self.X_train = X[:NUM_TRAIN,:,:]
    self.X_test = X[NUM_TRAIN:,:,:]
    self.y_train = y[:NUM_TRAIN]
    self.y_test= y[NUM_TRAIN:]

    if self.debug:
      print(f'X_train, y_train: {self.X_train.shape}, {self.y_train.shape}')

  def train(self):
    """
    Declare model and train.
    Based on https://keras.io/examples/timeseries/timeseries_classification_from_scratch/.
    """
    from keras.callbacks import ModelCheckpoint, TensorBoard, Callback, EarlyStopping
    early_stop = EarlyStopping(monitor = "loss", mode = "min", patience = 7)
    model = Sequential()
    model.add(Conv1D(filters=256, kernel_size=2, activation='relu', input_shape=(30,1)))
    model.add(Conv1D(filters=128, kernel_size=2, activation='relu'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Flatten())
    model.add(RepeatVector(30))
    model.add(LSTM(units=100, return_sequences=True, activation='relu'))
    model.add(Dropout(0.2))
    model.add(LSTM(units=100, return_sequences=True, activation='relu'))
    model.add(Dropout(0.2))
    model.add(LSTM(units=100, return_sequences=True, activation='relu'))
    model.add(LSTM(units=100, return_sequences=True, activation='relu'))
    model.add(Bidirectional(LSTM(128, activation='relu')))
    model.add(Dense(100, activation='relu'))
    model.add(Dense(1))
    model.compile(loss='mse', optimizer='adam')

    self.model_hist = model.fit(self.X_train, self.y_train, epochs=self.NUM_EPOCHS, verbose=1, callbacks = [early_stop] )
    self.model = model

  def get_model_name(self):
    from datetime import datetime
    return f"{datetime.today().strftime('%Y%m%d-%H%M')}-LSTMv1.hdf5"

  def save_model(self, path):
    """
    Save the current model under the given drive path.
    Timestamp the model name.
    """
    fname = f'{path}{self.get_model_name()}'
    if self.debug:
      print(f'Saving model to: {fname}')
    return self.model.save(fname)

  def predict(self):
    pred = self.model.predict(self.X_test)

    y_test = self.scaler.inverse_transform(self.y_test)

    plt.figure(figsize=(20,9))
    plt.plot(y_test , 'blue', linewidth=5)
    plt.plot(pred,'r' , linewidth=4)
    plt.legend(('Test','Predicted'))
    plt.show()

    print(f'MSE: {mean_squared_error(y_test, pred)}')