In [1]:
# !pip install ta --quiet 

In [None]:
from datetime import datetime
import tensorflow as tf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from matplotlib import pyplot
from joblib import dump, load
from pickle import load


from keras import models
import keras 
from keras.models import Sequential
from keras.layers import Dense

import yfinance as yf

from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.preprocessing import MinMaxScaler

from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, explained_variance_score


from pandas_datareader.data import DataReader
from pandas_datareader import data as pdr
import pandas_ta as ta



# Load Data

In [None]:
def fetch_stock_data(ticker, start_date, end_date, interval):
    """
    Fetch stock data for a given ticker, start date, and end date.
    
    :param ticker: Stock ticker symbol (e.g., 'AAPL' for Apple Inc.)
    :param start_date: Start date in the format 'YYYY-MM-DD'.
    :param end_date: End date in the format 'YYYY-MM-DD'.
    
    :return: DataFrame containing stock data for the given ticker and dates.
    """
    stock_data = yf.download(ticker, start=start_date, end=end_date, interval=interval)
    return stock_data

In [None]:
ticker = "AAPL"
start_date = "2012-01-01"
end_date = datetime.now().strftime('%Y-%m-%d')

aapl_data = fetch_stock_data(ticker, start_date, end_date, interval = "1d")
aapl_data.reset_index(inplace=True)

## Calculating Technical Indicators

In [None]:
aapl_data

# Univariate Analysis

### Summary Statistics

In [None]:
summary_stat_df = aapl_data.copy()

In [None]:
summary_stat_df1 = summary_stat_df.iloc[:,1:7]
summary_stat_df1

summary_stat = summary_stat_df1.describe()
summary_stat

### Histogram

Visualize the distribution and identify skewness or kurtosis in the data.

In [None]:
plt.figure(figsize=(10, 6))

# Plotting all the histograms together
for column, color in zip(['Open', 'High', 'Low', 'Close'], ['blue', 'green', 'red', 'purple']):
    plt.hist(aapl_data[column], bins=50, alpha=0.5, label=column, color=color)

plt.title('Price Distributions')
plt.xlabel('Price')
plt.ylabel('Frequency')
plt.legend()
plt.show()

In [None]:
# SMA Moving Average
sma_period = 200 

The 200-day simple moving average (SMA) is considered a key indicator by traders and market analysts for determining overall long-term market trends. It is calculated by plotting the average price over the past 200 days, along with the daily price chart and other moving averages.


In [None]:
def calculate_rsi(series, period=14):
    delta = series.diff()
    gain = (delta.where(delta > 0, 0)).fillna(0)
    loss = (-delta.where(delta < 0, 0)).fillna(0)

    average_gain = gain.rolling(window=period).mean()
    average_loss = loss.rolling(window=period).mean()

    rs = average_gain / average_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

In [None]:
def get_indicator(aapl_data):
    #Simple Moving Average (SMA)
    aapl_data["SMA-200"] = aapl_data.iloc[:,4].rolling(sma_period).mean()
    
    # Exponential Moving Average (EMA)
    aapl_data["EMA-12"] = aapl_data.iloc[:,4].ewm(span=12, adjust=False).mean()
    aapl_data["EMA-26"] = aapl_data.iloc[:,4].ewm(span=26, adjust=False).mean()
    
   #Average Convergence Divergence (MACD)
    aapl_data['MACD'] = aapl_data["EMA-12"] - aapl_data["EMA-26"]
    aapl_data['MACD_Signal'] = aapl_data['MACD'].ewm(span=9, adjust=False).mean()
    
    aapl_data["RSI_14"] = calculate_rsi(aapl_data['Close'], 14)
    
    return aapl_data

In [None]:
appl_data = get_indicator(aapl_data)

In [None]:
aapl_sma = px.line(appl_data, x="Date", y=["Close", "SMA-200", "EMA-12", "EMA-26"], title='Indicators')
display(aapl_sma)

In [None]:
import seaborn as sns

correlation_matrix = aapl_data.corr()

plt.figure(figsize=(12, 8))
heatmap = sns.heatmap(correlation_matrix, annot=True, fmt=".2f", cmap="coolwarm")
plt.title('Correlation Matrix Heatmap')
plt.show()

correlation_matrix

In [None]:
# checking the missing values
aapl_data.isnull().sum()

In [None]:
# drop rows with null values in the indicators columns
aapl_data.dropna(subset=['SMA-200', 'EMA-12', 'EMA-26', 'MACD', 'MACD_Signal', 'RSI_14'], inplace=True)

In [None]:
aapl_data.to_csv("Stock_data.csv", index=False)

## Data Preprocessing

If we use an 80/20 train/test split, we can determine the split data for training dataset from `2012-10-16 to 2021-09-30` and for testing dataset `2021-10-01 to 2023-11-08`.

In [None]:
# insert the data
aapl_data = pd.read_csv("Stock_data.csv")

In [None]:
aapl_data.drop(columns=["Date","Adj Close"],inplace=True)

In [None]:
aapl_data.columns

In [None]:
split_point = int(len(aapl_data)*0.8)

In [None]:
feature_columns = [col for col in aapl_data.columns if col != "Close"]

In [None]:
def create_windowed_data(data, n_steps):
    X, y = list(), list()
    for i in range(len(data) - n_steps):
        seq_x, seq_y = data[i:i+n_steps][feature_columns], data.iloc[i+n_steps]['Close']
        X.append(seq_x.values)
        y.append(seq_y)
    return np.array(X), np.array(y)

In [None]:
def data_split(data, n_steps, test_size):
    # Create the windowed data
    X, y = create_windowed_data(data, n_steps)
    
    # Calculate the index for the split point
    test_samples = int(len(X) * test_size)
    split_point = len(X) - test_samples

    # Split the data
    X_train, X_test = X[:split_point], X[split_point:]
    y_train, y_test = y[:split_point], y[split_point:]
    
    return X_train, X_test, y_train, y_test

In [None]:
n_steps = 10
test_size = 0.2
X_train, X_test, y_train, y_test = data_split(aapl_data, n_steps, test_size)

In [None]:
X_train.shape

### Scaling the input features

- [ ] Both X_train and X_test should be scaled. LSTM and other gradient-based algorithms used to transform both training and test data. 

- [ ] The scaler should be fitted only on the training data to avoid data leakage and then used to transform both the training data and the test data.

# Modeling 
## Baseline GRU Model 

In [None]:
from tensorflow.keras.layers import GRU, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping

### Build GRU Model

### Train the Model

In [None]:
class StockPricePredictorGRU:
    def __init__(self, n_steps, n_features):
        self.n_steps = n_steps
        self.n_features = n_features
        self.scaler = MinMaxScaler(feature_range=(0, 1))
        self.y_scaler = MinMaxScaler(feature_range=(0, 1))
        self.model = self.build_model()

    def build_model(self):
        model = Sequential()
        model.add(GRU(units=50, return_sequences=True, input_shape=(self.n_steps, self.n_features), activation='tanh'))
        model.add(Dropout(0.2))
        model.add(GRU(units=50, activation='tanh'))
        model.add(Dropout(0.2))
        model.add(Dense(units=1))
        model.compile(optimizer='adam', loss='mean_squared_error')
        return model

    def fit(self, X, y, epochs=100, batch_size=32, validation_split=0.1):
        # Reshape from 3D to 2D
        nsamples, nx, ny = X.shape
        X_2D = X.reshape((nsamples, nx*ny))

        # Scale the data
        X_scaled_2D = self.scaler.fit_transform(X_2D)

        # Reshape back to 3D
        X_scaled = X_scaled_2D.reshape((nsamples, nx, ny))

        # Scale the target
        y_scaled = self.y_scaler.fit_transform(y.reshape(-1, 1))

        # Fit the model
        self.history = self.model.fit(
            X_scaled, y_scaled,
            epochs=epochs,
            batch_size=batch_size,
            validation_split=validation_split,
            callbacks=[EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)]
        )


    def plot_loss(self):
        plt.figure(figsize=(10, 6))
        plt.plot(self.history.history['loss'], 'bo-', label='Training loss')
        plt.plot(self.history.history['val_loss'], 'ro-', label='Validation loss')
        plt.title('Training and Validation Loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()
        plt.show()

    def predict(self, X):
        # Reshape from 3D to 2D
        nsamples, nx, ny = X.shape
        X_2D = X.reshape((nsamples, nx*ny))
        X_scaled_2D = self.scaler.transform(X_2D)
        X_scaled = X_scaled_2D.reshape((nsamples, nx, ny))
        y_pred_scaled = self.model.predict(X_scaled)
        y_pred = self.y_scaler.inverse_transform(y_pred_scaled)
        return y_pred

    def plot_predictions(self, y_true, y_pred):
        plt.figure(figsize=(10, 6))
        plt.plot(y_true, color='blue', label='Actual Stock Price')
        plt.plot(y_pred, color='red', linestyle='--', label='Predicted Stock Price')
        plt.title('Stock Price Prediction')
        plt.xlabel('Time')
        plt.ylabel('Stock Price')
        plt.legend()
        plt.show()
        
    def evaluation(self, y_true, y_pred):
        mae = mean_absolute_error(y_true, y_pred)
        mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
        r2 = r2_score(y_true, y_pred)
        rmspe = np.sqrt(np.mean(np.square((y_true - y_pred) / y_true))) * 100
        rmsde = np.sqrt(mean_squared_error(y_true, y_pred))
        
        metrics = {'MAE': mae, 'MAPE': mape, 'R2': r2, 'RMSPE': rmspe, 'RMSDE': rmsde}
        for metric, value in metrics.items():
            print(f"{metric}: {value:.2f}")
        
        return metrics
    
        
    def save_scaler(self, scaler_path, y_scaler_path):
        dump(self.scaler, scaler_path)
        dump(self.y_scaler, y_scaler_path)
    
    def save_model(self, file_path):
        self.model.save(file_path)
        
n_steps = 10
n_features = 10

predictor = StockPricePredictorGRU(n_steps=n_steps, n_features=n_features)
predictor.fit(X_train, y_train, epochs=100, batch_size=32)
predictor.plot_loss()
y_pred = predictor.predict(X_test)

predictor.plot_predictions(y_test, y_pred)

metrics = predictor.evaluation(y_test, y_pred)

predictor.save_model('Models/gru_model.h5')
predictor.save_scaler('Scaler/X_scaler.joblib', 'Scaler/y_scaler.joblib')


In [None]:
from joblib import load as joblib_load

GRU_model = models.load_model('Models/gru_model.h5')
X_scaler = open('Scaler/X_scaler.joblib', 'rb')
y_scaler = open('Scaler/y_scaler.joblib', 'rb')

X_scaler = joblib_load(X_scaler)
y_scaler = joblib_load(y_scaler)

In [None]:
def predict_next_days(model, X_scaler, y_scaler, data, n_days, n_steps=10, n_features=10):
    predictions = []
    feature_columns = ['Open', 'High', 'Low', 'Volume', 'SMA-200', 'EMA-12', 'EMA-26', 'MACD', 
                       'MACD_Signal', 'RSI_14']

    data_copy = data.copy()

    for _ in range(n_days):
        input_data = data_copy[feature_columns].tail(n_steps)

        # Flatten the data and reshape for the scaler to (1, n_steps * n_features)
        input_data_flattened = input_data.values.flatten().reshape(1, n_steps * n_features)

        # Scale the flattened data
        input_data_scaled = X_scaler.transform(input_data_flattened)

        # Reshape the scaled data back to 3D format for the model: (1, n_steps, n_features)
        input_data_reshaped = input_data_scaled.reshape(1, n_steps, n_features)

        predicted_price_scaled = model.predict(input_data_reshaped)

        predicted_price = y_scaler.inverse_transform(predicted_price_scaled)[0, 0]

        predictions.append(predicted_price)

        next_row = input_data.iloc[-1].copy()
        for col in feature_columns:
            next_row[col] = input_data[col].mean()  
        data_copy = pd.concat([data_copy, next_row], ignore_index=True)

    return predictions


n_days = 1  # Number of days to predict
predicted_prices = predict_next_days(GRU_model, X_scaler, y_scaler, aapl_data, n_days)
predicted_prices
