# Let's stop being gullible. You can't predict stocks with a simple LSTM

In [None]:
import math
import json
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler 
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras.utils.vis_utils import plot_model

In [None]:
# Get these from Alpacas
api_key = ...
api_secret_key = ...

#### The function below is to be used for shares

In [None]:
def get_historical_prices(symbols, interval = '1Day', start_date = None):
    
    
    def fetch(url, next_page_token = None):
        
        if next_page_token:
            url+=f'&page_token={next_page_token}'
            
        headers = {
            "accept": "application/json",
            "APCA-API-KEY-ID": api_key,
            "APCA-API-SECRET-KEY": api_secret_key
        }

        response = requests.get(url, headers=headers, verify=False)

        data = json.loads(response.text)
        try:
            next_page_token = 'stop' if str(data['next_page_token']) == 'None' else str(data['next_page_token'])
        except:
            next_page_token = 'stop'
        
        return data['bars'], next_page_token
    
    if start_date:
        url = f"https://data.alpaca.markets/v2/stocks/bars?symbols={'%2C'.join(symbols)}&timeframe={interval}&start={start_date}&limit=10000&adjustment=raw&feed=iex"
    else:
        url = f"https://data.alpaca.markets/v2/stocks/bars?symbols={'%2C'.join(symbols)}&timeframe={interval}&limit=10000&adjustment=raw&feed=iex"

    historical_prices = {}
    
    next_page_token = None
    while next_page_token != 'stop':
        data, next_page_token = fetch(url, next_page_token)
        print(next_page_token)
        historical_prices.update(data)
    
    print(historical_prices)
    print('Data Downloaded')
    
    prices = [[[dict['t'],dict['c']] for dict in historical_prices[symbol]] for symbol in symbols]
    
    active_symbols = []
    stocks = {}
    for i in range(len(symbols)):
        try:
            stocks[symbols[i]] = pd.DataFrame(prices[i], columns=['date', 'close']).sort_values(by='date', ascending=True)
            active_symbols.append(symbols[i])
        except:
            print('bug with: ', symbols[i])
            
    return stocks, list(stocks.keys())

#### This one is for BTC/USD (No need API KEYS)

In [None]:
def get_historical_prices(symbols, interval = '1Day', start_date = None):
    
    def fetch(url, next_page_token = None):
        
        if next_page_token:
            url+=f'&page_token={next_page_token}'

        response = requests.get(url, verify=False)

        data = json.loads(response.text)
        try:
            next_page_token = 'stop' if str(data['next_page_token']) == 'None' else str(data['next_page_token'])
        except:
            next_page_token = 'stop'
        
        return data['bars'], next_page_token
    
    if start_date:
        url = f"https://data.alpaca.markets/v1beta3/crypto/us/bars?symbols={'%2C'.join(symbols)}&timeframe={interval}&start={start_date}&limit=10000"
    else:
        url = f"https://data.alpaca.markets/v1beta3/crypto/us/bars?symbols={'%2C'.join(symbols)}&timeframe={interval}&limit=10000"

    historical_prices = {}
    
    next_page_token = None
    while next_page_token != 'stop':
        data, next_page_token = fetch(url, next_page_token)
        print(next_page_token)
        historical_prices.update(data)
    
    print(historical_prices)
    print('Data Downloaded')
    
    prices = [[[dict['t'],dict['c']] for dict in historical_prices[symbol]] for symbol in symbols]
    
    active_symbols = []
    stocks = {}
    for i in range(len(symbols)):
        try:
            stocks[symbols[i]] = pd.DataFrame(prices[i], columns=['date', 'close']).sort_values(by='date', ascending=True)
            active_symbols.append(symbols[i])
        except:
            print('bug with: ', symbols[i])
            
    return stocks, list(stocks.keys())

In [None]:
# Download data, i.e MSFT here
symbols = ['MSFT']
stocks, symbols = get_historical_prices(symbols, interval='1Day', start_date='2020-09-01')
stocks

In [None]:
# Define DataFrame
df = stocks['MSFT'][['date', 'close']]
df.plot()

In [None]:
# Scaling the data
scaler = MinMaxScaler(feature_range=(0,1))
scaled_values = scaler.fit_transform(np.array(df.sort_values(by='date', ascending=True)['close']).reshape(-1,1))

scaled_df = df.copy()
scaled_df['close'] = scaled_values
scaled_df

In [None]:
# Split train and test data without shuffle
train_df, test_df = train_test_split(scaled_df, test_size=0.2, shuffle=False)
train_df

In [None]:
# Set up the train rolling window for the LSTM
x_train = []
y_train = []

for i in range(30, len(train_df)):
    x_train.append(train_df.iloc[i-30:i]['close'])
    y_train.append(train_df.iloc[i]['close'])
    
x_train, y_train = np.array(x_train), np.array(y_train)
x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1))

In [None]:
# Set up the test rolling window for the LSTM
x_test = []
y_test = []

for i in range(30, len(test_df)):
    x_test.append(test_df.iloc[i-30:i]['close'])
    y_test.append(test_df.iloc[i]['close'])

x_test = np.array(x_test)
x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1))

In [None]:
# Define model
model = keras.Sequential()
model.add(layers.LSTM(128, return_sequences=True, input_shape=(x_train.shape[1], 1)))
model.add(layers.LSTM(64, return_sequences=False))
model.add(layers.Dense(32))
model.add(layers.Dense(1))
model.summary()

In [None]:
# Compile model and fit to train data
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(x_train, y_train, batch_size= 1, epochs=3)

In [None]:
# Plot results
predictions = model.predict(x_test)
predictions = scaler.inverse_transform(predictions)
validation = test_df[30:].copy()
validation['predictions'] = predictions
plt.figure(figsize=(16,8))
plt.title('Model')
plt.xlabel('Date')
plt.ylabel('Close Price USD ($)')
plt.plot(df['close'])
plt.plot(validation[['predictions']])
plt.legend(['Base', 'Predictions'], loc='lower right')
plt.show()

## Back to reality

#### The first algorithm is more accurate but much slower

In [None]:
import numpy as np
import pandas as pd
from tqdm import trange
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import LSTM, Dense

# Create a time series
df_train = df.iloc[:-100].copy()
time_series = np.array(df_train['close'])

# Normalization
scaler = MinMaxScaler(feature_range=(0, 1))
normalized_series = scaler.fit_transform(time_series.reshape(-1, 1))

# Training data
lookback = 30
X_train, y_train = [], []
for i in range(len(normalized_series) - lookback):
    X_train.append(normalized_series[i:i+lookback, 0])
    y_train.append(normalized_series[i+lookback, 0])
X_train, y_train = np.array(X_train), np.array(y_train)
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))

# Test data
last_instances = normalized_series[-lookback:]
X_test = np.array([last_instances])
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))

# Prediction
predicted_values = []

for i in trange(100):
    model = Sequential()
    model.add(LSTM(128, input_shape=(lookback, 1)))
    model.add(Dense(32,activation='relu'))
    model.add(Dense(1,activation='sigmoid'))
    model.compile(loss='mean_squared_error', optimizer='adam')
    model.fit(X_train, y_train, epochs=1, batch_size=1, verbose=0)
    predicted_value = model.predict(X_test)
    X_test = np.reshape(np.array([np.concatenate((X_test.reshape(lookback,)[1:], predicted_value[0])).reshape(-1, 1)]), (X_test.shape[0], X_test.shape[1], 1))
    predicted_values.append(scaler.inverse_transform(predicted_value)[0][0])

print("Predicted values:", predicted_values)

#### Less accurate but much faster !

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import LSTM, Dense

df_train = df.iloc[:-100].copy()

# Création d'une série temporelle fictive
time_series = np.array(df_train['close'])

# Normalisation de la série temporelle
scaler = MinMaxScaler(feature_range=(0, 1))
normalized_series = scaler.fit_transform(time_series.reshape(-1, 1))

# Préparation des données d'entraînement
lookback = 30  # Nombre d'instants de temps antérieurs à utiliser pour la prédiction
X_train, y_train = [], []
for i in range(len(normalized_series) - lookback):
    X_train.append(normalized_series[i:i+lookback, 0])
    y_train.append(normalized_series[i+lookback, 0])
X_train, y_train = np.array(X_train), np.array(y_train)

# Remodelage des données pour l'entrée dans le LSTM (batch_size, timesteps, features)
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))

# Construction du modèle LSTM
model = Sequential()
model.add(LSTM(128, input_shape=(lookback, 1)))
model.add(Dense(32,activation='relu'))
model.add(Dense(1,activation='sigmoid'))
model.compile(loss='mean_squared_error', optimizer='adam')

# Entraînement du modèle
model.fit(X_train, y_train, epochs=5, batch_size=1, verbose=1)

# Préparation des données de test (derniers instants de temps de la série)
last_instances = normalized_series[-lookback:]
X_test = np.array([last_instances])
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))

# Prédiction des prochaines valeurs
predicted_values = []

for i in range(100):
    predicted_value = model.predict(X_test)
    print(predicted_value)
    X_test = np.reshape(np.array([np.concatenate((X_test.reshape(lookback,)[1:], predicted_value[0])).reshape(-1, 1)]), (X_test.shape[0], X_test.shape[1], 1))
    predicted_values.append(scaler.inverse_transform(predicted_value)[0][0])

print("Valeurs prédites:", predicted_values)

#### Plot results

In [None]:
validation = pd.DataFrame(index=[k for k in range(len(df_train['close']), len(df_train['close'])+len(predicted_values))])
validation['predictions'] = predicted_values
plt.figure(figsize=(16,8))
plt.title('Model')
plt.xlabel('Date')
plt.ylabel('Close Price USD ($)')
plt.plot(df['close'])
plt.plot(validation[['predictions']])
plt.legend(['Base', 'Predictions'], loc='lower right')
plt.show()