In [None]:
import copy
import numpy as np


#Function to build techinical MA by using 'Close price' 
#Author Yansheng Zhu
#Time last edited: 14:40 June 29, 2020
#Last edited by: Yansheng Zhu
#input: portfolio dictionary, interval ('5' '10' '30' '60' for MA5 MA10...), path of daily data
#       new = True for returning a new portfolio deepcopy
#output: new portfolio dictionary, with updated feature MA

def build_MA(portfolio,interval='30',path='data/1d/', new = True,):
    
    if new == True:
        new_port = copy.deepcopy(portfolio)
    else:
        new_port=portfolio
    
    
    new_port = copy.deepcopy(portfolio)
    col_name='MA'+interval
    for ticker in new_port.keys():
        
        #calculate MA from an available month. 
        #if daily data from 2020-01-01, MA30 will be calculated from 2020-04-01 
        #to make sure there're enough days for MA
        starting=datetime.strptime(new_port[ticker]['Date'][0], '%Y-%m-%d')+relativedelta(months = (int(interval)//20+2))
        starting=starting.strftime('%Y-%m-%d')
        
        new_port[ticker][col_name]=0
        daily_hist=pd.read_csv(path+ticker+'.csv')
        months=new_port[ticker]['Date'][new_port[ticker]['Date']>=starting]
        
        for month in months:
            MA_end=daily_hist[daily_hist['Date']<month].iloc[-1].name
            MA_start=daily_hist[daily_hist['Date']<month].iloc[-int(interval)].name
            MA=daily_hist['Close'][MA_start:MA_end+1].mean()
            
            row_index=new_port[ticker][new_port[ticker]['Date']==month].index[0]
            new_port[ticker].loc[row_index,col_name]=MA
    return new_port


In [None]:
#Function to build techinical EMA and MACD by using 'Close price' 
#Author Yansheng Zhu
#Time last edited: 14:40 June 29, 2020
#Last edited by: Yansheng Zhu


#function to construct technical: MACD
#input: portfolio dictionary, path for storing daily data
#       new = True for returning a new portfolio deepcopy
#       accuracy means how much weight to be included, default 0.001 means 99.90% weight to be included 
#output: portfolio dictionary, with updated feature MACD


def build_MACD(portfolio, path = 'data/1d/', new = True, accuracy = 0.001):
    
    if new == True:
        new_port = copy.deepcopy(portfolio)
    else:
        new_port = portfolio
    
    col_name = 'MACD'
    for ticker in new_port.keys():
        print(ticker)
        #calculate EMA from an available month. 
        #if daily data from 2020-01-01, accuracy = 0.001, then 89.76=np.log(0.001)/np.log(25/27)
        #lagged terms will be required in calculation of EMA26, which will be calculated from 2020-07-01 
        #to make sure there're enough days for EMA26 (the longest period used in MACD calcculation)
        required_lag = np.ceil(np.log(accuracy) / np.log(25/27))
        
        starting = datetime.strptime(new_port[ticker]['Date'][0], '%Y-%m-%d') + relativedelta(months = (required_lag//20+2))
        starting = starting.strftime('%Y-%m-%d')
        
        new_port[ticker][col_name] = 0
        daily_hist = pd.read_csv(path+ticker+'.csv')
        months = new_port[ticker]['Date'][new_port[ticker]['Date']>=starting]
        
        for month in months:
            
            end = daily_hist[daily_hist['Date']<month].iloc[-1].name
            
            lag_26 = int(np.log(accuracy) / np.log(25/27) + 1)
            lag_12 = int(np.log(accuracy) / np.log(11/13) + 1)
            lag_9 = int(np.log(accuracy) / np.log(8/10) + 1)
            
            start_26 = daily_hist[daily_hist['Date'] < month].iloc[-lag_26].name
            start_12 =d aily_hist[daily_hist['Date'] < month].iloc[-lag_12].name
            start_9 = daily_hist[daily_hist['Date'] < month].iloc[-lag_9].name
            
            EMA26 = EMA_calculation(26, daily_hist['Close'][start_26:end+1], lag_26)
            EMA12 = EMA_calculation(12, daily_hist['Close'][start_12:end+1], lag_12)
            EMA9 = EMA_calculation(9, daily_hist['Close'][start_9:end+1], lag_9)
            
            row_index = new_port[ticker][new_port[ticker]['Date'] == month].index[0]
            new_port[ticker].loc[row_index,col_name]=224/51 * EMA9 - 16/3 * EMA12 + 16/17 * EMA26
    return new_port




#function to calculate EMA, used in MACD and EMA technicals construction
#input: interval, the EMA period length to be calculated (not the exact length, interval=30 doesnt mean use only 30 price to calculate)
#       prices, the prices used in calculation. Can be list or array
#       lag, the exact length needed in calculation. Determined by 'interval' and the accuracy       
#output: EMA
    
def EMA_calculation(interval, prices, lag):
    coefs = 2 / (interval+1) * np.array([ ((interval-1) / (interval+1))**i for i in range(0, lag)])
    coefs = coefs[::-1]
    EMA = np.dot(coefs, prices)
    return EMA





#function to construct technical: EMA
#input: portfolio dictionary, path for storing daily data
#       interval, a str to specify the EMA period length (not the exact length, interval=30 doesnt mean use only 30 price to calculate)
#       new = True for returning a new portfolio deepcopy
#       accuracy means how much weight to be included, default 0.001 means 99.90% weight to be included 
#output: portfolio dictionary, with updated feature EMA 

def build_EMA(portfolio, interval='30', path='data/1d/', new = True, accuracy = 0.001): 
    if new == True:
        new_port = copy.deepcopy(portfolio)
    else:
        new_port = portfolio
    
    col_name = 'EMA' + interval
    for ticker in new_port.keys():
            print(ticker)
            #calculate EMA from an available month. 
            #if daily data from 2010-01-01, accuracy = 0.001, interval=30, then 103.58=np.log(0.001)/np.log(29/31)
            #lagged terms will be required in calculation of EMA30, which will be calculated from 2010-08-01 
            #to make sure there're enough days for EMA30 
            required_lag = np.ceil(np.log(accuracy)/np.log((int(interval)-1)/(int(interval)+1)))
            
            starting = datetime.strptime(new_port[ticker]['Date'][0], '%Y-%m-%d') + relativedelta(months = (required_lag//20+2))
            starting = starting.strftime('%Y-%m-%d')
            
            new_port[ticker][col_name] = 0
            daily_hist = pd.read_csv(path + ticker + '.csv')
            months = new_port[ticker]['Date'][new_port[ticker]['Date'] >= starting]
            
            for month in months:
                
                end = daily_hist[daily_hist['Date'] < month].iloc[-1].name
                start = daily_hist[daily_hist['Date'] < month].iloc[-int(required_lag)].name
                
                EMA = EMA_calculation(int(interval), daily_hist['Close'][start:end+1], int(required_lag))
             
                row_index = new_port[ticker][new_port[ticker]['Date'] == month].index[0]
                new_port[ticker].loc[row_index,col_name] = EMA
    return new_port

In [None]:

#function to construct technical: RSI
#input: portfolio dictionary, path for storing daily data
#       interval, a str to specify the RSI period length (not the exact length, interval=30 doesnt mean use only 30 price to calculate)
#       new = True for returning a new portfolio deepcopy
#       accuracy means how much weight to be included, default 0.001 means 99.90% weight to be included 
#output: portfolio dictionary, with updated feature RSI


def build_RSI(portfolio, interval='14', path='data/1d/', new = True, accuracy=0.001): 
    if new == True:
        new_port = copy.deepcopy(portfolio)
    else:
        new_port=portfolio
    
    col_name='RSI'+interval
    for ticker in new_port.keys():
            print(ticker)
        
            required_lag= np.ceil(np.log(accuracy)/np.log(1-1/int(interval)))
    
            starting=datetime.strptime(new_port[ticker]['Date'][0], '%Y-%m-%d')+relativedelta(months = (required_lag//20+2))
            starting=starting.strftime('%Y-%m-%d')
            
            new_port[ticker][col_name]=0
            daily_hist=pd.read_csv(path+ticker+'.csv')
            months=new_port[ticker]['Date'][new_port[ticker]['Date']>=starting]
            
            alpha_1=1/int(interval)
            alpha_2=1-alpha_1
            
            for month in months:
                #
                end=daily_hist[daily_hist['Date']<month].iloc[-1].name
                start=daily_hist[daily_hist['Date']<month].iloc[-int(required_lag)].name - 1
                
                price_list=daily_hist['Close'][start:end+1].reset_index(drop=True)
                price_changes=(price_list-price_list.shift())[1:]
                
                U=[round(i,ndigits=2) if i > 0  else 0 for i in price_changes][::-1]
                D=[round(-i,ndigits=2) if i < 0 else 0 for i in price_changes][::-1]
                
                coefs=[alpha_1*(alpha_2** i ) for i in range(0,int(required_lag))]
                
                SMMA_U=np.dot(U,coefs)
                SMMA_D=np.dot(D,coefs)
                
                RSI=100*SMMA_U/(SMMA_U+SMMA_D)
             
                row_index=new_port[ticker][new_port[ticker]['Date']==month].index[0]
                new_port[ticker].loc[row_index,col_name]=RSI
    return new_port
