In [None]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.regression.rolling import RollingOLS
from functools import partial
import sys
import math
pd.options.display.float_format = "{:,.4f}".format
from sklearn.linear_model import LinearRegression
import warnings
warnings.filterwarnings("ignore")
import os
from scipy.stats import norm

In [None]:
#import data
df = pd.read_excel('/Users/yiningqu/Desktop/dfa_analysis_data.xlsx', sheet_name = 'factors').set_index('Date')
factors = df.drop(['RF'], axis=1)


#import data
raw_data = pd.read_excel('../data/momentum_data.xlsx',sheet_name = None)
sheets = list(raw_data.keys())

ff_factors = raw_data[sheets[1]].set_index('Date')
momentum = raw_data[sheets[2]].set_index('Date')
mom_deciles = raw_data[sheets[3]].set_index('Date')
tercile_port = raw_data[sheets[4]].set_index('Date')
rf = raw_data[sheets[5]].set_index('Date')

ff_factors['UMD'] = momentum['UMD']

In [None]:
def performance_summary(return_data, period = 12):
    """ 
        Returns the Performance Stats for given set of returns
        Inputs: 
            return_data - DataFrame with Date index and Monthly Returns for different assets/strategies.
        Output:
            summary_stats - DataFrame with annualized mean return, vol, sharpe ratio. Skewness, Excess Kurtosis, Var (0.5) and
                            CVaR (0.5) and drawdown based on monthly returns. 
    """
    summary_stats = return_data.mean().to_frame('Mean').apply(lambda x: x*period)
    summary_stats['Volatility'] = return_data.std().apply(lambda x: x*np.sqrt(period))
    summary_stats['Sharpe Ratio'] = summary_stats['Mean']/summary_stats['Volatility']
    summary_stats['Skewness'] = return_data.skew()
    summary_stats['Excess Kurtosis'] = return_data.kurtosis()
    summary_stats['VaR (0.05)'] = return_data.quantile(.05, axis = 0)
    summary_stats['CVaR (0.05)'] = return_data[return_data <= return_data.quantile(.05, axis = 0)].mean()
    summary_stats['Min'] = return_data.min()
    summary_stats['Max'] = return_data.max()
    
    wealth_index = 1000*(1+return_data).cumprod()
    previous_peaks = wealth_index.cummax()
    drawdowns = (wealth_index - previous_peaks)/previous_peaks

    summary_stats['Max Drawdown'] = drawdowns.min()
    summary_stats['Peak'] = [previous_peaks[col][:drawdowns[col].idxmin()].idxmax() for col in previous_peaks.columns]
    summary_stats['Bottom'] = drawdowns.idxmin()
    
    recovery_date = []
    for col in wealth_index.columns:
        prev_max = previous_peaks[col][:drawdowns[col].idxmin()].max()
        recovery_wealth = pd.DataFrame([wealth_index[col][drawdowns[col].idxmin():]]).T
        recovery_date.append(recovery_wealth[recovery_wealth[col] >= prev_max].index.min())
    summary_stats['Recovery'] = recovery_date
    
    return summary_stats

In [None]:
def regression_based_performance(factor,fund_ret,rf,constant = True):
    """ 
        Returns the Regression based performance Stats for given set of returns and factors
        Inputs:
            factor - Dataframe containing monthly returns of the regressors
            fund_ret - Dataframe containing monthly excess returns of the regressand fund
            rf - Monthly risk free rate of return
        Output:
            summary_stats - (Beta of regression, treynor ratio, information ratio, alpha). 
    """
    if constant:
        X = sm.tools.add_constant(factor)
    else:
        X = factor
    y=fund_ret
    model = sm.OLS(y,X,missing='drop').fit()
    
    if constant:
        beta = model.params[1:]
        alpha = round(float(model.params['const']),6) *12

        
    else:
        beta = model.params
    treynor_ratio = ((fund_ret - rf).mean()*12)/beta[0]
    tracking_error = (model.resid.std()*np.sqrt(12))
    if constant:        
        information_ratio = model.params[0]*12/tracking_error
    r_squared = model.rsquared
    if constant:
        return (beta,treynor_ratio,information_ratio,alpha,r_squared,tracking_error,model.resid)
    else:
        return (beta,treynor_ratio,r_squared,tracking_error,model.resid)

In [None]:
#only one factor time series regression
def time_series_regression(portfolio, factors, FF3F = False, resid = False, scale =12):
    '''
    Input portfolio (columns of funds to be regressed), and factors.
    This function returns the portfolio columns as index, and regression estimates as columns
    If factors contain Fama-French 3 Factors, (Size and Value), then includes those betas.
    If resid = True, then we return the residual of each column in portfolio indexed by time
    '''
    
    report = pd.DataFrame(index = portfolio.columns)
    residual = pd.DataFrame(columns= portfolio.columns)

    for col in portfolio.columns:
        fund_ret = portfolio[col]
        model = sm.OLS(fund_ret, sm.add_constant(factors), missing= 'drop').fit()
        report.loc[col, 'Alpha'] = model.params['const'] * scale
        report.loc[col, 'Market Beta'] = model.params[1]
        if FF3F:
            report.loc[col, 'Size Beta'] = model.params[2]
            report.loc[col, 'Value Beta'] = model.params[3]
        report.loc[col, 'Information Ratio'] = np.sqrt(scale) * model.params['const'] / model.resid.std()
        report.loc[col, 'Treynor Ratio'] = scale * portfolio[col].mean() / model.params[1]
        report.loc[col, 'R-Squared'] = model.rsquared
        report.loc[col, 'Tracking Error'] = (model.resid.std()*np.sqrt(scale))  
        if resid:
            residual[col] = model.resid
    if resid:
        return residual
    return report

In [None]:
#multiple factor time series regression
def time_series_regression(portfolio, factors, resid = False, scale =12):
    '''
    Input portfolio (columns of funds to be regressed), and factors.
    This function returns the portfolio columns as index, and regression estimates as columns
    Function includes betas for anything included in factors
    If resid = True, then we return the residual of each column in portfolio indexed by time
    '''
    
    report = pd.DataFrame(index = portfolio.columns)
    residual = pd.DataFrame(columns= portfolio.columns)

    for col in portfolio.columns:
        fund_ret = portfolio[col]
        model = sm.OLS(fund_ret, sm.add_constant(factors), missing= 'drop').fit()
        report.loc[col, 'Mean Return'] = scale * fund_ret.mean()
        report.loc[col, 'Sharpe Ratio'] = np.sqrt(scale) * fund_ret.mean() / fund_ret.std()
        report.loc[col, 'Alpha'] = model.params['const'] * scale
        for i, factor in enumerate(factors.columns):
            report.loc[col, f'{factor} Beta'] = model.params[i + 1]
        report.loc[col, 'Information Ratio'] = np.sqrt(scale) * model.params['const'] / model.resid.std()
        report.loc[col, 'Treynor Ratio'] = scale * portfolio[col].mean() / model.params[1]
        report.loc[col, 'R-Squared'] = model.rsquared
        report.loc[col, 'Tracking Error'] = (model.resid.std()*np.sqrt(scale))  
        if resid:
            residual[col] = model.resid
    if resid:
        return residual
    return report

In [None]:
#cross sectional test:
portfolio = portfolio_ret.mean().to_frame('Mean Portfolio excess returns')
cs_CAPM = time_series_regression(portfolio, ts_CAPM['Market Beta'])

In [None]:
#tangent weight
def tangency_portfolio_rfr(asset_return,cov_matrix):
    """ 
        Returns the tangency portfolio weights in a (1 x n) vector when a riskless assset is available
        Inputs: 
            asset_return - Excess return over the risk free rate for each asset (n x 1) Vector
            cov_matrix = nxn covariance matrix for the assets
    """
    asset_cov = np.array(cov_matrix)
    inverted_cov= np.linalg.inv(asset_cov)
    one_vector = np.ones(len(cov_matrix.index))
    
    den = (one_vector @ inverted_cov) @ (asset_return)
    num =  inverted_cov @ asset_return
    return (1/den) * num

In [None]:
#Tangency portfolio instruction
mean_ret = factor_summary['Mean']
tan_weights = tangency_portfolio_rfr(mean_ret, factors.cov())
tan_weights = pd.DataFrame(tan_weights, index = factors.columns, columns = ['Tangent Weights'])

#put mean and tangent weights together 
tan_mean_weights = pd.concat([tan_weights.T, factor_summary[['Mean']].T]).T
tan_mean_weights

In [None]:
#Mean Absolute Value
def calculate_mae(regression_result, scale = 12):
    return round(abs(regression_result['Alpha']).mean()*scale, 6)

In [None]:
#Mean Absolute Value Instruction
mae_df = pd.DataFrame({
    'Model': ['CAPM', 'FF3', 'FF5', 'AQR'],
    'MAE': [calculate_mae(CAPM_regression), calculate_mae(FF3_regression), calculate_mae(FF5_regression), calculate_mae(AQR_regression)]
})

In [None]:
def process_cs_regression(mean_data, beta_data, model_name):
    def perform_cs_regression(mean_data, beta_data, model_name):
        CS_result = sm.OLS(mean_data, beta_data, missing='drop').fit()
        CS_MAE = abs(CS_result.resid).mean()
        CS_premia = CS_result.params.to_frame(f'{model_name} Premia')
        return CS_result, CS_MAE, CS_premia
    CS_result, CS_MAE, premia = perform_cs_regression(mean_data, beta_data, model_name)
    premia.rename(index=lambda x: x.replace(f' {model_name} Beta', f' {model_name}'), inplace=True)
    return CS_result, CS_MAE, premia

In [None]:
#premia 就是beta的值（就是mean是左边，右边是之前算出来的几个factor的beta值，用beta的值作为regressor来算beta --> premia
#用在time series里面每个factor的mean和在cross sectional test里面的premia（beta的值）进行比较，接近说明好
AQR_CS, AQR_CS_MAE, AQR_CS_premia = process_data_regression(AQR_mean, AQR_beta, 'AQR')
FF3_CS, FF3_CS_MAE, FF3_CS_premia = process_data_regression(FF3_mean, FF3_beta, 'FF3')
FF5_CS, FF5_CS_MAE, FF5_CS_premia = process_data_regression(FF5_mean, FF5_beta, 'FF5')

cs_premia = pd.concat([mean_ex_ret, AQR_CS_premia, FF3_CS_premia, FF5_CS_premia], axis = 1).fillna('')
display(cs_premia)


cs_mae = pd.Series([AQR_CS_MAE, FF3_CS_MAE, FF5_CS_MAE], index = ['AQR', 'FF3', 'FF5'])
mae_ts = mae_df.loc[['AQR', 'FF3', 'FF5']]
mae = pd.concat([mae_ts, cs_mae], axis = 1)
mae.columns = ['Time Series MAE', 'Cross Section MAE']
display(mae)

In [None]:
#create different time horizon performance_summary
sub_1980 = factors.loc[:'1980']
sub_2001 = factors.loc['1981':'2001']
sub_2023 = factors.loc['2002':]

df_dict={'1926-1980' : sub_1980,
         '1981-2001' : sub_2001,
         '2002-2023' : sub_2023}

summary_lst = []
for key in df_dict.keys():
    summary_stats = performance_summary(df_dict[key], period = 12).loc[:,['Mean','Volatility','Sharpe Ratio','VaR (0.05)']]
    summary_stats['Period'] = key
    summary_stats= summary_stats.reset_index().rename(columns = {'index':'Factor'}).set_index(['Period','Factor'])
    summary_lst.append(summary_stats)

factor_summary = pd.concat(summary_lst)
factor_summary

In [None]:
#heatmap
factor_corr = factors.corr()

plt.figure(figsize=(16, 6))
heatmap = sns.heatmap(factor_corr, vmin=0, vmax=1, annot=True)
heatmap.set_title('Factors Correlation Heatmap', fontdict={'fontsize':12}, pad=12)

In [None]:
#three heatmap together 
fig, (ax,ax2,ax3) = plt.subplots(ncols=3)

# fig.subplots_adjust(wspace=0.01)
# fig.tight_layout(pad=1)
sns.heatmap(sub_1980.corr(), ax=ax, cbar=False, annot = True).set_title('1926 - 1980', fontdict={'fontsize':12}, pad=12)

fig.colorbar(ax.collections[0], ax=ax,location="left", use_gridspec=False, pad=0.2)
sns.heatmap(sub_2001.corr(), ax=ax2, cbar=False, annot = True).set_title('1981 - 2001', fontdict={'fontsize':12}, pad=12)

fig.colorbar(ax.collections[0],ax=ax2,location="right", use_gridspec=False, pad=0.2)

sns.heatmap(sub_2023.corr(), ax=ax3, cbar=False, annot = True).set_title('2002 - 2023', fontdict={'fontsize':12}, pad=12)
fig.colorbar(ax.collections[0],ax=ax3,location="right", use_gridspec=False, pad=0.2)

fig.set_figwidth(15)
plt.show()

In [None]:
#将时期前和后的值放在一起比较
pre_post = pre_case.join(post_case, lsuffix=' Pre', rsuffix=' Post')
display(pre_post)

In [None]:
# Plot cumulative returns of factors, add vertical line at 2015
fig, ax = plt.subplots(figsize=(8, 5))
cum_rets = (1 + factors).cumprod() - 1
cum_rets.plot(ax=ax)
ax.axvline('2015', color='k', linestyle='--')
ax.set_title('Cumulative Returns of Factors')
ax.set_ylabel('Cumulative Return')
ax.set_xlabel('Date');



plt.plot((1 + ports['1994':]).cumprod(), label = ports.columns)
plt.title('Cumulative Return since 1994')
plt.legend()
plt.show()

In [None]:
#cumulative return figures
figure = ((factors + 1).cumprod()).plot()
# plt.figure(figsize=(100, 6))
plt.title('Cumulative Returns')
plt.show()

In [None]:
#two cumulative return figures
fig, (ax1, ax2) = plt.subplots(1, 2)
fig.suptitle('Sub Period Cumulative Returns')
ax1.plot(((sub_2001 + 1).cumprod()*100)-100)
ax2.plot(((sub_2023 + 1).cumprod()*100)-100)

fig.set_figwidth(15)
ax1.legend(sub_2001.columns)
ax2.legend(sub_2023.columns)

ax1.title.set_text('1981-2001')
ax2.title.set_text('2002-2023')

In [None]:
#subtract one values from the total dataframe
portfolio_ret = portfolio.sub(risk_free.values).loc['1981':]

In [None]:
#scatterplot
plt.scatter(portfolio_summary.loc[:,['Volatility']],portfolio_summary.loc[:,['Mean']])
plt.xlabel("Volatility")
plt.ylabel("Mean Excess Returns")