Skip to content

Commit

Permalink
complete Gaussian process (GP) smoothing for sim-based correction
Browse files Browse the repository at this point in the history
  • Loading branch information
zatkins2 committed May 1, 2024
1 parent 1923057 commit a46b113
Showing 1 changed file with 185 additions and 39 deletions.
224 changes: 185 additions & 39 deletions pspipe_utils/covariance.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,73 +451,219 @@ def skew(cov, dir=1):
return corrected_cov


def correct_analytical_cov_keep_res_diag(an_full_cov, mc_full_cov, return_diag=False):
def correct_analytical_cov_eigenspectrum_ratio(an_full_cov, mc_full_cov, return_all=False):
"""Correct an analytical covariance matrix with a monte carlo covariance
matrix. Assumes the following rotated monte carlo matrix is diagonal:
mc_rot = (ana**-.5) @ mc @ (ana**-.5).T
and then simply replaces the diagonal in that basis with the observed
values:
ana_corrected = (ana**.5) @ np.diag(np.diag(mc_rot)) @ (ana**.5).T
Parameters
----------
an_full_cov : (nblock*nbin, nblock*nbin) np.ndarray
Analytic covariance matrix to be corrected.
mc_full_cov : (nblock*nbin, nblock*nbin) np.ndarray
Noisy monte carlo matrix to use to correct the analytic matrix.
return_all : bool, optional
If True, in addition to returning ana_corrected, also return the
diagonal of the mc_rot matrix, by default False.
Returns
-------
(nblock*nbin, nblock*nbin) np.ndarray, {(nblock*nbin,) np.ndarray}
ana_corrected as above, and if return_all, np.diag(mc_rot)
"""
sqrt_an_full_cov = utils.eigpow(an_full_cov, 0.5)
inv_sqrt_an_full_cov = np.linalg.inv(sqrt_an_full_cov)
res = inv_sqrt_an_full_cov @ mc_full_cov @ inv_sqrt_an_full_cov.T # res should be close to the identity if an_full_cov is good
res_diag = np.diag(res)

corrected_cov = sqrt_an_full_cov @ np.diag(res_diag) @ sqrt_an_full_cov.T

if return_diag:
if return_all:
return corrected_cov, res_diag
else:
return corrected_cov


def correct_analytical_cov_keep_res_diag_gp(an_full_cov, mc_full_cov, lb, ell_cuts=None,
return_diag=False):
sqrt_an_full_cov = utils.eigpow(an_full_cov, 0.5)
def correct_analytical_cov_eigenspectrum_ratio_gp(lb, an_full_cov, mc_full_cov,
var_eigenspectrum_ratios_by_block=None,
idx_arrs_by_block=None, return_all=False):
"""Correct an analytical covariance matrix with a monte carlo covariance
matrix. Assumes the following rotated monte carlo matrix is diagonal:
mc_rot = (ana**-.5) @ mc @ (ana**-.5).T
and then fits the diagonal in that basis with a Gaussian process (GP) on
the observed values:
ana_corrected = (ana**.5) @ np.diag(GP(np.diag(mc_rot))) @ (ana**.5).T
The GP is applied to each block diagonal separately.
Parameters
----------
lb : (nbin,) np.ndarray
The locations in ell of the bins.
an_full_cov : (nblock*nbin, nblock*nbin) np.ndarray
Analytic covariance matrix to be corrected.
mc_full_cov : (nblock*nbin, nblock*nbin) np.ndarray
Noisy monte carlo matrix to use to correct the analytic matrix.
var_eigenspectrum_ratios_by_block : (nblock, nbin) np.ndarray, optional
The variance of the diagonal elements of mc_rot, by default None. These
would need to be precomputed in the rotated basis. If None, the
noise level in the diagonal elements of mc_rot is estimated from the
elements themselves by the Gaussian process.
idx_arrs_by_block : (nblock,) list, optional
A list of np.ndarrays, each of which may have between 0 and nbin elements,
that gives for each block the bin indices that should actually be used
in the Gaussian Process, by default None. If the list is None, or if any
element of the list is None, all elements for all blocks (or for the
specific block) are used in the Gaussian process. If an element of the list
is an empty array, then no Gaussian process is run on that block; its
observed values are used without any smoothing applied to them.
return_all : bool, optional
If True, in addition to returning ana_corrected, also return the
diagonal of the mc_rot matrix, the smoothed diagonal, and a list
of the Gaussian process regression objects for each block,
by default False.
Returns
-------
(nblock*nbin, nblock*nbin) np.ndarray, {(nblock*nbin,) np.ndarray,
(nblock*nbin,) np.ndarray, (nblock,) list of sklearn.GaussianProcessRegressor
objects}
ana_corrected as above, and if return_all, np.diag(mc_rot),
GP(np.diag(mc_rot)), and a list of the GPs for each block. If for any
block the idx_arrs_by_block array is empty, the returned GP is None
since no GP was actually used for that block.
"""
sqrt_an_full_cov = utils.eigpow(an_full_cov, 0.5)
inv_sqrt_an_full_cov = np.linalg.inv(sqrt_an_full_cov)
res = inv_sqrt_an_full_cov @ mc_full_cov @ inv_sqrt_an_full_cov.T # res should be close to the identity if an_full_cov is good
res_diag = np.diag(res)

n_spec = len(res_diag) // len(lb)
if ell_cuts is None:
ell_cuts = [0] * n_spec

res_diags = np.split(res_diag, n_spec)

if var_eigenspectrum_ratios_by_block is None:
var_eigenspectrum_ratios_by_block = [None] * n_spec

if idx_arrs_by_block is None:
idx_arrs_by_block = [None] * n_spec

smoothed_res_diags = []
for i, res in enumerate(res_diags):
smoothed_res_diags.append(smooth_gp_diag(lb, r, ell_cut=ell_cuts[i]) for r in res_diags)
smooth_res_diag = np.hstack(smoothed_res_diags)
gprs = []
for i in range(n_spec):
smoothed_gp_diag, gpr = smooth_gp_diag(lb, res_diags[i], var_eigenspectrum_ratios_by_block[i],
idx_arr=idx_arrs_by_block[i], return_gpr=True)
smoothed_res_diags.append(smoothed_gp_diag)
gprs.append(gpr)

corrected_cov = sqrt_an_full_cov @ np.diag(smooth_res_diag) @ sqrt_an_full_cov.T
smoothed_res_diag = np.hstack(smoothed_res_diags)

if return_diag:
return corrected_cov, res_diag
corrected_cov = sqrt_an_full_cov @ np.diag(smoothed_res_diag) @ sqrt_an_full_cov.T

if return_all:
return corrected_cov, res_diag, smoothed_res_diag, gprs
else:
return corrected_cov


def smooth_gp_diag(lb, arr_diag, var_diag, ell_cut=0, length_scale=500.0,
length_scale_bounds=(100, 1e4), n_restarts_optimizer=10):

kernel = 1.0 * RBF(length_scale=length_scale, length_scale_bounds=length_scale_bounds) # + \
# WhiteKernel(noise_level=noise_level, noise_level_bounds=noise_level_bounds)

out = np.zeros_like(arr_diag)

def smooth_gp_diag(lb, arr_diag, var_diag=None, idx_arr=None,
length_scale=200.0, length_scale_bounds=(2, 2e4),
noise_level=0.01, noise_level_bounds=(1e-6, 1e1),
n_restarts_optimizer=5, remove_mean=True,
return_gpr=False):
"""Fit a Gaussian process (GP) to a 1-dimensional target from
a 1-dimensional feature set. The GP uses a radial basis function (RBF)
kernel to model the signal and a white noise kernel to model the noise.
The RBF kernel has a constant kernel prefactor.
i_cut = np.where(lb > ell_cut)[0][0] # idx of first bin greater than ell_cut
out[:i_cut] = arr_diag[:i_cut] # below i_cut, do nothing
Parameters
----------
lb : (n,) np.ndarray
The featureset.
arr_diag : (n,) np.ndarray
The target.
var_diag : (n,) np.ndarray, optional
The white-noise variance in the target, by default None. If None, the
noise level in the target is estimated from the target values
themselves by the Gaussian process.
idx_arr : np.ndarray, optional
An array of size 0..n that indexes which data points are fit by the GP,
by default None. If None, all datapoints are used. Any unused datapoints
are not fit, but rather the target values are simply adopted at face
value.
length_scale : float, optional
The initial RBF scale guess, by default 200.0.
length_scale_bounds : tuple, optional
The bounds of the RBF scale fit, by default (2, 2e4).
noise_level : float, optional
The initial white noise level guess, by default 0.01. Only used if
var_diag is None.
noise_level_bounds : tuple, optional
The bounds of the white noise level guess, by default (1e-6, 1e1).
Only used if var_diag is None.
n_restarts_optimizer : int, optional
The number of optimizers to run to try to find a gloval minimum,
by default 5.
remove_mean : bool, optional
If True, subtract the mean of the target prior to the fit and add it
back in the prediction, by default True.
return_gpr : bool, optional
If True, also return the sklearn.GaussianProcessRegressor object, by
default False.
# fit the first GP on the bins above the ell_cut
X_train = lb[i_cut:, np.newaxis]
y_train = arr_diag[i_cut:] - 1 # prior mean of 1
var_train = var_diag[i_cut:]
gpr = GaussianProcessRegressor(kernel=kernel, alpha=var_train,
n_restarts_optimizer=n_restarts_optimizer)
gpr.fit(X_train, y_train)
out[i_cut:] = gpr.predict(X_train, return_std=False) + 1 # prior mean of 1

# # fit an exponential at the low end
# X_train = lb[:i_cut]
# y_train = np.abs(arr_diag - y_mean_high)[:i_cut]
# z = np.polyfit(X_train, np.log(y_train), 1)
# f = np.poly1d(z)
# y_mean_high[:i_cut] += np.exp(f(lb[:i_cut]))
Returns
-------
(n,) np.ndarray, {sklearn.GaussianProcessRegressor object}
The smoothed target, and optionally the object used to smooth.
"""
if idx_arr is None:
idx_arr = np.arange(lb.size)

out = arr_diag.copy()

if len(idx_arr) > 0:
# fit only on the indexed data bins
X_train = lb[idx_arr, None]
y_train = arr_diag[idx_arr]

# remove mean
if remove_mean:
prior_mean = y_train.mean()
y_train = y_train - prior_mean

# get the fit
if var_diag is None:
kernel = 1.0 * RBF(length_scale=length_scale, length_scale_bounds=length_scale_bounds) + \
WhiteKernel(noise_level=noise_level, noise_level_bounds=noise_level_bounds)
gpr = GaussianProcessRegressor(kernel=kernel, alpha=0,
n_restarts_optimizer=n_restarts_optimizer)
else:
kernel = 1.0 * RBF(length_scale=length_scale, length_scale_bounds=length_scale_bounds)
gpr = GaussianProcessRegressor(kernel=kernel, alpha=var_diag[idx_arr],
n_restarts_optimizer=n_restarts_optimizer)
gpr.fit(X_train, y_train)
y_fit = gpr.predict(X_train, return_std=False)

if remove_mean:
y_fit += prior_mean

out[idx_arr] = y_fit
else:
gpr = None

return out
if return_gpr:
return out, gpr
else:
return out


def canonize_connected_2pt(leg1, leg2, all_legs):
Expand Down

0 comments on commit a46b113

Please sign in to comment.