In [None]:
import ta
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error, confusion_matrix, precision_score, recall_score, f1_score

In [None]:
class StockPredictionAndAnomalyDetection:
    def __init__(self, time_steps=60, anomaly_model='IsolationForest'):
        self.time_steps = time_steps
        self.scaler = MinMaxScaler(feature_range=(0, 1))
        self.lstm_model = None
        self.anomaly_model = anomaly_model
        self.anomaly_detector = None
        self.scaled_data = None

    def mean_absolute_percentage_error(self, y_true, y_pred):
        y_true, y_pred = np.array(y_true), np.array(y_pred)
        non_zero = (y_true != 0)
        return np.mean(np.abs((y_true[non_zero] - y_pred[non_zero]) / y_true[non_zero])) * 100

    def prepare_data(self, df):
        df['SMA_50'] = df['Close'].rolling(window=50).mean()
        df['EMA_20'] = df['Close'].ewm(span=20, adjust=False).mean()
        df['RSI'] = ta.momentum.rsi(df['Close'], window=14)
        df['MACD'] = ta.trend.macd(df['Close'])
        df['Bollinger_High'] = ta.volatility.bollinger_hband(df['Close'])
        df['Bollinger_Low'] = ta.volatility.bollinger_lband(df['Close'])
        df['ATR'] = ta.volatility.average_true_range(df['High'], df['Low'], df['Close'])
        df['Stochastic'] = ta.momentum.stoch(df['High'], df['Low'], df['Close'])
        df['OBV'] = ta.volume.on_balance_volume(df['Close'], df['Volume'])
        df['Volume_Change'] = df['Volume'].pct_change()
        df['Price_Change'] = df['Close'].pct_change()
        df.fillna(method='bfill', inplace=True)
        self.scaled_data = self.scaler.fit_transform(df[['Close', 'SMA_50', 'EMA_20', 'RSI', 'MACD', 'Bollinger_High', 'Bollinger_Low', 'ATR', 'Stochastic', 'OBV', 'Volume_Change', 'Price_Change']])
        return df

    def create_lstm_data(self):
        X, y = [], []
        for i in range(self.time_steps, len(self.scaled_data)):
            X.append(self.scaled_data[i-self.time_steps:i])
            y.append(self.scaled_data[i, 0])  # Close price as the target
        return np.array(X), np.array(y)

    def build_lstm_model(self):
        model = Sequential([
            LSTM(units=100, return_sequences=True, input_shape=(self.time_steps, self.scaled_data.shape[1])),
            Dropout(0.1),
            LSTM(units=75, return_sequences=True),
            Dropout(0.1),
            LSTM(units=50, return_sequences=False),
            Dropout(0.1),
            Dense(units=25),
            Dense(units=1)
        ])
        model.compile(optimizer='adam', loss='mean_squared_error')
        return model

    def train_lstm_model(self, X_train, y_train, X_val, y_val):
        lstm_checkpoint = ModelCheckpoint('combinedModel_v3.keras', save_best_only=True, monitor='val_loss', mode='min')
        lstm_early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
        history = self.lstm_model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_val, y_val), callbacks=[lstm_checkpoint, lstm_early_stop])
        return history

    def evaluate_model(self, y_true, y_pred, dataset_type='Validation'):
        # Calculate different evaluation metrics
        mse = mean_squared_error(y_true, y_pred)
        rmse = np.sqrt(mse)
        r2 = r2_score(y_true, y_pred)
        mae = mean_absolute_error(y_true, y_pred)
        
        print(f'{dataset_type} MSE: {mse:.2f}')
        print(f'{dataset_type} RMSE: {rmse:.2f}')
        print(f'{dataset_type} R² Score: {r2:.2f}')
        print(f'{dataset_type} MAE: {mae:.2f}')

    def evaluate_anomaly_detection(self, y_true_anomalies, y_pred_anomalies):
        # Calculate confusion matrix
        cm = confusion_matrix(y_true_anomalies, y_pred_anomalies)
        precision = precision_score(y_true_anomalies, y_pred_anomalies, pos_label=-1)
        recall = recall_score(y_true_anomalies, y_pred_anomalies, pos_label=-1)
        f1 = f1_score(y_true_anomalies, y_pred_anomalies, pos_label=-1)
        
        print(f'Confusion Matrix:\n{cm}')
        print(f'Precision: {precision:.2f}')
        print(f'Recall: {recall:.2f}')
        print(f'F1 Score: {f1:.2f}')

    def train_combined_model(self, df):
        df = self.prepare_data(df)
        X, y = self.create_lstm_data()
        X_train, X_val, y_train, y_val = X[:-200], X[-200:], y[:-200], y[-200:]
        self.lstm_model = self.build_lstm_model()
        self.train_lstm_model(X_train, y_train, X_val, y_val)
        
        # Calculate different metrics for training and validation sets
        train_predictions = self.lstm_model.predict(X_train)
        val_predictions = self.lstm_model.predict(X_val)
        
        train_predictions = self.scaler.inverse_transform(np.concatenate([train_predictions, np.zeros((train_predictions.shape[0], self.scaled_data.shape[1] - 1))], axis=1))[:, 0]
        val_predictions = self.scaler.inverse_transform(np.concatenate([val_predictions, np.zeros((val_predictions.shape[0], self.scaled_data.shape[1] - 1))], axis=1))[:, 0]
        
        y_train_actual = self.scaler.inverse_transform(np.concatenate([y_train.reshape(-1, 1), np.zeros((y_train.shape[0], self.scaled_data.shape[1] - 1))], axis=1))[:, 0]
        y_val_actual = self.scaler.inverse_transform(np.concatenate([y_val.reshape(-1, 1), np.zeros((y_val.shape[0], self.scaled_data.shape[1] - 1))], axis=1))[:, 0]
        
        self.evaluate_model(y_train_actual, train_predictions, dataset_type='Training')
        self.evaluate_model(y_val_actual, val_predictions, dataset_type='Validation')
        
        residuals = y_val_actual - val_predictions
        
        if self.anomaly_model == 'IsolationForest':
            self.anomaly_detector = IsolationForest(contamination=0.05, random_state=42)
        elif self.anomaly_model == 'OneClassSVM':
            self.anomaly_detector = OneClassSVM(kernel='rbf', gamma='scale', nu=0.05)
        elif self.anomaly_model == 'LocalOutlierFactor':
            self.anomaly_detector = LocalOutlierFactor(novelty=True, contamination=0.05)
        else:
            raise ValueError("Unsupported anomaly detection model.")
        
        self.anomaly_detector.fit(residuals.reshape(-1, 1))

    def predict_future(self, days_to_predict):
        last_sequence = self.scaled_data[-self.time_steps:]
        future_predictions = []

        for _ in range(days_to_predict):
            next_pred = self.lstm_model.predict(last_sequence.reshape(1, self.time_steps, -1))
            future_predictions.append(next_pred[0])
            last_sequence = np.roll(last_sequence, -1, axis=0)
            last_sequence[-1] = next_pred

        future_predictions = np.array(future_predictions)
        future_predictions = self.scaler.inverse_transform(np.concatenate([future_predictions, np.zeros((future_predictions.shape[0], self.scaled_data.shape[1] - 1))], axis=1))[:, 0]
        return future_predictions

    def detect_anomalies(self, df, future_predictions=None):
        df = self.prepare_data(df)
        X, y = self.create_lstm_data()
        predicted_prices = self.lstm_model.predict(X)
        predicted_prices = self.scaler.inverse_transform(np.concatenate([predicted_prices, np.zeros((predicted_prices.shape[0], self.scaled_data.shape[1] - 1))], axis=1))[:, 0]
        actual_prices = self.scaler.inverse_transform(np.concatenate([y.reshape(-1, 1), np.zeros((y.shape[0], self.scaled_data.shape[1] - 1))], axis=1))[:, 0]
        
        # Calculate MAPE for the entire dataset
        overall_mape = self.mean_absolute_percentage_error(actual_prices, predicted_prices)
        print(f"Overall MAPE: {overall_mape:.2f}%")
        
        self.evaluate_model(actual_prices, predicted_prices, dataset_type='Overall')
        
        residuals = actual_prices - predicted_prices
        anomalies = self.anomaly_detector.predict(residuals.reshape(-1, 1))

        if future_predictions is not None:
            future_residuals = np.zeros(len(future_predictions))
            future_anomalies = self.anomaly_detector.predict(future_residuals.reshape(-1, 1))
            return actual_prices, predicted_prices, anomalies, future_predictions, future_anomalies

        return actual_prices, predicted_prices, anomalies
    
    def visualize_predictions_and_anomalies(self, df, future_days=21):
        actual_prices, predicted_prices, anomalies, future_predictions, future_anomalies = self.detect_anomalies(df, self.predict_future(future_days))
        
        overall_mape = self.mean_absolute_percentage_error(actual_prices, predicted_prices)
        
        # Evaluate anomaly detection
        self.evaluate_anomaly_detection(anomalies, anomalies)
        
        plt.figure(figsize=(16, 6))
        plt.plot(df.index[-len(predicted_prices):], actual_prices, color='blue', label='Actual Stock Price')
        plt.plot(df.index[-len(predicted_prices):], predicted_prices, color='red', label='Predicted Stock Price')
        plt.scatter(df.index[-len(predicted_prices):][anomalies == -1], actual_prices[anomalies == -1], color='green', label='Detected Manipulation')

        future_dates = pd.date_range(start=df.index[-1] + pd.Timedelta(days=1), periods=future_days)
        plt.plot(future_dates, future_predictions, color='orange', label='Future Predictions')
        plt.scatter(future_dates[future_anomalies == -1], future_predictions[future_anomalies == -1], color='purple', label='Future Anomalies')

        plt.title(f'Stock Price Prediction with Anomalies and Future Forecast (MAPE: {overall_mape:.2f}%)')
        plt.xlabel('Time')
        plt.ylabel('Stock Price')
        plt.legend()
        plt.show()

        print(f"Number of historical anomalies detected: {np.sum(anomalies == -1)}")
        print(f"Percentage of historical data points marked as anomalies: {100 * np.mean(anomalies == -1):.2f}%")
        print(f"Number of potential future anomalies detected: {np.sum(future_anomalies == -1)}")
        print(f"Percentage of future predictions marked as potential anomalies: {100 * np.mean(future_anomalies == -1):.2f}%")

    def create_combined_plot(self, df, future_days=21):
        actual_prices, predicted_prices, anomalies, future_predictions, future_anomalies = self.detect_anomalies(df, self.predict_future(future_days))
        
        plt.figure(figsize=(16, 6))
        
        # Plot actual stock price
        plt.plot(df.index, df['Close'], color='blue', label='Actual Stock Price')
        
        # Plot historical anomalies
        plt.scatter(df.index[-len(predicted_prices):][anomalies == -1], 
                    df['Close'].values[-len(predicted_prices):][anomalies == -1], 
                    color='red', label='Historical Anomalies')
        
        # Plot future predictions
        future_dates = pd.date_range(start=df.index[-1] + pd.Timedelta(days=1), periods=future_days)
        plt.plot(future_dates, future_predictions, color='orange', label='Future Predictions')
        
        plt.title('Stock Price with Historical Anomalies and Future Predictions')
        plt.xlabel('Time')
        plt.ylabel('Stock Price')
        plt.legend()
        plt.show()

        print(f"Number of historical anomalies detected: {np.sum(anomalies == -1)}")
        print(f"Percentage of historical data points marked as anomalies: {100 * np.mean(anomalies == -1):.2f}%")
        print(f"Future prediction period: {future_days} days")       

In [None]:
ticker = 'GME'
stockPrices = yf.download(ticker, '1981-01-01', '2024-07-28')
stockPrices.columns = stockPrices.columns.str.strip()

## EDA

In [None]:
stockPrices.isnull()

In [None]:
stockPrices.info()

In [None]:
stockPrices.dtypes

In [None]:
stockPrices.shape

In [None]:
stockPrices.size

In [None]:
stockPrices.describe()

In [None]:
stockPrices.isna()

In [None]:
# Train and evaluate the combined model
model = StockPredictionAndAnomalyDetection(anomaly_model='IsolationForest')
model.train_combined_model(stockPrices)

In [None]:
model.visualize_predictions_and_anomalies(stockPrices, future_days=21)

In [None]:
model.create_combined_plot(stockPrices, future_days=21)