Skip to content

Commit

Permalink
cleaned constant correlation
Browse files Browse the repository at this point in the history
  • Loading branch information
robertmartin8 committed Jun 29, 2019
1 parent 52bf76b commit 8bdb57d
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 78 deletions.
127 changes: 58 additions & 69 deletions pypfopt/risk_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,98 +214,87 @@ def shrunk_covariance(self, delta=0.2):
shrunk_cov = delta * F + (1 - delta) * self.S
return self.format_and_annualise(shrunk_cov)

def ledoit_wolf(self):
def ledoit_wolf(self, shrinkage_target="identity"):
"""
Calculate the Ledoit-Wolf shrinkage estimate.
:param shrinkage_target: target matrix to shrink towards
:type shrinkage_target: str
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = covariance.ledoit_wolf(X)
if shrinkage_target == "identity":
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = covariance.ledoit_wolf(X)
elif shrinkage_target == "constant_correlation":
shrunk_cov, self.delta = self._ledoit_wolf_constant_correlation()
else:
raise NotImplementedError

return self.format_and_annualise(shrunk_cov)

def oracle_approximating(self):
def _ledoit_wolf_constant_correlation(self):
"""
Calculate the Oracle Approximating Shrinkage estimate
Calculate the Ledoit-Wolf shrinkage estimate using the constant
correlation matrix as the shrinkage target.
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = covariance.oas(X)
return self.format_and_annualise(shrunk_cov)
t, n = np.shape(X)

Xm = X - X.mean(axis=0)
S = self.S # sample cov matrix

class ConstantCorrelation(CovarianceShrinkage):
"""
Shrinks towards constant correlation matrix
if shrink is specified, then this constant is used for shrinkage
# Constant correlation target
var = np.diag(S).reshape(-1, 1)
std = np.sqrt(var)
_var = np.tile(var, (n,))
_std = np.tile(std, (n,))
r_bar = (np.sum(S / (_std * _std.T)) - n) / (n * (n - 1))
F = r_bar * (_std * _std.T)
F[np.eye(n) == 1] = var.reshape(-1)

# Estimate pi
y = Xm ** 2
pi_mat = np.dot(y.T, y) / t - 2 * np.dot(Xm.T, Xm) * S / t + S ** 2
pi_hat = np.sum(pi_mat)

# Theta matrix, expanded term by term
term1 = np.dot((X ** 3).T, X) / t
help_ = np.dot(X.T, X) / t
help_diag = np.diag(help_)
term2 = np.tile(help_diag, (n, 1)).T * S
term3 = help_ * _var
term4 = _var * S
theta_mat = term1 - term2 - term3 + term4
theta_mat[np.eye(n) == 1] = np.zeros(n)
rho_hat = sum(np.diag(pi_mat)) + r_bar * np.sum(
np.dot((1 / std), std.T) * theta_mat
)

The notation follows Ledoit and Wolf (2003, 2004) version 04/2014
"""
# Estimate gamma
gamma_hat = np.linalg.norm(S - F, "fro") ** 2

def shrink(self):
# Compute shrinkage constant
kappa_hat = (pi_hat - rho_hat) / gamma_hat
delta = max(0.0, min(1.0, kappa_hat / t))

# Compute shrunk covariance matrix
shrunk_cov = delta * F + (1 - delta) * self.S
return shrunk_cov, delta

def oracle_approximating(self):
"""
Calculate the Constant-Correlation covariance matrix.
Calculate the Oracle Approximating Shrinkage estimate
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
x = np.nan_to_num(self.X.values)

# de-mean returns
t, n = np.shape(x)
meanx = x.mean(axis=0)
x = x - np.tile(meanx, (t, 1))

# compute sample covariance matrix
sample = (1.0 / t) * np.dot(x.T, x)

# compute prior
var = np.diag(sample).reshape(-1, 1)
sqrtvar = np.sqrt(var)
_var = np.tile(var, (n,))
_sqrtvar = np.tile(sqrtvar, (n,))
r_bar = (np.sum(sample / (_sqrtvar * _sqrtvar.T)) - n) / (n * (n - 1))
prior = r_bar * (_sqrtvar * _sqrtvar.T)
prior[np.eye(n) == 1] = var.reshape(-1)

# compute shrinkage parameters and constant
if self.delta is None:

# what we call pi-hat
y = x ** 2.0
phi_mat = np.dot(y.T, y) / t - 2 * np.dot(x.T, x) * sample / t + sample ** 2
phi = np.sum(phi_mat)

# what we call rho-hat
term1 = np.dot((x ** 3).T, x) / t
help_ = np.dot(x.T, x) / t
help_diag = np.diag(help_)
term2 = np.tile(help_diag, (n, 1)).T * sample
term3 = help_ * _var
term4 = _var * sample
theta_mat = term1 - term2 - term3 + term4
theta_mat[np.eye(n) == 1] = np.zeros(n)
rho = sum(np.diag(phi_mat)) + r_bar * np.sum(
np.dot((1.0 / sqrtvar), sqrtvar.T) * theta_mat
)

# what we call gamma-hat
gamma = np.linalg.norm(sample - prior, "fro") ** 2

# compute shrinkage constant
kappa = (phi - rho) / gamma
shrinkage = max(0.0, min(1.0, kappa / t))
self.delta = shrinkage
else:
# use specified constant
shrinkage = self.delta

# compute the estimator
sigma = shrinkage * prior + (1 - shrinkage) * sample
return self.format_and_annualise(sigma)
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = covariance.oas(X)
return self.format_and_annualise(shrunk_cov)


class SingleIndex(CovarianceShrinkage):
Expand Down
25 changes: 16 additions & 9 deletions tests/test_risk_models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import warnings
import pandas as pd
import numpy as np
import pytest
from pypfopt import risk_models
from tests.utilities_for_tests import get_data

Expand Down Expand Up @@ -138,8 +139,7 @@ def test_shrunk_covariance_extreme_delta():
cs = risk_models.CovarianceShrinkage(df)
# if delta = 0, no shrinkage occurs
shrunk_cov = cs.shrunk_covariance(0)
np.testing.assert_array_almost_equal(
shrunk_cov.values, risk_models.sample_cov(df))
np.testing.assert_array_almost_equal(shrunk_cov.values, risk_models.sample_cov(df))
# if delta = 1, sample cov does not contribute to shrunk cov
shrunk_cov = cs.shrunk_covariance(1)
N = df.shape[1]
Expand All @@ -157,7 +157,7 @@ def test_shrunk_covariance_frequency():
np.testing.assert_array_almost_equal(shrunk_cov.values, S)


def test_ledoit_wolf():
def test_ledoit_wolf_default():
df = get_data()
cs = risk_models.CovarianceShrinkage(df)
shrunk_cov = cs.ledoit_wolf()
Expand All @@ -168,22 +168,29 @@ def test_ledoit_wolf():
assert not shrunk_cov.isnull().any().any()


def test_oracle_approximating():
def test_ledoit_wolf_constant_correlation():
df = get_data()
cs = risk_models.CovarianceShrinkage(df)
shrunk_cov = cs.oracle_approximating()
shrunk_cov = cs.ledoit_wolf(shrinkage_target="constant_correlation")
assert 0 < cs.delta < 1
assert shrunk_cov.shape == (20, 20)
assert list(shrunk_cov.index) == list(df.columns)
assert list(shrunk_cov.columns) == list(df.columns)
assert not shrunk_cov.isnull().any().any()


def test_constant_correlation():
def test_ledoit_wolf_raises_not_implemented():
df = get_data()
cc = risk_models.ConstantCorrelation(df)
shrunk_cov = cc.shrink()
assert 0 < cc.delta < 1
cs = risk_models.CovarianceShrinkage(df)
with pytest.raises(NotImplementedError):
cs.ledoit_wolf(shrinkage_target="I have not been implemented!")


def test_oracle_approximating():
df = get_data()
cs = risk_models.CovarianceShrinkage(df)
shrunk_cov = cs.oracle_approximating()
assert 0 < cs.delta < 1
assert shrunk_cov.shape == (20, 20)
assert list(shrunk_cov.index) == list(df.columns)
assert list(shrunk_cov.columns) == list(df.columns)
Expand Down

0 comments on commit 8bdb57d

Please sign in to comment.