# Define folder methods

In [7]:
# prompt: Define a class DataframeRepo.

import pandas as pd
import h5py
import pickle
import os
from google.colab import drive
drive.mount('/content/drive')

class DataframeRepo:
  def __init__(self, folder_name='stock-rl', verbose=0):
    self._folder_name = folder_name
    self._verbose = verbose

  def folder_name(self):
    """Returns the name of the folder."""
    return self._folder_name

  def load(self, file_name):
    """Retrieves a DataFrame from Google Drive.

    Args:
      file_name: The name of the file containing the DataFrame.

    Returns:
      The loaded pandas DataFrame or None if the file or folder doesn't exist.
    """
    try:
      index_file_name = file_name.replace('.h5', '_index.pkl')
      self._get_from_drive(file_name, file_name, drive_folder=self.folder_name())
      self._get_from_drive(index_file_name, index_file_name, drive_folder=self.folder_name())
      return self._load_dataframe_from_h5(file_name, index_file_name)

    except Exception as e:
      print(f"Error retrieving DataFrame: {e}")
      return None

  def store(self, dataframe, file_name):
    """Stores a DataFrame to Google Drive.

    Args:
      dataframe: The pandas DataFrame to store.
      file_name: The name of the file to store the DataFrame.
    """
    try:
      index_file_name = file_name.replace('.h5', '_index.pkl')
      self._create_folder_in_drive(self.folder_name())
      self._save_dataframe_to_h5(dataframe, file_name)
      self._store_dataframe_index(dataframe, index_file_name)
      self._save_to_drive(file_name, file_name, drive_folder=self.folder_name())
      self._save_to_drive(index_file_name, index_file_name, drive_folder=self.folder_name())

    except Exception as e:
      print(f"Error storing DataFrame: {e}")

  def list(self):
    """Lists all files in the folder.

    Returns:
        A list of file names in the folder.
    """
    try:
      drive_path = f'/content/drive/My Drive/{self.folder_name()}'

      if not os.path.exists(drive_path):
        return []

      files = os.listdir(drive_path)
      return files

    except Exception as e:
      print(f"Error listing files: {e}")
      return []

  def _create_folder_in_drive(self, folder_name):
    """Creates a folder in Google Drive if it doesn't exist.

    Args:
      folder_name: The name of the folder to create.
    """
    drive_path = '/content/drive/My Drive/'  # Root directory in Google Drive
    folder_path = os.path.join(drive_path, folder_name)

    if not os.path.exists(folder_path):
      os.makedirs(folder_path)
      if self._verbose:
        print(f"Folder '{folder_name}' created in Google Drive.")
    else:
      if self._verbose:
        print(f"Folder '{folder_name}' already exists in Google Drive.")

  def _get_from_drive(self, drive_filename, local_filename='', drive_folder='stock-rl'):
    """Gets a file from Google Drive to the local drive, overwriting if it exists.

    Args:
      drive_filename: Name of the file in Google Drive.
      local_filename: Path to save the file locally.
      drive_folder: Folder in Google Drive to get from (default: '/stock-rl').
    """
    drive_path = f'/content/drive/My Drive/{drive_folder}/{drive_filename}'
    if not local_filename:
      local_filename = drive_filename

    if not os.path.exists(drive_path):
      if self._verbose:
        print(f"File '{drive_filename}' does not exist in Google Drive.")
      return

    if os.path.exists(local_filename):
      os.remove(local_filename)
      if self._verbose:
        print(f"Existing local file '{local_filename}' removed.")

    !cp "{drive_path}" "{local_filename}"
    if self._verbose:
      print(f"File '{drive_filename}' retrieved from Google Drive to '{local_filename}'.")

  def _load_dataframe_from_h5(self, filename, index_filename):
    """Loads a pandas DataFrame from an HDF5 file and restores its index.

    Args:
      filename: The name of the HDF5 file containing the DataFrame data.
      index_filename: The name of the file containing the DataFrame index information.

    Returns:
      The loaded pandas DataFrame with restored index.
    """
    try:
      # Load the DataFrame data from the HDF5 file
      with h5py.File(filename, 'r') as hf:
        data2 = pd.DataFrame(hf['data'][:])

      # Load the index information from the pickle file
      with open(index_filename, 'rb') as f:
        index_info = pickle.load(f)

      # Restore the column names and row index
      data2.columns = index_info['columns']
      data2.index = index_info['index']

      if self._verbose:
        print(f"DataFrame loaded from '{filename}' with restored index.")
      return data2

    except Exception as e:
      print(f"Error loading DataFrame: {e}")
      return None

  def _save_dataframe_to_h5(self, dataframe, filename):
    """Saves a pandas DataFrame to an HDF5 file.

    Args:
      dataframe: The pandas DataFrame to save.
      filename: The name of the HDF5 file to create.
    """
    try:
      # Check if the file already exists and remove it
      if os.path.exists(filename):
        os.remove(filename)
        if self._verbose:
          print(f"Existing file '{filename}' removed.")

      # Save the DataFrame to the HDF5 file
      with h5py.File(filename, 'w') as hf:
        hf.create_dataset('data', data=dataframe.to_numpy())
      if self._verbose:
        print(f"DataFrame saved to '{filename}'.")

    except Exception as e:
      print(f"Error saving DataFrame: {e}")

  def _store_dataframe_index(self, dataframe, filename):
    """Stores the columns and row index of a pandas DataFrame using pickle.

    Args:
      dataframe: The pandas DataFrame.
      filename: The name of the file to store the index information.

    Remarks:
      The stored object contains these keys: 'columns', 'index'.
    """
    index_info = {
        'columns': dataframe.columns,
        'index': dataframe.index
    }
    with open(filename, 'wb') as f:
      pickle.dump(index_info, f)
    if self._verbose:
      print(f"DataFrame index information saved to '{filename}'.")

  def _save_to_drive(self, local_filename, drive_filename='', drive_folder='stock-rl'):
    """Saves a local file to Google Drive, overwriting if it exists.

    Args:
      local_filename: Path to the local file.
      drive_filename: Name of the file in Google Drive.
      drive_folder: Folder in Google Drive to save to (default: '/stock-rl').
    """
    drive_path = f'/content/drive/My Drive/{drive_folder}/{drive_filename}'
    if not drive_filename:
      drive_filename = local_filename

    if not os.path.exists(local_filename):
      if self._verbose:
        print(f"Local file '{local_filename}' does not exist.")
      return

    if os.path.exists(drive_path):
      os.remove(drive_path)
      if self._verbose:
        print(f"Existing file '{drive_filename}' removed from Google Drive.")

    !cp "{local_filename}" "{drive_path}"
    if self._verbose:
      print(f"File '{local_filename}' saved to Google Drive as '{drive_filename}'.")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Data cleaning

In [12]:
# prompt: Define DataPipeline

class DataPipeline:
  def __init__(self, steps=[], verbose=0):
    self._steps = steps
    self._verbose = verbose
    # TODO: fix _validate_step
    # for step in steps:
    #   self._validate_step(step)

  def fit(self, dataframe):
    pass

  def transform(self, dataframe):
    for step in self._steps:
      dataframe = step(dataframe)
    return dataframe

  def _validate_step(self, step):
    if not callable(step):
      raise ValueError("Each step must be a callable function.")

    if not isinstance(step(pd.DataFrame()), pd.DataFrame):
      raise ValueError("Each step must take a dataframe and return a dataframe.")


In [41]:
# prompt: Define DataPipelineBuilder

class DataPipelineBuilder:
  def __init__(self):
    self._steps = []

  def add_rsi(self):
    def append_rsi(data, window=14):
      """
      Calculates the RSI for each stock in the DataFrame and appends it as a new column.

      Args:
        data: A pandas DataFrame with 'Close' columns for each stock.
        window: The period for calculating RSI (default is 14).

      Returns:
        The original DataFrame with added RSI columns for each stock.
      """

      for column_name in data.columns:
        # column_name is like ('Close', 'CBA.AX')
        if 'Close' in column_name:
          stock_close_data = data[[column_name]]
          stock_close_data.columns = ['Close']
          stock_rsi = self._calculate_rsi(stock_close_data)
          stock_rsi.columns = [('RSI', column_name[1])]
          data = pd.concat([data, stock_rsi], axis=1)

      return data
    self._steps.append(append_rsi)
    return self

  def add_kdj(self):
    def append_kdj(dataframe):
      """
      Calculates the KDJ for each stock in the DataFrame and appends it as a new column.

      Args:
        dataframe: A pandas DataFrame with 'High', 'Low', and 'Close' columns representing
                  stock high, low, and closing prices.
      """

      stock_symbols = dataframe.columns.get_level_values(1).unique()
      for symbol in stock_symbols:
        stock_data = dataframe.xs(symbol, level=1, axis=1)
        kdj = self._calculate_kdj(stock_data)
        kdj.columns = [('KDJ.K', symbol), ('KDJ.D', symbol), ('KDJ.J', symbol)]
        dataframe = pd.concat([dataframe, kdj], axis=1)
      return dataframe

    self._steps.append(append_kdj)
    return self

  def sort_columns(self):
    """
    Sorts the columns of the DataFrame in alphabetical order.

    Returns:
      A new DataFrame with sorted columns.
    """
    def sort_columns(dataframe):
      return dataframe.sort_index(axis=1, ascending=True)
    self._steps.append(sort_columns)
    return self

  def build(self):
    """
    Builds a DataPipeline with the configured steps.

    Returns:
      A DataPipeline object.
    """
    # TODO: fix _validate_step
    # for step in self._steps:
    #   self._validate_step(step)
    return DataPipeline(steps=self._steps)

  def _validate_step(self, step):
    if not callable(step):
      raise ValueError("Each step must be a callable function.")

    if not isinstance(step(pd.DataFrame()), pd.DataFrame):
      raise ValueError("Each step must take a dataframe and return a dataframe.")

  def _calculate_rsi(self, dataframe, window=14):
    """
    Calculates the Relative Strength Index (RSI) for a given DataFrame.

    Args:
      data: A pandas DataFrame with a 'Close' column representing stock closing prices.
      window: The period for calculating RSI (default is 14).

    Returns:
      A new DataFrame containing only the RSI column.
    """

    delta = dataframe['Close'].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)

    avg_gain = gain.rolling(window=window).mean()
    avg_loss = loss.rolling(window=window).mean()

    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))

    return pd.DataFrame({'RSI': rsi})

  def _calculate_kdj(self, data, n=9, m1=3, m2=3):
    """
    Calculates the KDJ indicator for a given DataFrame.

    Args:
      data: A pandas DataFrame with 'High', 'Low', and 'Close' columns representing
            stock high, low, and closing prices.
      n: The period for calculating RSV (default is 9).
      m1: The period for calculating K (default is 3).
      m2: The period for calculating D (default is 3).

    Returns:
      A new DataFrame containing the calculated K, D, and J values.
    """

    low_n = data['Low'].rolling(window=n).min()
    high_n = data['High'].rolling(window=n).max()
    rsv = ((data['Close'] - low_n) / (high_n - low_n)) * 100
    rsv.fillna(50, inplace=True)

    k = rsv.ewm(alpha=1 / m1, adjust=False).mean()
    d = k.ewm(alpha=1 / m2, adjust=False).mean()
    j = 3 * k - 2 * d

    return pd.DataFrame({'KDJ.K': k, 'KDJ.D': d, 'KDJ.J': j})

In [None]:
# Note:
# This method removes all rows that has the entire row as NaN.
# dataframe.dropna(how='all')

In [42]:
repo = DataframeRepo()
data = repo.load('stock_data.h5')

In [43]:
builder = DataPipelineBuilder()
builder.add_rsi()
builder.add_kdj()
builder.sort_columns()

pipeline = builder.build()
pipeline.fit(data)
data_transformed = pipeline.transform(data)

In [44]:
repo.store(data_transformed, 'stock_data_processed.h5')

In [None]:
# prompt: Define an RSI calculator. Return a new dataframe that is the RSI column but do not append it to the original dataframe

import pandas as pd

# Keep this for now for building unit test later.
def calculate_rsi(data, window=14):
  """
  Calculates the Relative Strength Index (RSI) for a given DataFrame.

  Args:
    data: A pandas DataFrame with a 'Close' column representing stock closing prices.
    window: The period for calculating RSI (default is 14).

  Returns:
    A new DataFrame containing only the RSI column.
  """

  delta = data['Close'].diff()
  gain = delta.where(delta > 0, 0)
  loss = -delta.where(delta < 0, 0)

  avg_gain = gain.rolling(window=window).mean()
  avg_loss = loss.rolling(window=window).mean()

  rs = avg_gain / avg_loss
  rsi = 100 - (100 / (1 + rs))

  return pd.DataFrame({'RSI': rsi})

# Example usage:
# Assuming you have a DataFrame called 'data' with a 'Close' column
# rsi_df = calculate_rsi(data)
# print(rsi_df)

In [None]:
# prompt: Define a KDJ calculator. Return a new dataframe that is the calculated column but do not append it to the original dataframe. Pass in the parameters into the method with default values.

import pandas as pd

# Keep this for now for building unit test later.

# Since this is a linear combination of values in data, this is probably not a needed feature.
# but if I calculate ema first, then the data on the last day already has all the data it had before,
# and the weight would be the param to calculate kdj
def calculate_kdj(data, n=9, m1=3, m2=3):
  """
  Calculates the KDJ indicator for a given DataFrame.

  Args:
    data: A pandas DataFrame with 'High', 'Low', and 'Close' columns representing
          stock high, low, and closing prices.
    n: The period for calculating RSV (default is 9).
    m1: The period for calculating K (default is 3).
    m2: The period for calculating D (default is 3).

  Returns:
    A new DataFrame containing the calculated K, D, and J values.
  """

  low_n = data['Low'].rolling(window=n).min()
  high_n = data['High'].rolling(window=n).max()
  rsv = ((data['Close'] - low_n) / (high_n - low_n)) * 100
  rsv.fillna(50, inplace=True)

  k = rsv.ewm(alpha=1 / m1, adjust=False).mean()
  d = k.ewm(alpha=1 / m2, adjust=False).mean()
  j = 3 * k - 2 * d

  return pd.DataFrame({'K': k, 'D': d, 'J': j})

# Example usage:
# Assuming you have a DataFrame called 'data' with 'High', 'Low', and 'Close' columns
# kdj_df = calculate_kdj(data)
# print(kdj_df)
