In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, SimpleRNN, GRU, LSTM, Dense, Flatten, GlobalMaxPooling1D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam, SGD
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler

In [None]:
df = pd.read_csv('data/sbux.csv')
df.head()

### Wrong Approach (Model 1)

In [None]:
series = df['close'].values.reshape(-1,1)

In [None]:
scalar = StandardScaler()
scalar.fit(series[:len(series)//2])
series = scalar.transform(series).flatten()

In [None]:
series.shape

In [None]:
# build the data set
T = 10
D = 1
X = []
Y = []
for t in range(len(series) - T):
    x = series[t:t+T]
    y = series[t+T]
    X.append(x)
    Y.append(y)
    
X = np.array(X).reshape(-1, T)
Y = np.array(Y)

N = len(X)
print("X.shape", X.shape, "Y.shape", Y.shape)

In [None]:
# build RNN model
i = Input(shape=(T,1))
x = LSTM(5)(i)
x = Dense(1)(x)
model = Model(i, x)
model.compile(loss='mse', optimizer=Adam(lr=0.1))

In [None]:
r = model.fit(X[:-N//2], Y[:-N//2], validation_data=(X[-N//2:], Y[-N//2:]), epochs=80)

In [None]:
# Plot loss per iteration
plt.plot(r.history['loss'], label='loss')
plt.plot(r.history['val_loss'], label='val_loss')
plt.legend()
plt.show()

In [None]:
# "wrong" forecast using true targets
validation_target = Y[-N//2:]
validation_predictions = []

# index of first validation
i = -N//2

while(len(validation_predictions) < len(validation_target)):
    P = model.predict(X[i].reshape(1,-1,1))[0,0]
    i = i+1
    validation_predictions.append(P)

In [None]:
# Plot the predictions
plt.plot(validation_target, label='validation_target')
plt.plot(validation_predictions, label='validation_predictions')
plt.legend()
plt.show()

In [None]:
# "Correct" forecast using self predicted
validation_target = Y[-N//2:]
validation_predictions = []

# index of first validation
last_x = X[-N//2]

while(len(validation_predictions) < len(validation_target)):
    P = model.predict(last_x.reshape(1,-1,1))[0,0]
    validation_predictions.append(P)
    last_x = np.roll(last_x, -1)
    last_x[-1] = P
    
# Plot the predictions
plt.plot(validation_target, label='validation_target')
plt.plot(validation_predictions, label='validation_predictions')
plt.legend()
plt.show()

### Calculating returns (Model 2)

In [None]:
# Shift the data
df['PrevClose'] = df['close'].shift(1)  #move everything up
df.head()

In [None]:
df.tail()

In [None]:
# Calculate the return
df['Return'] = (df['close'] - df['PrevClose']) / df['PrevClose']
df.head()

In [None]:
df['Return'].hist()

In [None]:
series = df['Return'].values[1:].reshape(-1,1)
scalar = StandardScaler()
scalar.fit(series[:len(series)//2])
series = scalar.transform(series).flatten()

In [None]:
# build the data set
T = 10
D = 1
X = []
Y = []
for t in range(len(series) - T):
    x = series[t:t+T]
    y = series[t+T]
    X.append(x)
    Y.append(y)
    
X = np.array(X).reshape(-1, T)
Y = np.array(Y)

N = len(X)
print("X.shape", X.shape, "Y.shape", Y.shape)

In [None]:
# build RNN model
i = Input(shape=(T,1))
x = LSTM(5)(i)
x = Dense(1)(x)
model = Model(i, x)
model.compile(loss='mse', optimizer=Adam(lr=0.1))

In [None]:
r = model.fit(X[:-N//2], Y[:-N//2], validation_data=(X[-N//2:], Y[-N//2:]), epochs=80)

In [None]:
# Plot loss per iteration
plt.plot(r.history['loss'], label='loss')
plt.plot(r.history['val_loss'], label='val_loss')
plt.legend()
plt.show()

In [None]:
# "wrong" forecast using true targets
validation_target = Y[-N//2:]
validation_predictions = []

# index of first validation
i = -N//2

while(len(validation_predictions) < len(validation_target)):
    P = model.predict(X[i].reshape(1,-1,1))[0,0]
    i = i+1
    validation_predictions.append(P)
    
# Plot the predictions
plt.plot(validation_target, label='validation_target')
plt.plot(validation_predictions, label='validation_predictions')
plt.legend()
plt.show()

In [None]:
# "Correct" forecast using self predicted
validation_target = Y[-N//2:]
validation_predictions = []

# index of first validation
last_x = X[-N//2]

while(len(validation_predictions) < len(validation_target)):
    P = model.predict(last_x.reshape(1,-1,1))[0,0]
    validation_predictions.append(P)
    last_x = np.roll(last_x, -1)
    last_x[-1] = P
    
# Plot the predictions
plt.plot(validation_target, label='validation_target')
plt.plot(validation_predictions, label='validation_predictions')
plt.legend()
plt.show()

### Model 3 (Use all data)

Make it a classification problem, instead of predicting price, just predict wether the price will go up or down.

In [None]:
input_data = df[['open', 'high', 'low', 'close', 'volume']]
target = df['Return']

In [None]:
T = 10
D = input_data.shape[1]
N = len(input_data) - T

In [None]:
# Normalize the inputs
Ntrain = len(input_data) * 2 // 3
scalar = StandardScaler()
scalar.fit(input_data[:Ntrain + T])
series = scalar.transform(input_data).flatten()

In [None]:
# Setup X_train and y_train
X_train = np.zeros((Ntrain, T, D))
y_train = np.zeros(Ntrain)

for t in range(Ntrain):
    X_train[t, :, :] = input_data[t:t+T]
    y_train[t] = (target[t+T] > 0)

In [None]:
# Setup X_test and y_test
X_test = np.zeros((N-Ntrain, T, D))
y_test = np.zeros(N-Ntrain)

for u in range(N-Ntrain):
    t = u + Ntrain
    X_test[u, :, :] = input_data[t:t+T]
    y_test[u] = (target[t+T] > 0)

In [None]:
# build RNN model
i = Input(shape=(T,D))
x = LSTM(50, return_sequences=True)(i)
x = GlobalMaxPooling1D()(x)
X = Dense(100, activation='relu')
X = Dense(10, activation='relu')
x = Dense(1, activation='sigmoid')(x)
model = Model(i, x)
model.compile(loss='binary_crossentropy', optimizer=Adam(lr=0.001), metrics='accuracy')

In [None]:
r = model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=50, batch_size=1)

In [None]:
# Plot loss per iteration
plt.plot(r.history['loss'], label='loss')
plt.plot(r.history['val_loss'], label='val_loss')
plt.legend()
plt.show()

In [None]:
# Plot loss per iteration
plt.plot(r.history['accuracy'], label='accuracy')
plt.plot(r.history['val_accuracy'], label='val_accuracy')
plt.legend()
plt.show()