# part 1. load data

In [2]:
import numpy as np
import pandas as pd
import csv
import datetime
from dateutil.relativedelta import relativedelta
from scipy.stats import norm
from sklearn import base
from sklearn.linear_model import LinearRegression 
from sklearn.pipeline import Pipeline
import seaborn

In [3]:
def processIndex(df):
    try:
        df.index=pd.to_datetime(df.index)
    except TypeError:
        raise TypeError('Incorrect index')
    if df.index[0]<datetime.date(2000,1,1):
        raise ValueError('Index not in correct range')
    return df

In [4]:
# load all files:
path1='/Users/yingzhang/Documents/project/Poly_models/ETF_daily.csv'
path2='/Users/yingzhang/Documents/project/Poly_models/ETF_weekly.csv'
path3='/Users/yingzhang/Documents/project/Poly_models/ETF_biweekly.csv'
path4='/Users/yingzhang/Documents/project/Poly_models/ETF_monthly.csv'
ETF_daily=processIndex(pd.read_csv(path1, index_col=0))
ETF_weekly=processIndex(pd.read_csv(path2, index_col=0))
ETF_biweekly=processIndex(pd.read_csv(path3, index_col=0))
ETF_monthly=processIndex(pd.read_csv(path4, index_col=0))

In [5]:
# load all files:
path_1='/Users/yingzhang/Documents/project/Poly_models/stock_daily.csv'
path_2='/Users/yingzhang/Documents/project/Poly_models/stock_weekly.csv'
path_3='/Users/yingzhang/Documents/project/Poly_models/stock_biweekly.csv'
path_4='/Users/yingzhang/Documents/project/Poly_models/stock_monthly.csv'
stock_daily=processIndex(pd.read_csv(path_1, index_col=0))
stock_weekly=processIndex(pd.read_csv(path_2, index_col=0))
stock_biweekly=processIndex(pd.read_csv(path_3, index_col=0))
stock_monthly=processIndex(pd.read_csv(path_4, index_col=0))

In [6]:
# load stock prices for sharpe ratio:
file_path='/Users/yingzhang/Documents/project/Poly_models/stocks_adjclose_855_fixed.csv'
stock_price=processIndex(pd.read_csv(file_path,index_col=0))

### Set up parameters

In [7]:
ETF_data_list=[ETF_daily, ETF_weekly, ETF_biweekly, ETF_monthly]
stock_data_list=[stock_daily, stock_weekly, stock_biweekly, stock_monthly]

In [8]:
month_list_for_rolling_2years=[datetime.date(i,j,1) for i in range(2019,2021) for j in range(1,13)]+[datetime.date(2021,1,1)]

In [9]:
firstday=stock_daily.index[0]

In [10]:
dateformat='%Y-%m-%d'

In [11]:
# month_list_for_rolling=[datetime.date(i,j,1).strftime(dateformat) for i in range(2011,2021) for j in range(1,13)]

# part 2. get return list for specific time period

In [12]:
def submonth(end_date, n_of_month=12):
    return end_date-relativedelta(months=n_of_month)

In [13]:
def addmonth(start_date, n_of_month=12):
    return start_date+relativedelta(months=n_of_month)

In [14]:
def selectMonth(start_date, end_date, dataframe):
# # dataframe should have index as "2021-01-01"
#     dateformat='%Y-%m-%d'
    start=start_date.strftime(dateformat)
    end=end_date.strftime(dateformat)
    return dataframe.loc[(dataframe.index>=start) & (dataframe.index<end)]

In [15]:
def returnList(start_date,end_date,list_of_dataframe):
    '''
    getting the required month from a list of dateframe and concatenate them in one dataFrame
    Eg: put together all selected months data of daily, weekly, biweekly, monthly return
    
    '''
    alist=[]
    for df in list_of_dataframe:
        alist.append(selectMonth(start_date, end_date, df))
    return pd.concat(alist)

In [16]:
def cleanUp(dataframe):
    null_cols=dataframe.columns[dataframe.isnull().any()]
    return dataframe.drop(null_cols,axis=1)

# part 3. percentiles, basefunctions, etc.

In [17]:
def pctlList(dataframe,pctl=['50','25','75']):
    '''gives the percentiles of the dateframe of those pctl'''
    df=pd.DataFrame(columns=dataframe.columns) 
    for k in pctl:
        if k=='1':
            df.loc[k+'_percentile']=[pareto_CVaR(dataframe[i],0.025) for i in dataframe.columns]
        elif k=='99':
            df.loc[k+'_percentile']=[pareto_CVaR(dataframe[i],0.975) for i in dataframe.columns]
        else:
            df.loc[k+'_percentile']=[np.percentile(dataframe[i].dropna(),int(k)) for i in dataframe.columns]
    return df

In [18]:
def basefunc(df,dataframe=None):
    '''getting the basefunc function of two dataframe.
       df is used to get the pctl, dataframe is the X.
       function pctlList required!
       drop columns with null value
       output of this function is a dataframe with each col containing tuple (1, phi1, phi2, phi3, phi4)
    '''
    if dataframe is None:
        dataframe=df
    
    df_pctl=pctlList(df) # calculate percentiles
    
    # calculate base functions
    phi1=dataframe
    phi2=abs(dataframe-df_pctl.loc['50_percentile'])
    phi3=1/2*(abs(dataframe-df_pctl.loc['75_percentile'])
         -abs(dataframe-df_pctl.loc['25_percentile'])
         -df_pctl.loc['75_percentile']-df_pctl.loc['25_percentile'])+dataframe
    phi4=1/2*(abs(dataframe-df_pctl.loc['75_percentile'])
         +abs(dataframe-df_pctl.loc['25_percentile'])
         -df_pctl.loc['75_percentile']+df_pctl.loc['25_percentile'])
    
    # put base funcs in list and form a new df
    base_df=pd.DataFrame(columns=dataframe.columns,index=dataframe.index)
    for col in dataframe.columns:
        base_df[col]=list(zip(list(phi1[col]),list(phi2[col]),list(phi3[col]),list(phi4[col])))
    return base_df

In [19]:
class basefuncTransformer(base.BaseEstimator, base.TransformerMixin): 
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        return basefunc(X) 

In [20]:
class MLRestimator(base.BaseEstimator, base.RegressorMixin):
    
    def __init__(self, estimator_factory):
        # estimator_factory can be called to produce estimators
        self.estimator_factory=estimator_factory       
    
    def fit(self, X, y):
        # Create an estimator and fit it with the portion in each group
        self.estimators_={}
        for column in X.columns:
            if np.ndim(list(X[column]))>1:
                temp=list(X[column])
            else:
                temp=np.transpose([list(X[column])])
            self.estimators_[column]=self.estimator_factory().fit(temp,list(y))
        return self

    def predict(self, X):
        # Call the appropriate predict method for each row of X
        result=pd.DataFrame(columns=X.columns,index=X.index)
        for column in X.columns:
            temp=self.estimators_[column].predict(list(X[column]))
            result[column]=temp
        return result    

    def residuals(self, X, y):
        result=self.predict(X)
        return result.sub(y,axis='index')

In [21]:
def pareto_CVaR(dataserie, qtl, prop=0.1, NumSigma=0, skip=2, origin=0):
    dataserie=dataserie.dropna()
    ds_sorted=list(dataserie.sort_values())

    if qtl<0.5:
        side=-1
    elif qtl>0.5:
        side=1
        qtl=1-qtl
    else:
        return np.median(df_sorted)    

#     if prop==None:
#         prop=qtl
    
    n=len(dataserie)
    nprop=int(round(n*prop))
    
#     if origin==None:
#         origin=0
#     else:
#         origin=np.median(df_sorted)
         
    if side==-1:
        tail=np.array(ds_sorted[skip:nprop+skip])
    elif side==1:
        tail=np.array(ds_sorted[n-nprop-skip:n-skip][::-1]) # need to be decreasing for same y_label
       
    log_tail=np.log(abs(tail-origin))
    y_label=[-np.log(i+1+skip)-(n-(i+1+skip))/2/n for i in range(nprop)]

    x_data=np.transpose([log_tail])
    lr=LinearRegression()
    lr.fit(x_data, y_label)
        
    hill11=lr.coef_[0]
    hill21=np.exp(-(lr.intercept_+np.log(n))/hill11)
    hill31=origin
    
    ParetoCVaR= hill31 + side * hill21 * hill11 / (hill11 - 1) * qtl ** (-1 / hill11)
    return ParetoCVaR

# part 4.  cross validation

In [22]:
def firstdateHandle(date):
    if date.day<5:
        d=datetime.date(date.year,date.month,1)
    elif (date.month!=2 and date.day>27) or (date.month==2 and date.day>25):
        d=date
    else:
        raise ValueError('ValueError: dataframe start date incorrect')    
    return d

def lastdateHandle(date):
    if date.day<5:
        d=date
    elif (date.month!=2 and date.day>27) or (date.month==2 and date.day>25):
        tempdate=addmonth(date,1)
        d=datetime.date(tempdate.year,tempdate.month,1)   
    else:
        raise ValueError('ValueError: dataframe last date incorrect')   
    return d

In [23]:
lastdateHandle(datetime.date(2021,6,28))

datetime.date(2021, 7, 1)

In [24]:
end=month_list_for_rolling_2years[2]

In [25]:
start=submonth(end)

In [26]:
ETF_data=cleanUp(returnList(start,end,ETF_data_list))

In [27]:
ETF_data.index.sort_values()

DatetimeIndex(['2018-03-01', '2018-03-02', '2018-03-02', '2018-03-02',
               '2018-03-05', '2018-03-06', '2018-03-07', '2018-03-08',
               '2018-03-09', '2018-03-09',
               ...
               '2019-02-20', '2019-02-21', '2019-02-22', '2019-02-22',
               '2019-02-22', '2019-02-25', '2019-02-26', '2019-02-27',
               '2019-02-28', '2019-02-28'],
              dtype='datetime64[ns]', length=367, freq=None)

In [28]:
def dltMonthSplit(n, dataframe):
    '''
    remove rows of the nth month in the dataframe
    n should be within the range of n_of_month as in func selectMonth
    Output should be two dataframes:
    1. removed month sub_df and 2. leftover_df
    '''
    
    first_index=dataframe.index.sort_values()[0] # '2011-02-01'
    last_index=dataframe.index.sort_values()[-1] # '2012-01-31'
    first_date=firstdateHandle(first_index) # datetime.datetime(2011, 2, 1, 0, 0)
    last_date=lastdateHandle(last_index)
    
    # here n_of_month is 12 between ('2011-02-01', '2012-01-31')
    n_of_month=(last_date.year-first_date.year)*12+(last_date.month-first_date.month)
    first=datetime.date(first_date.year,first_date.month,1)
    month_list=[addmonth(first,i) for i in range(n_of_month)]
    
    if n>(n_of_month) or n<1:
        raise ValueError('nth month selected is out of range of the total number of months in dataframe') 
    elif isinstance(n,(float,str,bool)):
        raise TypeError('data type wrong for n')
    else:
        start=month_list[n-1]
        end=addmonth(start,1).strftime(dateformat)
        start=start.strftime(dateformat)
        dltMonth_df=dataframe.iloc[(dataframe.index>=start) & (dataframe.index<end)]
        lftMonth_df=dataframe.iloc[(dataframe.index<start) | (dataframe.index>=end)]
    return [dltMonth_df,lftMonth_df]

In [29]:
def CrossValidation(basefunc_df, stock_list, number_of_month=12, estimator=LinearRegression): 
    '''
    number_of_month=12 for 1 year data
    factor_df should be the base_func.
    '''
    lre=MLRestimator(estimator)
    residuals=pd.DataFrame(columns=basefunc_df.columns)
    
    for i in range(1,number_of_month+1):
        [X_dlt_data,X_lft_data]=dltMonthSplit(i,basefunc_df)
        [y_dlt_data,y_lft_data]=dltMonthSplit(i,stock_list)
        lre.fit(X_lft_data,y_lft_data)
        pred=lre.predict(X_dlt_data)
        resSqr=pred.sub(y_dlt_data,axis='index')**2
        residuals.loc[str(i)+'_month_removed']=resSqr.sum()
    
    stock_sumsqr=sum(stock_list**2)
    temp=stock_sumsqr/residuals.sum()-1
    temp[temp<0]=0
    scores=np.sqrt(temp)
    return scores

In [30]:
def factorSelect(scores, basefunc):
    max_col=scores.idxmax()
    max_score=max(scores)
    scores_selected=scores[scores>max(scores)/2]
    return scores_selected.index

In [31]:
class CVSelectTransformer(base.BaseEstimator, base.TransformerMixin):
    def __init__(self, y, estimator=LinearRegression):
        self.estimator=estimator
        self.y=y
    
    def fit(self, X, y=None):
        return self

    def _all_scores_(self, X):
        return CrossValidation(X, self.y, 12, self.estimator)

    def _selected_columns_(self, X):
        scores=self._all_scores_(X)
        return factorSelect(scores, X)

    def transform(self, X):
        return X[self._selected_columns_(X)]
     
    def scores_(self, X):
        scores=self._all_scores_(X)
        columns=self._selected_columns_(X)
        return scores[columns]

In [32]:
class PolyModelEstimator(base.BaseEstimator, base.RegressorMixin):
    
    def __init__(self, y, bsf=basefuncTransformer, fs=CVSelectTransformer, es=MLRestimator):
        self.y=y
        self.bsf=bsf()
        self.fs=fs(self.y)
        self.es=es(LinearRegression)
        self.base=pd.DataFrame()
        self.X_data=pd.DataFrame()
        
    def fit(self, X):
        self.base=self.bsf.fit_transform(X)
        self.X_data=self.fs.fit_transform(self.base)
        self.es=self.es.fit(self.X_data, self.y)
        return self
    
    def scores_(self, X):
        return self.fs.scores_(self.base)
    
    def predict(self, X):
        base=self.bsf.fit_transform(X)
        X_data=self.fs.fit_transform(base)
        return self.es.predict(X_data)
    
    def residuals(self, X):
        y_pred=self.predict(X)
        return y_pred.sub(self.y, axis='index')      

<span style="color:RED">PAY ATTENTION: parameter of PolyModelEstimator has to be the same as the y put into fit!</span>.

# part 5. stress VaR and long-term expectations

In [33]:
def indicators(df,df_long,y):
    pme=PolyModelEstimator(y)
    pme.fit(df)
    scores=pme.scores_(df)
    residuals=pme.residuals(df)
    estimators=pme.es.estimators_
    selected_col=list(estimators.keys())
    
    if selected_col==[]:
        return y.name
    
    df=df[selected_col]
    df_long=df_long[selected_col]
    
    pctl_long=['1']+[str(i) for i in range(5,100,5)]+['99']
    df_long_pctl=pctlList(df_long, pctl_long)
    basefunc_longterm=basefunc(df, df_long_pctl)
    
    temp=pd.DataFrame(index=df_long_pctl.index)
    for i in selected_col:
        temp[i]=np.matmul(np.array(list(basefunc_longterm[i])),estimators[i].coef_)+estimators[i].intercept_
    
    weight=[0.025]+[0.05 for _ in range(5,100,5)]+[0.025]
    miu=norm.ppf(.99)
    ltExpect=pd.Series(np.matmul(weight, temp.values),index=selected_col)
    temp=pd.DataFrame.min(temp)
    stressVaR=np.sqrt(temp**2+miu**2*np.var(residuals))
    max_sVaR=max(stressVaR)
    
    # longterm expectation
    L=sum(scores*ltExpect)/sum(scores)
    kappa=0.01
    # utility
    U=L-kappa*max_sVaR
    
    return [L, max_sVaR, U]

# part 6. for all stock_data

1. run for two years: 12-31-2018 to 12-31-2020
2. change the selectdate
3. long_term_data start from 1-3-2011 ends the same date as ETF_data
4. thinking about the pyspark distribution
5. thinking about keeping basefunc for all stocks
6. performance

In [34]:
end=month_list_for_rolling_2years[0]

In [35]:
start=submonth(end)

In [36]:
start

datetime.date(2018, 1, 1)

In [37]:
start_for_sp=stock_price[stock_price.index<=start.strftime(dateformat)].index[-1]
end_for_sp=stock_price[stock_price.index<=end.strftime(dateformat)].index[-1]

In [38]:
end_for_sp

Timestamp('2018-12-31 00:00:00')

In [39]:
stock_before=stock_price.loc[start_for_sp]
stock_now=stock_price.loc[end_for_sp]

In [43]:
def mainfunc(ETF_data_list, stock_data_list, stock_price, month_list_for_rolling, firstday=firstday):
    for i in month_list_for_rolling:
        end_date=i
        start_date=submonth(end_date)
        
        fields=['Stock','L', 'Max_sVaR', 'U','Sharpe','Stock_price_1year_before',
                'Stock_price_now','Stock_r','Std_of_returns']
        filename='indicators'+end_date.strftime(dateformat)+'.csv'
        filename_zeroscores='zero_scores_stocks.csv'
        
        ETF_data=cleanUp(returnList(start_date,end_date,ETF_data_list))
        ETF_long_term_data=returnList(firstday,end_date,ETF_data_list)
        stock_data=cleanUp(returnList(start_date,end_date,stock_data_list))
        stock_daily=cleanUp(selectMonth(start_date,end_date,stock_data_list[0]))
        
        #choose the last date for the split of start point to ensure we are using the previous price for S(t-1year)
        start_for_sp=stock_price[stock_price.index<=start_date.strftime(dateformat)].index[-1]
        end_for_sp=stock_price[stock_price.index<=end_date.strftime(dateformat)].index[-1]
        stock_before=stock_price.loc[start_for_sp]
        stock_now=stock_price.loc[end_for_sp]
        stock_r=stock_price.loc[end_for_sp]/stock_price.loc[start_for_sp]-1
        
        mydict=[]
        for stock in stock_data.columns:
            sp_before=stock_before[stock]
            sp_now=stock_now[stock]
            r=stock_r[stock]
            sigma=stock_daily[stock].std()
            sharpe=r/sigma
            temp=indicators(ETF_data, ETF_long_term_data, stock_data[stock])
            
            if isinstance(temp,str):
                with open(filename_zeroscores,'a') as f:
                    writer = csv.DictWriter(f,fieldnames=['end_date', 'stock'])
                    writer.writerows([{'end_date':end_date.strftime(dateformat), 'stock':temp}])
            else:
                [L,max_sVaR,U]=temp
                row={'Stock':stock,'L':L,'Max_sVaR':max_sVaR,'U':U,'Sharpe':sharpe,
                     'Stock_price_1year_before':sp_before,'Stock_price_now':sp_now,'Stock_r':r,
                     'Std_of_returns':sigma}
                mydict.append(row)
#                 print(row)
        
        with open(filename, 'w') as csvfile: 
            writer = csv.DictWriter(csvfile, fieldnames = fields) 
            writer.writeheader() 
            writer.writerows(mydict)

In [44]:
month_list_for_rolling_2years[7:]

[datetime.date(2019, 8, 1),
 datetime.date(2019, 9, 1),
 datetime.date(2019, 10, 1),
 datetime.date(2019, 11, 1),
 datetime.date(2019, 12, 1),
 datetime.date(2020, 1, 1),
 datetime.date(2020, 2, 1),
 datetime.date(2020, 3, 1),
 datetime.date(2020, 4, 1),
 datetime.date(2020, 5, 1),
 datetime.date(2020, 6, 1),
 datetime.date(2020, 7, 1),
 datetime.date(2020, 8, 1),
 datetime.date(2020, 9, 1),
 datetime.date(2020, 10, 1),
 datetime.date(2020, 11, 1),
 datetime.date(2020, 12, 1),
 datetime.date(2021, 1, 1)]

In [46]:
import time

# starting time
start_time = time.time()

# program body starts
mainfunc(ETF_data_list, stock_data_list, stock_price, month_list_for_rolling_2years[7:])

# sleeping for 1 sec to get 10 sec runtime
# time.sleep(1)

# program body ends

# end time
end_time = time.time()

# total time taken
print(f"Runtime of the program is {end_time - start_time}")

Runtime of the program is 106557.55431509018


In [97]:
5433/60/60

1.5091666666666665