In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import warnings
warnings.filterwarnings('ignore')

StockPriceLSTM Model

In [31]:
class StockPriceLSTM:
    def __init__(self, sequence_length=60, test_size=0.2, validation_size=0.2):
        self.sequence_length = sequence_length
        self.test_size = test_size
        self.validation_size = validation_size
        self.scaler = MinMaxScaler(feature_range=(0, 1))
        self.model = None
        self.history = None

    def load_and_preprocess_data(self, file_path):
        df = pd.read_csv(file_path)

        df = df.iloc[2:].reset_index(drop=True)

        df.columns = ['Date', 'Close', 'High', 'Low', 'Open', 'Volume']

        df['Date'] = pd.to_datetime(df['Date'])
        df = df.set_index('Date')

        numeric_columns = ['Close', 'High', 'Low', 'Open', 'Volume']
        for col in numeric_columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')

        df = df.dropna()

        df = df.sort_index()

        print(f"Data loaded successfully!")
        print(f"Shape: {df.shape}")
        print(f"Date range: {df.index.min()} to {df.index.max()}")
        print(f"Sample data:")
        print(df.head())

        return df

    def create_features(self, df):
        data = df.copy()

        data['MA_5'] = data['Close'].rolling(window=5).mean()
        data['MA_10'] = data['Close'].rolling(window=10).mean()
        data['MA_20'] = data['Close'].rolling(window=20).mean()

        delta = data['Close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
        rs = gain / loss
        data['RSI'] = 100 - (100 / (1 + rs))

        data['BB_upper'] = data['MA_20'] + (data['Close'].rolling(window=20).std() * 2)
        data['BB_lower'] = data['MA_20'] - (data['Close'].rolling(window=20).std() * 2)

        data['Price_Change'] = data['Close'].pct_change()

        data['Volume_MA'] = data['Volume'].rolling(window=10).mean()


        data['HL_Spread'] = data['High'] - data['Low']

        data = data.dropna()

        return data

    def prepare_sequences(self, data, target_column='Close'):
        feature_columns = ['Close', 'High', 'Low', 'Open', 'Volume',
                          'MA_5', 'MA_10', 'MA_20', 'RSI', 'Price_Change',
                          'Volume_MA', 'HL_Spread']

        available_features = [col for col in feature_columns if col in data.columns]

        scaled_data = self.scaler.fit_transform(data[available_features])

        X, y = [], []
        for i in range(self.sequence_length, len(scaled_data)):
            X.append(scaled_data[i-self.sequence_length:i])
            target_idx = available_features.index(target_column)
            y.append(scaled_data[i, target_idx])

        return np.array(X), np.array(y), available_features

    def split_data(self, X, y):
        test_size = int(len(X) * self.test_size)
        train_val_size = len(X) - test_size
        val_size = int(train_val_size * self.validation_size)
        train_size = train_val_size - val_size

        X_train = X[:train_size]
        X_val = X[train_size:train_size + val_size]
        X_test = X[train_size + val_size:]

        y_train = y[:train_size]
        y_val = y[train_size:train_size + val_size]
        y_test = y[train_size + val_size:]

        print(f"Training set size: {len(X_train)}")
        print(f"Validation set size: {len(X_val)}")
        print(f"Test set size: {len(X_test)}")

        return X_train, X_val, X_test, y_train, y_val, y_test

    def build_model(self, input_shape):
        model = Sequential([
            LSTM(units=50, return_sequences=True, input_shape=input_shape),
            Dropout(0.2),


            LSTM(units=50, return_sequences=True),
            Dropout(0.2),


            LSTM(units=50, return_sequences=False),
            Dropout(0.2),


            Dense(units=25, activation='relu'),
            Dropout(0.1),
            Dense(units=1)
        ])
        model.compile(
            optimizer=Adam(learning_rate=0.01),
            loss='mse',
            metrics=['mae']
        )

        return model

    def train_model(self, X_train, y_train, X_val, y_val, epochs=100, batch_size=32):

        self.model = self.build_model((X_train.shape[1], X_train.shape[2]))


        early_stopping = EarlyStopping(
            monitor='val_loss',
            patience=15,
            restore_best_weights=True
        )

        reduce_lr = ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=10,
            min_lr=1e-7
        )


        print("Training the model...")
        self.history = self.model.fit(
            X_train, y_train,
            batch_size=batch_size,
            epochs=epochs,
            validation_data=(X_val, y_val),
            callbacks=[early_stopping, reduce_lr],
            verbose=1
        )

    def evaluate_model(self, X_test, y_test):
        predictions = self.model.predict(X_test)

        mse = mean_squared_error(y_test, predictions)
        mae = mean_absolute_error(y_test, predictions)
        rmse = np.sqrt(mse)
        mape = np.mean(np.abs((y_test - predictions.flatten()) / y_test)) * 100

        from sklearn.metrics import r2_score
        r2 = r2_score(y_test, predictions)


        accuracy_from_mape = 100 - mape

        metrics = {
            'MSE': mse,
            'MAE': mae,
            'RMSE': rmse,
            'MAPE (%)': mape,
            'R-squared': r2,
            'Accuracy (100-MAPE) (%)': accuracy_from_mape
        }

        print("\n" + "="*30)
        print("Model Evaluation & Accuracy")
        print("="*30)
        for metric, value in metrics.items():
            if metric == 'R-squared':
                 print(f"{metric}: {value:.4f} ({(value * 100):.2f}%)")
            elif '%' in metric:
                 print(f"{metric}: {value:.2f}")
            else:
                 print(f"{metric}: {value:.4f}")
        print("="*30 + "\n")


        return metrics, predictions

In [40]:
def main():
    lstm_model = StockPriceLSTM(sequence_length=60)

    df = lstm_model.load_and_preprocess_data('/content/AAPL_stock_data.csv')

    df_with_features = lstm_model.create_features(df)

    X, y, feature_names = lstm_model.prepare_sequences(df_with_features)
    print(f"Features used: {feature_names}")

    X_train, X_val, X_test, y_train, y_val, y_test = lstm_model.split_data(X, y)

    lstm_model.train_model(X_train, y_train, X_val, y_val, epochs=50, batch_size=32)


    metrics, predictions = lstm_model.evaluate_model(X_test, y_test)
    return lstm_model, X, y, feature_names

In [41]:
if __name__ == "__main__":
    model, X, y, feature_names = main()

Data loaded successfully!
Shape: (2012, 5)
Date range: 2016-01-04 00:00:00 to 2023-12-29 00:00:00
Sample data:
                Close       High        Low       Open     Volume
Date                                                             
2016-01-04  23.803167  23.807687  23.046256  23.184082  270597600
2016-01-05  23.206671  23.916134  23.138889  23.893540  223164000
2016-01-06  22.752523  23.129851  22.564992  22.720891  273829600
2016-01-07  21.792263  22.623736  21.787745  22.296118  324377600
2016-01-08  21.907492  22.393271  21.862304  22.266743  283192000
Features used: ['Close', 'High', 'Low', 'Open', 'Volume', 'MA_5', 'MA_10', 'MA_20', 'RSI', 'Price_Change', 'Volume_MA', 'HL_Spread']
Training set size: 1238
Validation set size: 309
Test set size: 386
Training the model...
Epoch 1/50
[1m39/39[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 126ms/step - loss: 0.0619 - mae: 0.1528 - val_loss: 0.0371 - val_mae: 0.1830 - learning_rate: 0.0100
Epoch 2/50
[1m39/39[0m [