# Stock Price Direction Prediction using LSTM

In [None]:
# -*- coding: utf-8 -*-
"""Stock Price Direction Prediction using LSTM"""

## Install required packages if not already installed

In [None]:
!pip install yfinance pandas_ta keras-self-attention imbalanced-learn shap seaborn


## Import necessary libraries

In [None]:
# Import necessary libraries
import yfinance as yf
import pandas as pd
import pandas_ta as ta
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
from sklearn.utils import class_weight
from imblearn.over_sampling import SMOTE
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from keras_self_attention import SeqSelfAttention
import tensorflow as tf
from sklearn.decomposition import PCA
import csv
import warnings

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# Confirm that all required libraries are successfully imported
print("All necessary libraries imported successfully.")


## Step 1: Data Collection

In [None]:
# Define the stock ticker symbol and period
ticker = 'AAPL'  # You can change this to any ticker symbol
data = yf.download(ticker, start='2020-01-01', end='2023-01-01')

# Save data to a CSV file
data.to_csv('AAPL_historical_data.csv')

## Step 2: Data Preparation

In [None]:
# Input CSV file path
input_file = "AAPL_historical_data.csv"

# Output CSV file path
output_file = "AAPL_historical_data_output.csv"

# Read the CSV file, modify it, and save to a new file
with open(input_file, 'r') as file:
    reader = list(csv.reader(file))

    # Modify the first row, first column
    reader[0][0] = reader[0][0].replace("Price", "Date")

    # Remove 2nd and 3rd rows (index 1 and 2)
    modified_data = [row for i, row in enumerate(reader) if i not in (1, 2)]

# Write the modified data to a new file
with open(output_file, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(modified_data)

print("CSV file has been modified and saved as:", output_file)


## Step 3: Feature Engineering

In [None]:
# Load the CSV file directly
data = pd.read_csv('AAPL_historical_data_output.csv', parse_dates=['Date'], index_col='Date')

# Calculate additional advanced technical indicators

# Existing Indicators
data['SMA'] = ta.sma(data['Close'], length=20)
data['EMA'] = ta.ema(data['Close'], length=20)
data['RSI'] = ta.rsi(data['Close'], length=14)

# MACD
macd = ta.macd(data['Close'])
if macd is not None:
    data = data.join(macd)
else:
    print("MACD calculation returned None. Please check your data.")

# Bollinger Bands
bbands = ta.bbands(data['Close'], length=20)
if bbands is not None:
    data = data.join(bbands)
else:
    print("Bollinger Bands calculation returned None. Please check your data.")

# Stochastic Oscillator
stoch = ta.stoch(data['High'], data['Low'], data['Close'])
if stoch is not None:
    data = data.join(stoch)
else:
    print("Stochastic Oscillator calculation returned None. Please check your data.")

# Williams %R
data['WILLIAMS_R'] = ta.willr(data['High'], data['Low'], data['Close'], length=14)

# Chaikin Money Flow (CMF)
data['CMF'] = ta.cmf(data['High'], data['Low'], data['Close'], data['Volume'], length=20)

# Additional Technical Indicators
# Average Directional Index (ADX)
adx = ta.adx(data['High'], data['Low'], data['Close'], length=14)
if adx is not None:
    data = data.join(adx[['ADX_14']])
else:
    print("ADX calculation returned None. Please check your data.")

# Commodity Channel Index (CCI)
data['CCI'] = ta.cci(data['High'], data['Low'], data['Close'], length=20)

# On-Balance Volume (OBV)
data['OBV'] = ta.obv(data['Close'], data['Volume'])

# Money Flow Index (MFI)
mfi = ta.mfi(data['High'], data['Low'], data['Close'], data['Volume'], length=14)
if mfi is not None:
    data['MFI'] = mfi.astype(float)  # Ensure MFI is float to avoid dtype warnings
else:
    print("MFI calculation returned None. Please check your data.")

# Time-based Features
data['DayOfWeek'] = data.index.dayofweek
data['Month'] = data.index.month
data['Quarter'] = data.index.quarter

# Encode Cyclical Features
data['DayOfWeek_Sin'] = np.sin(2 * np.pi * data['DayOfWeek'] / 6)
data['DayOfWeek_Cos'] = np.cos(2 * np.pi * data['DayOfWeek'] / 6)
data['Month_Sin'] = np.sin(2 * np.pi * data['Month'] / 12)
data['Month_Cos'] = np.cos(2 * np.pi * data['Month'] / 12)

# Drop rows with NaN values resulting from indicator calculations
data.dropna(inplace=True)

## Step 5: Analyze and Remove Outliers

In [None]:
# Identify outliers using Z-score
z_scores = np.abs(stats.zscore(data['Close']))
outliers = data[z_scores > 3]
print("Outliers detected:\n", outliers)

# Handling Outliers: Remove rows with outliers
data = data[z_scores <= 3]

## Step 6: Data Preprocessing

In [None]:
# Handle missing values (if any remain after prior steps)
data.fillna(method='ffill', inplace=True)
data.dropna(inplace=True)


## Step 6: Modify the Target Variable (Classification)

In [None]:
# For a 1-week ahead prediction (7 days)
data['Future_Close'] = data['Close'].shift(-7)
data['Target'] = (data['Future_Close'] > data['Close']).astype(int)
data.dropna(inplace=True)  # Remove rows with NaN values


## Step 7: Prepare Features and Scale Data

In [None]:
features = data.drop(['Future_Close', 'Target'], axis=1)
target = data['Target']

# Scale features
scaler = MinMaxScaler()
scaled_features = scaler.fit_transform(features)
scaled_features = pd.DataFrame(scaled_features, index=features.index, columns=features.columns)


## Step 8: Feature Selection


In [None]:
# Correlation Analysis
corr_matrix = scaled_features.corr()
plt.figure(figsize=(16, 12))
sns.heatmap(corr_matrix, annot=False, cmap='coolwarm')
plt.title('Feature Correlation Matrix')
plt.show()

# Do not drop any features due to high correlation
print("No features are being dropped due to high correlation.")


## Step 9: Dimensionality Reduction with PCA

In [None]:
from sklearn.decomposition import PCA

# Apply PCA to reduce dimensionality while retaining 95% variance
pca = PCA(n_components=0.95, random_state=42)
scaled_features_pca = pca.fit_transform(scaled_features)
print(f"Original number of features: {scaled_features.shape[1]}")
print(f"Reduced number of features after PCA: {scaled_features_pca.shape[1]}")


## Step 10: Create Sequences for LSTM Input

In [None]:
def create_sequences(features, target, seq_length):
    X = []
    y = []
    for i in range(len(features) - seq_length):
        X.append(features[i:i + seq_length])
        y.append(target.iloc[i + seq_length])
    return np.array(X), np.array(y)

sequence_length = 50  # Adjust as needed

# Create sequences using PCA-transformed features
X, y = create_sequences(scaled_features_pca, target, sequence_length)


## Step 11: Split the Data into Training and Testing Sets

In [None]:
train_size = int(len(X) * 0.8)  # 80% for training, 20% for testing
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]


## Step 12:  Handle Class Imbalance

In [None]:
# Use SMOTE to oversample the minority class
nsamples, nx = X_train.shape[0], X_train.shape[2]
X_train_reshaped = X_train.reshape((nsamples, sequence_length * nx))

sm = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = sm.fit_resample(X_train_reshaped, y_train)

# Reshape back to original shape
X_train_resampled = X_train_resampled.reshape((X_train_resampled.shape[0], sequence_length, nx))


## Step 13: Build and Compile the LSTM Model with Bidirectional LSTM and Attention

In [None]:
model = Sequential()
model.add(Bidirectional(LSTM(128, return_sequences=True), input_shape=(sequence_length, nx)))
model.add(SeqSelfAttention(attention_activation='sigmoid'))
model.add(Dropout(0.2))
model.add(Bidirectional(LSTM(64)))
model.add(Dropout(0.2))
model.add(Dense(16, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001)))
model.add(Dense(1, activation='sigmoid'))

# Compile the Model with Focal Loss
def focal_loss(gamma=2., alpha=.25):
    def focal_loss_fixed(y_true, y_pred):
        epsilon = 1e-7
        y_pred = tf.keras.backend.clip(y_pred, epsilon, 1. - epsilon)
        pt = tf.where(tf.equal(y_true, 1), y_pred, 1 - y_pred)
        loss = -alpha * tf.math.pow(1. - pt, gamma) * tf.math.log(pt)
        return tf.reduce_mean(loss)
    return focal_loss_fixed

optimizer = Adam(learning_rate=0.0005)
model.compile(optimizer=optimizer, loss=focal_loss(gamma=2., alpha=.25), metrics=['accuracy'])


## Step 14: Train the Model

In [None]:
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

history = model.fit(
    X_train_resampled, y_train_resampled,
    epochs=100,
    batch_size=32,
    validation_split=0.1,
    callbacks=[early_stopping],
    verbose=1
)


## Step 15: Evaluate the Model

In [None]:
# Make Predictions on Test Data
y_pred_prob = model.predict(X_test)

# Adjusting the Classification Threshold using ROC Curve
fpr, tpr, thresholds = roc_curve(y_test, y_pred_prob)
roc_auc = auc(fpr, tpr)

# Find the optimal threshold (Youden's J statistic)
optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds[optimal_idx]
print(f"Optimal Threshold: {optimal_threshold}")

# Use optimal threshold
y_pred = (y_pred_prob >= optimal_threshold).astype(int).flatten()

print(classification_report(y_test, y_pred))
cm = confusion_matrix(y_test, y_pred)
print('Confusion Matrix:\n', cm)

# Plot ROC Curve
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.scatter(fpr[optimal_idx], tpr[optimal_idx], marker='o', color='red', label='Optimal Threshold')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend()
plt.show()

# Visualize Training History
# Accuracy Plot
plt.figure(figsize=(14, 5))
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy During Training')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# Loss Plot
plt.figure(figsize=(14, 5))
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss During Training')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

