<left>FINM 33150 - Quantitative Trading Strategies</left>
<left>Winter 2023</left>
<br>
<h1><center> Homework 6: Predictive Regression </center></h1>
<center>Due - 23:00 [CST] February 17, 2023</center>
<br>
<h3>Ki Hyun</h3>
<h3>Student ID: 12125881</h3>

<h5> Imports </h5>

In [1]:
%matplotlib inline

In [2]:
import os
from datetime import datetime, timedelta
import quandl
import pandas as pd
import numpy as np
from numpy.linalg import inv
from statsmodels.regression import linear_model as lm

<h5> Constants </h5>

In [3]:
Prices_table = 'QUOTEMEDIA/PRICES'
CDS_table = os.path.join(r'C:\Users\kwhyu\OneDrive - The University of Chicago\2023-1 Winter\FINM 33150\FINM-33150-W23',
                         r'Data\CDS\Liq5YCDS.delim')
K = 16

<h5> Helper Functions </h5>

In [4]:
"""
This code was given by Dr. Boonstra, B., Ph.D. for
University of Chicago FINM 33150 Quandl Options Data Fetching guidelines
"""
def grab_quandl_table(
    table_path,
    avoid_download=False,
    replace_existing=False,
    date_override=None,
    allow_old_file=False,
    **kwargs,
):
    if os.environ['OS'][0:7] == "Windows":
        root_data_dir = os.path.join(os.environ['HOMEPATH'], 'Quandl Data')
    else:
        root_data_dir = os.path.join(os.environ["HOME"], 'Quandl Data')
    data_symlink = os.path.join(root_data_dir, f"{table_path}_latest.zip")
    if avoid_download and os.path.exists(data_symlink):
        print(f"Skipping any possible download of {table_path}")
        return data_symlink

    table_dir = os.path.dirname(data_symlink)
    if not os.path.isdir(table_dir):
        print(f'Creating new data dir {table_dir}')
        os.mkdir(table_dir)

    if date_override is None:
        my_date = datetime.now().strftime("%Y%m%d")
    else:
        my_date = date_override
    data_file = os.path.join(root_data_dir, f"{table_path}_{my_date}.zip")

    if os.path.exists(data_file):
        file_size = os.stat(data_file).st_size
        if replace_existing or not file_size > 0:
            print(f"Removing old file {data_file} size {file_size}")
        else:
            print(
                f"Data file {data_file} size {file_size} exists already, no need to download"
            )
            return data_file

    dl = quandl.export_table(
        table_path, filename = data_file, api_key = 'JbMPn9bSpFPNS7Z7PcZy', **kwargs
    )
    file_size = os.stat(data_file).st_size
    if os.path.exists(data_file) and file_size > 0:
        print(f"Download finished: {file_size} bytes")
        if not date_override:
            try:
                if os.path.exists(data_symlink):
                    print(f"Removing old symlink")
                    os.unlink(data_symlink)
                print(f"Creating symlink: {data_file} -> {data_symlink}")
                os.symlink(
                    data_file, data_symlink,
                )
            except:
                print(f"Symlink Creation Permission Denied")
                data_symlink = data_file
    else:
        print(f"Data file {data_file} failed download")
        return
    return data_symlink if (date_override is None or allow_old_file) else "NoFileAvailable"

In [5]:
"""
This code was given by Dr. Boonstra, B., Ph.D. for
University of Chicago FINM 33150 Quandl Options Data Fetching guidelines
"""
def fetch_quandl_table(table_path, avoid_download=True, **kwargs):
    return pd.read_csv(
        grab_quandl_table(table_path, avoid_download=avoid_download, **kwargs),
        low_memory = False
    )

In [None]:
Prices = fetch_quandl_table(Prices_table)

Data file \Users\kwhyu\Quandl Data\QUOTEMEDIA/PRICES_20230217.zip size 1378907642 exists already, no need to download


In [None]:
def clean_CDS_data(CDS_df, returns = True, window = 1):
    df = CDS_df.copy()
    df['date'] = pd.to_datetime(df['date'])
    df = df[df['date'].apply(lambda x: x.weekday()) == 2] # Wednesday to Wednesday only
    df.set_index('date', inplace = True)
    df = df[['parspread','ticker']]
    # calculating Index Data
    CDS_index = df.index.unique().map(lambda x: df.loc[x]['parspread'].sum()).values
    df_index = pd.DataFrame(CDS_index, index = df.index.unique(), columns = ['parspread'])
    df_index['ticker'] = 'Index'
    # adding Index Data to the main dataframe
    df =  pd.concat([df, df_index])
    # sorting by date and ticker
    df.reset_index(inplace = True)
    df.sort_values(['date', 'ticker'], inplace = True)
    df.set_index(['date', 'ticker'], inplace = True)

    if returns:
        df['returns'] = df.groupby('ticker').pct_change(periods = window)
        df = df[['returns']]

    return df.stack().unstack(level = 1).reset_index(level = 1, drop = True)

In [None]:
def clean_Prices_data(tickers, start_date, end_date, returns = True, window = 1):
    global Prices
    # adding S&P 500 ETF Ticker
    tickers.append('SPY')
    # filtering tickers and date
    df = Prices[(Prices['date'] >= (start_date - timedelta(days = 7)).strftime('%Y-%m-%d')) &
                (Prices['date'] <= end_date.strftime('%Y-%m-%d')) &
                (Prices['ticker'].isin(tickers))].copy()
    # changing date to datetime object
    df['date'] = pd.to_datetime(df['date'])
    df = df[df['date'].apply(lambda x: x.weekday()) == 2] # Wednesday to Wednesday only
    # setting the index to date and ticker
    df.set_index(['date', 'ticker'], inplace = True)
    df.sort_index(inplace = True)
    df = df[['adj_close']]

    if returns:
        df = df.groupby('ticker').pct_change(periods = window)
        df = df.rename(columns = {'adj_close': 'returns'})

    return df.stack().unstack(level = 1).reset_index(level = 1, drop = True)

In [None]:
class Sherman_Morrison_OLS:
    def __init__(self, X, y):
        # ensuring matrix operations
        X = np.atleast_2d(X)
        y = np.atleast_2d(y)
        # initial regression
        ## inverse matrix
        self.P = inv(np.matmul(X.T, X))
        ## coefficients
        initial_beta = np.matmul(np.matmul(self.P, X.T), y)
        self.beta = initial_beta
        self.initial = initial_beta
        self.resid = []

    def update_P(self, x, f, dir = 1):
        P_new = self.P - np.matmul(np.matmul(self.P, x), np.matmul(x.T, self.P))/f*dir
        return P_new

    def update_beta(self, x_new, y_new):
        # error dispersion
        f = 1 + np.matmul(np.matmul(x_new.T, self.P), x_new)[0,0]
        # prediction error
        h = y_new - np.matmul(x_new.T, self.beta)[0,0]
        self.resid.append(h)
        beta_new = self.beta + np.matmul(self.P, x_new) * h / f
        self.P = self.update_P(x_new, f, dir = 1)
        return beta_new

    def reduce_beta(self, x_old, y_old):
        # error dispersion
        f = 1 - np.matmul(np.matmul(x_old.T, self.P), x_old)[0,0]
        # prediction error
        h = y_old - np.matmul(x_old.T, self.beta)[0,0]
        beta_reduced = self.beta - np.matmul(self.P, x_old) * h / f
        self.P = self.update_P(x_old, f, dir = -1)
        return beta_reduced

    def window_update(self, x_new, y_new, x_old, y_old):
        # ensuring matrix operations
        x_new = np.atleast_2d(x_new).T
        x_old = np.atleast_2d(x_old).T
        # update with new data
        self.beta = self.update_beta(x_new, y_new)
        # delete old data
        self.beta = self.reduce_beta(x_old, y_old)
        return self.beta

    def exponential_decay(self):
        pass

In [None]:
def CDS_boxcar(ticker, K, CDS, Equity):
    # regression data
    y = CDS[[ticker]]
    X = Equity[[ticker, 'Index']]
    # boxcar regression
    temp = Sherman_Morrison_OLS(X.iloc[:K, :].values, y.iloc[:K, :].values)
    ## updating betas
    betas = np.array(list(map(lambda x: temp.window_update(X.iloc[K+x, :].values, y.iloc[K+x, :].values[0],
                                                           X.iloc[0+x, :].values, y.iloc[0+x,:].values[0])[:,0],
                              range(len(Equity.index) - K))))
    # tag for dataframe
    resid_tag = r'$\rho_{' + ticker + r'}'
    beta_1_tag = r'$\beta_{' + ticker + r', Equity}'
    beta_2_tag = r'$\beta_{' + ticker + r', Index}'
    # indexes
    grid = y.iloc[K:, :].index
    # pd Series
    resid = pd.Series(temp.resid, index = grid, name = resid_tag)
    beta_1 = pd.Series(betas[:, 0], index = grid, name = beta_1_tag)
    beta_2 = pd.Series(betas[:, 1], index = grid, name = beta_2_tag)
    return resid, beta_1, beta_2

<h2> 1. Introduction </h2>

<h2> 2. Data </h2>

In [None]:
CDS_raw = pd.read_table(CDS_table, index_col=0)

In [None]:
CDS = clean_CDS_data(CDS_raw)
CDS.head(K)

In [None]:
Equity = clean_Prices_data(list(CDS_raw.ticker.unique()), CDS.index[0], CDS.index[-1])
Equity.head(K)

In [None]:
missing_list_1 = CDS[CDS.index.isin(Equity.index) == False].index
if len(missing_list_1) == 0:
    print("There were no dates included in the CDS Data but not in the Prices Data")
else:
    for date in missing_list_1:
        print("Return on", date.strftime('%Y-%m-%d'), "was included in the CDS data but not in the Prices data")

In [None]:
missing_list_2 = Equity[Equity.index.isin(CDS.index) == False].index
if len(missing_list_2) == 0:
    print("There were no dates included in the Prices Data but not in the CDS Data")
else:
    for date in missing_list_2:
        print("Return on", date.strftime('%Y-%m-%d'), "was included in the Prices data but not in the CDS data")

In [None]:
missing_list_3 = Equity.index[1:][Equity.index.map(lambda x: x + timedelta(days = -7)
                                                   ).values[1:] != Equity.index.values[:-1]]
if len(missing_list_3) == 0:
    print("There were no dates included in the Prices Data that calculated 2-week return")
else:
    for date in missing_list_3:
        print("Return on", date.strftime('%Y-%m-%d'), "was calculated as 2-week return in the Prices Data")

In [None]:
missing_list_4 = CDS.index[1:][CDS.index.map(lambda x: x + timedelta(days = -7)).values[1:] != CDS.index.values[:-1]]
if len(missing_list_4) == 0:
    print("There were no dates included in the CDS Data that calculated 2-week return")
else:
    for date in missing_list_4:
        print("Return on", date.strftime('%Y-%m-%d'), "was calculated as 2-week return in the CDS Data")

In [None]:
agg_grid = Equity.index[Equity.index.isin(missing_list_3) == False]

In [None]:
CDS = CDS.loc[agg_grid]
Equity = Equity.loc[agg_grid].merge(CDS[['Index']], how = 'inner', left_index = True, right_index = True)

<h2> 3. Methodology </h2>

<h3> 3-0. Subheading </h3>

In [None]:
temp = CDS_boxcar('BA', K, CDS, Equity)[2]

<h2> 4. Analysis </h2>

<h3> 4-0. Subheading </h3>

<h2> 5. Evaluation </h2>

<h3> 5-0. Subheading </h3>