In [3]:
import talib
from talib.abstract import Function
import numpy as np
import pandas as pd

from gym import Space
from copy import copy
from abc import abstractmethod
from typing import Union, List, Callable, Dict

from tensortrade.features import FeatureTransformer
from loguru import logger

class TAlibIndicator(FeatureTransformer):
    """Adds one or more TAlib indicators to a data frame, based on existing open, high, low, and close column values."""

    def __init__(self, indicators: List[str], lows: Union[List[float], List[int]] = None, highs: Union[List[float], List[int]] = None, **kwargs):
        indicators = self._error_check(indicators)
        self._indicator_names = [indicator.upper() for indicator in indicators]
        self._indicators = [getattr(talib, name.split('-')[0]) for name in self._indicator_names]
        # Here we get the stats for each indicator for TA-Lib
        self._stats = {indicator:self._get_info(indicator) for indicator in self._indicator_names}
        
    def _error_check(self, a:List[str])->List[str]:
        """ Check for errors common errors"""
        err_indexes = []
        for n, i in enumerate(a):
            if i == "BBAND":
                a[n] = "BBANDS"
            elif i == "BB":
                a[n] = "BBANDS"
            elif i == "RIS":
                a[n] = "RSI"
            elif i == "":
                err_indexes.append(n)
            elif i == None:
                err_indexes.append(n)
        for n in sorted(err_indexes, reverse=True):
            del a[n]
        return a
    

    def _get_info(self, indicator_name:str) -> Dict:
        """ Get the relavent indicator parameters and inputs """
        if indicator_name is None:
            print("Usage: help_indicator(symbol), symbol is indicator name")
            return {
                "parameters": {},
                "inputs": []
            }
        else:
            upper_code = indicator_name.upper()
            if upper_code not in talib.get_functions():
                print(f"ERROR: indicator {upper_code} not in list")
                return {
                    "parameters": {},
                    "inputs": []
                }
            else:
                func = Function(upper_code)
                parameters = dict(func.parameters)
                inputs = list(func.input_names.values())
                return {
                    "parameters": parameters,
                    "inputs": inputs
                }

    def _match_inputs(self, x_column:list, inputs:list):
        """ Search through inputs to match outputs. It only goes through common inputs """
        real_inputs = []
        for inp in inputs:
            if inp == "close":
                if inp in x_column:
                    real_inputs.append(inp)
                if "Close" in x_column:
                    real_inputs.append("Close")
            elif inp == "open":
                if inp in x_column:
                    real_inputs.append(inp)
                elif "Open" in x_column:
                    real_inputs.append("Open")
            elif inp == "high":
                if inp in x_column:
                    real_inputs.append(inp)
                elif "High" in x_column:
                    real_inputs.append("High")
            elif inp == "low":
                if inp in x_column:
                    real_inputs.append(inp)
                elif "Low" in x_column:
                    real_inputs.append("Low")
            elif inp == "volume":
                if inp in x_column:
                    real_inputs.append(inp)
                elif "VolumeTo" in x_column:
                    real_inputs.append("VolumeTo")
        return real_inputs

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        for idx, indicator in enumerate(self._indicators):
            
            indicator_name = self._indicator_names[idx]
            logger.debug(indicator_name)
            indicator_params = self._stats[indicator_name]['parameters']
            indicator_inputs = self._stats[indicator_name]["inputs"]
            # Convert inputs into something that we'd commonly run to
            matched_inputs = self._match_inputs(list(X.columns), indicator_inputs)
            indicator_args = [X[arg].values for arg in matched_inputs]
            if indicator_name == 'BBANDS':
                upper, middle, lower = indicator(*indicator_args,**indicator_params)

                X["bb_upper"] = upper
                X["bb_middle"] = middle
                X["bb_lower"] = lower
            else:
                try:
                    value = indicator(*indicator_args,**indicator_params)

                    if type(value) == tuple:
                        X[indicator_name] = value[0][0]
                    else:
                        X[indicator_name] = value

                except:
                    X[indicator_name] = indicator(*indicator_args,**indicator_params)[0]

        return X

In [4]:
import pandas as pd

from tensortrade.features.scalers import MinMaxNormalizer, ComparisonNormalizer, PercentChangeNormalizer
from tensortrade.features.stationarity import FractionalDifference

ohlcv_data = pd.read_csv('./data/Coinbase_BTCUSD_1h.csv', skiprows=1)
ohlcv_data = ohlcv_data[['open','high','low','close','volume']]

In [5]:
taindicator = TAlibIndicator(indicators=["BBAND", "RSI", "EMA", "SMA", "", None])

In [6]:
df = pd.read_csv('../tests/data/input/coinbase-1h-btc-usd.csv')

In [7]:
transformed = taindicator.transform(df).dropna()

2019-12-13 22:30:43.110 | DEBUG    | __main__:transform:103 - BBANDS
2019-12-13 22:30:43.114 | DEBUG    | __main__:transform:103 - RSI
2019-12-13 22:30:43.116 | DEBUG    | __main__:transform:103 - EMA
2019-12-13 22:30:43.118 | DEBUG    | __main__:transform:103 - SMA


In [8]:
transformed.to_csv('../tests/data/outputs/ta_transformed.csv')

In [9]:


from math import log
import pandas as pd
import numpy as np

from gym import Space
from copy import copy
from typing import Union, List, Tuple
from loguru import logger
from tensortrade.features.feature_transformer import FeatureTransformer


class StandardNormalizer(FeatureTransformer):
    """A transformer for normalizing values within a feature pipeline by removing the mean and scaling to unit variance."""

    def __init__(self, columns: Union[List[str], str, None] = None, feature_min=0, feature_max=1, inplace=True):
        """
        Arguments:
            columns (optional): A list of column names to normalize.
            feature_min (optional): The minimum value in the range to scale to.
            feature_max (optional): The maximum value in the range to scale to.
            inplace (optional): If `False`, a new column will be added to the output for each input column.
        """
        super().__init__(columns=columns, inplace=inplace)

        self._feature_min = feature_min
        self._feature_max = feature_max

        if feature_min >= feature_max:
            raise ValueError("feature_min must be less than feature_max")

        self._history = {}

    def reset(self):
        self._history = {}

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        if self.columns is None:
            self.columns = list(X.select_dtypes('number').columns)
        
        for column in self.columns:
            if self._inplace == True:
                X[column] = (X[column] - X[column].mean())/X[column].std()
            else:
                X[f"{column}_scaled"] = (X[column] - X[column].mean())/X[column].std()
            
        return X.dropna()


In [10]:
standard = StandardNormalizer()
transformed = standard.transform(ohlcv_data)

In [12]:
transformed.to_csv('../tests/data/outputs/standard_transformed.csv')

In [13]:
min_max = MinMaxNormalizer()

In [14]:
min_max_transformed = min_max.transform(ohlcv_data)

In [16]:
min_max_transformed.to_csv('../tests/data/outputs/min_max_transformed.csv')

In [17]:
com_normalizer = ComparisonNormalizer()

com_normalizer.transform(ohlcv_data).to_csv('../tests/data/outputs/com_norm_transformed.csv')

In [18]:
pct_normalizer = PercentChangeNormalizer()
pct_normalizer.transform(ohlcv_data).to_csv('../tests/data/outputs/pct_transformed.csv')

In [19]:
from tensortrade.features import FeaturePipeline
from tensortrade.features.scalers import MinMaxNormalizer

In [20]:
feature_pipeline = FeaturePipeline(
    steps=[
        TAlibIndicator(indicators=["BBAND", "RSI", "EMA", "SMA", "", None]),
        MinMaxNormalizer(),
    ]
)

In [23]:
feature_pipeline.transform(ohlcv_data).dropna().to_csv('../tests/data/outputs/feature_pipeline_output.csv')

2019-12-13 22:57:48.573 | DEBUG    | __main__:transform:103 - BBANDS
2019-12-13 22:57:48.577 | DEBUG    | __main__:transform:103 - RSI
2019-12-13 22:57:48.579 | DEBUG    | __main__:transform:103 - EMA
2019-12-13 22:57:48.581 | DEBUG    | __main__:transform:103 - SMA
