<a href="https://colab.research.google.com/github/wewerthonc/stock_prediction_gans/blob/main/stock_market_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [135]:
from google.colab import drive
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns
import tensorflow as tf
import tensorflow.keras.layers as layers

In [136]:
drive.mount('/content/MyDrive')
PATH_BASE = '/content/MyDrive/MyDrive/Datasets/'

#Import and Format Data

In [137]:
# Read CSV file
stock_df = pd.read_csv(PATH_BASE + 'dados_petroleo_2021.csv', usecols = ['Close', 'Date'], index_col = ['Date'])

In [138]:
#remove pandemic values

# Select the dates that are not within the specified interval
stock_df = stock_df.loc[~stock_df.index.isin(pd.date_range('2020-02-14', '2020-06-01'))]

In [139]:
stock_df.info()

In [140]:
stock_df.describe()

Unnamed: 0,Close
count,1738.0
mean,18.925161
std,7.324068
min,4.2
25%,13.06
50%,18.965
75%,26.1
max,31.120001


In [141]:
from typing import Tuple

def train_validation_split(dataset: pd.DataFrame, test_size: float = 0.2) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Split the dataset into training and validation sets.

    Args:
        dataset (pd.DataFrame): The dataset to be split.
        test_size (float, optional): The proportion of the dataset to be used for validation. Defaults to 0.2.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]: The training and validation sets.
    """
    # Calculate the size of the dataset
    df_size = len(dataset)

    # Determine the number of rows for the training set based on the test_size proportion
    slices = int(df_size * (1 - test_size))

    # Split the dataset into training and validation sets
    train = dataset.iloc[:slices]
    validation = dataset.iloc[slices:]

    # Return the training and validation sets
    return train, validation

In [142]:
from sklearn.preprocessing import MinMaxScaler

class DataNormalizer:
    def __init__(self):
        self.scaler = MinMaxScaler()

    def fit_scaler(self, data: pd.DataFrame):
        """
        Fit the scaler on the data.

        Parameters:
            data (numpy.ndarray or pandas.DataFrame): The input data for fitting the scaler.
        """
        self.scaler.fit(data)

    def normalize_data(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Normalize the data using the fitted scaler.

        Parameters:
            data (numpy.ndarray or pandas.DataFrame): The input data to be normalized.

        Returns:
            pandas.DataFrame: The normalized data.
        """
        # Store the original index and column names
        index = data.index
        columns = data.columns

        # Transform the data using the scaler
        normalized_data = self.scaler.transform(data)

        # Create a DataFrame with the normalized data, using the original index and column names
        normalized_df = pd.DataFrame(normalized_data, index=index, columns=columns)

        return normalized_df

    def inverse_normalize_data(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Inverse normalize the data using the fitted scaler.

        Parameters:
            data (numpy.ndarray): The normalized data to be inverse normalized.

        Returns:
            pandas.DataFrame: The inverse normalized data with the same index and column names as the input data.
        """
        # Store the original index and column names
        index = data.index
        columns = data.columns

        # Inverse transform the data using the scaler
        inverse_normalized_data = self.scaler.inverse_transform(data)

        # Create a DataFrame with the inverse normalized data, using the original index and column names
        inverse_normalized_df = pd.DataFrame(inverse_normalized_data, index=index, columns=columns)

        return inverse_normalized_df

In [143]:
def get_formatted_dataset(stand_base: pd.DataFrame, lag: int) -> pd.DataFrame:
    """
    Formats the dataset by creating lagged samples.

    Args:
        stand_base (pd.DataFrame): Input standardized base dataset.
        lag (int): Number of lagged samples.

    Returns:
        pd.DataFrame: Formatted dataset with lagged samples.
    """
    len_samples = len(stand_base.index)

    # Create resulting dataset
    formatted_dataset = pd.DataFrame(np.zeros((len_samples - lag * 2 + 1, lag * 2)))

    # Set column labels
    formatted_dataset.columns = np.arange(0, lag * 2)

    for _ in range(0, len_samples - lag * 2 + 1):
      # Select the subset of data for the current lagged sample
      subset = stand_base.iloc[_: _ + lag * 2, 0]
      
      # Assign the subset values to the corresponding row in the formatted dataset
      formatted_dataset.iloc[_, 0 : lag * 2] = subset

    return formatted_dataset

# Define GAN

In [144]:
from datetime import timedelta

class GAN:
    def __init__(self, latent_dim: int, lag: int):
        """
        Initializes the GAN model.

        Args:
            latent_dim (int): Dimensionality of the latent space.
            lag (int): Lag value used to calculate the output dimension.
        """
        self.latent_dim = latent_dim
        self.output_dim = 2 * lag - latent_dim
        self.generator_optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.0001)
        self.discriminator_optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.0001)
        self.generator = self.build_generator()
        self.discriminator = self.build_discriminator()
        self.model = self.build_model()

    def build_generator(self) -> tf.keras.Model:
        """
        Builds the generator model.

        Returns:
            tf.keras.Model: Generator model.
        """
        generator = tf.keras.Sequential([
            layers.LSTM(128, activation='tanh', input_shape=(self.latent_dim, 1), return_sequences=True),
            layers.LSTM(64, activation='tanh'),
            layers.Dense(32, activation='relu'),
            layers.Dense(self.output_dim, activation='linear')
        ])
        return generator

    def build_discriminator(self) -> tf.keras.Model:
        """
        Builds the discriminator model.

        Returns:
            tf.keras.Model: Discriminator model.
        """
        discriminator = tf.keras.Sequential([
            layers.LSTM(128, activation='tanh', input_shape=(self.output_dim, 1), return_sequences=True),
            layers.Dense(64, activation='relu'),
            layers.Dense(1, activation='sigmoid')
        ])

        discriminator.compile(loss="binary_crossentropy", optimizer=self.discriminator_optimizer)
        discriminator.trainable = False

        return discriminator

    def build_model(self) -> tf.keras.Model:
        """
        Builds the combined model of generator and discriminator.

        Returns:
            tf.keras.Model: Combined model.
        """
        gan = tf.keras.Sequential([
            self.generator, self.discriminator
        ])
        gan.compile(loss="binary_crossentropy", optimizer=self.generator_optimizer)
        return gan

    def train(self, df: pd.DataFrame, n_epochs: int, batch_size: int) -> None:
        """
        Trains the GAN model.

        Args:
            df (pd.DataFrame): Input data.
            n_epochs (int): Number of training epochs.
            batch_size (int): Batch size.
        """
        dataset = tf.data.Dataset.from_tensor_slices(df)
        dataset_size = len(dataset)

        for epoch in range(n_epochs):

            #dataset = dataset.shuffle(dataset_size, reshuffle_each_iteration=True)

            for batch in dataset.batch(batch_size, drop_remainder=True):

                # phase 1 - Training discriminator
                generated_samples = self.generator(batch.numpy()[:, 0: self.latent_dim])

                # Concatenate generated and real data
                fake_and_real = tf.concat([generated_samples, batch.numpy()[:, self.latent_dim:]], axis=0)
                y1 = tf.constant([[0.]] * batch_size + [[1.]] * batch_size)
                d_loss = self.discriminator.train_on_batch(fake_and_real, y1)

                # phase 2 - training the generator
                generated_samples = batch.numpy()[:, 0:self.latent_dim]
                y2 = tf.constant([[1.]] * batch_size)
                g_loss = self.model.train_on_batch(generated_samples, y2)

                print(f"Epoch: {epoch+1}, Discriminator Loss: {d_loss}, Generator Loss: {g_loss}")

    def predict(self, df: pd.DataFrame, column: str, days: int) -> pd.DataFrame:
      """
      Predict future values based on the provided DataFrame.

      Args:
          df (pd.DataFrame): The input DataFrame containing historical data.
          column (str): The column name for which predictions are to be made.
          days (int): The number of days for which predictions are to be made.

      Returns:
          pd.DataFrame: The DataFrame with the predicted values appended.

      """

      # Create new values
      for _ in range(0, days):
          # Select the last rows of the 'Close' column
          close_subset = df[column].iloc[-self.latent_dim:]

          # Reshape the subset into a 1-row array
          reshaped_array = close_subset.values.reshape(1, self.latent_dim)

          # Generate the prediction using the reshaped array
          prediction = gan.generator.predict(reshaped_array)

          # Get the last date from the 'Date' column
          last_date = df['Date'].iloc[-1]

          # Calculate the next date by adding one day
          next_date = last_date + timedelta(days=1)

          # Print the next date
          print("Next date:", next_date.date())

          # Create a new row with the next date and the predicted value
          new_row = pd.DataFrame({'Date': [next_date], column: [prediction[0][0]]})

          # Append the new row to the DataFrame
          df = pd.concat([df, new_row], ignore_index = True)

      return df

#Tuning

In [145]:
import random

def set_seeds(seed: int) -> None:
    """
    Sets the seed values for numpy, random, and TensorFlow.

    Args:
        seed (int): Seed value to set.
    """
    # Set seed for numpy
    np.random.seed(seed)

    # Set seed for random module
    random.seed(seed)

    # Set seed for TensorFlow
    tf.random.set_seed(seed)

In [146]:
import sys

def save_print_to_file(text, filename):
    """
    Saves the text to a file.

    Args:
        text (str): Text to be saved.
        filename (str): Name of the file to save.
    """
    with open(filename, 'a') as file:
        sys.stdout = file
        print(text)
        sys.stdout = sys.__stdout__

In [147]:
from typing import Dict
from sklearn.model_selection import ParameterGrid
from sklearn.metrics import mean_squared_error

def tune_gan_model(train_df: pd.DataFrame, val_df: pd.DataFrame, hyperparameters: Dict) -> None:
  """
    Tune a GAN model using the provided hyperparameters.

    Args:
        df (pandas.DataFrame): The input dataset.
        hyperparameters (dict or iterable): Hyperparameters to be tuned.

  """
  best_mse = np.inf
  best_params = None

  # Iterate over different combinations of hyperparameters
  for params in ParameterGrid(hyperparameters):
  
      # Set random seed
      set_seeds(42)
    
      # Extract hyperparameters from the current combination
      latent_dim = params['latent_dim']
      epochs = params['epochs']
      batch_size = params['batch_size']
      lag = params['lag']

      # Create Data Normalizer
      normalizer = DataNormalizer()
      normalizer.fit_scaler(train_df)
      
      # Create a GAN model instance
      gan = GAN(latent_dim, lag)

      # Scale the training dataset
      normalized_train = normalizer.normalize_data(train_df)

      # Prepare the training dataset
      formatted_train_dataset = get_formatted_dataset(normalized_train, lag)

      # Train the GAN model
      gan.train(formatted_train_dataset, epochs, batch_size)

      # Scale the validation dataset
      normalized_val = normalizer.normalize_data(val_df)
    
      # Calculate mean squared error for validation dataset
      mse_list = []

      # Prepare the validation dataset
      formatted_val_dataset = get_formatted_dataset(normalized_val, lag)
      dataset = tf.data.Dataset.from_tensor_slices(formatted_val_dataset)
      
      # Iterate over batches of the validation dataset
      for batch in dataset.batch(batch_size, drop_remainder=True):
          generated_samples = gan.generator(batch.numpy()[:, 0:latent_dim])
          mse = mean_squared_error(generated_samples, batch.numpy()[:, latent_dim:])
          mse_list.append(mse)
    
      # Calculate average mean squared error
      avg_mse = np.mean(mse_list)
    
      # Prepare parameters and save/print to file
      parameters = f"\nParameters: {params} " + f"Average MSE: {avg_mse}"
      filename = "/content/MyDrive/MyDrive/Datasets/output.txt"
      save_print_to_file(parameters, filename)

      # Update best mean squared error and corresponding hyperparameters if necessary
      if avg_mse < best_mse:
          best_mse = avg_mse
          best_params = params

  # Prepare best parameters and save/print to file
  parameters = f"\nBest Parameters: {best_params} " + f"Best Average MSE: {best_mse}"
  filename = "/content/MyDrive/MyDrive/Datasets/output.txt"
  save_print_to_file(parameters, filename)

In [148]:
hyperparameters = {
    'latent_dim': [90],
    'epochs': [200],
    'batch_size': [16],
    'lag': [90]
}

In [149]:
train_df, val_df = train_validation_split(stock_df)
len(train_df), len(val_df)

(1390, 348)

In [150]:
#tune_gan_model(train_df, val_df, hyperparameters)

# Prediction

In [151]:
def save_dataframe_to_csv(dataframe: pd.DataFrame, filepath: str) -> None:
    """
    Save a DataFrame to a CSV file.

    Parameters:
        dataframe (pandas.DataFrame): The DataFrame to be saved.
        filepath (str): The file path where the CSV file will be saved.
    """
    dataframe.to_csv(filepath, index=False)

In [152]:
# Set random seed
set_seeds(42)

In [None]:
# Define the dimensionality of the latent space
latent_dim = 90

# Define the lag for the time series data
lag = 90

# Set the batch size for training
batch_size = 16

# Set the number of epochs for training
epochs = 200

# Create an instance of the DataNormalizer class
normalizer = DataNormalizer()

# Fit the scaler on the stock data to compute normalization parameters
normalizer.fit_scaler(stock_df)

# Normalize the stock data using the computed parameters
df_norm = normalizer.normalize_data(stock_df)

# Create an instance of the GAN class
gan = GAN(latent_dim, lag)

# Get the formatted dataset for training the GAN
formatted_df = get_formatted_dataset(df_norm, lag)

# Train the GAN model on the formatted dataset
gan.train(formatted_df, epochs, batch_size)

# Reset the index of the normalized dataset and convert the 'Date' column to datetime format
dataset = df_norm.reset_index()
dataset['Date'] = pd.to_datetime(dataset['Date'])

# Generate predictions using the trained GAN model
predictions = gan.predict(dataset, 'Close', 90).set_index('Date')

# Inverse normalize the predictions to obtain the original scale
predictions_corrected = normalizer.inverse_normalize_data(predictions)

In [None]:
import datetime

# Set seaborn style and colors
sns.set_style('whitegrid')
colors = ["#DE8F8F", "#8FB8DE"]

# Select the last 120 values from the 'Close' column
subset = predictions_corrected['Close'].tail(120)

# Set up the plot figure and axes
fig, ax1 = plt.subplots(figsize=(10, 6))
ax2 = ax1.twinx()

# Plot the first half of the time series in the first color
sns.lineplot(data=subset.iloc[:30], palette=[colors[0]], ax=ax1)

# Plot the second half of the time series in the second color
sns.lineplot(data=subset.iloc[30:], palette=[colors[1]], ax=ax2)

# Set the prediction start date
prediction_start_date = datetime.datetime(2022, 1, 1)

# Find the index of the prediction start date in the subset DataFrame
start_date_index = subset.index.get_loc(prediction_start_date)

# Add a vertical line at the prediction start date
ax1.axvline(x=subset.index[start_date_index], color='red', linestyle='--', label='Prediction Start')

# Set the plot title
plt.title('Stock Price Trend')

# Set the labels for each axis
ax1.set_xlabel('Date')
ax1.set_ylabel('Price (First Half)')
ax2.set_ylabel('Price Prediction')

# Set the x-axis range to cover the first and last date of the dataset
ax1.set_xlim(subset.index[0], subset.index[-1])

# Set the y-axis range between 0 and 30
ax1.set_ylim(15, 30)

# Set the y-axis range between 0 and 30
ax2.set_ylim(15, 30)

# Rotate x-axis labels by 45 degrees
ax1.set_xticklabels(ax1.get_xticklabels(), rotation=45)

# Show the legends
ax1.legend(loc='upper left')
ax2.legend(loc='upper right')

# Show the plot
plt.show()

In [None]:
save_dataframe_to_csv(predictions_corrected, '/content/MyDrive/MyDrive/Datasets/predictions_corrected.csv')