In [1]:
from util import *
from keras.datasets import mnist
from keras.layers import Input, Dense, Reshape, Flatten, Dropout, Bidirectional, LSTM, Reshape, RepeatVector, TimeDistributed
from keras.layers import BatchNormalization, Activation
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
import tensorflow_gan as tfgan
import tensorflow as tf
tf.compat.v1.enable_eager_execution() 

import matplotlib.pyplot as plt

import sys

import numpy as np
import pandas as pd

import os

from PIL import Image




# Load Data
Loading from preprocessed numpy array

# Creating GAN

In [2]:
class LSTMGAN():
    def __init__(self, step_i, step_o, f, data):
        self.data = data
        self.step_input = step_i
        self.step_output = step_o
        self.feature_len = f
        self.in_shape = (self.step_input, self.feature_len)
        self.out_shape = (self.step_output, self.feature_len)
        self.dis_shape = (self.step_input+self.step_output, self.feature_len)

        optimizer = Adam(0.0001, 0.4)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',
            optimizer=optimizer,
            metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()
        # Trains the generator to imitate input data
        self.generator.compile(loss='mean_squared_error',
            optimizer=optimizer)
        
        # The generator takes noise as input and generates song
        noise = Input(shape=self.in_shape)
        gen_output = self.generator(noise)
        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated images as input and determines validity
        valid = self.discriminator(merge_time_series(noise, gen_output))

        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(noise, valid)
        
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)
        
        

    def build_generator(self):

        model = Sequential()
        model.add(Bidirectional(LSTM(128, return_sequences=True), input_shape=self.in_shape))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.2))
        model.add(Bidirectional(LSTM(128)))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(self.out_shape[0] * self.out_shape[1]))
        model.add(Reshape(self.out_shape))
        # model.summary()

        noise = Input(shape=self.in_shape)
        img = model(noise)

        return Model(noise, img)

    def build_discriminator(self):

        model = Sequential()
        
        model.add(Bidirectional(LSTM(128, activation = 'relu', return_sequences=True), input_shape=self.dis_shape))
        model.add(Dropout(0.4))
        model.add(TimeDistributed(Dense(128, activation = 'relu')))
        model.add(Dense(1, activation = 'linear'))
        #model.summary()

        img = Input(shape=self.dis_shape)
        validity = model(img)

        return Model(img, validity)
    
    
    def train(self, epochs, batch_size=128, save_interval=50, log_interval=50):
    

        # Load the dataset
        (X_train_input, X_train_output) = split_time_series(self.step_input, self.step_output, self.data)

        # normalize

        # Adversarial ground truths
        valid = np.ones(batch_size)
        fake = np.zeros(batch_size)

        for epoch in range(epochs):

            # ---------------------
            #  Train Discriminator
            # ---------------------
            # Select a random half of songs
            idx = np.random.randint(0, X_train_input.shape[0], batch_size)
            real_input= X_train_input[idx]
            real_output= X_train_output[idx]
            
           
            
            (real_input, scalers) = batch_standardize(real_input)
            real_output = batch_transform(real_output, scalers)
            
            # Sample noise and generate a batch of new prices
            noise = np.random.normal(0, 1, (batch_size,self.step_input,self.feature_len))
            gen_output = self.generator.predict(noise, batch_size = batch_size)
            real_series = merge_time_series(real_input,real_output)
            fake_series = merge_time_series(real_input,gen_output)

            # Train the discriminator (real classified as ones and generated as zeros)
            d_loss_real = self.discriminator.train_on_batch(real_series, valid)
            d_loss_fake = self.discriminator.train_on_batch(fake_series, fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------
            # First training (wants generator to imitate real data)
            g_loss1 = self.generator.train_on_batch( np.array(real_input), np.array(real_output))
            
            # Second training (wants discriminator to mistake songs as real)
            g_loss2 = self.combined.train_on_batch(noise, valid)
            g_loss = 0.5 * np.add(g_loss1, g_loss2)



            # Plot the progress
            if log_interval and epoch % log_interval == 0:
                print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))
                print("input:\n", scalers[0].inverse_transform( real_input[0]))
                print("fake_output:\n", scalers[0].inverse_transform( gen_output[0]))
                print("real_output:\n", scalers[0].inverse_transform( real_output[0]))

            # If at save interval => save model
            if epoch % save_interval == 0:
                self.generator.save("LSTM_generator.h5")

# Model Summary
I couldn't train the model on this online notebook so I trained it locally for 1000 epochs and uploaded the h5 file.

In [None]:
data = load_data("./data_stock/Dowjones_average.csv")
step_i = 5
step_o = 5
f = 4
lstmgan = LSTMGAN(step_i, step_o, f , data)
lstmgan.train(epochs=10000, batch_size=50, save_interval=1000, log_interval=1000)

0 [D loss: 2.120816, acc.: 51.90%] [G loss: 9.437490]
input:
 [[36128.83 36236.07 36031.78 36087.45]
 [36076.18 36316.61 36076.18 36142.22]
 [36159.7  36159.7  35909.48 35931.05]
 [35901.69 35952.63 35654.39 35870.95]
 [35879.09 35879.09 35555.37 35601.98]]
fake_output:
 [[36027.35  36104.668 35856.844 35925.523]
 [36026.63  36107.508 35846.285 35924.258]
 [36028.1   36106.883 35842.61  35921.996]
 [36033.414 36105.984 35845.418 35927.77 ]
 [36028.62  36106.11  35850.098 35922.277]]
real_output:
 [[35631.41 35929.66 35615.55 35619.25]
 [35619.92 35841.52 35542.87 35813.8 ]
 [35752.31 35825.47 35591.03 35804.38]
 [35366.69 35366.69 34749.8  34899.34]
 [35017.71 35287.91 34895.89 35135.94]]
1000 [D loss: 0.660743, acc.: 56.20%] [G loss: 5.282877]
input:
 [[35055.86 35150.37 34950.19 35144.31]
 [35078.9  35078.9  34878.07 35058.52]
 [35109.95 35116.37 34876.84 34930.93]
 [34985.99 35171.52 34985.99 35084.53]
 [35013.26 35106.3  34871.13 34935.47]]
fake_output:
 [[34942.543 35026.906 34697

Loading pretrained model

Installinging Mido Library

# Generate Stock Prices
Generating random input and letting model predict output

In [None]:
# import importlib
# import lib.reload(util)
# from util import *

(real_input, real_output) = split_time_series_disjoint(step_i, step_o, data)
MA5 = moving_average(5, data)
MA5 = MA5[:round(len(MA5)/step_o) * step_o]
(stzd_input, scalers) = batch_standardize(real_input)

stzd_pred_output = lstmgan.generator(tf.convert_to_tensor(stzd_input)).numpy()

#TODO: fix MA5
pred_output = batch_inverse_transform(stzd_pred_output, scalers)
stzd_real_output = batch_transform(real_output, scalers)
stzd_MA5 = batch_transform(MA5.reshape(-1, step_o, MA5.shape[-1]), scalers)

print(f'predicted price:\n{pred_output[0]},\nreal price:\n{real_output[0]},\nMA5:\n{MA5[:5]} \
       ,\nstandardized predicted price:\n{stzd_pred_output[0]} \
       ,\nstandardized real price:\n{stzd_real_output[0]} \
       ,\nstandardized MA5:\n{stzd_MA5[:5]}')

# Plot


In [None]:
#transform everything into columns                
stzd_pred_data = columnify(stzd_pred_output)
stzd_real_data = columnify(stzd_real_output)
stzd_MA5_data = columnify(stzd_MA5)
pred_data = columnify(pred_output)
real_data = columnify(real_output) 
MA5_data = columnify(MA5)

titles = ["Open Price", "High Price", "Low Price", "Close Price"]
columns = len(titles)

mae = MAE(stzd_real_data[3],stzd_pred_data[3])
rmse = RMSE(stzd_real_data[3],stzd_pred_data[3])
mape = MAPE(stzd_real_data[3],stzd_pred_data[3])
ar = AR(stzd_real_data[3],stzd_pred_data[3])

MA5_mae = MAE(stzd_real_data[3],stzd_MA5_data[3])
MA5_rmse = RMSE(stzd_real_data[3],stzd_MA5_data[3])
MA5_mape = MAPE(stzd_real_data[3],stzd_MA5_data[3])
MA5_ar = AR(stzd_real_data[3],stzd_MA5_data[3])

print('LSTM: MAE=',mae,'RMSE=',rmse,'MAPE=',mape,'AR=',ar)
print('MA5: MAE=',MA5_mae,'RMSE=',MA5_rmse,'MAPE=',MA5_mape,'AR=',MA5_ar)


In [None]:
for i in range(columns):
    length = len(pred_data[i])
    plt.plot(range(length), pred_data[i], color ='r', 
             label ='predicted')

    plt.plot(range(length), real_data[i], color ='b', 
             label ='real')

    # naming of x-axis and y-axis
    plt.xlabel('Date')
    plt.ylabel('Price')

    # naming the title of the plot
    plt.title(titles[i])

    plt.legend()
    plt.show()