In [None]:
!nvidia-smi

In [5]:
!pip install --upgrade pip
!pip install -q numpy
!pip install -q matplotlib
!pip install -q pandas
!pip install -q tensorflow
!pip install -q scikit-learn
!pip install -q pandas-datareader
!pip install -q yfinance
!pip install --upgrade mplfinance

# Code Source Note: https://github.com/twopirllc/pandas-ta
!pip install -q pandas_ta



In [None]:
import os
import sys
from google.colab import drive
drive.mount('/content/drive/')

# Set the working directory for the tasks
SKELETON_DIR = '/content/drive/MyDrive/stock-prediction/MachineLearning2'
os.chdir(SKELETON_DIR)

In [4]:
from datetime import datetime, timedelta
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import pandas_ta as ta
import pandas_datareader as web
import datetime as dt
import tensorflow as tf
import yfinance as yf
import mplfinance as mpf

from sklearn.preprocessing import MinMaxScaler
from sklearn import metrics
from sklearn.model_selection import train_test_split

from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Dense, Dropout, LSTM, Bidirectional, SimpleRNN, GRU, InputLayer, Input, Activation
from tensorflow.keras.utils import plot_model

import joblib

ModuleNotFoundError: No module named 'numpy'

In [3]:
from train_datasets import create_datasets

ModuleNotFoundError: No module named 'joblib'

In [2]:
data, df, train_data, test_data, train_feature_scaler, train_target_scaler, x_train, x_test, y_train, y_test = create_datasets(start, end, ticker)

NameError: name 'create_datasets' is not defined

In [None]:
print("Data shapes/types:")
print("data:", type(data))
print("df:", type(df))
print("train_data:", train_data.shape)
print("test_data:", test_data.shape)
print("train_feature_scaler:", type(train_feature_scaler))
print("train_target_scaler:", type(train_target_scaler))
print("x_train:", x_train.shape)
print("x_test:", x_test.shape)
print("y_train:", y_train.shape)
print("y_test:", y_test.shape)

In [None]:
def plot_candlestick(input_df, n=1):

    # Copy to avoid warnings
    input_df = input_df.copy()

    # Resampling the data for n trading days
    if n > 1:
        input_df = input_df.resample(f'{n}D').agg({
            'Open': 'first',
            'High': 'max',
            'Low': 'min',
            'Close': 'last',
            'Volume': 'sum'
        }).dropna()

    # Add moving averages to the dataframe
    input_df['MA50'] = input_df['Close'].rolling(window=50).mean()
    input_df['MA100'] = input_df['Close'].rolling(window=100).mean()
    input_df['MA200'] = input_df['Close'].rolling(window=200).mean()

    # Create a custom plot for the moving averages
    ap = []
    if input_df['MA50'].dropna().shape[0] > 0:
        aligned_MA50 = input_df['MA50'].dropna().reindex(input_df.index, fill_value=None)
        ap.append(mpf.make_addplot(aligned_MA50, color='orange'))
    if input_df['MA100'].dropna().shape[0] > 0:
        aligned_MA100 = input_df['MA100'].dropna().reindex(input_df.index, fill_value=None)
        ap.append(mpf.make_addplot(aligned_MA100, color='green'))
    if input_df['MA200'].dropna().shape[0] > 0:
        aligned_MA200 = input_df['MA200'].dropna().reindex(input_df.index, fill_value=None)
        ap.append(mpf.make_addplot(aligned_MA200, color='magenta'))

    # Plot the candlestick chart
    mpf.plot(input_df, type='candle', style='charles',
             title=f"{ticker} Candlestick Chart",
             ylabel='Price',
             volume=True,
             ylabel_lower='Volume',
             addplot=ap,
             show_nontrading=True)

In [None]:
def plot_boxplot(input_df, n=1, k=10):
    # Copy to avoid warnings
    input_df = input_df.copy()

    # Resampling the data for n trading days
    if n > 1:
        input_df = input_df.resample(f'{n}D').agg({
            'Open': 'first',
            'High': 'max',
            'Low': 'min',
            'Close': 'last',
            'Volume': 'sum'
        }).dropna()

    # Prepare data for boxplot
    box_data = []
    labels = []
    for idx, row in input_df.iterrows():
        box_data.append([row['Low'], row['Open'], row['Close'], row['High']])
        labels.append(idx.strftime('%Y-%m-%d'))

    # Plotting
    fig, ax = plt.subplots()
    ax.boxplot(box_data, vert=True, patch_artist=True)
    ax.set_title(f'{ticker} Boxplot Chart')
    ax.set_xlabel('Date')
    ax.set_ylabel('Price')

    # Set x-axis labels and ticks
    ax.set_xticks(range(1, len(labels) + 1, k))
    ax.set_xticklabels(labels[::k], rotation=90)

    plt.show()

In [None]:
plot_candlestick(data)

In [None]:
plot_candlestick(df, n=10)

In [None]:
plot_candlestick(train_data,n=10)

In [None]:
plot_candlestick(test_data,n=10)

In [None]:
def create_dynamic_model(input_shape, layer_configs, output_units=1):
    model = Sequential()

    # First layer needs to specify input_shape
    first_layer_config = layer_configs[0]
    layer_type = first_layer_config['type']

    if 'Bidirectional' in layer_type:
        layer_type = layer_type.replace('Bidirectional(', '').replace(')', '')
        if layer_type == 'LSTM':
            model.add(Bidirectional(LSTM(units=first_layer_config['units'], return_sequences=first_layer_config['return_sequences']), input_shape=input_shape))
        elif layer_type == 'GRU':
            model.add(Bidirectional(GRU(units=first_layer_config['units'], return_sequences=first_layer_config['return_sequences']), input_shape=input_shape))
        elif layer_type == 'RNN':
            model.add(Bidirectional(SimpleRNN(units=first_layer_config['units'], return_sequences=first_layer_config['return_sequences']), input_shape=input_shape))
    else:
        if layer_type == 'LSTM':
            model.add(LSTM(units=first_layer_config['units'], return_sequences=first_layer_config['return_sequences'], input_shape=input_shape))
        elif layer_type == 'GRU':
            model.add(GRU(units=first_layer_config['units'], return_sequences=first_layer_config['return_sequences'], input_shape=input_shape))
        elif layer_type == 'RNN':
            model.add(SimpleRNN(units=first_layer_config['units'], return_sequences=first_layer_config['return_sequences'], input_shape=input_shape))

    if 'activation' in first_layer_config:
        model.add(Activation(first_layer_config['activation']))

    model.add(Dropout(first_layer_config['dropout']))

    # Remaining layers
    for layer_config in layer_configs[1:]:
        layer_type = layer_config['type']

        if 'Bidirectional' in layer_type:
            layer_type = layer_type.replace('Bidirectional(', '').replace(')', '')
            if layer_type == 'LSTM':
                model.add(Bidirectional(LSTM(units=layer_config['units'], return_sequences=layer_config['return_sequences']), input_shape=input_shape))
            elif layer_type == 'GRU':
                model.add(Bidirectional(GRU(units=layer_config['units'], return_sequences=layer_config['return_sequences']), input_shape=input_shape))
            elif layer_type == 'RNN':
                model.add(Bidirectional(SimpleRNN(units=layer_config['units'], return_sequences=layer_config['return_sequences']), input_shape=input_shape))
        else:
            if layer_type == 'LSTM':
                model.add(LSTM(units=layer_config['units'], return_sequences=layer_config['return_sequences']))
            elif layer_type == 'GRU':
                model.add(GRU(units=layer_config['units'], return_sequences=layer_config['return_sequences']))
            elif layer_type == 'RNN':
                model.add(SimpleRNN(units=layer_config['units'], return_sequences=layer_config['return_sequences']))

        if 'activation' in layer_config:
            model.add(Activation(layer_config['activation']))

        model.add(Dropout(layer_config['dropout']))

    # Output layer
    model.add(Dense(units=output_units))

    return model

In [None]:
def plot_metric(metric_name_1, metric_name_2, plot_name):
  # Get Metric values using metric names as identifiers
  metric_value_1 = model_training_history.history[metric_name_1]
  metric_value_2 = model_training_history.history[metric_name_2]

  # Constructing a range object which will be used as time
  epochs = range(len(metric_value_1))

  # Plotting the Graph
  plt.plot(epochs, metric_value_1, 'blue', label = metric_name_1)
  plt.plot(epochs, metric_value_2, 'red', label = metric_name_2)

  # Adding title to the plot
  plt.title(str(plot_name))

  # Adding legend to the plot
  plt.legend()

In [None]:
n_steps = x_train.shape[1]
print(n_steps)
n_features = x_train.shape[2]
print(n_features)
input_shape = (n_steps, n_features)

In [None]:
# Test 1 - Mix LSTM, GRU, RNN, Differs Activation
layer_configs = [
    { 'type': 'LSTM', 'units': 120, 'return_sequences': True, 'dropout': 0.25, 'activation': 'tanh' },
    { 'type': 'LSTM', 'units': 100, 'return_sequences': True, 'dropout': 0.25 },
    { 'type': 'GRU', 'units': 80, 'return_sequences': True, 'dropout': 0.25 },
    { 'type': 'RNN', 'units': 60, 'return_sequences': True, 'dropout': 0.25, 'activation': 'relu' },
    { 'type': 'LSTM', 'units': 40, 'return_sequences': False, 'dropout': 0.25 }
]

In [None]:
# Test 2 - Simple LSTM Model
layer_configs = [
    { 'type': 'LSTM', 'units': 50, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'LSTM', 'units': 50, 'return_sequences': False, 'dropout': 0.2 }
]

In [None]:
# Test 3 - Simple GRU Model (Most Accurate)
layer_configs = [
    { 'type': 'GRU', 'units': 100, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'GRU', 'units': 100, 'return_sequences': False, 'dropout': 0.2 }
]

In [None]:
# Test 4 - More complex GRU Model
layer_configs = [
    { 'type': 'GRU', 'units': 128, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'GRU', 'units': 128, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'GRU', 'units': 128, 'return_sequences': False, 'dropout': 0.2 }
]

In [None]:
# Test 5 - Mixed LSTM and GRU Model
layer_configs = [
    { 'type': 'LSTM', 'units': 100, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'GRU', 'units': 100, 'return_sequences': False, 'dropout': 0.2 }
]

In [None]:
# Test 6 - Deep LSTM with more Units Model
layer_configs = [
    { 'type': 'LSTM', 'units': 200, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'LSTM', 'units': 150, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'LSTM', 'units': 100, 'return_sequences': False, 'dropout': 0.2 }
]

In [None]:
# Test 7 - LTSM Reduced Dropout Model
layer_configs = [
    { 'type': 'LSTM', 'units': 100, 'return_sequences': True, 'dropout': 0.1 },
    { 'type': 'LSTM', 'units': 100, 'return_sequences': False, 'dropout': 0.1 }
]

In [None]:
# Test 8 -  Complex Bidirectional LSTM Configuration Model
layer_configs = [
    { 'type': 'Bidirectional(LSTM)', 'units': 128, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'Bidirectional(LSTM)', 'units': 128, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'Bidirectional(LSTM)', 'units': 128, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'Bidirectional(LSTM)', 'units': 128, 'return_sequences': False, 'dropout': 0.2 }
]

In [None]:
# Test 9 -  Complex Bidirectional GRU Configuration Model
layer_configs = [
    { 'type': 'Bidirectional(GRU)', 'units': 128, 'return_sequences': True, 'dropout': 0.3 },
    { 'type': 'Bidirectional(GRU)', 'units': 128, 'return_sequences': True, 'dropout': 0.3 },
    { 'type': 'Bidirectional(GRU)', 'units': 64, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'Bidirectional(GRU)', 'units': 64, 'return_sequences': False, 'dropout': 0.2 }
]

In [None]:
# Test 10 - Complex Bidirectional GRU Configuration - More Layers Model
layer_configs = [
    { 'type': 'Bidirectional(GRU)', 'units': 256, 'return_sequences': True, 'dropout': 0.4 },
    { 'type': 'Bidirectional(GRU)', 'units': 128, 'return_sequences': True, 'dropout': 0.3 },
    { 'type': 'Bidirectional(GRU)', 'units': 128, 'return_sequences': True, 'dropout': 0.3 },
    { 'type': 'Bidirectional(GRU)', 'units': 64, 'return_sequences': True, 'dropout': 0.2 },
    { 'type': 'Bidirectional(GRU)', 'units': 32, 'return_sequences': False, 'dropout': 0.1 }
]

In [None]:
del lstm_model

In [None]:
lstm_model = create_dynamic_model(input_shape, layer_configs)
lstm_model.summary()

In [None]:
# Training
# Avoid overfitting by using early_stopping, dropout, etc
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=15, mode='min', restore_best_weights=True)
lstm_model.compile(loss='mse', optimizer='adam', metrics=["accuracy"])
model_training_history = lstm_model.fit(x_train, y_train, batch_size=32, epochs=100, shuffle=True, validation_split=0.2, callbacks=[early_stopping_callback])

In [None]:
# Plot Total Loss vs Total Validation Loss
plot_metric('loss', 'val_loss', 'Total Loss vs Total Validation Loss')

In [None]:
# Model coded added before for testing, won't be available in this submission.
predicted_prices = lstm_model.predict(x_test)
predicted_prices = train_target_scaler.inverse_transform(predicted_prices)

In [None]:
len(predicted_prices)

In [None]:
print(type(train_data))
print(type(test_data))
print(type(predicted_prices))

In [None]:
def plot_candlestick_predicted(input_df, predicted_prices, n=1):
    # Work with a deep copy to avoid modifying the original dataframe
    input_df = input_df.copy()

    # Resampling the data for n trading days
    if n > 1:
        input_df = input_df.resample(f'{n}D').agg({
            'Open': 'first',
            'High': 'max',
            'Low': 'min',
            'Close': 'last',
            'Volume': 'sum'
        }).dropna()

    # Add moving averages to the dataframe
    input_df['MA50'] = input_df[price_value].rolling(window=50).mean()
    input_df['MA100'] = input_df[price_value].rolling(window=100).mean()
    input_df['MA200'] = input_df[price_value].rolling(window=200).mean()

    # Convert the index to a DatetimeIndex
    input_df.index = pd.to_datetime(input_df.index)

    # Plot the last the last predicted candles
    df_plot = input_df[-len(predicted_prices):].copy()

    # Add Predicted Prices
    # Check if predicted_prices is 2D and reshape if necessary
    if predicted_prices.ndim == 2:
        predicted_prices = predicted_prices.reshape(-1)

    # Ensure the length of predicted_prices
    # matches the length of the sliced portion of the DataFrame
    if len(predicted_prices) > len(df_plot):
        predicted_prices = predicted_prices[-len(df_plot):]  # Take only the last predictions
    elif len(predicted_prices) < len(df_plot):
        print(f"Length mismatch: predicted_prices has length {len(predicted_prices)} but df_plot has length {len(df_plot)}")
        # Align the predictions to the end of df_plot
        start_idx = len(df_plot) - len(predicted_prices)
        df_plot = df_plot[start_idx:].copy()

    df_plot['Predicted'] = predicted_prices

    # Create a custom plot for the predicted prices
    ap = []
    if input_df['MA50'].dropna().shape[0] > 0:
        aligned_MA50 = input_df['MA50'].dropna().reindex(input_df.index, fill_value=None)
        ap.append(mpf.make_addplot(aligned_MA50, color='orange'))
    if input_df['MA100'].dropna().shape[0] > 0:
        aligned_MA100 = input_df['MA100'].dropna().reindex(input_df.index, fill_value=None)
        ap.append(mpf.make_addplot(aligned_MA100, color='green'))
    if input_df['MA200'].dropna().shape[0] > 0:
        aligned_MA200 = input_df['MA200'].dropna().reindex(input_df.index, fill_value=None)
        ap.append(mpf.make_addplot(aligned_MA200, color='magenta'))

    ap.append(mpf.make_addplot(df_plot['Predicted'], color='red', linestyle='dashed'))

    # Plot the candlestick chart
    mpf.plot(df_plot, type='candle', style='charles',
             title=f"{ticker}Candlestick Chart",
             ylabel='Price',
             volume=False,
             addplot=ap,
             show_nontrading=False)

In [None]:
plot_candlestick_predicted(test_data, predicted_prices, n=4)

In [None]:
def plot_candlestick_full(train_df, test_df, predicted_prices, n=1):
    # Create deep copies to avoid modifying the original dataframes
    train_df = train_df.copy()
    test_df = test_df.copy()

    # Resampling the data for n trading days
    if n > 1:
        train_df = train_df.resample(f'{n}D').agg({
            'Open': 'first',
            'High': 'max',
            'Low': 'min',
            'Close': 'last',
            'Volume': 'sum'
        }).dropna()

        test_df = test_df.resample(f'{n}D').agg({
            'Open': 'first',
            'High': 'max',
            'Low': 'min',
            'Close': 'last',
            'Volume': 'sum'
        }).dropna()

        if train_df.empty or test_df.empty:
          raise ValueError("Resampling resulted in an empty DataFrame. Try a smaller value of n.")

        # Adjust the length of predicted_prices to match test_df
        eff_length = len(test_df)
        predicted_prices = predicted_prices[-eff_length:]

    # Compute moving averages for the training data
    train_df['MA50'] = train_df[price_value].rolling(window=50).mean()
    train_df['MA100'] = train_df[price_value].rolling(window=100).mean()
    train_df['MA200'] = train_df[price_value].rolling(window=200).mean()

    # Compute moving averages for the test data
    test_df['MA50'] = test_df[price_value].rolling(window=50).mean()
    test_df['MA100'] = test_df[price_value].rolling(window=100).mean()
    test_df['MA200'] = test_df[price_value].rolling(window=200).mean()

    # Check if predicted_prices is 2D and reshape if necessary
    if predicted_prices.ndim == 2:
        predicted_prices = predicted_prices.reshape(-1)

    # Ensure the length of predicted_prices matches the length of the test data
    if len(predicted_prices) != len(test_df):
        raise ValueError(f"Length mismatch: predicted_prices has length {len(predicted_prices)} but test_df has length {len(test_df)}")

    # Add predicted prices to the test dataframe
    test_df['Predicted'] = predicted_prices

    # Concatenate train and test dataframes to form a complete dataframe for plotting
    df_plot = pd.concat([train_df, test_df])

    # Convert the index to a DatetimeIndex
    df_plot.index = pd.to_datetime(df_plot.index)

    # Create a custom plot for the predicted prices and moving averages
    ap = []
    if df_plot['MA50'].dropna().shape[0] > 0:
        aligned_MA50 = df_plot['MA50'].dropna().reindex(df_plot.index, fill_value=None)
        ap.append(mpf.make_addplot(aligned_MA50, color='orange'))

    if df_plot['MA100'].dropna().shape[0] > 0:
        aligned_MA100 = df_plot['MA100'].dropna().reindex(df_plot.index, fill_value=None)
        ap.append(mpf.make_addplot(aligned_MA100, color='green'))

    if df_plot['MA200'].dropna().shape[0] > 0:
        aligned_MA200 = df_plot['MA200'].dropna().reindex(df_plot.index, fill_value=None)
        ap.append(mpf.make_addplot(aligned_MA200, color='magenta'))

    ap.append(mpf.make_addplot(df_plot['Predicted'], color='red', linestyle='dashed'))

    # Plot the candlestick chart
    mpf.plot(df_plot, type='candle', style='charles',
            title=f"{ticker} Candlestick Chart",
            ylabel='Price',
            volume=False,
            addplot=ap,
            show_nontrading=False)


In [None]:
# Truncate or slice test_df to match the length of predicted_prices
truncated_test_data = test_data.iloc[-len(predicted_prices):]
plot_candlestick_full(train_data,truncated_test_data, predicted_prices, n=4)

**ARIMA MODEL**

In [None]:
import pandas as pd
import statsmodels.api as sm
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import matplotlib.pyplot as plt

In [None]:
# Plot ACF and PACF for 'Close' prices
fig, ax = plt.subplots(1, 2, figsize=(12, 4))

# ACF plot
plot_acf(train_data['Close'], lags=40, ax=ax[0])

# PACF plot
plot_pacf(train_data['Close'], lags=40, ax=ax[1])

plt.tight_layout()
plt.show()

In [None]:
from statsmodels.tsa.stattools import adfuller

def visualize_and_check_stationarity(time_series):
    """
    Visualizes the provided time series with its rolling mean and standard deviation.
    Also performs the Augmented Dickey-Fuller (ADF) test to check for stationarity.

    Parameters:
    - time_series: The time series data (pandas Series).
    """
    # Plotting the original time series with rolling mean and standard deviation
    rolling_window = 30  # 30 days rolling window
    rolling_mean = time_series.rolling(window=rolling_window).mean()
    rolling_std = time_series.rolling(window=rolling_window).std()

    plt.figure(figsize=(14, 7))
    plt.plot(time_series, color='blue', label='Original Close Prices')
    plt.plot(rolling_mean, color='red', label='Rolling Mean')
    plt.plot(rolling_std, color='black', label='Rolling Std')
    plt.legend(loc='best')
    plt.title('Close Prices with Rolling Mean & Standard Deviation')
    plt.show()

    # Augmented Dickey-Fuller test
    adft = adfuller(time_series, autolag='AIC')
    output = pd.Series(adft[0:4], index=['Test Statistics', 'p-value', 'No. of lags used', 'Number of observations used'])
    for key, values in adft[4].items():
        output['critical value (%s)' % key] = values
    print(output)

# Calling the function to visualize the 'Close' prices and check for stationarity
visualize_and_check_stationarity(train_data['Close'])

In [None]:
from statsmodels.tsa.seasonal import seasonal_decompose

def decompose_time_series(time_series, model='additive', freq=30):
    """
    Decomposes a time series into trend, seasonal, and residual components.

    Parameters:
    - time_series: The time series data (pandas Series).
    - model: Type of decomposition ('additive' or 'multiplicative'). Default is 'additive'.
    - freq: Frequency for seasonal decomposition. Default is 30 (monthly).

    Returns:
    - Decomposition result object.
    """

    decomposition = seasonal_decompose(time_series, model=model, period=freq)

    # Plotting the decomposed time series components
    plt.figure(figsize=(16, 8))

    plt.subplot(411)
    plt.plot(time_series, label='Original')
    plt.legend(loc='upper left')
    plt.title('Time Series Decomposition')

    plt.subplot(412)
    plt.plot(decomposition.trend, label='Trend')
    plt.legend(loc='upper left')

    plt.subplot(413)
    plt.plot(decomposition.seasonal, label='Seasonal')
    plt.legend(loc='upper left')

    plt.subplot(414)
    plt.plot(decomposition.resid, label='Residual')
    plt.legend(loc='upper left')

    plt.tight_layout()
    plt.show()

    return decomposition

# Decompose the 'Close' prices time series
decomposed = decompose_time_series(train_data['Close'])


In [None]:
!pip install pmdarima

In [None]:
from pmdarima import auto_arima

# Finding the best ARIMA model using auto_arima
model_autoARIMA = auto_arima(train_data['Close'], start_p=0, start_q=0,
                             test='adf',       # using adftest to find optimal 'd'
                             max_p=3, max_q=3, # maximum p and q
                             m=1,              # frequency of series
                             d=None,           # let model determine 'd'
                             seasonal=False,   # No Seasonality
                             start_P=0,
                             D=0,
                             trace=True,
                             error_action='ignore',
                             suppress_warnings=True,
                             stepwise=True)

print(model_autoARIMA.summary())
model_autoARIMA.plot_diagnostics(figsize=(15,8))
plt.show()

In [None]:
# Building the ARIMA model using your parameters
arima_model = ARIMA(train_data['Close'], order=(0, 1, 0))
fitted = arima_model.fit()

def arima_forecast(train_series, test_series, fitted_model):
    """
    Forecast using the provided ARIMA model and plot the results.

    Parameters:
    - train_series: Training data (pandas Series).
    - test_series: Testing data (pandas Series).
    - fitted_model: Fitted ARIMA model.

    """
    # Forecasting using the ARIMA model
    forecast_object = fitted_model.get_forecast(steps=len(test_series))

    # Extract forecast and confidence intervals
    fc = forecast_object.predicted_mean
    conf_int = forecast_object.conf_int(alpha=0.05)
    lower_series = conf_int.iloc[:, 0]
    upper_series = conf_int.iloc[:, 1]

    # print(len(fc))
    # print(conf_int)

    return fc, conf_int, lower_series, upper_series


# Call the function to forecast and plot
fc, conf_int, lower_series, upper_series = arima_forecast(train_data['Close'], test_data['Close'], fitted)




In [None]:
# Plotting the data
plt.figure(figsize=(12,6), dpi=100)
plt.plot(train_data['Close'].index, train_data['Close'], label='Training Data')
plt.plot(test_data['Close'].index, test_data['Close'], color='blue', label='Actual Stock Price')
plt.plot(test_data['Close'].index, fc, color='orange', label='Predicted Stock Price')
plt.fill_between(test_data['Close'].index, lower_series, upper_series, color='k', alpha=.10)
plt.title('Stock Price Prediction')
plt.xlabel('Time')
plt.ylabel('Stock Price')
plt.legend(loc='upper left', fontsize=8)
plt.show()

In [None]:
del data, df, train_data, test_data, train_feature_scaler, train_target_scaler, x_train, x_test, y_train, y_test

## `create_predict_datasets`

**The function `create_predict_datasets` takes the following parameters:**

1. `start_predict`: A string representing the starting date for fetching the stock data in the format 'YYYY-MM-DD'.
2. `end_predict`: A string representing the ending date for fetching the stock data in the format 'YYYY-MM-DD'.
3. `tick`: A string representing the stock ticker symbol for which the data is to be fetched.
4. `k`: An integer parameter (the significance of which is not described in the provided context).

**This function performs the following operations:**

1. **Data Fetching**: Retrieves stock data for the specified `tick` between `start_predict` and `end_predict` dates using the `load_data` function.
2. **Data Validation**: Applies data validation and preprocessing using the `data_validation` function. This includes feature engineering and data transformation tasks.
3. **Feature and Target Definition**: Specifies the features and target columns for prediction. The features include stock data attributes like 'Open', 'High', 'Low', etc., and the target is the 'TargetNextClose' column.
4. **Scaling Data**: Scales the feature data to bring it within a normalized range, typically [0,1], using the `scaler_features` function. This ensures better performance during the prediction phase.
5. **Prepare Test Data**: Constructs the test datasets by segmenting the scaled data based on the `step_size` and the length of the scaled data.
6. **Datetime Index Check**: Ensures the dataframes `data` and `df` have a datetime index. If not, it converts the 'Date' column to a datetime index.
7. **Return**: Returns the processed dataframe `df`, the scaled feature data `scaled_data`, the scaler object `scaler` for inverse transformations, and the test datasets `x_test` and `y_test`.

In [None]:
def create_predict_datasets(start_predict, end_predict, tick, k):

    # Download or Load Raw Data
    print(f"Fetching data from {start_predict} to {end_predict}")
    data = load_data(start_predict, end_predict, tick)

    print(f"Raw data shape: {data.shape}")
    print(data.head())

    # Data Validation
    df = data_validation(start_predict, end_predict, tick)

    print(f"Processed data shape: {df.shape}")
    print(df.head())

    # Define features and target
    feature_columns = ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'RSI', 'EMAF', 'EMAM', 'EMAS']
    target_column = 'TargetNextClose'

    # Preparing Datasets
    # Scaler for features
    scaled_data, train_feature_scaler = scaler_features(df[feature_columns])
    print("Scaled data shape:", scaled_data.shape)

    # Scaler for target
    scaled_target_train, scaler = scaler_features(df[target_column].values.reshape(-1, 1))

    x_test, y_test = [], []
    for i in range(step_size, len(scaled_data)):
        x_test.append(scaled_data[i-step_size:i])
        y_test.append(scaled_target_train[i])

    x_test, y_test = np.array(x_test), np.array(y_test)
    print("x_test shape:", x_test.shape)

    # For data
    if not isinstance(data.index, pd.DatetimeIndex):
        if "Date" in data.columns:
            data['Date'] = pd.to_datetime(data['Date'])
            data.set_index('Date', inplace=True)

    # For df
    if not isinstance(df.index, pd.DatetimeIndex):
        if "Date" in df.columns:
            df['Date'] = pd.to_datetime(df['Date'])
            df.set_index('Date', inplace=True)


    return df, scaled_data, scaler, x_test, y_test


In [None]:
def plot_predictions(df, actual_prices, past_predictions, future_predictions):
    # Flatten future_predictions to 1D
    future_predictions = future_predictions.flatten()

    # Generate date sequences for plotting
    actual_dates = df.index
    future_dates = pd.date_range(start=actual_dates[-1], periods=len(future_predictions)+1)[1:]

    plt.figure(figsize=(15, 7))

    # Plot actual and past predicted prices
    plt.plot(actual_dates, actual_prices, color='blue', label='Actual Prices')
    plt.plot(actual_dates[-len(past_predictions):], past_predictions, color='green', label='Past Predictions')

    # Plot future predictions
    plt.plot(future_dates, future_predictions, color='red', linestyle='dashed', label='Future Predictions')

    plt.title('Stock Price Predictions')
    plt.xlabel('Date')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

## `single_day_multivariate_prediction`

**The function `single_day_multivariate_prediction` takes the following parameters:**

1. `model`: A pre-trained machine learning or deep learning model that will be used for prediction.
2. `tick`: A string representing the stock ticker symbol for which the prediction is to be made.
3. `predict_date`: A string representing the date for which the prediction is to be made in the format 'YYYY-MM-DD'.

**This function performs the following operations:**

1. **Date Conversion**: Converts the `predict_date` string into a datetime object for further operations.
2. **Lookback Calculation**: Determines the starting date (2 years prior to `predict_date`) for fetching historical stock data.
3. **Data Preparation**: Calls the `create_predict_datasets` function to fetch and prepare the necessary datasets for prediction. The data fetched starts from the calculated lookback date and ends a day before the specified `predict_date`.
4. **Data Availability Check**: Verifies if there's sufficient data available to make a prediction. If not, a `ValueError` is raised.
5. **Prediction**: Uses the provided `model` to make a prediction using the last day's multivariate data fetched.
6. **Inverse Scaling**: The raw prediction output from the model is scaled back to its original range using the inverse transformation process to get the actual predicted price.
7. **Output**: The function returns the predicted closing price for the specified `predict_date`.

In [None]:
def single_day_multivariate_prediction(model, tick, predict_date):

    # Convert the string date to a datetime object
    predict_date = datetime.strptime(predict_date, '%Y-%m-%d')
    start_date = predict_date - timedelta(days=730)

    print(f"Date: {start_date.strftime('%Y-%m-%d')} to {predict_date.strftime('%Y-%m-%d')}")

    # Create a dataset from the start date to the day before the specified prediction date
    df, scaled_data, scaler, x_test, _ = create_predict_datasets(start_date.strftime('%Y-%m-%d'), predict_date.strftime('%Y-%m-%d'), tick, k=1)

    print('Define ARIMA')
    model_arima = ARIMA(df['Close'], order=(1, 1, 2))
    fitted = model_arima.fit()

    print('Start forecast')
    # Forecasting using the ARIMA model
    forecast_object = fitted.get_forecast(steps=len(df['Close']))

    print('Extract forecast')
    # Extract forecast and confidence intervals
    fc = forecast_object.predicted_mean
    conf_int = forecast_object.conf_int(alpha=0.05)
    lower_series = conf_int.iloc[:, 0]
    upper_series = conf_int.iloc[:, 1]

    # Check if there's enough data for prediction
    if len(x_test) == 0 or x_test[-1].shape[0] == 0:
        raise ValueError("Insufficient data for the specified prediction date.")

    # Use the model to make a prediction using the last day's multivariate data
    prediction = model.predict(x_test[-1].reshape(1, -1, x_test.shape[-1]))

    # Inversely scale the prediction to get the actual predicted price
    predicted_price = scaler.inverse_transform(prediction)

    print(f"Date: {predict_date.strftime('%Y-%m-%d')}, Predicted Price: {predicted_price[0][0]}")
    print(f"Date: {predict_date.strftime('%Y-%m-%d')}, Predicted Price: {fc}")
    print(f"Date: {predict_date.strftime('%Y-%m-%d')}, Predicted Price: {lower_series}")
    print(f"Date: {predict_date.strftime('%Y-%m-%d')}, Predicted Price: {upper_series}")



    return predicted_price, fc


In [None]:
end_date = '2023-09-22'
predicted_closing_price, forecast = single_day_multivariate_prediction(lstm_model,
                                                             tick='TSLA',
                                                             predict_date=end_date)



# predictions

**The function `predictions` takes the following parameters:**

1. `model`: A pre-trained machine learning model used for making predictions.
2. `tick`: A string representing the ticker symbol of the company's stock.
3. `start_predict`: A string representing the starting date for the dataset.
4. `end_predict`: A string representing the ending date for the dataset.
5. `k`: An integer representing the number of days into the future for which predictions need to be made (default value is 10).

**This function performs the following operations:**

1. **Prepare Datasets**: Calls `create_predict_datasets` to prepare the required datasets for prediction.
2. **Past Predictions**: Uses the model to predict past closing prices based on the test dataset.
3. **Future Predictions**: Predicts the closing prices for the next \( k \) days using the model.
4. **Date Handling**: Generates future dates based on the last known date in the dataset.
5. **Prediction Printing**: Prints each future prediction along with its corresponding date.

In [None]:
def predictions(model, tick, start_predict, end_predict, k=10):

  print(f'Date: {start_predict} to {end_predict}')

  # Preparing Datasets
  predict_df, scaled_data, scaler, x_test, y_test = create_predict_datasets(start_predict, end_predict, tick, k)

  # Actual Prices
  actual_prices = predict_df['Close'].values

  # Past predictions
  past_predictions = model.predict(x_test)
  past_predictions = scaler.inverse_transform(past_predictions)

  # Convert to numpy
  past_predictions = np.array(past_predictions)

  # Placeholder for future predictions of company
  future_days = k
  future_predictions = []

  input_data = x_test[-1]

  # Getting the last known date from the dataset and generating future dates
  last_known_date = datetime.strptime(end_predict, '%Y-%m-%d')
  future_dates = [last_known_date + timedelta(days=i) for i in range(1, future_days + 1)]

  for i in range(future_days):
    pred = model.predict(input_data.reshape(1, -1, x_test.shape[-1]))

    # Inversely scaling
    predicted_price = scaler.inverse_transform(pred)

    future_predictions.append(predicted_price)

    # Print predictions
    current_date = future_dates[i]
    print(f"Date: {current_date.strftime('%Y-%m-%d')}, Predicted Price: {predicted_price}")

    # Updating the last element with the new prediction
    input_data = np.roll(input_data, -1, axis=0)
    input_data[-1] = pred

  # Convert to numpy
  future_predictions = np.array(future_predictions)

  return predict_df, actual_prices, past_predictions, future_predictions


In [None]:
end_date = datetime.now()
start_date = end_date - timedelta(days=730)
predict_df, actual_prices, past_predictions, future_predictions = predictions(model,
                                                                  tick='TSLA',
                                                                  start_predict=start_date.strftime('%Y-%m-%d'),
                                                                  end_predict=end_date.strftime('%Y-%m-%d'),
                                                                  k=10)