From a46b11327214374a9b50e8fa2b201231ca8cfce7 Mon Sep 17 00:00:00 2001 From: Zachary Atkins Date: Wed, 1 May 2024 11:52:54 -0400 Subject: [PATCH] complete Gaussian process (GP) smoothing for sim-based correction --- pspipe_utils/covariance.py | 224 ++++++++++++++++++++++++++++++------- 1 file changed, 185 insertions(+), 39 deletions(-) diff --git a/pspipe_utils/covariance.py b/pspipe_utils/covariance.py index 4f329d4..481038f 100644 --- a/pspipe_utils/covariance.py +++ b/pspipe_utils/covariance.py @@ -451,7 +451,32 @@ def skew(cov, dir=1): return corrected_cov -def correct_analytical_cov_keep_res_diag(an_full_cov, mc_full_cov, return_diag=False): +def correct_analytical_cov_eigenspectrum_ratio(an_full_cov, mc_full_cov, return_all=False): + """Correct an analytical covariance matrix with a monte carlo covariance + matrix. Assumes the following rotated monte carlo matrix is diagonal: + + mc_rot = (ana**-.5) @ mc @ (ana**-.5).T + + and then simply replaces the diagonal in that basis with the observed + values: + + ana_corrected = (ana**.5) @ np.diag(np.diag(mc_rot)) @ (ana**.5).T + + Parameters + ---------- + an_full_cov : (nblock*nbin, nblock*nbin) np.ndarray + Analytic covariance matrix to be corrected. + mc_full_cov : (nblock*nbin, nblock*nbin) np.ndarray + Noisy monte carlo matrix to use to correct the analytic matrix. + return_all : bool, optional + If True, in addition to returning ana_corrected, also return the + diagonal of the mc_rot matrix, by default False. + + Returns + ------- + (nblock*nbin, nblock*nbin) np.ndarray, {(nblock*nbin,) np.ndarray} + ana_corrected as above, and if return_all, np.diag(mc_rot) + """ sqrt_an_full_cov = utils.eigpow(an_full_cov, 0.5) inv_sqrt_an_full_cov = np.linalg.inv(sqrt_an_full_cov) res = inv_sqrt_an_full_cov @ mc_full_cov @ inv_sqrt_an_full_cov.T # res should be close to the identity if an_full_cov is good @@ -459,65 +484,186 @@ def correct_analytical_cov_keep_res_diag(an_full_cov, mc_full_cov, return_diag=F corrected_cov = sqrt_an_full_cov @ np.diag(res_diag) @ sqrt_an_full_cov.T - if return_diag: + if return_all: return corrected_cov, res_diag else: return corrected_cov -def correct_analytical_cov_keep_res_diag_gp(an_full_cov, mc_full_cov, lb, ell_cuts=None, - return_diag=False): - sqrt_an_full_cov = utils.eigpow(an_full_cov, 0.5) +def correct_analytical_cov_eigenspectrum_ratio_gp(lb, an_full_cov, mc_full_cov, + var_eigenspectrum_ratios_by_block=None, + idx_arrs_by_block=None, return_all=False): + """Correct an analytical covariance matrix with a monte carlo covariance + matrix. Assumes the following rotated monte carlo matrix is diagonal: + + mc_rot = (ana**-.5) @ mc @ (ana**-.5).T + + and then fits the diagonal in that basis with a Gaussian process (GP) on + the observed values: + + ana_corrected = (ana**.5) @ np.diag(GP(np.diag(mc_rot))) @ (ana**.5).T + + The GP is applied to each block diagonal separately. + + Parameters + ---------- + lb : (nbin,) np.ndarray + The locations in ell of the bins. + an_full_cov : (nblock*nbin, nblock*nbin) np.ndarray + Analytic covariance matrix to be corrected. + mc_full_cov : (nblock*nbin, nblock*nbin) np.ndarray + Noisy monte carlo matrix to use to correct the analytic matrix. + var_eigenspectrum_ratios_by_block : (nblock, nbin) np.ndarray, optional + The variance of the diagonal elements of mc_rot, by default None. These + would need to be precomputed in the rotated basis. If None, the + noise level in the diagonal elements of mc_rot is estimated from the + elements themselves by the Gaussian process. + idx_arrs_by_block : (nblock,) list, optional + A list of np.ndarrays, each of which may have between 0 and nbin elements, + that gives for each block the bin indices that should actually be used + in the Gaussian Process, by default None. If the list is None, or if any + element of the list is None, all elements for all blocks (or for the + specific block) are used in the Gaussian process. If an element of the list + is an empty array, then no Gaussian process is run on that block; its + observed values are used without any smoothing applied to them. + return_all : bool, optional + If True, in addition to returning ana_corrected, also return the + diagonal of the mc_rot matrix, the smoothed diagonal, and a list + of the Gaussian process regression objects for each block, + by default False. + + Returns + ------- + (nblock*nbin, nblock*nbin) np.ndarray, {(nblock*nbin,) np.ndarray, + (nblock*nbin,) np.ndarray, (nblock,) list of sklearn.GaussianProcessRegressor + objects} + ana_corrected as above, and if return_all, np.diag(mc_rot), + GP(np.diag(mc_rot)), and a list of the GPs for each block. If for any + block the idx_arrs_by_block array is empty, the returned GP is None + since no GP was actually used for that block. + """ + sqrt_an_full_cov = utils.eigpow(an_full_cov, 0.5) inv_sqrt_an_full_cov = np.linalg.inv(sqrt_an_full_cov) res = inv_sqrt_an_full_cov @ mc_full_cov @ inv_sqrt_an_full_cov.T # res should be close to the identity if an_full_cov is good res_diag = np.diag(res) n_spec = len(res_diag) // len(lb) - if ell_cuts is None: - ell_cuts = [0] * n_spec + res_diags = np.split(res_diag, n_spec) + + if var_eigenspectrum_ratios_by_block is None: + var_eigenspectrum_ratios_by_block = [None] * n_spec + + if idx_arrs_by_block is None: + idx_arrs_by_block = [None] * n_spec + smoothed_res_diags = [] - for i, res in enumerate(res_diags): - smoothed_res_diags.append(smooth_gp_diag(lb, r, ell_cut=ell_cuts[i]) for r in res_diags) - smooth_res_diag = np.hstack(smoothed_res_diags) + gprs = [] + for i in range(n_spec): + smoothed_gp_diag, gpr = smooth_gp_diag(lb, res_diags[i], var_eigenspectrum_ratios_by_block[i], + idx_arr=idx_arrs_by_block[i], return_gpr=True) + smoothed_res_diags.append(smoothed_gp_diag) + gprs.append(gpr) - corrected_cov = sqrt_an_full_cov @ np.diag(smooth_res_diag) @ sqrt_an_full_cov.T + smoothed_res_diag = np.hstack(smoothed_res_diags) - if return_diag: - return corrected_cov, res_diag + corrected_cov = sqrt_an_full_cov @ np.diag(smoothed_res_diag) @ sqrt_an_full_cov.T + + if return_all: + return corrected_cov, res_diag, smoothed_res_diag, gprs else: return corrected_cov -def smooth_gp_diag(lb, arr_diag, var_diag, ell_cut=0, length_scale=500.0, - length_scale_bounds=(100, 1e4), n_restarts_optimizer=10): - - kernel = 1.0 * RBF(length_scale=length_scale, length_scale_bounds=length_scale_bounds) # + \ - # WhiteKernel(noise_level=noise_level, noise_level_bounds=noise_level_bounds) - - out = np.zeros_like(arr_diag) - +def smooth_gp_diag(lb, arr_diag, var_diag=None, idx_arr=None, + length_scale=200.0, length_scale_bounds=(2, 2e4), + noise_level=0.01, noise_level_bounds=(1e-6, 1e1), + n_restarts_optimizer=5, remove_mean=True, + return_gpr=False): + """Fit a Gaussian process (GP) to a 1-dimensional target from + a 1-dimensional feature set. The GP uses a radial basis function (RBF) + kernel to model the signal and a white noise kernel to model the noise. + The RBF kernel has a constant kernel prefactor. - i_cut = np.where(lb > ell_cut)[0][0] # idx of first bin greater than ell_cut - out[:i_cut] = arr_diag[:i_cut] # below i_cut, do nothing + Parameters + ---------- + lb : (n,) np.ndarray + The featureset. + arr_diag : (n,) np.ndarray + The target. + var_diag : (n,) np.ndarray, optional + The white-noise variance in the target, by default None. If None, the + noise level in the target is estimated from the target values + themselves by the Gaussian process. + idx_arr : np.ndarray, optional + An array of size 0..n that indexes which data points are fit by the GP, + by default None. If None, all datapoints are used. Any unused datapoints + are not fit, but rather the target values are simply adopted at face + value. + length_scale : float, optional + The initial RBF scale guess, by default 200.0. + length_scale_bounds : tuple, optional + The bounds of the RBF scale fit, by default (2, 2e4). + noise_level : float, optional + The initial white noise level guess, by default 0.01. Only used if + var_diag is None. + noise_level_bounds : tuple, optional + The bounds of the white noise level guess, by default (1e-6, 1e1). + Only used if var_diag is None. + n_restarts_optimizer : int, optional + The number of optimizers to run to try to find a gloval minimum, + by default 5. + remove_mean : bool, optional + If True, subtract the mean of the target prior to the fit and add it + back in the prediction, by default True. + return_gpr : bool, optional + If True, also return the sklearn.GaussianProcessRegressor object, by + default False. - # fit the first GP on the bins above the ell_cut - X_train = lb[i_cut:, np.newaxis] - y_train = arr_diag[i_cut:] - 1 # prior mean of 1 - var_train = var_diag[i_cut:] - gpr = GaussianProcessRegressor(kernel=kernel, alpha=var_train, - n_restarts_optimizer=n_restarts_optimizer) - gpr.fit(X_train, y_train) - out[i_cut:] = gpr.predict(X_train, return_std=False) + 1 # prior mean of 1 - - # # fit an exponential at the low end - # X_train = lb[:i_cut] - # y_train = np.abs(arr_diag - y_mean_high)[:i_cut] - # z = np.polyfit(X_train, np.log(y_train), 1) - # f = np.poly1d(z) - # y_mean_high[:i_cut] += np.exp(f(lb[:i_cut])) + Returns + ------- + (n,) np.ndarray, {sklearn.GaussianProcessRegressor object} + The smoothed target, and optionally the object used to smooth. + """ + if idx_arr is None: + idx_arr = np.arange(lb.size) + + out = arr_diag.copy() + + if len(idx_arr) > 0: + # fit only on the indexed data bins + X_train = lb[idx_arr, None] + y_train = arr_diag[idx_arr] + + # remove mean + if remove_mean: + prior_mean = y_train.mean() + y_train = y_train - prior_mean + + # get the fit + if var_diag is None: + kernel = 1.0 * RBF(length_scale=length_scale, length_scale_bounds=length_scale_bounds) + \ + WhiteKernel(noise_level=noise_level, noise_level_bounds=noise_level_bounds) + gpr = GaussianProcessRegressor(kernel=kernel, alpha=0, + n_restarts_optimizer=n_restarts_optimizer) + else: + kernel = 1.0 * RBF(length_scale=length_scale, length_scale_bounds=length_scale_bounds) + gpr = GaussianProcessRegressor(kernel=kernel, alpha=var_diag[idx_arr], + n_restarts_optimizer=n_restarts_optimizer) + gpr.fit(X_train, y_train) + y_fit = gpr.predict(X_train, return_std=False) + + if remove_mean: + y_fit += prior_mean + + out[idx_arr] = y_fit + else: + gpr = None - return out + if return_gpr: + return out, gpr + else: + return out def canonize_connected_2pt(leg1, leg2, all_legs):