<a href="https://colab.research.google.com/github/suryogumilar/Tensorflow_timeseries/blob/main/C4_W3_Lab_1_RNN_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Recurrent Neural Network (RNN) for time series forecasting

we delete the last lambda layer

In [1]:
import tensorflow as tf
import numpy as np
import pandas as pd
import datetime
from dateutil.relativedelta import relativedelta
# for timezone()
import pytz
import math
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import plotly.graph_objects as go
from IPython.display import clear_output
import random
import os
# Import library for t-test
import scipy.stats as stats

## functions

Define some utility functions that repeatedly use

### Plot functions

In [2]:
def plot_series(time, series, format="-", start=0, end=None):
    """
    Visualizes time series data

    Args:
      time (array of int) - contains the time steps
      series (array of int) - contains the measurements for each time step
      format - line style when plotting the graph
      label - tag for the line
      start - first time step to plot
      end - last time step to plot
    """

    # Setup dimensions of the graph figure
    plt.figure(figsize=(10, 6))
    
    if type(series) is tuple:

      for series_num in series:
        # Plot the time series data
        plt.plot(time[start:end], series_num[start:end], format)

    else:
      # Plot the time series data
      plt.plot(time[start:end], series[start:end], format)

    # Label the x-axis
    plt.xlabel("Time")

    # Label the y-axis
    plt.ylabel("Value")

    # Overlay a grid on the graph
    plt.grid(True)

    # Draw the graph on screen
    plt.show()

In [3]:
def plot_series_plotly(time, series, series_name=None, 
                       figure_title='', showlegend=False, 
                       start=0, end=None,
                       xaxis_title="Time",
                       yaxis_title="Value"):
    """
    Visualizes time series data but using plotly for interactive graph

    Args:
      time (array of int) - contains the time steps
      series (array of int) - contains the measurements for each time step
      series_name (array of string) - contains correlative name of each series
      format - line style when plotting the graph
      label - tag for the line
      start - first time step to plot
      end - last time step to plot
    """
    fig = go.Figure()
    # Setup dimensions of the graph figure
    
    if type(series) is tuple:
      ii = 0
      for series_num in series:
        # Plot the time series data
        fig.add_trace(go.Scatter(x=time[start:end],
                                 y=series_num[start:end], mode='lines',
                                 name=series_name[ii]))  
        ii = ii+1
    else:
      # Plot the time series data
      fig.add_trace(go.Scatter(x=time[start:end],
                                 y=series[start:end], mode='lines'))

    fig.update_layout(title=figure_title, xaxis_title=xaxis_title, 
                      yaxis_title=yaxis_title,
                      autosize=False,
                      width=600,
                      height=600,
                      margin=dict(
                        l=50,
                        r=50,
                        b=100,
                        t=100,
                        pad=4
                        ), paper_bgcolor="LightSteelBlue"
                        , showlegend=showlegend
                      )
    fig.show()

In [4]:
def plot_candlesticks(df, figure_title='', showlegend=False):
  fig = go.Figure(data= [go.Candlestick(x=df['Date'],
                             open=df['Open'],
                             high=df['High'],
                             low=df['Low'],
                             close=df['Close']
                             )])
  fig.update_layout(title=figure_title, xaxis_title="Time", yaxis_title="Value",
                    autosize=False,
                    width=600,
                    height=600,
                    margin=dict(
                        l=50,
                        r=50,
                        b=100,
                        t=100,
                        pad=4
                    ),
                    paper_bgcolor="LightSteelBlue", showlegend=showlegend
                  )
  fig.show()

In [5]:
def plot_loss_inlog(history, epoch_value, lrs_value=1e-8, 
                    x_boundary1=1e-8, x_boundary2=1e-3,
                    y_boundary1=0, y_boundary2=50):
  """
  plot loss value after training in logaritmic scale

  Parameters:
    lrs_value: learning rate value that passed to LearningRateScheduler function
  """
  # Define the learning rate array
  lrs = lrs_value * (10 ** (np.arange(epoch_value) / 20))

  # Set the figure size
  plt.figure(figsize=(10, 6))

  # Set the grid
  plt.grid(True)

  # Plot the loss in log scale
  plt.semilogx(lrs, history.history["loss"])

  # Increase the tickmarks size
  plt.tick_params('both', length=10, width=1, which='both')

  # Set the plot boundaries
  plt.axis([x_boundary1, x_boundary2, 
            y_boundary1, y_boundary2])

In [6]:
def plot_loss_inlog_plotly(history, epoch_value, lrs_value=1e-8,
                           figure_title='Loss value', showlegend=False):
  """
  plot loss value after training in logaritmic scale

  Parameters:
    lrs_value: learning rate value that passed to LearningRateScheduler function
  """
  # Define the learning rate array
  lrs = lrs_value * (10 ** (np.arange(epoch_value) / 20))
  fig = go.Figure()
  fig.add_trace(go.Scatter(x=lrs,
                           y=history.history['loss'], mode='lines'))
  fig.update_xaxes(title_text="learning rate", type="log")
  
  fig.update_layout(title=figure_title, xaxis_title="Time", yaxis_title="Value",
                      autosize=False,
                      width=600,
                      height=600,
                      margin=dict(
                        l=50,
                        r=50,
                        b=100,
                        t=100,
                        pad=4
                        ), paper_bgcolor="LightSteelBlue"
                        , showlegend=showlegend
                      )
  fig.show()

In [7]:
def plot_prediction_graph(model, df, training_ds_rows, 
                          window_size, normalizer_univar, denormalizer_univar):
  # Initialize a list
  forecast = []
  dataset_to_forecast = df['Close'].iloc[training_ds_rows-window_size:]
  dateset_to_forecast_normalized = normalizer_univar(dataset_to_forecast)
  for time in range(dateset_to_forecast_normalized.shape[0] - window_size):
    the_prediction = model.predict(
        np.expand_dims(dateset_to_forecast_normalized[time:time + window_size], 
                      axis=0), 
        verbose=0)
    the_prediction_denorm = denormalizer_univar(the_prediction)
    forecast.append(the_prediction_denorm)
    
  # Convert to a numpy array and drop single dimensional axes
  results = np.array(forecast).squeeze()

  # Overlay the results with the validation set
  test_set = tf.convert_to_tensor(df[training_ds_rows:]['Close'])
  plot_series(df_test['Date'], (test_set, results) )
  return (test_set, results)

In [8]:
def plot_prediction_graph_plotly(model, df, training_ds_rows, 
                          window_size, normalizer_univar, denormalizer_univar):
  # Initialize a list
  forecast = []
  dataset_to_forecast = df['Close'].iloc[training_ds_rows-window_size:]
  dateset_to_forecast_normalized = normalizer_univar(dataset_to_forecast)
  for time in range(dateset_to_forecast_normalized.shape[0] - window_size):
    the_prediction = model.predict(
        np.expand_dims(dateset_to_forecast_normalized[time:time + window_size], 
                      axis=0), 
        verbose=0)
    the_prediction_denorm = denormalizer_univar(the_prediction)
    forecast.append(the_prediction_denorm)
    
  # Convert to a numpy array and drop single dimensional axes
  results = np.array(forecast).squeeze()

  # Overlay the results with the validation set
  test_set = tf.convert_to_tensor(df[training_ds_rows:]['Close'])
  plot_series_plotly(df_test['Date'], (test_set, results), series_name=[
      'test dataset', 'predicted value'] )
  return (test_set, results)

### function data retrieval

In [9]:
def getStockData(history_span:int, the_ticker:str):
  """
  Getting stock data from Yahoo Finance API

  Args:
    history_span (int) how much backdate data to be collected
    the_ticker (string) ticker name on yahoo finance API
  Returns:
    Pandas dataframe (pd.DataFrame) containing stock data    
  """
  THE_URL = ('https://query1.finance.yahoo.com/v7/finance/'+
           'download/{ticker}?period1={period1}&period2={period2}&interval=1d&events=history&includeAdjustedClose=true')
  tdy = datetime.datetime.now(tz=pytz.timezone('Asia/Jakarta'))

  p2 = math.ceil(tdy.timestamp())
  p1 = math.floor((tdy - relativedelta(years=history_span)).timestamp())
  yf_url = THE_URL.format(ticker=the_ticker,period1=p1, period2=p2)
  df = pd.read_csv(yf_url)
  df['Date'] = pd.to_datetime(df['Date'], format='%Y-%m-%d')
  return df

### Make windowed data for time series forecasting

In [10]:
def windowed_dataset(series, window_size, batch_size, shuffle_buffer):
    """Generates dataset windows

    Args:
      series (array of float) - contains the values of the time series
      window_size (int) - the number of time steps to include in the feature
      batch_size (int) - the batch size
      shuffle_buffer(int) - buffer size to use for the shuffle method

    Returns:
      dataset (TF Dataset) - TF Dataset containing time windows
    """
  
    # Generate a TF Dataset from the series values
    dataset = tf.data.Dataset.from_tensor_slices(series)
    
    # Window the data but only take those with the specified size
    dataset = dataset.window(window_size + 1, shift=1, drop_remainder=True)
    
    # Flatten the windows by putting its elements in a single batch
    dataset = dataset.flat_map(lambda window: window.batch(window_size + 1))

    # Create tuples with features and labels 
    dataset = dataset.map(lambda window: (window[:-1], window[-1]))

    # Shuffle the windows
    dataset = dataset.shuffle(shuffle_buffer)
    
    # Create batches of windows
    dataset = dataset.batch(batch_size).prefetch(1)
    
    return dataset

### Tensorflow function

In [11]:
def set_seed(seed: int = 42) -> None:
  random.seed(seed)
  np.random.seed(seed)
  tf.random.set_seed(seed)
  tf.experimental.numpy.random.seed(seed)
  #tf.keras.utils.set_random_seed(seed)
  #tf.config.experimental.enable_op_determinism()
  try:
    tf.set_random_seed(seed)
  except AttributeError as ae:
    print('INFO: tf.set_random_seed is deprecated in tf version ', tf.__version__, ' ',ae )
  
  # When running on the CuDNN backend, two further options must be set
  os.environ['TF_CUDNN_DETERMINISTIC'] = '1'
  os.environ['TF_DETERMINISTIC_OPS'] = '1'
  # Set a fixed value for the hash seed
  os.environ["PYTHONHASHSEED"] = str(seed)
  print(f"Random seed set as {seed}")

## get data

In [12]:
# stock_name = str(input("Stock tick:"))
# hist_data = int(input("historical data (year):"))

stock_name = 'TLKM.JK'
hist_data = 3

In [13]:
df = getStockData(hist_data, stock_name)

## Split the Dataset

In [14]:
split_ratio = 0.8
rows_of_dataframe = df.shape[0]

training_ds_rows = round(rows_of_dataframe * split_ratio)
test_ds_rows = round(rows_of_dataframe * (1- split_ratio))

df_training = df[:training_ds_rows]
df_test = df[training_ds_rows:]

## Normalize

In [15]:
training_set = tf.convert_to_tensor(df_training['Close'])

## define the normalizer
normalizer_univar = tf.keras.layers.Normalization(axis=None)
denormalizer_univar = tf.keras.layers.Normalization(axis=None, invert=True)

# train the normalizer to training set
normalizer_univar.adapt(training_set)

# train denormalizer layer
denormalizer_univar.adapt(training_set)

# we get normalized training set
training_set_normalized = normalizer_univar(training_set)

## Prepare features and labels


In [16]:
window_size = 20
batch_size = 32
shuffle_buffer_size = 1000
     
## CONSTANT
MU = 0.000001
NANO = 1e-9

In [17]:
# Generate the dataset windows
windowed_training_ds = windowed_dataset(training_set_normalized, window_size, 
                           batch_size, shuffle_buffer_size)

Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089


## Build and compile the model

model definition

In [18]:
class TS_RNN_Model(tf.keras.Model):
  def __init__(self, window_size,
               normalizer_layer=None,
               denormalizer_layer=None,
               **kwargs):
    super(TS_RNN_Model, self).__init__(**kwargs)

    self.normalizer_1 = normalizer_layer
    self.denormalizer_1 = denormalizer_layer
    model_tune = tf.keras.models.Sequential([
      tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=-1),
                         input_shape=[window_size]),
      tf.keras.layers.SimpleRNN(40, return_sequences=True),
      tf.keras.layers.SimpleRNN(40),
      tf.keras.layers.Dense(1)
      #,
      #tf.keras.layers.Lambda(lambda x: x * 100.0)
    ])
    self.seq_1 = model_tune

  def normalize(self, x):
    return self.normalizer_1(x)
  
  def denormalize(self, x):
    return self.denormalizer_1(x)

  @tf.function   
  def call(self, x):
    x = self.seq_1(x)
    return x

## create base model
this base model will be cloned in order to create reproducible weight and bias inside the network

In [19]:
set_seed(1)
model_base00 = TS_RNN_Model(window_size=window_size, 
                              normalizer_layer=normalizer_univar,
                              denormalizer_layer=denormalizer_univar)
     

INFO: tf.set_random_seed is deprecated in tf version  2.11.0   module 'tensorflow' has no attribute 'set_random_seed'
Random seed set as 1


In [20]:
# Save the weights
model_base00.save_weights('./checkpoints/my_checkpoint')

## train model

In [21]:
learning_rate_value_all = 0.0001
epoch_value = 100

#### no build

In [22]:
model_tune1 = TS_RNN_Model(window_size=window_size, 
                              normalizer_layer=normalizer_univar,
                              denormalizer_layer=denormalizer_univar)
model_tune1.load_weights('./checkpoints/my_checkpoint')

# Set the training parameters
model_tune1.compile(loss=tf.keras.losses.Huber(), 
                    optimizer=tf.keras.optimizers.SGD(
                        learning_rate=learning_rate_value_all,
                        momentum=0.9), metrics=['mae'])

# Train the model
history_nobuild = model_tune1.fit(windowed_training_ds, epochs=epoch_value, 
                                  verbose=0)

#### build shape [1, window_size]


In [23]:
model_tune2 = TS_RNN_Model(window_size=window_size, 
                              normalizer_layer=normalizer_univar,
                              denormalizer_layer=denormalizer_univar)
model_tune2.load_weights('./checkpoints/my_checkpoint')

# set built parameter
model_tune2.build(input_shape = (1, window_size))
#model_tune2.summary()
# Set the training parameters
model_tune2.compile(loss=tf.keras.losses.Huber(), 
                    optimizer=tf.keras.optimizers.SGD(
                        learning_rate=learning_rate_value_all,
                        momentum=0.9))


# Train the model
history_build1 = model_tune2.fit(windowed_training_ds, epochs=epoch_value, 
                                verbose=0)

#### build shape = [batch_size, window_size]

In [24]:
model_tune3 = TS_RNN_Model(window_size=window_size, 
                              normalizer_layer=normalizer_univar,
                              denormalizer_layer=denormalizer_univar)
model_tune3.load_weights('./checkpoints/my_checkpoint')

# set built parameter
model_tune3.build(input_shape = (batch_size, window_size))
#model_tune3.summary()
# Set the training parameters
model_tune3.compile(loss=tf.keras.losses.Huber(), 
                    optimizer=tf.keras.optimizers.SGD(
                        learning_rate=learning_rate_value_all,
                        momentum=0.9))


# Train the model
history_build_bs = model_tune3.fit(windowed_training_ds, epochs=epoch_value, 
                                verbose=0)

## plot each model prediction

no build

In [25]:
test_set_g1, results1 = plot_prediction_graph_plotly(
    model_tune1, df, training_ds_rows, 
    window_size, normalizer_univar, denormalizer_univar)

[1, ws]

In [26]:
test_set_g2, results2 = plot_prediction_graph_plotly(
    model_tune2, df, training_ds_rows, 
    window_size, normalizer_univar, denormalizer_univar)
     

[bs, ws]

In [27]:
test_set_g3, results3 = plot_prediction_graph_plotly(
    model_tune3, df, training_ds_rows, 
    window_size, normalizer_univar, denormalizer_univar)

## Compare

#### no build

In [28]:
print(history_nobuild.history['loss'][-1])
print(tf.keras.metrics.mean_squared_error(test_set_g1, results1).numpy())
print(tf.keras.metrics.mean_absolute_error(test_set_g1, results1).numpy())


0.010280991904437542
5680.365
56.60501


[1, ws]

In [29]:
print(history_build1.history['loss'][-1])
print(tf.keras.metrics.mean_squared_error(test_set_g2, results2).numpy())
print(tf.keras.metrics.mean_absolute_error(test_set_g2, results2).numpy())

0.010281714610755444
5670.4473
56.496418


[bs, ws]

In [30]:
print(history_build_bs.history['loss'][-1])
print(tf.keras.metrics.mean_squared_error(test_set_g3, results3).numpy())
print(tf.keras.metrics.mean_absolute_error(test_set_g3, results3).numpy())

0.01028437539935112
5671.159
56.514122
