In [1]:
import talib as ta
import pandas as pd
import numpy as np
from yahoo_fin import stock_info as finance

In [7]:
class QQE():

    def __init__(self, qqe_period = 15, qqe_sf = 15, qqe_factor = 2):

        self.qqe_period = qqe_period
        self.qqe_sf = qqe_sf
        self.qqe_factor = qqe_factor

    def qqe(self, dataframe, index_as_date = True, intraday_data = False):
        '''
        Updated QQE Function
        qqe_output = qqe(finance.get_data('SPY', '20180101', '20200420', index_as_date = False))
        '''

        # Bring in variables
        df = dataframe.copy()
        qqe_period = self.qqe_period
        qqe_sf = self.qqe_sf
        qqe_factor = self.qqe_factor
        index_as_date = index_as_date
        intraday_data = intraday_data

        # Auto detect if data is intraday format
        if (df.index.name == 'date_time') or (df.index.name == 'datetime'):
            intraday_data = True

        if (index_as_date == True) & (intraday_data == False):
            df = df.reset_index().rename(columns = {'index':'date'})
        elif (index_as_date == True) & (intraday_data == True):
            df.reset_index(inplace = True)

        # Function to calculate trailing value using NumPy array
        def calculate_tr_values(df, rsi_values, dar_values):
            tr_values  = [0]*df.shape[0]

            for i in range(1, df.shape[0]):
                tr = tr_values[i-1]
                dv = tr
                rsi0 = rsi_values[i]
                rsi1 = rsi_values[i - 1]
                dar = dar_values[i]
                if rsi0 < tr:
                    tr = rsi0 + dar
                    if rsi1 < dv:
                        if tr > dv:
                            tr = dv
                elif rsi0 > tr:
                    tr = rsi0 - dar
                    if rsi1 > dv:
                        if tr < dv:
                            tr = dv

                tr_values[i] = tr

            return tr_values

        # Create Wilder's Period
        Wilders_Period = qqe_period * 2 - 1
        # Pre-calculations needed before loop
        df['rsi'] = ta.EMA(ta.RSI(df['close'], qqe_period), qqe_sf) - 50

        df['rsi1'] = df['rsi'].shift(-1)
        df['AtrRsi'] = abs(df['rsi1'] - df['rsi'])
        df = df.fillna(0)
        df['MaAtrRsi'] = ta.EMA(df['AtrRsi'], Wilders_Period)
        df = df.fillna(0)
        df['tr'] = 0
        df['dar'] = ta.EMA(df['MaAtrRsi'], Wilders_Period) * qqe_factor
        df = df.fillna(0)

        # Loop to process QQE values
        rsi_values = df["rsi"].values
        dar_values = df["dar"].values
        tr_values = calculate_tr_values(df, rsi_values, dar_values)
        df['tr'] = tr_values

        if (index_as_date == True) & (intraday_data == False):
            return pd.DataFrame({"date": df['date'],"value1": df['rsi'], "value2": df['tr']}).set_index('date')

        elif (index_as_date == True) & (intraday_data == True):
            return pd.DataFrame({"datetime": df['datetime'],"value1": df['rsi'], "value2": df['tr']}).set_index('datetime')

        elif (index_as_date == False) & (intraday_data == True):
            return pd.DataFrame({"date_time": df['datetime'],"value1": df['rsi'], "value2": df['tr']})

        else:
            return pd.DataFrame({"date": df['date'],"value1": df['rsi'], "value2": df['tr']})

In [10]:
class CSI(QQE):

    def __init__(self, start_date, end_date, qqe_period = 32, qqe_sf = 10, qqe_factor = 2, customindex_ticker = 'SPY',
                data_dict = None):

        self.start_date = start_date
        self.end_date = end_date
        self.qqe_period = qqe_period
        self.qqe_sf = qqe_sf
        self.qqe_factor = qqe_factor
        self.cyclicals_list = ['XLI', 'XLY', 'XLK']
        self.defensives_list = ['XLP', 'XLU', 'TLT']
        self.customindex_ticker = customindex_ticker
        self.data_dict = data_dict 


    def CustomIndex(self, dataframe, dataframe2, inverse=False):
        df = dataframe;
        df2 = dataframe2;

        if inverse==False:
            df['newClose']=df['close']/df2['close']
            df['newOpen']=df['open']/df2['open']
            df['newVolume'] = (((df['close']*df['volume'])/(df2['close']*df2['volume'])) * (df['volume'] + df2['volume'])).astype(float)
            cols = ["open", "close", "high", "low"]
            df["newHigh"] = (df[cols] / df2[cols]).max(axis=1)
            df["newLow"] = (df[cols] / df2[cols]).min(axis=1)
            return pd.DataFrame({"open": df['newOpen'],"high": df['newHigh'],"low": df['newLow'],"close": df['newClose'],"volume": df['newVolume']}).fillna(method='ffill')
        else:
            df['newClose']=df2['close']/df['close']
            df['newOpen']=df2['open']/df['open']
            df['newVolume'] = (((df2['close']*df2['volume'])/(df['close']*df['volume'])) * (df['volume'] + df2['volume'])).astype(float)
            cols = ["open", "close", "high", "low"]
            df["newHigh"] = (df2[cols] / df[cols]).max(axis=1)
            df["newLow"] = (df2[cols] / df[cols]).min(axis=1)
            return pd.DataFrame({"open": df['newOpen'],"high": df['newHigh'],"low": df['newLow'],"close": df['newClose'],"volume": df['newVolume']}).fillna(method='ffill')


    def return_quadrant_id(self, csi_value1, csi_value2):
        '''
        Purpose of function: feed CSI_line1 CSI_line2 values into the function using apply/lambda and return the quadrant ID (e.g., quadrant 1)

        df_ticker.apply(lambda x: return_quadrant_id(x['csi1'], x['csi2']), axis = 1)

        '''
        csi_value1 = csi_value1
        csi_value2 = csi_value2

        if (csi_value1 > 0) & (csi_value1 > csi_value2):
            return 'Q1'
        elif (csi_value1 > 0) & (csi_value1 < csi_value2):
            return 'Q2'
        elif (csi_value1 < 0) & (csi_value1 < csi_value2):
            return 'Q3'
        elif (csi_value1 < 0) & (csi_value1 > csi_value2):
            return 'Q4'
        elif (csi_value1 == 0.00000) & (csi_value2 == 0.00000):
            return 0
        elif (np.isnan(csi_value1)) & (np.isnan(csi_value2)):
            return 0
        else:
            return 0

    # Function to calculate CSI
    def calc_csi(self, include_quadrant_id = False):
        '''
        self.calc_csi(cyclicals_list, defensives_list, start_date, end_date)
        '''

        # Create the data_dict repo of timeseries data (this helps speed up the calculations)
        if (not isinstance(self.data_dict, dict)):
            tickers = [x for x in self.cyclicals_list] + [x for x in self.defensives_list]
            tickers.append(self.customindex_ticker)
            self.data_dict = {}
            for tic in tickers:
                self.data_dict[tic] = finance.get_data(tic, self.start_date, self.end_date)

        start_date = self.start_date
        end_date = self.end_date
        qqe_period = self.qqe_period
        qqe_sf = self.qqe_sf
        qqe_factor = self.qqe_factor
        cyclicals_list = self.cyclicals_list
        defensives_list = self.defensives_list

        # Define cyclical and defensive ETFs
        cyclical_1 = cyclicals_list[0]
        cyclical_2 = cyclicals_list[1]
        cyclical_3 = cyclicals_list[2]
        defensive_1 = defensives_list[0]
        defensive_2 = defensives_list[1]
        defensive_3 = defensives_list[2]

        # Grab data from dictionary
        data_SPY = self.data_dict[self.customindex_ticker]
        cyclical_1 = self.data_dict[cyclical_1]
        cyclical_2 = self.data_dict[cyclical_2]
        cyclical_3 = self.data_dict[cyclical_3]
        defensive_1 = self.data_dict[defensive_1]
        defensive_2 = self.data_dict[defensive_2]
        defensive_3 = self.data_dict[defensive_3]

        # Custom Indexes ("CI")
        ci1 = self.CustomIndex(cyclical_1,data_SPY,False)
        ci2 = self.CustomIndex(cyclical_2,data_SPY,False)
        ci3 = self.CustomIndex(cyclical_3,data_SPY,False)
        ci4 = self.CustomIndex(defensive_1,data_SPY,False)
        ci5 = self.CustomIndex(defensive_2,data_SPY,False)
        ci6 = self.CustomIndex(defensive_3,data_SPY,False)

        # CI Multipliers
        m1 = 1
        m2 = 1
        m3 = 1
        m4 = -1
        m5 = -1
        m6 = -1

        # QQE Calculations
        q1 = self.qqe(ci1)
        q2 = self.qqe(ci2)
        q3 = self.qqe(ci3)
        q4 = self.qqe(ci4)
        q5 = self.qqe(ci5)
        q6 = self.qqe(ci6)

        q1['value1'] = q1['value1'] * m1
        q1['value2'] = q1['value2'] * m1

        q2['value1'] = q2['value1'] * m2
        q2['value2'] = q2['value2'] * m2

        q3['value1'] = q3['value1'] * m3
        q3['value2'] = q3['value2'] * m3

        q4['value1'] = q4['value1'] * m4
        q4['value2'] = q4['value2'] * m4

        q5['value1'] = q5['value1'] * m5
        q5['value2'] = q5['value2'] * m5

        q6['value1'] = q6['value1'] * m6
        q6['value2'] = q6['value2'] * m6

        CSI_line1 = (q1['value1']+q2['value1']+q3['value1']+q4['value1']+q5['value1']+q6['value1'])/6
        CSI_line2 = (q1['value2']+q2['value2']+q3['value2']+q4['value2']+q5['value2']+q6['value2'])/6

        df_csi = pd.DataFrame({"csi1" : CSI_line1, "csi2" : CSI_line2})

        # Optionally, include quadrant ID at each value
        if include_quadrant_id == True:
            df_csi['quadrant'] = df_csi.apply(lambda x: self.return_quadrant_id(x['csi1'], x['csi2']), axis = 1)

        return df_csi

In [7]:
### This is meant to mimic the QC500 Universe 
import requests
from bs4 import BeautifulSoup

def return_etf_holdings(input = 'SPY'):

    if input == None:
        ticker = self.ticker.upper()
    else:
        ticker = input.upper()

    headers = requests.utils.default_headers()
    headers['User-Agent'] = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36'
    res = requests.get("https://www.zacks.com/funds/etf/%s/holding"%ticker,  headers=headers)
    soup = BeautifulSoup(res.text, 'html.parser')

    js = None
    for script in soup.find_all("script"):
        if "etf_holdings.formatted_data = " in str(script):
    #    if "etf_holdings" in str(script):
            js = str(script)
    holdings = []
    # for line in js.text.split("["):
    for line in js.split("["):
        if "rel=" in line:
            line_soup = BeautifulSoup(line, 'html.parser')
            #holding = line_soup.find("a")["alt"].strip('\\"')
            holding = line_soup.find('a')['rel'][0].strip('\\"')
            holdings.append(holding)

    return holdings