In [None]:
# 2020-10-28 created by Akson

In [None]:
# Code15.1
# import

import numpy as np
import tensorflow as tf
import tensorflow.keras as keras
import matplotlib.pyplot as plt

In [None]:
# Code15.2
# generate time series dataset

def generate_time_series(batch_size, n_steps):
    freq1, freq2, offsets1, offsets2 = np.random.rand(4, batch_size, 1)
    time = np.linspace(0, 1, n_steps)
    series = 0.5 * np.sin((time - offsets1) * (freq1 * 10 + 10))
    series += 0.2 * np.sin((time - offsets2) * (freq2 * 20 + 20))
    series += 0.1 * (np.random.rand(batch_size, n_steps) - 0.5)

    return series[..., np.newaxis].astype(np.float32)

n_steps = 50
series = generate_time_series(10000, n_steps + 1)
X_train = series[:7000, :n_steps]
y_train = series[:7000, -1]
X_valid = series[7000: 9000, :n_steps]
y_valid = series[7000: 9000, -1]
X_test = series[9000:, :n_steps]
y_test = series[9000:, -1]

In [None]:
# Code15.3
# some model?

# ?
y_pred = X_test[:, -1]
print(np.mean(keras.losses.mean_squared_error(y_test, y_pred)))

# use dnn
model = keras.models.Sequential([
    keras.layers.Flatten(input_shape = [50, 1]),
    keras.layers.Dense(1)
])

model.compile(loss = 'mse', optimizer = 'adam')
model.fit(X_train, y_train, epochs = 20, validation_data = (X_valid, y_valid))
model.evaluate(X_test, y_test)

In [None]:
# Code15.4
# simple RNN

model = keras.models.Sequential([
    keras.layers.SimpleRNN(1, input_shape = [None, 1])
])

model.compile(loss = 'mse', optimizer = 'adam')
model.fit(X_train, y_train, epochs = 20, validation_data = (X_valid, y_valid))
model.evaluate(X_test, y_test)

In [None]:
# Code15.5
# normal RNN

model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences = True, input_shape = [None, 1]),
    keras.layers.SimpleRNN(20, return_sequences = True),
    keras.layers.SimpleRNN(1)
])

model.compile(loss = 'mse', optimizer = 'adam')
model.fit(X_train, y_train, epochs = 10, validation_data = (X_valid, y_valid))
model.evaluate(X_test, y_test)

In [None]:
# Code15.6
# change last layer

model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences = True, input_shape = [None, 1]),
    keras.layers.SimpleRNN(20),
    keras.layers.Dense(1)
])

model.compile(loss = 'mse', optimizer = 'adam')
model.fit(X_train, y_train, epochs = 10, validation_data = (X_valid, y_valid))
model.evaluate(X_test, y_test)

In [None]:
# Code15.7
# more far-1

series = generate_time_series(1, n_steps + 10)
X_new = series[:, :n_steps]
y_new = series[:, n_steps:]
X = X_new

for step_ahead in range(10):
    y_pred_one = model.predict(X[:, step_ahead:])[:, np.newaxis, :]
    X = np.concatenate([X, y_pred_one], axis = 1)

y_pred = X[:, n_steps:]

def plot_series(series, y = None, y_pred = None, x_label = '$t$', y_label = '$x(t)$'):
    plt.plot(series, '.-')
    if y is not None:
        plt.plot(n_steps, y, 'bx', markersize = 10)
    if y_pred is not None:
        plt.plot(n_steps, y_pred, 'ro')
    plt.grid(True)

    if x_label:
        plt.xlabel(x_label, fontsize = 16)
    if y_label:
        plt.ylabel(y_label, fontsize = 16, rotation = 0)
    plt.hlines(0, 0, 100, linewidth = 1)
    plt.axis([0, n_steps + 1, -1, 1])
    

def plot_multiple_forecasts(X, y, y_pred):
    n_steps = X.shape[1]
    ahead = y.shape[1]
    plot_series(X[0, :, 0])
    plt.plot(np.arange(n_steps, n_steps + ahead), y[0, :, 0], 'ro-', label = 'Actual')
    plt.plot(np.arange(n_steps, n_steps + ahead), y_pred[0, :, 0], 'bx-', label = 'Forecast', markersize = 10)
    plt.axis([0, n_steps + ahead, -1, 1])
    plt.legend(fontsize = 14)

plot_multiple_forecasts(X_new, y_new, y_pred)
plt.show()

In [None]:
# Code15.8
#

series = generate_time_series(10000, n_steps + 10)
X_train = series[:7000, :n_steps]
y_train = series[:7000, n_steps:]
X_valid = series[7000::, :n_steps]
y_valid = series[7000:, n_steps:]

model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences = True, input_shape = [None, 1]),
    keras.layers.SimpleRNN(20),
    keras.layers.Dense(10)
])

model.compile(loss = 'mse', optimizer = 'adam')
model.fit(X_train, y_train, batch_size = 10, validation_data = (X_valid, y_valid))

series = generate_time_series(1, 50 + 10)
X_new = series[:, :50, :]
y_new = series[:, -10:, :]
y_pred = model.predict(X_new)[..., np.newaxis]

plot_multiple_forecasts(X_new, y_new, y_pred)
plt.show()

In [None]:
# Code15.9

n_steps = 50
series = generate_time_series(10000, n_steps + 10)
X_train = series[:7000, :n_steps]
X_valid = series[7000:9000, :n_steps]
X_test = series[9000:, :n_steps]

y = np.empty((10000, n_steps, 10))
for step_ahead in range(1, 10 + 1):
    y[..., step_ahead - 1] = series[..., step_ahead:step_ahead + n_steps, 0]
y_train = y[:7000]
y_valid = y[7000:9000]
y_test = y[9000:]

model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences = True, input_shape = [None, 1]),
    keras.layers.SimpleRNN(20, return_sequences = True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

def last_time_step_mse(y_true, y_pred):
    return keras.metrics.mean_squared_error(y_true[:, -1], y_pred[:, -1])

model.compile(loss = 'mse', optimizer = keras.optimizers.Adam(lr = 0.01), metrics = [last_time_step_mse])
history = model.fit(X_train, y_train, epochs =20, validation_data = (X_valid, y_valid))

series = generate_time_series(1, 50 + 10)
X_new = series[:, :50, :]
y_new = series[:, 50:, :]

y_pred = model.predict(X_new)[:, -1][..., np.newaxis]

plot_multiple_forecasts(X_new, y_new, y_pred)
plt.show()

In [None]:
# Code15.10

class LNSimpleRNNCell(keras.layers.Layer):
    def __init__(self, units, activation = 'tanh', **kwargs):
        super().__init__(**kwargs)
        self.state_size = units
        self.output_size = units
        self.simple_rnn_cell = keras.layers.SimpleRNNCell(units, activation = None)
        self.layer_norm = keras.layers.LayerNormalization()
        self.activation = keras.activations.get(activation)

    def call(self, inputs, states):
        outputs, new_states = self.simple_rnn_cell(inputs, states)
        norm_outputs = self.activation(self.layer_norm(outputs))
        return norm_outputs, [norm_outputs]

model = keras.models.Sequential([
    keras.layers.RNN(LNSimpleRNNCell(20), return_sequences = True, input_shape = [None, 1]),
    keras.layers.RNN(LNSimpleRNNCell(20), return_sequences = True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss = 'mse', optimizer = 'adam', metrics = [last_time_step_mse])
history = model.fit(X_train, y_train, epochs = 10, validation_data = (X_valid, y_valid))

In [None]:
# Code15.11
# LSTM

model = keras.models.Sequential([
    keras.layers.LSTM(20, return_sequences = True, input_shape = [None, 1]),
    keras.layers.LSTM(20, return_sequences = True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss = 'mse', optimizer = 'adam', metrics = [last_time_step_mse])
history = model.fit(X_train, y_train, epochs = 10, validation_data = (X_valid, y_valid))

In [None]:
# Code15.12
# GRU

model = keras.models.Sequential([
    keras.layers.GRU(20, return_sequences = True, input_shape = [None, 1]),
    keras.layers.GRU(20, return_sequences = True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss = 'mse', optimizer = 'adam', metrics = [last_time_step_mse])
history = model.fit(X_train, y_train, epochs = 10, validation_data = (X_valid, y_valid))

In [None]:
# Code15.13
# 1-D Conv

model = keras.models.Sequential([
    keras.layers.Conv1D(filters = 20, kernel_size = 4, strides = 2, padding = 'valid', input_shape = [None, 1]),
    keras.layers.GRU(20, return_sequences = True),
    keras.layers.GRU(20, return_sequences = True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss = 'mse', optimizer = 'adam', metrics = [last_time_step_mse])
history = model.fit(X_train, y_train[:, 3::2], epochs = 10, validation_data = (X_valid, y_valid[:, 3::2]))

In [None]:
# Code15.14
# Simple WaveNet

model = keras.models.Sequential()
model.add(keras.layers.InputLayer(input_shape = [None, 1]))
for rate in (1, 2, 4, 8) * 2:
    model.add(keras.layers.Conv1D(filters = 20, kernel_size = 2, padding = 'causal', activation = 'relu', dilation_rate = rate))
model.add(keras.layers.Conv1D(filters = 10, kernel_size = 1))

model.compile(loss = 'mse', optimizer = 'adam', metrics = [last_time_step_mse])
history = model.fit(X_train, y_train, epochs = 20, validation_data = (X_valid, y_valid), verbose = 1)