# Import

In [108]:
import yfinance as yf
import numpy as np
from sklearn.preprocessing import MinMaxScaler

# Tensorflow
from tensorflow.keras.layers import Input, Dense, LSTM, Dropout
from tensorflow.keras.models import Model

# Display
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.express as px

# Parameters

In [109]:
STOCK_SYMBOL = 'GE'
period = '1y'
time_step = 3
split_ratio = 0.8

# Data

## Download raw stock

In [110]:
def download_stock_data(stock_symbol, indicators=['max'], period='max'):
    """
    Get stocks
    :param stock_symbol: Name of wanted stock, string
    :param indicators: Indicators to extract (available indicators: 'Open', 'High', 'Low', 'Close', 'Volume', 'max'), list of strings
    :param period: Data period to download ('1d', '5d', '1mo', '3mo', '6mo', '1y', '2y', '5y', '10y', 'ytd', 'max')
    :return:
        - stocks (nb samples, features)
    """
    ticker = yf.Ticker(stock_symbol)
    history = ticker.history(period=period)

    if 'max' in indicators:
        indicators = ['Open', 'High', 'Low', 'Close', 'Volume']
    
    stock_data = history[indicators].to_numpy()
    
    return stock_data

In [111]:
stock = download_stock_data(STOCK_SYMBOL, indicators=['max'], period=period)

print('Number of samples:', stock.shape[0])

Number of samples: 253


## Split data

In [112]:
def split_train_test(x, split_ratio):
    """
    Split data into 2 datasets: train and test
    :param x: data, ndarray
    :param split_ratio: ratio to split dataset, float [0-1]
    :return:
        - x_train
        - x_test
    """
    
    nb_samples_train = int(x.shape[0] * split_ratio)
    
    x_train = x[:nb_samples_train]
    x_test = x[nb_samples_train:]
    
    return x_train, x_test

In [113]:
x_train, x_test = split_train_test(stock, split_ratio)

print('TRAIN: Shape of input:', x_train.shape)
print('TEST: Shape of input:', x_test.shape)

TRAIN: Shape of input: (202, 5)
TEST: Shape of input: (51, 5)


## Normalization

In [114]:
scalerInput = MinMaxScaler()

x_train = scalerInput.fit_transform(x_train)
x_test = scalerInput.transform(x_test)

## Build timeseries

In [115]:
def build_dataset(data, time_step):
    """
    Build timeseries dataset
    :param data: stock data, ndarray (nb_sample, features)
    :param time_step: time step used to build dataset
    :return:
        - timeseries dataset input
    """
    
    nb_samples = data.shape[0]
    nb_features = data.shape[1]
    
    x = np.zeros((nb_samples - time_step, time_step, nb_features))
    
    for i_index in range(nb_samples - time_step):
        x[i_index] = data[i_index:i_index + time_step, :]
        
    return x 

In [116]:
# Unit test
vec = np.asarray([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]).reshape((2,-1)).transpose()

x = build_dataset(vec, 2)

assert x.shape == (3, 2, 2)

np.testing.assert_array_equal(x[0][0], [1., 6.])
np.testing.assert_array_equal(x[2][1], [4., 9.])

del x, vec

In [117]:
x_train_timeseries = build_dataset(x_train, time_step)
x_test_timeseries = build_dataset(x_test, time_step)

nb_samples_timeseries_train = x_train_timeseries.shape[0]
nb_samples_timeseries_test = x_test_timeseries.shape[0]

print('TRAIN: Shape of input:', x_train_timeseries.shape)
print('TEST: Shape of input:', x_test_timeseries.shape)

TRAIN: Shape of input: (199, 3, 5)
TEST: Shape of input: (48, 3, 5)


In [118]:
y_train = x_train[-nb_samples_timeseries_train:, 3].reshape((nb_samples_timeseries_train, 1))
y_test = x_test[-nb_samples_timeseries_test:, 3].reshape((nb_samples_timeseries_test, 1))

print('TRAIN: Shape of output:', y_train.shape)
print('TEST: Shape of output:', y_test.shape)

TRAIN: Shape of output: (199, 1)
TEST: Shape of output: (48, 1)


# Display

In [119]:
def display_raw_stock(data):
    """
    Display a Candlestick graph
    :param data: stock data, ndarray
    """
    # Create subplots and mention plot grid size
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, 
               vertical_spacing=0.03, subplot_titles=(STOCK_SYMBOL, ), 
               row_width=[0.2, 0.7])
    
    # 'Open', 'Low', 'High', 'Close'
    fig.add_trace(go.Candlestick(open=data[:, 0],
                                 high=data[:, 1],
                                 low=data[:, 2],
                                 close=data[:, 3],
                                 name=STOCK_SYMBOL,
                                 showlegend=False),
                  row=1, col=1)
    
    # 'Volume'
    fig.add_trace(go.Bar(y=data[:, 4], showlegend=False),
                  row=2, col=1)
    
    # Parameters
    fig.update_layout(xaxis_rangeslider_visible=False)  # Remove slider of candlestick    
    fig.update_yaxes(title_text='Stock price', row=1, col=1)
    fig.update_yaxes(title_text='Volume', row=2, col=1)

    fig.show()    

In [120]:
display_raw_stock(stock)

In [135]:
def display_stock_prediction(reality, prediction):
    """
    Display a Candlestick graph
    :param data: stock data, ndarray
    :param symbol: symbol of the stock, string
    """
    fig = go.Figure()
    
    # 'Open', 'Low', 'High', 'Close'
    fig.add_trace(go.Candlestick(open=reality[:, 0],
                                 high=reality[:, 1],
                                 low=reality[:, 2],
                                 close=reality[:, 3],
                                 name=STOCK_SYMBOL))
    
    # Prediction
    prediction = prediction.reshape((prediction.shape[0], 1))
    fig.add_trace(go.Scatter(y=prediction[:, 0], line_color='blue', name='Prediction',
                             mode='lines+markers', marker=dict(size=4), line=dict(width=1)))
    
    # Parameters
    fig.update_layout(title=('Prediction vs Reality'), xaxis_rangeslider_visible=False)
    fig.update_yaxes(title_text='Stock')

    fig.show()    

In [136]:
display_stock_prediction(stock, stock[:, 1])

In [123]:
def display_history(history):
    """
    Display history of learning step
    :param history: history from fit function
    """
    fig = go.Figure()
    
    # 'Volume'
    fig.add_trace(go.Scatter(y=history.history['loss'], name='Train', mode='lines+markers'))
    
    # 'Volume'
    fig.add_trace(go.Scatter(y=history.history['val_loss'], name='Validation', mode='lines+markers'))
    
    # Parameters
    fig.update_layout(title='Loss during training')
    fig.update_xaxes(title='Epochs')
    fig.update_yaxes(title='Loss')
    
    fig.show()        

# Models

## LSTM

In [124]:
def lstm(num_timesteps, num_features, num_outputs=1, nb_layers=4, units=50, dropout=0.2, activation=None):
    """
    Build lstm model
    :param num_timesteps: number of timesteps in the input
    :param num_features: number of features in the input
    :param num_outputs: number of outputs
    :param nb_layers: number of layers
    :param units: numbers of lstm neurons
    :param dropout: fraction of the input units to drop
    :param activation: activation function of the last (Dense) layer
    :return:
        - model: lstm model
    """

    # Specify the input shape
    inputs = Input(shape=(num_timesteps, num_features))

    # Build the layers
    tensor = inputs
    for i_layer in range(nb_layers-1):
        tensor = LSTM(units=units, return_sequences=True)(tensor)
        tensor = Dropout(dropout)(tensor)

    tensor = LSTM(units=units)(tensor)
    tensor = Dropout(dropout)(tensor)

    # Specify the output shape
    outputs = Dense(units=num_outputs, activation=activation)(tensor)

    # Create the model with defined inputs and outputs
    model = Model(inputs=inputs, outputs=outputs)

    return model

In [154]:
my_lstm = lstm(time_step, stock.shape[1], num_outputs=1, nb_layers=1, units=40, dropout=0.2, activation=None)

my_lstm.compile(optimizer='adam', loss='mean_squared_error')
history = my_lstm.fit(x_train_timeseries, y_train, epochs=20, batch_size=8, validation_data=(x_test_timeseries, y_test), shuffle=False)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [155]:
display_history(history)

In [156]:
prediction = my_lstm.predict(x_test_timeseries)

In [157]:
# display_stock_prediction(x_test, prediction)
display_stock_prediction(x_test[time_step:, ], prediction)

# Delay