In [10]:
# /scripts/feature_engineering.py
import pandas as pd
import numpy as np
import ta  
#import talib 
from gym_anytrading.envs import Actions, Positions
from gym_anytrading.envs import StocksEnv


In [6]:
#original feature function
def process_data_1(self):
    prices = self.df.loc[:, 'Close'].to_numpy()

    prices[self.frame_bound[0] - self.window_size]  
    prices = prices[self.frame_bound[0]-self.window_size:self.frame_bound[1]]

    diff = np.insert(np.diff(prices), 0, 0)
    signal_features = np.column_stack((prices, diff))

    return prices.astype(np.float32), signal_features.astype(np.float32)
#Extended feature
def process_data_2(self):
    # Extract all needed columns
    data = self.df.iloc[self.frame_bound[0] - self.window_size:self.frame_bound[1], :].copy()

    # Validate index to ensure all operations are within bounds
    if data.empty:
        raise ValueError("Data slice resulted in an empty DataFrame. Check frame bounds and window size.")

    # Calculate differences for price columns (change from previous day)
    for column in ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']:
        # Calculate daily differences and insert a zero at the start of each
        diff = np.insert(np.diff(data[column].to_numpy()), 0, 0)
        data[f'{column}_diff'] = diff

    # Create an array of signal features, including all processed data
    features_columns = [col for col in data.columns]  
    signal_features = data[features_columns].to_numpy().astype(np.float32)

    # Return original price data and signal features
    prices = data['Close'].to_numpy().astype(np.float32)
    return prices, signal_features
#Integrate Technical Indicators
def process_data_3(self):
    # Extract all needed columns
    data = self.df.iloc[self.frame_bound[0] - self.window_size:self.frame_bound[1], :].copy()

    # Validate index to ensure all operations are within bounds
    if data.empty:
        raise ValueError("Data slice resulted in an empty DataFrame. Check frame bounds and window size.")

    # Calculate differences for price columns (change from previous day)
    for column in ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']:
        # Calculate daily differences and insert a zero at the start of each
        diff = np.insert(np.diff(data[column].to_numpy()), 0, 0)
        data[f'{column}_diff'] = diff

    data['MACD'] = ta.trend.MACD(data['Close']).macd()
    data['EMA30'] = ta.trend.EMAIndicator(data['Close'], window=self.window_size).ema_indicator()

    # Momentum indicators
    data['RSI'] = ta.momentum.RSIIndicator(data['Close']).rsi()
    data['StochasticOscillator'] = ta.momentum.StochasticOscillator(data['High'], data['Low'], data['Close']).stoch()
    data['WilliamsR'] = ta.momentum.WilliamsRIndicator(data['High'], data['Low'], data['Close']).williams_r()

    # Volume indicators
    data['Volume_OBV'] = ta.volume.OnBalanceVolumeIndicator(data['Close'], data['Volume']).on_balance_volume()
    data['MFI'] = ta.volume.MFIIndicator(data['High'], data['Low'], data['Close'], data['Volume']).money_flow_index()

    # Volatility indicators
    data['ATR'] = ta.volatility.AverageTrueRange(data['High'], data['Low'], data['Close']).average_true_range()
    bollinger = ta.volatility.BollingerBands(data['Close'])
    data['Bollinger_mavg'] = bollinger.bollinger_mavg()
    data['Bollinger_hband'] = bollinger.bollinger_hband()
    data['Bollinger_lband'] = bollinger.bollinger_lband()

    # Ensure all NaNs are filled or dropped
    data.fillna(0, inplace=True)

    # Create an array of signal features, including all processed data
    features_columns = [col for col in data.columns]  
    signal_features = data[features_columns].to_numpy().astype(np.float32)

    # Return original price data and signal features
    prices = data['Close'].to_numpy().astype(np.float32)
    return prices, signal_features
#Integrate Candlestick Pattern
def process_data_4(self):
    # Extract all needed columns
    data = self.df.iloc[self.frame_bound[0] - self.window_size:self.frame_bound[1], :].copy()

    # Validate index to ensure all operations are within bounds
    if data.empty:
        raise ValueError("Data slice resulted in an empty DataFrame. Check frame bounds and window size.")

    # Calculate differences for price columns (change from previous day)
    for column in ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']:
        # Calculate daily differences and insert a zero at the start of each
        diff = np.insert(np.diff(data[column].to_numpy()), 0, 0)
        data[f'{column}_diff'] = diff

    # Adding technical indicators
    data['MACD'] = ta.MACD(data['Close'])[0]
    data['EMA30'] = ta.EMA(data['Close'], timeperiod=30)

    # Momentum indicators
    data['RSI'] = ta.RSI(data['Close'])
    data['StochasticOscillator'] = ta.STOCH(data['High'], data['Low'], data['Close'])[0]
    data['WilliamsR'] = ta.WILLR(data['High'], data['Low'], data['Close'])

    # Volume indicators
    data['Volume_OBV'] = ta.OBV(data['Close'], data['Volume'])
    data['MFI'] = ta.MFI(data['High'], data['Low'], data['Close'], data['Volume'])

    # Volatility indicators
    data['ATR'] = ta.ATR(data['High'], data['Low'], data['Close'])
    bollinger = ta.BBANDS(data['Close'])
    data['Bollinger_mavg'] = bollinger[1]
    data['Bollinger_hband'] = bollinger[2]
    data['Bollinger_lband'] = bollinger[0]

    # Adding candlestick patterns
    data['Hammer'] = talib.CDLHAMMER(data['Open'], data['High'], data['Low'], data['Close'])
    data['InvertedHammer'] = talib.CDLINVERTEDHAMMER(data['Open'], data['High'], data['Low'], data['Close'])
    data['Engulfing'] = talib.CDLENGULFING(data['Open'], data['High'], data['Low'], data['Close'])
    data['Doji'] = talib.CDLDOJI(data['Open'], data['High'], data['Low'], data['Close'])
    data['MorningStar'] = talib.CDLMORNINGSTAR(data['Open'], data['High'], data['Low'], data['Close'])
    data['EveningStar'] = talib.CDLEVENINGSTAR(data['Open'], data['High'], data['Low'], data['Close'])
    data['PiercingLine'] = talib.CDLPIERCING(data['Open'], data['High'], data['Low'], data['Close'])
    data['HangingMan'] = talib.CDLHANGINGMAN(data['Open'], data['High'], data['Low'], data['Close'])
    data['ShootingStar'] = talib.CDLSHOOTINGSTAR(data['Open'], data['High'], data['Low'], data['Close'])
    data['Harami'] = talib.CDLHARAMI(data['Open'], data['High'], data['Low'], data['Close'])

    # Ensure all NaNs are filled or dropped
    data.fillna(0, inplace=True)

    # Create an array of signal features, including all processed data
    features_columns = [col for col in data.columns]  
    signal_features = data[features_columns].to_numpy().astype(np.float32)

    # Return original price data and signal features
    prices = data['Close'].to_numpy().astype(np.float32)
    return prices, signal_features

In [12]:
#Propotional Reward & Penalty
def reward_function_1(self, action):
    step_reward = 0

    trade = False
    if (
        (action == Actions.Buy.value and self._position == Positions.Short) or
        (action == Actions.Sell.value and self._position == Positions.Long)
    ):
        trade = True

    if trade:
        current_price = self.prices[self._current_tick]
        last_trade_price = self.prices[self._last_trade_tick]
        price_diff = current_price - last_trade_price

        if self._position == Positions.Long:
            step_reward += price_diff

    return step_reward
#Loss Aversion with static weight
def reward_function_2_static(self, action):
    step_reward = 0

    trade = False
    if (
        (action == Actions.Buy.value and self._position == Positions.Short) or
        (action == Actions.Sell.value and self._position == Positions.Long)
    ):
        trade = True

    if trade:
        current_price = self.prices[self._current_tick]
        last_trade_price = self.prices[self._last_trade_tick]
        price_diff = current_price - last_trade_price

        if self._position == Positions.Long:
            step_reward += price_diff
            if step_reward > 0 : 
                step_reward *= 1
            else : 
                step_reward *= 1.2

    return step_reward
#Loss Aversion with dynamic weight
def reward_function_2_dynamic(self, action):
    step_reward = 0

    trade = False
    if (
        (action == Actions.Buy.value and self._position == Positions.Short) or
        (action == Actions.Sell.value and self._position == Positions.Long)
    ):
        trade = True

    if trade:
        current_price = self.prices[self._current_tick]
        last_trade_price = self.prices[self._last_trade_tick]
        price_diff = current_price - last_trade_price

        if self._position == Positions.Long:
            step_reward += price_diff
            if step_reward > 0 : 
                step_reward *= 1
            else : 
                penalty_factor = 1.0 + self.calculate_normalized_volatility()
                step_reward *= penalty_factor

    return step_reward
#Risk Handling with thresholding
def reward_function_3(self, action):
    step_reward = 0

    trade = False
    if (
        (action == Actions.Buy.value and self._position == Positions.Short) or
        (action == Actions.Sell.value and self._position == Positions.Long)
    ):
        trade = True

    if trade:
        current_price = self.prices[self._current_tick]
        last_trade_price = self.prices[self._last_trade_tick]
        price_diff = current_price - last_trade_price

        if self._position == Positions.Long:
            step_reward += price_diff
            if step_reward > 0 : 
                step_reward *= 1
            else : 
                penalty_factor = 1.0 + self.calculate_normalized_volatility()
                step_reward *= penalty_factor

    if self._total_profit >= 1 :
        step_reward += 1
    else:
        step_reward -= 0.5

    return step_reward
#Risk Handling with avoid overconfidence
def reward_function_4(self, action):
    step_reward = 0

    trade = False
    if (
        (action == Actions.Buy.value and self._position == Positions.Short) or
        (action == Actions.Sell.value and self._position == Positions.Long)
    ):
        trade = True

    if trade:
        current_price = self.prices[self._current_tick]
        last_trade_price = self.prices[self._last_trade_tick]
        price_diff = current_price - last_trade_price

        if self._position == Positions.Long:
            step_reward += price_diff
            if step_reward > 0 : 
                step_reward *= 1
            else : 
                penalty_factor = 1.0 + self.calculate_normalized_volatility()
                step_reward *= penalty_factor
        self.recent_trades.append(self._current_tick)

        # Avoid overtrading penalty
    recent_trades = self.recent_trades[-10:]
    if len(recent_trades) > 3:
        step_reward -= 0.5

    return step_reward

In [13]:
def setup_environment_with_function(reward_func, process_func):
    class CustomEnv(StocksEnv):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            # Bind the provided reward function to the instance
            self._calculate_reward = reward_func.__get__(self)
            self._process_data = process_func.__get__(self)
            self.recent_trades = []

        def reset(self, seed=None, options=None):
            super().reset(seed=seed, options=options)
            self.action_space.seed(int((self.np_random.uniform(0, seed if seed is not None else 1))))

            self._truncated = False
            self._current_tick = self._start_tick
            self._last_trade_tick = self._current_tick - 1
            self._position = Positions.Long
            self._position_history = (self.window_size * [None]) + [self._position]
            self._total_reward = 0.
            self._total_profit = 1.  # unit
            self._first_rendering = True
            self.history = {}
            self.recent_trades = [] 
            self._last_active_tick = self._current_tick

            observation = self._get_observation()
            info = self._get_info()

            if self.render_mode == 'human':
                self._render_frame()

            return observation, info
        
        def step(self, action):
            self._truncated = False
            self._current_tick += 1

            if self._current_tick == self._end_tick:
                self._truncated = True

            step_reward = self._calculate_reward(action)
            self._total_reward += step_reward

            self._update_profit(action)

            trade = False
            if (
                (action == Actions.Buy.value and self._position == Positions.Short) or
                (action == Actions.Sell.value and self._position == Positions.Long)
            ):
                trade = True

            if trade:
                self._position = self._position.opposite()
                self._last_trade_tick = self._current_tick
                self.recent_trades.append(self._current_tick)  # Track the trade

            self._position_history.append(self._position)
            observation = self._get_observation()
            info = self._get_info()
            self._update_history(info)

            if self.render_mode == 'human':
                self._render_frame()

            return observation, step_reward, False, self._truncated, info

        
        def calculate_normalized_volatility(self, window=20): 
            if len(self.prices) < window: 
                return 1.0 # Default to no adjustment if not enough data 
            recent_prices = self.prices[-window:] 
            avg_price = np.mean(recent_prices) 
            volatility = np.std(recent_prices) / avg_price # Normalized volatility as a percentage of the average price 
            return volatility

    return CustomEnv

In [14]:
def call_env(train_df, test_df, reward_func,process_func):
    customEnv = setup_environment_with_function(reward_func,process_func)
    train_env = customEnv(
            df=train_df,
            window_size=30,
            frame_bound=(30, train_df.shape[0])
        )

    test_env = customEnv(
        df=test_df,
        window_size=30,
        frame_bound=(30, test_df.shape[0]),
    )
    return train_env,test_env