In [None]:
!pip install yfinance



In [None]:
try:
    import yfinance as yf
except ImportError:
    raise ImportError("Cannot start without 'yfinance' package.\nInstall it before running the code again.")

import os
import requests
from datetime import datetime
import bs4 as bs
import numpy as np
import pandas as pd
from pandas.core.frame import DataFrame
from scipy.sparse.linalg import eigsh
from collections import OrderedDict
# from google.colab import drive
# drive.mount('/content/gdrive')


class SP500DataLoader:
    def __init__(self):
        if not os.path.exists('Data'):
            os.makedirs('Data')

        self.start_date, self.end_date = None, None
        self.cleaned_prices = None
        self.cleaned_returns = None
        self.raw_prices = None
        self.raw_returns = None

        # Download stocks names from S&P500 page on wikipedia
        resp = requests.get('http://en.wikipedia.org/wiki/List_of_S%26P_500_companies')
        soup = bs.BeautifulSoup(resp.text, 'lxml')
        table = soup.find('table', {'class': 'wikitable sortable'})
        self.tickers = []
        for row in table.findAll('tr')[1:]:
            ticker = row.findAll('td')[0].text
            self.tickers.append(ticker)
        self.tickers = [s.replace('\n', '') for s in self.tickers]
        self.tickers = self.tickers + ["SPY"]

    # Check whether 'start_date' and 'end_date' make a valid date range or not
    def check_date_range(self, start_date: tuple, end_date: tuple):
        start = datetime(*start_date)
        end = datetime(*end_date)

        if end <= start:
            raise Exception("The start date must be before the end date!")

        return start, end

    # Doanload prices using yfinance
    def download_prices(self, start_date, end_date, interval='1d', column='Adj Close'):
        self.start_date, self.end_date = self.check_date_range(start_date, end_date)
        self.raw_prices = yf.download(self.tickers, start=self.start_date, end=self.end_date, interval=interval)[column]

    # Helper function to write dataframes on files with specified names
    def write_on_disk(self, data: DataFrame, filename: str):
        if "csv" in filename:
            data.to_csv('Data/' + filename)
        elif "h5" in filename:
            data.to_hdf('Data/' + filename, 'fixed', mode='w', complib='blosc', complevel=9)

        print(f"Saved: Data/{filename}")

    # Return a list of stock names in S&P 500 index
    def get_stocks_list(self):
        return self.tickers.copy()

    # Return raw prices data (which is not cleaned)
    def get_raw_prices(self, start_date: tuple, end_date: tuple, interval='1d', column='Adj Close', save_as_h5=False, save_as_csv=False):
        self.download_prices(start_date, end_date)

        if save_as_csv:
            self.write_on_disk(self.raw_prices, "S&P500-raw_prices.csv")
        if save_as_h5:
            self.write_on_disk(self.raw_prices, "S&P500-raw_prices.h5")

        return self.raw_prices

    # Calculate and return raw returns data (which is not cleaned)
    def get_raw_returns(self, start_date: tuple, end_date: tuple, interval='1d', column='Adj Close', save_as_h5=False, save_as_csv=False):
        self.get_raw_prices(start_date, end_date)

        self.raw_returns = self.raw_prices.copy()
        self.raw_returns = np.log(self.raw_returns).diff()
        self.raw_returns = self.raw_returns.iloc[1:]  # removes first row which is NaN after diff()

        if save_as_csv:
            self.write_on_disk(self.raw_returns, "S&P500-raw_returns.csv")
        if save_as_h5:
            self.write_on_disk(self.raw_returns, "S&P500-raw_returns.h5")

        return self.raw_returns

    # Return cleaned prices data (stocks with at least on NAN value are excluded)
    def get_cleaned_prices(self, start_date: tuple, end_date: tuple, interval='1d', column='Adj Close', save_as_h5=False, save_as_csv=False):
        self.get_raw_prices(start_date, end_date)

        self.cleaned_prices = self.raw_prices.copy()
        # Remove companies (columns) with all missing values for whole time range
        self.cleaned_prices.dropna(axis='columns', how='all', inplace=True)
        # Remove days (rows) with missing values for all of companies
        self.cleaned_prices.dropna(axis='index', how='all', inplace=True)
        # Finally, remove the columns with at least one Nan (missing value)
        self.cleaned_prices.dropna(axis='columns', how='any', inplace=True)

        if save_as_csv:
            self.write_on_disk(self.self.cleaned_prices, "S&P500-cleaned_prices.csv")
        if save_as_h5:
            self.write_on_disk(self.self.cleaned_prices, "S&P500-cleaned_prices.h5")

        return self.cleaned_prices

    # Calculate return values using cleaned data, and return the dataframe
    def get_cleaned_returns(self, start_date: tuple, end_date: tuple, interval='1d', column='Adj Close', save_as_h5=False, save_as_csv=False):
        self.get_cleaned_prices(start_date, end_date)

        self.cleaned_returns = self.cleaned_prices.copy()
        self.cleaned_returns = np.log(self.cleaned_returns).diff()
        self.cleaned_returns = self.cleaned_returns.iloc[1:]  # removes first row which is NaN after diff()

        if save_as_csv:
            self.write_on_disk(self.cleaned_returns, "S&P500-cleaned_returns.csv")
        if save_as_h5:
            self.write_on_disk(self.cleaned_returns, "S&P500-cleaned_returns.h5")

        return self.cleaned_returns

    # Return the last values for raw prices without redownloading them
    def get_last_raw_prices(self, save_as_h5=False, save_as_csv=False):
        if self.raw_prices is None:
            return None

        if save_as_csv:
            self.write_on_disk(self.raw_prices, "S&P500-raw_prices.csv")
        if save_as_h5:
            self.write_on_disk(self.raw_prices, "S&P500-raw_prices.h5")

        return self.raw_prices

    # Return the last values for raw returns without redownloading them
    def get_last_raw_returns(self, save_as_h5=False, save_as_csv=False):
        if self.raw_returns is None:
            return None

        if save_as_csv:
            self.write_on_disk(self.raw_returns, "S&P500-raw_returns.csv")
        if save_as_h5:
            self.write_on_disk(self.raw_returns, "S&P500-raw_returns.h5")

        return self.raw_returns

    # Return the last values for cleaned prices without redownloading them
    def get_last_cleaned_prices(self, save_as_h5=False, save_as_csv=False):
        if self.cleaned_prices is None:
            return None

        if save_as_csv:
            self.write_on_disk(self.cleaned_prices, "S&P500-cleaned_prices.csv")
        if save_as_h5:
            self.write_on_disk(self.cleaned_prices, "S&P500-cleaned_prices.h5")

        return self.cleaned_prices

    # Return the last values for cleaned returns without redownloading them
    def get_last_cleaned_returns(self, save_as_h5=False, save_as_csv=False):
        if self.cleaned_returns is None:
            return None

        if save_as_csv:
            self.write_on_disk(self.cleaned_returns, "S&P500-cleaned_returns.csv")
        if save_as_h5:
            self.write_on_disk(self.cleaned_returns, "S&P500-cleaned_returns.h5")

        return self.cleaned_returns


In [None]:
# Alex B. estimating specific variances
def params_sample_covar(Y, c:int=1):
    p, n = Y.shape

    # Dual Sample Covariance Eigenvales and Vectors
    L = Y.T @ Y/p
    vals, vecs = eigsh (np.array(L), 3, which='LA')

    vals = vals[::-1]
    vecs = np.fliplr(vecs)

    # Convert PC's from Dual Covariance
    H = Y @ vecs / np.sqrt(p * vals)
    sH = H
    eigs = vals * p / n

    # Select top eigenvector and eigenvalue
    h = H.iloc[:,0]
    if (sum(h) < 0):
        H = -H

    # Apply penalty correction to 1 if desired
    beta_hat_raw  = c*h + (1-c)

    # Residuals of PCA as regression
    phi = Y.T @ beta_hat_raw / (beta_hat_raw.T @ beta_hat_raw)
    E = Y - np.outer(beta_hat_raw,phi)

    # Diagonals of squared residuals for svar
    svar_fixed = np.diag(E @ E.T)/n
    if(svar_fixed<=0).any(): # Sign for sanity check
        print('Problem!')

    # Compute factor
    fvar_resid = phi.T @ phi / n

    # Normalize factor so that beta is mean 1
    fvar_fixed = fvar_resid * (beta_hat_raw.mean()**2)
    beta_hat_fixed = beta_hat_raw/beta_hat_raw.mean()
    return beta_hat_fixed, fvar_fixed, svar_fixed


def base_model_sample_covar(Y, n_factors):
    '''
    constructs "base" model with 6 factors where N is number of samples
    '''
    p,n = Y.shape
    L = Y.T @ Y/p
    # n_factors = 6

    vals, vecs = eigsh (np.array(L), n_factors, which='LA')
    vals = vals[::-1]
    vecs = np.fliplr(vecs)

    # Convert PC's from Dual Covariance
    B = Y @ vecs / np.sqrt(p * vals)
    eigs = vals * p / n

    phi = np.linalg.solve(B.T @ B, B.T @ Y)
    E = Y.values - B @ phi
    svar = np.diag(E @ E.T)/n

    # Set first factor to be mean 1
    m1 = B[:,0].mean()
    eigs[0] *= m1**2
    B[:,0] = B[:,0]/m1

    return B, eigs, svar

**Use PCA and LW constant correlation matrix and try different numbers of factors**

In [None]:
# Upper tri masking
def upper_tri_masking(A):
    m = A.shape[0]
    r = np.arange(m)
    mask = r[:,None] < r
    return A[mask]


def diff_per(pc_fraction, avg_corr):
    return (pc_fraction - avg_corr) / ((pc_fraction + avg_corr) / 2)


def diff(pc_fraction, avg_corr):
    return pc_fraction - avg_corr


def generate_sequence(number):
    # Start with 0 to 4
    sequence = list(range(10))
    # Add multiples of 100 up to the number
    sequence.extend(range(99, number, 100))
    return [i + 1 for i in sequence]


def generate_arithmetic_array(start=1, step=25, limit=500):
    # Initialize the array with the first number
    arith_array = [start]
    # Continue the arithmetic trend until we reach the limit
    num = start + step - 1  # Adjust the first increment to make the second number 25
    while num <= limit:
        arith_array.append(num)
        num += step

    return arith_array


def generate_exponential_array(limit=500):
    # Start with the first number
    num = 1
        # Initialize the array with the first number
    exp_array = [num]
        # Continue the exponential trend until we reach around the limit
    while num * 2 <= limit:
        num *= 2
        exp_array.append(num)
    return exp_array


def shrinkage(returns, cov):
    """Shrinks sample covariance matrix towards constant correlation unequal variance matrix.

    Ledoit & Wolf ("Honey, I shrunk the sample covariance matrix", Portfolio Management, 30(2004),
    110-119) optimal asymptotic shrinkage between 0 (sample covariance matrix) and 1 (constant
    sample average correlation unequal sample variance matrix).

    Paper:
    http://www.ledoit.net/honey.pdf

    Matlab code:
    https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-ffff-ffffde5e2d4e/covCor.m.zip

    Special thanks to Evgeny Pogrebnyak https://github.com/epogrebnyak

    :param returns:
        n, p - returns of n observations of p stocks.
    :return:
        Covariance matrix, sample average correlation, shrinkage.
    """
    n, p = returns.shape

    # The sample covariance calculated below is very similar to the one I calculated in main function, so use the main function one directly
    # mean_returns = np.mean(returns, axis=0, keepdims=True)
    # returns -= mean_returns
    # sample_cov = returns.transpose() @ returns / n

    sample_cov = cov

    # sample average correlation
    var = np.diag(sample_cov).reshape(-1, 1)
    sqrt_var = var ** 0.5
    unit_cor_var = sqrt_var * sqrt_var.transpose()
    average_cor = ((sample_cov / unit_cor_var).sum() - p) / p / (p - 1)
    prior = average_cor * unit_cor_var
    np.fill_diagonal(prior, var)

    # pi-hat
    y = returns ** 2
    phi_mat = (y.transpose() @ y) / n - sample_cov ** 2
    phi = phi_mat.sum()

    # rho-hat
    theta_mat = ((returns ** 3).transpose() @ returns) / n - var * sample_cov
    np.fill_diagonal(theta_mat, 0)
    rho = (
        np.diag(phi_mat).sum()
        + average_cor * (1 / sqrt_var @ sqrt_var.transpose() * theta_mat).sum()
    )

    # gamma-hat
    gamma = np.linalg.norm(sample_cov - prior, "fro") ** 2

    # shrinkage constant
    kappa = (phi - rho) / gamma
    shrink = max(0, min(1, kappa / n))
    # print("Shrinkage Constant:", shrink)

    # estimator
    sigma = shrink * prior + (1 - shrink) * sample_cov
    print("Shrinkage Constant:", shrink)

    return sigma, prior


def ndarray_to_diagonal_matrix(arr):
    if arr.ndim != 1:
        raise ValueError("Input array must be one-dimensional")
    size = arr.shape[0]
    # Create a zero matrix of size (n, n)
    matrix = np.zeros((size, size))
    # Place the original values on the diagonal
    np.fill_diagonal(matrix, arr)
    return matrix


def base_model_sample_covar(Y, vecs, n_factors):
    '''
    constructs "base" model with 6 factors where N is number of samples
    '''
    # p, n = Y.shape
    # L = Y.T @ Y/p
    # # n_factors = 6

    # vals, vecs = eigsh (np.array(L), n_factors, which='LA')
    # vals = vals[::-1]
    # vecs = np.fliplr(vecs)

    # # Convert PC's from Dual Covariance
    # B = Y @ vecs / np.sqrt(p * vals)
    # eigs = vals * p / n

    Y = Y.T
    p, n = Y.shape

    phi = np.linalg.solve(vecs.T @ vecs, vecs.T @ Y)
    # E = Y.values - vecs @ phi
    E = Y - vecs @ phi
    svar = np.diag(E @ E.T)/n
    svar_matrix = ndarray_to_diagonal_matrix(svar)

    # Diagonals of squared residuals for svar
    svar = np.diag(E @ E.T)/n
    if(svar<=0).any(): # Sign for sanity check
        print('Problem!')

    # Set first factor to be mean 1
    # m1 = B[:,0].mean()
    # eigs[0] *= m1**2
    # B[:,0] = B[:,0]/m1

    return svar_matrix


def covariance_to_array(cov_matrix, mean_vector, num_samples):
    # Perform Cholesky decomposition
    L = np.linalg.cholesky(cov_matrix)

    # Generate random samples
    num_variables = len(mean_vector)
    random_samples = np.random.standard_normal((num_samples, num_variables))

    # Transform the samples
    transformed_samples = np.dot(random_samples, L.T) + mean_vector

    return transformed_samples


def estimate_correlation_matrix(data, cov, num_components):
    """
    Estimate correlation matrix from the covariance matrix
    """
    # Standardize the data
    # data_standardized = (data - np.mean(data, axis=0)) / np.std(data, axis=0)

    n, p = data.shape

    eigenvalues, eigenvectors = eigsh(cov, p, which='LM')

    # Sort eigenvalues and eigenvectors in descending order
    idx = np.argsort(eigenvalues)[::-1]
    eigenvalues = eigenvalues[idx]
    eigenvectors = eigenvectors[:, idx]

    # Select top n_components
    selected_eigenvectors = eigenvectors[:, :num_components]

    # Select top eigenvalues
    selected_eigenvalues = ndarray_to_diagonal_matrix(eigenvalues[:num_components])
    rest_eigenvalues = eigenvalues[num_components:]
    # print("Rest Eigenvalues:", rest_eigenvalues)

    # Estimate the covariance matrix using the principal components
    estimated_covariance_matrix = np.dot(np.dot(selected_eigenvectors, selected_eigenvalues), selected_eigenvectors.T)

    # Subtraction method estimate specfic covariance matrix
    subtra_matrix = ndarray_to_diagonal_matrix(np.diag(cov - estimated_covariance_matrix))
    hetero_matrix = estimated_covariance_matrix + subtra_matrix

    # Estimate the homogeneous specific variances on diagonal
    if len(rest_eigenvalues) == 0:
        avg_rest_eigenvalues = 0
    else:
        avg_rest_eigenvalues = np.mean(rest_eigenvalues)
    rest_eigenvalues_matrix = (avg_rest_eigenvalues * n / p) * np.identity(cov.shape[1])
    homo_matrix = estimated_covariance_matrix + rest_eigenvalues_matrix

    hetero_std_devs = np.sqrt(np.diag(hetero_matrix))
    hetero_corr_matrix = hetero_matrix / np.outer(hetero_std_devs, hetero_std_devs)
    # print("Hetero Corr Matrix:\n", hetero_corr_matrix)

    homo_std_devs = np.sqrt(np.diag(homo_matrix))
    homo_corr_matrix = homo_matrix / np.outer(homo_std_devs, homo_std_devs)

    return hetero_corr_matrix, homo_corr_matrix


# Sequence for testing
sequence_original = generate_sequence(400)
sequence_target_range = range(200, 300, 2)
sequence_linear = generate_arithmetic_array(1, 25, 475)
total_linear = list(i + 1 for i in range(500))
every_two_linear = generate_arithmetic_array(1, 2, 500)
sequence_exp = generate_exponential_array(500) + [497]

sequence = sequence_exp

# Initiate average correlation list and PC1 fraction list
corr_res_dict = {i: {key: [] for key in sequence} for i in range(6)}
diff_res_dict = {i: {key: [] for key in sequence} for i in range(6)}
pc1_fraction_list = []
diffPer_actual = []
corr_avg_actual = []

# Get cleaned return values from 2000/05/19 to 2023/12/30
range_sp500 = range(2023, 2024)

for year in range_sp500:
    data_downloader_object = SP500DataLoader()
    cleaned_returns = data_downloader_object.get_cleaned_returns(
        start_date=(year, 1, 1), end_date=(year, 12, 31),
        interval='1d', column='Adj Close'
        )
    print("\nYear, Days and Securities: ", year, cleaned_returns.shape, '\n')

    # Center the data
    data_centered = cleaned_returns - np.mean(cleaned_returns, axis=0)

    cov = cleaned_returns.cov()
    centered_cov = data_centered.cov()
    corr = cleaned_returns.corr()
    centered_corr = data_centered.corr()
    p = data_centered.shape[1]
    n = data_centered.shape[0]
    upper_tril_array = upper_tri_masking(corr.to_numpy())
    tril_avg = np.average(upper_tril_array)

    print("N:", n)
    print("P:", p)
    vals, vecs = eigsh(np.array(cov), p, which='LM')
    vals_sorted = vals[::-1]
    pc1 = vals_sorted[0]
    l_squared = (np.sum(vals_sorted) - pc1)/(n - 1)
    delta_squared = l_squared*(n/p)
    pc1_fraction = (pc1 - delta_squared)/np.trace(cov)

    avg_dict = {0:{}, 1:{}, 2:{}, 3:{}, 4:{}, 5:{}}
    for m in sequence:
        if m <= p:
            print("\n", "Factor Number:", m)
            # Three kinds of matrices: Sample Covariance Matrix, LW Estimator Matrix and LW Constant Correlation Matrix
            lw_estimator_matrix, lw_constant_corr_matrix = shrinkage(data_centered.to_numpy(), centered_cov.to_numpy())

            # Three respective correlation matrices
            # print("Original:")
            # sample_hetero_corr_matrix, sample_homo_corr_matrix = estimate_correlation_matrix(cleaned_returns.to_numpy(), cov.to_numpy(), m)
            # print("Original Average:", np.average(upper_tri_masking(sample_hetero_corr_matrix)))
            sample_hetero_corr_matrix, sample_homo_corr_matrix = estimate_correlation_matrix(data_centered.to_numpy(), centered_cov.to_numpy(), m)
            lw_estimator_hetero_corr_matrix, lw_estimator_homo_corr_matrix = estimate_correlation_matrix(data_centered.to_numpy(), lw_estimator_matrix, m)
            lw_constant_hetero_corr_matrix, lw_constant_homo_corr_matrix = estimate_correlation_matrix(data_centered.to_numpy(), lw_constant_corr_matrix, m)
            list_ = [sample_hetero_corr_matrix, sample_homo_corr_matrix, lw_constant_hetero_corr_matrix, lw_constant_homo_corr_matrix, lw_estimator_hetero_corr_matrix, lw_estimator_homo_corr_matrix]

            for i in range(6):
                avg_dict[i][m] = np.average(upper_tri_masking(list_[i]))

        else:
            print("There is a None")
            for i in range(6):
                avg_dict[i][m] = None

    # diffPer_array = {}
    diff_dict = {0:{}, 1:{}, 2:{}, 3:{}, 4:{}, 5:{}}
    for m in sequence:
        if m <= p:
            # Calculate Diff Percentage
            for i in range(6):
                diff_dict[i][m] = diff(pc1_fraction, avg_dict[i][m])
        else:
            for i in range(6):
                diff_dict[i][m] = None


    pc1_fraction_list.append(pc1_fraction)
    corr_avg_actual.append(tril_avg)
    diffPer_actual.append(diff(pc1_fraction, tril_avg))

    for i in range(6):
        for m in sequence:
            corr_res_dict[i][m].append(avg_dict[i][m])
            diff_res_dict[i][m].append(diff_dict[i][m])

# Initiate result data
data = OrderedDict()

data["PC1: Fraction of Variance Explained"] = pc1_fraction_list

for i in range(6):
    for m in sequence:
        column_name = f"{i}_{m} Factor AvgCorr"
        data[column_name] = corr_res_dict[i][m]

data['Average Correlation'] = corr_avg_actual

for i in range(6):
    for m in sequence:
        column_name = f"{i}_Diff {m}"
        data[column_name] = diff_res_dict[i][m]

data['Diff'] = diffPer_actual

linear_df = pd.DataFrame(data, index=list(range_sp500))

# Write to excel
linear_df.to_excel('all 500.xlsx')

[*********************100%***********************]  504 of 504 completed
ERROR:yfinance:
5 Failed downloads:
ERROR:yfinance:['GEV', 'SOLV', 'SW']: YFPricesMissingError('$%ticker%: possibly delisted; no price data found  (1d 2023-01-01 00:00:00 -> 2023-12-31 00:00:00) (Yahoo error = "Data doesn\'t exist for startDate = 1672549200, endDate = 1703998800")')
ERROR:yfinance:['BF.B']: YFPricesMissingError('$%ticker%: possibly delisted; no price data found  (1d 2023-01-01 00:00:00 -> 2023-12-31 00:00:00)')
ERROR:yfinance:['BRK.B']: YFTzMissingError('$%ticker%: possibly delisted; no timezone found')



Year, Days and Securities:  2023 (249, 497) 

N: 249
P: 497


  vals, vecs = eigsh(np.array(cov), p, which='LM')



 Factor Number: 1
Shrinkage Constant: 0.21584786181039786
Original:


  eigenvalues, eigenvectors = eigsh(cov, p, which='LM')


Original Average: 0.24961906883337878
Centered:
Strand Average: 0.24961906883337887

 Factor Number: 2
Shrinkage Constant: 0.21584786181039786
Original:
Original Average: 0.2524392160688749
Centered:
Strand Average: 0.252439216068875

 Factor Number: 4
Shrinkage Constant: 0.21584786181039786
Original:
Original Average: 0.26342226142170855
Centered:
Strand Average: 0.26342226142170855

 Factor Number: 8
Shrinkage Constant: 0.21584786181039786
Original:
Original Average: 0.26345034825936425
Centered:
Strand Average: 0.26345034825936425

 Factor Number: 16
Shrinkage Constant: 0.21584786181039786
Original:
Original Average: 0.26348608606977136
Centered:
Strand Average: 0.2634860860697714

 Factor Number: 32
Shrinkage Constant: 0.21584786181039786
Original:
Original Average: 0.2634136728607577
Centered:
Strand Average: 0.26341367286075773

 Factor Number: 64
Shrinkage Constant: 0.21584786181039786
Original:
Original Average: 0.26323704296180384
Centered:
Strand Average: 0.2632370429618039

