<a href="https://colab.research.google.com/github/zzh2027/zzh2027.github.io/blob/master/Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##1. Import libraries

In [0]:
!pip install ta

In [0]:
import pandas as pd
import numpy as np
import time
import matplotlib.pyplot as plt
import os
import re
from sklearn.linear_model import LinearRegression
from scipy.signal import argrelextrema
from collections import defaultdict
import pandas_datareader.data as web
import ta
from scipy import interpolate
from scipy.interpolate import UnivariateSpline

import sys
sys.setrecursionlimit(10**6) 

from sklearn import model_selection
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier 
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV

from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score

##2. The Pipeline

In [0]:
class PipeLine(object):
    def __init__(self, df):
        """ 
        features --> Open, High, Low, Close, Volume, news, P/E ratio,
        """
        if df:
            self.df = df
            self.original_df = df.copy()
            print("Begin to process {} stocks...".format(str(len(self.df))))
            print(self.df)
        else:
            self.df, self.original_df = None, None
        self.model_df = None

    def GetQuandlData(self, symbols, begin_date = None, end_date = None, api_key = "fsRzwWgnx-zG8H2L9ya2"):
        """ test data"""
        os.environ['QUANDL_API_KEY']  = api_key
        out = pd.DataFrame()
        data_source = 'quandl'
        for symbol in symbols:
            df = web.DataReader(symbol, data_source, begin_date, end_date)[['AdjOpen', 'AdjHigh', 'AdjLow', 'AdjClose', 'AdjVolume']].reset_index()
            df.columns = ['date','open','high','low','close','volume']
            df['symbol'] = symbol
            df = df.set_index(['date', 'symbol'])
            out = pd.concat([out,df], axis = 0)
        self.original_df = out.sort_index()
        self.df = self.original_df.copy()
        print(self.df.tail())
        return self.original_df

    def GetTiingoData(self, symbol, begin_date = None, end_date = None, api_key = "ab583c49e5bed14e7c1b32145f44c60961199310"):
        try:
            df = pdr.get_data_tiingo(symbol, api_key = api_key)[['adjOpen', 'adjHigh', 'adjLow', 'adjClose', 'volume']].reset_index()
            df.columns = ['symbol','date','open','high','low','close','volume']
            df = df.set_index(['date', 'symbol'])
        except TypeError:
            print('Stock {} is not found!'.format(symbol))
        self.original_df = df.sort_index()
        self.df = self.original_df.copy()
        print(self.df.tail())
        return self.original_df
        


    def Reboot(self):
        self.df = self.original_df.copy()

    def DigestNews(self):
        """ Generate a NLP object and convert NEWS into categorical or numeric variables """
        tmp = NLP(self.df)
        """ This class should be able to return a dataframe using a method named 'helper' for now """
        return tmp.helper(self.df)
    
    def CustomMetrics(self):
        """ create some simple metrics """
        self._pctChange()
        self._MACD()
        self._featureEngineering()

        
        ## MA
        #for n in [14, 30, 50, 200]:
            #self.df.loc['ma' + str(n)] = talib.SMA
        ## RSI
        print(self.df)
    def _featureEngineering(self, n_bins = 10):
        df = self.df.copy()
        self.df['f01'] = df.close/df.open - 1
        self.df['f02'] = df.open / df.close.shift(1) - 1
        self.df['f03'] = df.volume.apply(np.log)
        self.df['f04'] = df.volume.diff()
        self.df['f05'] = df.volume.pct_change()
        self.df['f06'] = df.groupby(level = 'symbol').volume.apply(lambda x:x.rolling(5).mean()).apply(np.log)
        if len(self.df) > 2000:
            self.df['f07'] = df.volume.diff(50)
            self.df['f08'] = df.volume/df.groupby(level = 'symbol').volume.apply(lambda x:x.rolling(200).mean()) - 1
            self.df['f09'] = df.close/df.groupby(level = 'symbol').close.apply(lambda x:x.ewm(50).mean()) - 1
            zscore_func = lambda x:(x-x.rolling(window = 200, min_periods = 20).mean()) /x.rolling(window = 200, min_periods = 20).std()
            self.df['f10'] = df.groupby(level = 'symbol').close.apply(zscore_func)##z-score
            percentile_func = lambda x:x.rolling(200, min_periods = 20).apply(lambda x:pd.Series(x).rank(pct = True)[0])
        else:
            self.df['f07'] = df.volume.diff(20)
            self.df['f08'] = df.volume/df.groupby(level = 'symbol').volume.apply(lambda x:x.rolling(28).mean()) - 1
            self.df['f09'] = df.close/df.groupby(level = 'symbol').close.apply(lambda x:x.ewm(28).mean()) - 1
            zscore_func = lambda x:(x-x.rolling(window = 200, min_periods = 14).mean()) /x.rolling(window = 200, min_periods = 14).std()
            self.df['f10'] = df.groupby(level = 'symbol').close.apply(zscore_func)##z-score
            percentile_func = lambda x:x.rolling(200, min_periods = 14).apply(lambda x:pd.Series(x).rank(pct = True)[0])
        self.df['f11'] = df.groupby(level = 'symbol').volume.apply(percentile_func)
        self.df['f12'] = self.df['f08'].dropna().rank(pct= True)##rank each stock cross-sectionally 

        from pyti.money_flow_index import money_flow_index as mfi
        self.df['f13'] = mfi(df.high, df.low, df.close, df.volume, period = 14)##Money flow index(14 days)
        self.df['f14'] = self.df['f13'] - self.df['f13'].rolling(200, min_periods = 10).mean()##mean-centered money flow index
        
        binning_func =  lambda y:pd.qcut(y, q = n_bins, labels = range(1, n_bins+1))
        self.df['f15'] = df.volume.groupby(level = 'symbol').apply(binning_func)
        self.df['f16'] = self.df['f06'].apply(np.sign)
        plus_minus_fxn = lambda x:x.rolling(20).sum()
        self.df['f17'] = self.df['f16'].groupby(level = 'symbol').apply(plus_minus_fxn)

    def _pctChange(self):
        """Change of the open price of tomorrow compared to the close price today."""
        self.df['close_1'] = self.df.close.pct_change(-1)
        self.df['close_5'] = self.df.close.pct_change(-5)
        self.df['close_10'] = self.df.close.pct_change(-10)
        self.df['close_20'] = self.df.close.pct_change(-20)

    def _MACD_helper(self, close):
        exp1 = close.ewm(span = 12, adjust = False).mean()
        exp2 = close.ewm(span = 26, adjust = False).mean()
        DIF = exp1 - exp2
        DEA = DIF.ewm(span = 9, adjust = False).mean()
        return DIF, DEA
        
    def _MACD_trend(self, row):
        # method 1 --> walk-forward for 5 days each and interpolate()
        # method 2 --> 
        day_diff = lambda x: (x-self.df.date[0]).days
        self.df['day'] = self.df.date.apply(day_diff)
        X0, y0 = self.df.iloc[row-4:row+1]['day'].values.reshape(-1,1), self.df.iloc[row-4:row+1]['MACD_bar'].values.reshape(-1,1)
        model = LinearRegression().fit(X0, y0)
        res = model.coef_[0][0]
        #print("最后一行的值是{}".format(res))
        self.df.slope_MACD_bar.iloc[row] = res
        row-=1
        while row >= 0:
            for i in range(4,-1,-1):
                if row - i >=0:
                    y = self.df.iloc[row-i:row+1]['MACD_bar'].values.reshape(-1,1)## row ->9  -->第八行  row-->1   -->第零行
                    X = self.df.iloc[row-i:row+1]['day'].values.reshape(-1,1)
                    model = LinearRegression().fit(X,y)
                    self.df.loc[row,"slope_MACD_bar"] = (4/5 * model.coef_[0][0]) + (1/5 * round(self.df.loc[row+1,"slope_MACD_bar"],4))
                    break
            row-=1

            
    def _MACD(self):
        """
        Bullish crossover, just like in Moving Averages, a buy signal occurs when MACD crosses above the signal line. 
        A bearish signal occurs when MACD crosses below the signal line. 
        """
        DIF, DEA= self._MACD_helper(self.df.close)
        self.df['MACD'] = DIF
        self.df['Signal'] = DEA
        self.df['MACD_bar'] = (DIF - DEA*2)
        """
        Since the MACD's trend is of matters, we need to calculate its trend through LR first
        The time period is set as 5 days for now
        """
        self.df['slope_MACD_bar'] = np.nan
        self.df = self.df.reset_index()
        #print(self.df.columns)
        self._MACD_trend(len(self.df) - 1)
        self.df = self.df.set_index(['date', 'symbol'])

    def _get_max_min(self, df, MA = 1, window_range = 4, greatest = False):
        """ return max and min values and index """
        df_ma = df.close.rolling(window = MA).mean().dropna()## take the Moving Average value to analyze
        local_max = argrelextrema(df_ma.values, np.greater)[0]## get the index with greater values
        local_min = argrelextrema(df_ma.values, np.less)[0]   ## get the index with smaller values
        if greatest:
            return df.iloc[local_max], df.iloc[local_min]## For triangle trend analyze, we only need greater values

        ## Select the spike and the valley    
        price_local_max_dt = []
        for i in local_max:
            if (i > window_range) and (i < len(df_ma) - window_range):
                price_local_max_dt.append(df.iloc[i-window_range: i+window_range]['close'].idxmax())## 得到一定范围内值最大的位置
        price_local_min_dt = []
        for i in local_min:
            if (i > window_range) and (i < len(df_ma) - window_range):
                price_local_min_dt.append(df.iloc[i-window_range: i+window_range]['close'].idxmin())## 得到一定范围内值最小的位置
        maxima = pd.DataFrame(df.loc[price_local_max_dt])
        minima = pd.DataFrame(df.loc[price_local_min_dt])
        return maxima, minima

    def TriangleTrend(self,  MA=1, window_range =4, greatest = True):
        """ a Converging price range """ 
        #1 slice dataframe into multiple parts
        sliced_date = self.df.resample('M', level = 'date').mean().index.values
        out = pd.DataFrame()
        for d in sliced_date:
            X = self.df.xs(slice(d - pd.Timedelta('30 days'), d), level = 'date', drop_level = False )
            higher, lower = self._get_max_min(X, MA, window_range, greatest)
            spikes, downs = higher[['close', 'day']], lower[['close', 'day']]
            #spikes.date = spikes.date.apply(lambda x: (x-spikes.date[0]).days)
        ### Meethod without self.get_max_min() to get spikes and downs
        #spikes = [[0,b[0]],[len(b)-1,b[-1]]]
        #downs  = [[0,b[0]],[len(b)-1,b[-1]]]
        #for i in range(1,len(b)-1):
        #    if b[i] > b[i-1] and b[i]>b[i+1]:
        #        spikes.append((i,b[i]))
        #    elif b[i] < b[i-1] and b[i] < b[i+1]:
        #        downs.append((i,b[i]))"""
        #return spikes, downs
        
            res = []
            for i in [spikes, downs]:
                X,y = pd.DataFrame(i.day), pd.DataFrame(i.close)
                model = LinearRegression().fit(X,y)
                res.append(model.coef_[0][0])

            ratio_t = abs((res[0] - res[1])/np.mean(res))
            if ratio_t < 0.05:
                X['TriangleTrend'] = ratio_t# symmetrical
            else:
                val = np.sqrt(abs(res[1]**2 - res[0]**2))
                mid = np.mean(res)
                res = val#np.log(val)*np.sign(mid)
                X['TriangleTrend'] = res
                """
                if res <0:
                    print('It might keep descending after {}'.format(str(d)))
                else:
                    print("It is going up after {}".format(str(d)))     
                """   
            out = pd.concat([out, X])
        self.df  = pd.merge(self.df, out, how = 'left', on = ['date', 'symbol']).drop(columns = 'day_y')
        small = self.df.loc[~self.df.TriangleTrend.isnull()]
        fill = UnivariateSpline(small['day_x'].values, small['TriangleTrend'].values, s=1)
        xint = np.linspace(0, self.df['day_x'].values[-1] , len(self.df))
        yint = fill(xint)
        self.df.TriangleTrend = yint  
        self.df = self.df.rename(columns = {'day_x':'day'})
        return self.df

    def HeadAndShoulders(self):
        """ Predicts a bullish-to-bearish trend reversal """
        sliced_date = self.df.resample('3M', level = 'date').mean().index.values
        out = pd.DataFrame()
        for d in sliced_date:
            X = self.df.xs(slice(d - pd.Timedelta('93 days'), d), level = 'date', drop_level = False )
            patterns = self._findPatterns(df = X).fillna(method = 'ffill')
            if patterns.__len__() == 0:
                print('No patterns detected.')
                continue
            try:
                bull, bear = patterns.bull.iloc[-1], patterns.bear.iloc[-1]## pick the latest pattern as the feature value
                if bull[0] > bear[0]:
                    X['HS'] = 1
                else:
                    X['HS'] = -1
                
            except AttributeError:
                X['HS'] = 1 if patterns.columns[0] == 'bull' else -1# In case, there is only one pattern
            out = pd.concat([out,X])
        
        self.df = out.copy()
        return self.df
        

    
    def _findPatterns(self, df):
        a,b = self._get_max_min(df = df, MA = 3, window_range = 4)
        #
        max_min = pd.concat([a,b]).close.reset_index().set_index('date').sort_index()
        patterns = defaultdict(list)
        
        for i in range(5, len(max_min)):
            window = max_min.iloc[i-5: i]

            if (window.index[-1] - window.index[0]).days >100:
                print('buxing')
                continue## If the days difference is greater than 100 days, it might be too much, then we pass this one
            a,b,c,d,e= window.iloc[0].close,window.iloc[1].close,window.iloc[2].close,window.iloc[3].close,window.iloc[4].close

            if a<b and c<a and c<e and c<d and e<d and abs(b-d)<=np.mean([b,d])*0.02:
                patterns['IHS'].append((window.index[0], window.index[-1]))## this one is basically abandoned
            if (c<a or c<b) and (c<e or c<d) and (abs(a-e)/np.mean([a,e]) <= 0.02 or abs(a-d)/np.mean([a,d]) <= 0.02 or abs(b-e)/np.mean([b,e]) <= 0.02):
                #print('bull-->{}'.format([a,b,c,d,e]))
                patterns['bull'].append((window.index[0], window.index[-1]))
            if (c>a or c>b) and (c>e or c>d) and (abs(a-e)/np.mean([a,e]) <= 0.02 or abs(a-d)/np.mean([a,d]) <= 0.02 or abs(b-e)/np.mean([b,e]) <= 0.02):
                #print('bear-->{}'.format([a,b,c,d,e]))
                patterns['bear'].append((window.index[0], window.index[-1]))
        res = pd.DataFrame(dict([(k, pd.Series(v)) for k, v in patterns.items()]))
        return res

    def GetTargetVar(self, MA_window = 5):
        self.df = self.df.set_index('day')
        tarVarName = 'MA_' + str(MA_window) + '_close'
        f_1 = pd.Series(self.df['close'].values[::-1])##pick the close price and reverse for MA calculation
        f_2 = f_1.rolling(window = MA_window).mean()## calculate the future MA price
        f_3 = pd.DataFrame(f_2[::-1]).reset_index(drop = False)
        f_3.index = self.df.index
        self.df = pd.concat([self.df, f_3], axis = 1).drop(columns = ['index']).rename(columns = {0:tarVarName})## insert the target variables back to self.df
        self.df['LS_signal'] = self.df[tarVarName] - self.df.close
        self.model_df  = self.df.loc[~self.df.LS_signal.isnull()]
        self.model_df = self.model_df.dropna()
        return self.model_df

    def ModelSelection(self):
        y = self.model_df[['LS_signal']].apply(np.sign)
        features = self.model_df.columns[:-2]
        X = self.model_df[features]
        X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=0.2)
        print('training data has %d observation with %d features'% X_train.shape)
        print('test data has %d observation with %d features'% X_test.shape)
        print("=================Default Model Performance===========================================")
        classifier_logistic = LogisticRegression()
        classifier_KNN = KNeighborsClassifier()
        classifier_RF = RandomForestClassifier()
        model_names = ['Logistic Regression','KNN','Random Forest']
        model_list = [classifier_logistic, classifier_KNN, classifier_RF]
        res = []
        for classifier in model_list:
            cv_score = model_selection.cross_val_score(classifier, X_train, y_train, cv=5)
            res.append(cv_score.mean())
        for i in range(len(model_names)):
            print('Model accuracy of %s is: %.3f'%(model_names[i],res[i]))
        print("====================Grid Search for Better Models==========================================")
        def print_grid_search_metrics(gs,parameters):
            print ("Best score: %0.3f" % gs.best_score_)
            print ("Best parameters set:")
            best_parameters = gs.best_params_
            for param_name in sorted(parameters.keys()):
                print("\t%s: %r" % (param_name, best_parameters[param_name]))   
            
        parameters_lr  = {'penalty':('l1', 'l2', "elasticnet", "None"), 
                         'C':(1, 5, 10)}
        parameters_knn = {'n_neighbors':[3,5,7,10]}
        parameters_rf  = {'n_estimators' : [40,60,80]}
        parameters_list = [parameters_lr, parameters_knn, parameters_rf]
        bestModels = []

        for i in range(parameters_list.__len__()):
            print('Best {} Model-->'.format(model_names[i]))
            Grid = GridSearchCV(model_list[i], parameters_list[i], cv = 5).fit(X_train, y_train)
            print_grid_search_metrics(Grid, parameters_list[i])
            bestModels.append(Grid.best_estimator_)
            print('-----------------------------')
        def cal_evaluation(classifier, cm):
            tn = cm[0][0]
            fp = cm[0][1]
            fn = cm[1][0]
            tp = cm[1][1]
            accuracy  = (tp + tn) / (tp + fp + fn + tn + 0.0)
            precision = tp / (tp + fp + 0.0)
            recall = tp / (tp + fn + 0.0)
            print (classifier)
            print ("\tAccuracy is: %0.3f" % accuracy)
            print ("\tprecision is: %0.3f" % precision)
            print ("\trecall is: %0.3f\n" % recall)

        # print out confusion matrices
        def draw_confusion_matrices(confusion_matricies):
            class_names = ['Bull','Bear']
            classifier, cm = confusion_matrices    
            print(len(cm), len(cm[0]))     
            cal_evaluation(classifier, cm)
            fig = plt.figure()
            ax = fig.add_subplot(111)
            cax = ax.matshow(cm, interpolation='nearest',cmap=plt.get_cmap('Reds'))
            plt.title('Confusion matrix for %s' % classifier)
            fig.colorbar(cax)
            ax.set_xticklabels([''] + class_names)
            ax.set_yticklabels([''] + class_names)
            plt.xlabel('Predicted')
            plt.ylabel('True')
            plt.show()
        %matplotlib inline
        print("=================Confusion Matrix===========================================")
        for i in range(bestModels.__len__()):
            confusion_matrices = (model_names[i], confusion_matrix(y_test,bestModels[i].predict(X_test)))
            #print(confusion_matrices[0], confusion_matrices[1])
            draw_confusion_matrices(confusion_matrices)

"""
Model --> LR Regression, KNN, RF
"""

##3. 对真实数据检验Pipeline

In [0]:
symbols = ['AAPL','FB','MSFT','NFLX','GILD','MRNA','REGN']
for s in symbols:
    s = [s]
    test = PipeLine(None)
    origin = test.GetTiingoData(s)
    test.CustomMetrics()
    test.TriangleTrend()
    test.HeadAndShoulders()
    test.GetTargetVar(5)