In [39]:
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Jupyter")
AddReference("QuantConnect.Indicators")
from System import *
from QuantConnect import *
from QuantConnect.Data.Custom import *
from QuantConnect.Data.Market import TradeBar, QuoteBar
from QuantConnect.Data.Consolidators import QuoteBarConsolidator
from QuantConnect.Jupyter import *
from QuantConnect.Indicators import *
from QuantConnect.Indicators.CandlestickPatterns import *
from datetime import datetime, timedelta
import re
import os.path
import pickle
import pandas as pd

In [40]:
class QuantDataLoader:

    ''' The QuantDataLoader is used to ingest price and indicator data from Quant Connect.
    Currently supported price data: ask and bid price.
    Currently supported indicator data: RSI, MACD, ADX, STDDEV
    
    Use the following scheme in your indicator list on AddForexData:
        
        - RSI(int bars) // e.g. RSI(14)
        - MACD_LINE(int barsFast, int barsSlow, 0) // e.g. MACD_LINE(12, 26, 0)
        - MACD_SIGNAL(int barsFast, int barsSlow, int barsMovingAverageSignalLine) // e.g. MACD_SIGNAL(12, 26, 9)
        - MACD_HISTOGRAM(int barsFast, int barsSlow, int barsMovingAverageSignalLine) // e.g. MACD_HISTOGRAM(12, 26, 9)
        - ADX(ADX, int barsMovingAverage) // e.g. ADX(ADX, 18)
        - STDDEV(int bars) // e.g. STDDEV(26)

    Important: 
    This class will work only if you compile the Lean engine on your system. 
    For instructions use the official repo on github: https://github.com/QuantConnect/Lean/
    
    After the installation of Lean, start Jupyter Notebook from Lean\Launcher\bin\Debug and use this notebook.

    INPUTS:
        resolution      resolution must be given as enum, where resolution = 2 means minute data,
                        3 means hour data and 4 means daily data
    '''

    def __init__(self, resolution = 3):
        if resolution < 2 or resolution > 4: raise ValueError("Only minute (2), hour (3) and daily (4) data")

        self.qb = QuantBook()
        self.qb.SetStartDate(2018, 12, 31)
        self.resolution = resolution
        self.INDICATOR_FCN_NAMES = {"rsi":RelativeStrengthIndex, "macd_line":MovingAverageConvergenceDivergence, "macd_signal":MovingAverageConvergenceDivergence, "macd_histogram":MovingAverageConvergenceDivergence, "adx":AverageDirectionalIndex, "stddev": StandardDeviation}
        self.REQUIRED_DF_COLUMNS = {"price":['askopen', 'askhigh', 'asklow', 'askclose', 'bidopen', 'bidhigh', 'bidlow', 'bidclose'], "rsi":['relativestrengthindex'], "macd_line":['movingaverageconvergencedivergence'], "macd_signal":['signal'], "macd_histogram":['histogram'], "adx":['averagedirectionalindex'], "stddev":['standarddeviation']}
        self.RENAME_DICT= { "price":{"askopen":"price_ask_open", "askhigh":"price_ask_high", "asklow":"price_ask_low", "askclose":"price_ask_close", "bidopen":"price_bid_open", "bidhigh":"price_bid_high", "bidlow":"price_bid_low", "bidclose":"price_bid_close"}, "rsi":{"relativestrengthindex":"rsi"}, "macd_line":{"movingaverageconvergencedivergence":"macd_line"}, "macd_signal":{"signal":"macd_signal"}, "macd_histogram":{"histogram":"macd_histogram"}, "adx":{"averagedirectionalindex":"adx"}, "stddev":{"standarddeviation":"stddev"} }
        self.data = {}

    def save(self, path):
        ''' saves the ingested data under the given path

        IN  path    (string)    absolute or relative path of the save file. Datatype must be serialized data type *.p '''

        exportData = {}
        exportData["data"] = self.data
        exportData["resolution"] = self.resolution
        
        fileHandler = open(path, 'wb')
        pickle.dump(exportData, fileHandler)
        fileHandler.close()

        print("data successfully saved!") if os.path.isfile(path) else print("error saving data")

    def addForexData(self, forexList, indicatorList = []):
        ''' ingesting forex price data and requested indicator dataset

        IN  forexList       (list)      list of  trading pairs, e.g. ["EURUSD", "GBPJPY", "EURNZD"]
        IN  indicatorList   (list)      lists of indicator name with required arguments, e.g. ["RSI(14)", "MACD(12, 26, 9)"] '''

        for forexSymbol in forexList:
            self._initDataframe(forexSymbol)
            self._addForexPrice(forexSymbol)
            for indicatorString in indicatorList:
                indicatorName, indicatorArgs = self._parseIndicatorString(indicatorString)
                self._addForexIndicator(forexSymbol, indicatorName, *indicatorArgs)
            self._interpolateNans(forexSymbol) # interpolate after each column was added due to some indicators do not contain all datetime indices
    
    def _addForexPrice(self, forexSymbol):
        ''' adds the price of a forex pair to the data.

        IN  forexSymbol     (string)    symbol of the forex pair, e.g. "EURUSD" '''

        print("ingesting " + forexSymbol + " price data ...")

        self.qb.AddForex(forexSymbol)
        rawDataframe = self.qb.History(self.qb.Symbol(forexSymbol), 10000000, Nullable[Resolution](self.resolution))
        for newColumnName, dataSeries in self._filterDataFrame(rawDataframe, "price"):
            self.data[forexSymbol][newColumnName] = dataSeries

        print("done!")

    def _addForexIndicator(self, forexSymbol, indicatorName, *indicatorArgs):
        ''' adds indicator data for a forex pair to the data.

        IN  forexSymbol     (string)    symbol of the forex pair, e.g. "EURUSD"
        IN  indicatorName   (string)    standardized indicator name, e.g. "RSI"
        IN  *indicatorArgs  (various)   addiotional required arguments for indicators e.g. for MACD data: 12, 26, 9'''

        print("ingesting " + forexSymbol + " " + indicatorName + " data ...")

        indicatorName = indicatorName.lower() # ignore case sensitivity
        forexSymbol = forexSymbol.upper() # ignore case sensitivity

        indicator = self.INDICATOR_FCN_NAMES[indicatorName](*indicatorArgs)
        dataframe = self.qb.Indicator(indicator, self.qb.Symbol(forexSymbol), 10000000, Nullable[Resolution](self.resolution) )
        for newColumnName, dataSeries in self._filterDataFrame(dataframe, indicatorName):
            self.data[forexSymbol][newColumnName] = dataSeries

        print("done!")

    def _initDataframe(self, forexSymbol):
        ''' initialized the dataframe for the given forex pair if not yet existing

        IN  forexSymbol     (string)    symbol of the forex pair, e.g. "EURUSD" '''

        if not forexSymbol in self.data: self.data[forexSymbol] = pd.DataFrame()

    def _filterDataFrame(self, df, subject):
        ''' filters the dataframe for the given columns and norms the index to a datetime index

        IN      df          (pandas Dataframe)              the dataframe returned by QuantBook _addForexPrice
        IN      subject     (string)                        name of the price/indicator subject to filter, e.g. "price" or "macd_line"
        OUT                 (string),(pandas Dataframe)     standardized new name of the column and the filtered series '''

        datetimeIndex = self._makeDatetimeIndex(df.index.values)
        columnList = self.REQUIRED_DF_COLUMNS[subject]

        for columnName in columnList:
            data = df[columnName].values
            newColumnName = self.RENAME_DICT[subject][columnName]
            newSeries = pd.Series(data, index = datetimeIndex)
            yield newColumnName, newSeries
    
    def _interpolateNans(self, forexSymbol):
        ''' interpolates inner NaN entries of each column in the dataframe 
        
        IN  forexSymbol     (string)    symbol of the forex pair, e.g. "EURUSD" '''
        
        self.data[forexSymbol] = self.data[forexSymbol].interpolate(limit=2, limit_area = 'inside')        

    def _makeDatetimeIndex(self, dfIndex):
        ''' converts the given dataframe index into a standardized datetime index,
        due to QuantBook historical data has several index types.

        IN      dfIndex     (list)      the index of the dataframe you want to convert
        OUT                 (list)      the new datetime converted index
        '''

        if type(dfIndex[0]) == DateTime:
            datetimeList = [self._sysDateTimeToDatetime(idx) for idx in dfIndex]
        elif type(dfIndex[0] == list or dfIndex[0] == tuple):
            if len(dfIndex[0]) == 2 and type(dfIndex[0][1]) == DateTime:
                datetimeList = [self._sysDateTimeToDatetime(idx[1]) for idx in dfIndex]
            elif len(dfIndex[0]) == 2 and type(dfIndex[0][1]) == pd._libs.tslibs.timestamps.Timestamp:
                datetimeList = [idx[1].to_pydatetime() for idx in dfIndex]
            elif len(dfIndex[0]) == 1 and type(dfIndex[0]) == DateTime:
                datetimeList = [self._sysDateTimeToDatetime(idx) for idx in dfIndex]
            else:
                datetimeList = [idx[1].to_pydatetime() for idx in dfIndex]
        else:
            raise TypeError("Unknown index of pandas data frame") # hasn't occured yet, so until now only the above types are used in QuantConnect

        return datetimeList

    @staticmethod
    def _parseIndicatorString(indicatorString):
        ''' seperates the indicator name and the required arguments

        IN      indicatorString     (string)    string that contains indicator and the parameters, e.g. "MACD(12,26,9)"
        OUT     indicatorName       (string)    name of the indicator, e.g. "MACD"
        OUT     indicatorParameters (tuple)     parameters of the indicator as a tuple, e.g. (12,26,9) '''

        indicatorString = indicatorString.replace(" ", "") # replace white spaces first
        indicatorName, indicatorArgString, _ = re.split("[\(\)]", indicatorString) # separate name and parameters
        indicatorNumberArgs = re.findall("\d+", indicatorArgString) # extract number arguments
        indicatorNumberArgs = [int(argument) for argument in indicatorNumberArgs] # convert extracted number args to integers
        indicatorStringArgs = re.findall("[a-zA-Z]+", indicatorArgString) # extract string arguments
        indicatorParameters = indicatorStringArgs + indicatorNumberArgs # combine the converted number and string arguments

        return indicatorName, tuple(indicatorParameters)

    @staticmethod
    def _sysDateTimeToDatetime(sysDateTime):
        ''' converts an imported C# system datetime object to a python datetime
        IN      sysDateTime     (system.DateTime)   imported C# System DateTime object
        OUT                     (datetime)          converted python datetime'''

        year = sysDateTime.Year
        month = sysDateTime.Month
        day = sysDateTime.Day
        hour = sysDateTime.Hour
        minute = sysDateTime.Minute

        return datetime(year, month, day, hour, minute)