diff --git a/pymc/gp/hsgp_approx.py b/pymc/gp/hsgp_approx.py index 62596e27a0..9b85c24baf 100644 --- a/pymc/gp/hsgp_approx.py +++ b/pymc/gp/hsgp_approx.py @@ -71,18 +71,17 @@ def calc_basis_periodic( Xs: TensorLike, period: TensorLike, m: int, - tl: ModuleType = np, ): """ Calculate basis vectors for the cosine series expansion of the periodic covariance function. These are derived from the Taylor series representation of the covariance. """ - w0 = (2 * np.pi) / period # angular frequency defining the periodicity - m1 = tl.tile(w0 * Xs, m) - m2 = tl.diag(tl.arange(0, m, 1)) + w0 = (2 * pt.pi) / period # angular frequency defining the periodicity + m1 = pt.tile(w0 * Xs, m) + m2 = pt.diag(pt.arange(0, m, 1)) mw0x = m1 @ m2 - phi_cos = tl.cos(mw0x) - phi_sin = tl.sin(mw0x) + phi_cos = pt.cos(mw0x) + phi_sin = pt.sin(mw0x) return phi_cos, phi_sin @@ -121,8 +120,8 @@ class HSGP(Base): provided. Further information can be found in Ruitort-Mayol et al. drop_first: bool Default `False`. Sometimes the first basis vector is quite "flat" and very similar to - the intercept term. When there is an intercept in the model, ignoring the first basis - vector may improve sampling. This argument will be deprecated in future versions. + the intercept term. When there is an intercept in the model, removing the first basis + vector may improve sampling. parameterization: str Whether to use `centred` or `noncentered` parameterization when multiplying the basis by the coefficients. @@ -210,13 +209,6 @@ def __init__( if parameterization not in ["centered", "noncentered"]: raise ValueError("`parameterization` must be either 'centered' or 'noncentered'.") - if drop_first: - warnings.warn( - "The drop_first argument will be deprecated in future versions." - " See https://github.com/pymc-devs/pymc/pull/6877", - DeprecationWarning, - ) - self._drop_first = drop_first self._m = m self._m_star = int(np.prod(self._m)) @@ -474,11 +466,15 @@ def __init__( self, m: int, scale: Optional[Union[float, TensorLike]] = 1.0, + drop_first=False, *, mean_func: Mean = Zero(), cov_func: Periodic, ): - arg_err_msg = "`m` must be a positive integer as the `Periodic` kernel approximation is only implemented for 1-dimensional case." + arg_err_msg = ( + "`m` must be a positive integer as the `Periodic` kernel approximation is " + "only implemented for 1-dimensional case." + ) if not isinstance(m, int): raise ValueError(arg_err_msg) @@ -488,7 +484,8 @@ def __init__( if not isinstance(cov_func, Periodic): raise ValueError( - "`cov_func` must be an instance of a `Periodic` kernel only. Use the `scale` parameter to control the variance." + "`cov_func` must be an instance of a `Periodic` kernel only. Use the `scale` " + "parameter to control the variance." ) if cov_func.n_dims > 1: @@ -498,6 +495,7 @@ def __init__( self._m = m self.scale = scale + self.drop_first = drop_first super().__init__(mean_func=mean_func, cov_func=cov_func) @@ -577,8 +575,7 @@ def prior_linearized(self, Xs: TensorLike): ppc = pm.sample_posterior_predictive(idata, var_names=["f"]) """ Xs, _ = self.cov_func._slice(Xs) - - phi_cos, phi_sin = calc_basis_periodic(Xs, self.cov_func.period, self._m, tl=pt) + phi_cos, phi_sin = calc_basis_periodic(Xs, self.cov_func.period, self._m) J = pt.arange(0, self._m, 1) # rescale basis coefficients by the sqrt variance term psd = self.scale * self.cov_func.power_spectral_density_approx(J) @@ -603,21 +600,28 @@ def prior(self, name: str, X: TensorLike, dims: Optional[str] = None): # type: (phi_cos, phi_sin), psd = self.prior_linearized(X - self._X_mean) m = self._m - self._beta = pm.Normal(f"{name}_hsgp_coeffs_", size=(m * 2 - 1)) - # The first eigenfunction for the sine component is zero - # and so does not contribute to the approximation. - f = ( - self.mean_func(X) - + phi_cos @ (psd * self._beta[:m]) # type: ignore - + phi_sin[..., 1:] @ (psd[1:] * self._beta[m:]) # type: ignore - ) - self.f = pm.Deterministic(name, f, dims=dims) + if self.drop_first: + # Drop first sine term (all zeros), drop first cos term (all ones) + beta = pm.Normal(f"{name}_hsgp_coeffs_", size=(m * 2) - 2) + self._beta_cos = pt.concatenate(([0.0], beta[: m - 1])) + self._beta_sin = pt.concatenate(([0.0], beta[m - 1 :])) + + else: + # The first eigenfunction for the sine component is zero + # and so does not contribute to the approximation. + beta = pm.Normal(f"{name}_hsgp_coeffs_", size=(m * 2) - 1) + self._beta_cos = beta[:m] + self._beta_sin = pt.concatenate(([0.0], beta[m:])) + + cos_term = phi_cos @ (psd * self._beta_cos) + sin_term = phi_sin @ (psd * self._beta_sin) + self.f = pm.Deterministic(name, self.mean_func(X) + cos_term + sin_term, dims=dims) return self.f def _build_conditional(self, Xnew): try: - beta, X_mean = self._beta, self._X_mean + X_mean = self._X_mean except AttributeError: raise ValueError( @@ -626,14 +630,15 @@ def _build_conditional(self, Xnew): Xnew, _ = self.cov_func._slice(Xnew) - phi_cos, phi_sin = calc_basis_periodic(Xnew - X_mean, self.cov_func.period, self._m, tl=pt) + phi_cos, phi_sin = calc_basis_periodic(Xnew - X_mean, self.cov_func.period, self._m) m = self._m J = pt.arange(0, m, 1) # rescale basis coefficients by the sqrt variance term psd = self.scale * self.cov_func.power_spectral_density_approx(J) - phi = phi_cos @ (psd * beta[:m]) + phi_sin[..., 1:] @ (psd[1:] * beta[m:]) - return self.mean_func(Xnew) + phi + cos_term = phi_cos @ (psd * self._beta_cos) + sin_term = phi_sin @ (psd * self._beta_sin) + return self.mean_func(Xnew) + cos_term + sin_term def conditional(self, name: str, Xnew: TensorLike, dims: Optional[str] = None): # type: ignore R""" diff --git a/tests/gp/test_hsgp_approx.py b/tests/gp/test_hsgp_approx.py index 0474899dbf..a6be5c1263 100644 --- a/tests/gp/test_hsgp_approx.py +++ b/tests/gp/test_hsgp_approx.py @@ -229,35 +229,61 @@ def test_conditional(self, model, cov_func, X1, parameterization): class TestHSGPPeriodic(_BaseFixtures): def test_parametrization(self): - err_msg = "`m` must be a positive integer as the `Periodic` kernel approximation is only implemented for 1-dimensional case." + err_msg = ( + "`m` must be a positive integer as the `Periodic` kernel approximation is only " + "implemented for 1-dimensional case." + ) with pytest.raises(ValueError, match=err_msg): # `m` must be a positive integer, not a list - cov_func = pm.gp.cov.Periodic(1, period=1, ls=0.1) + cov_func = pm.gp.cov.Periodic(1, period=1, ls=1.0) pm.gp.HSGPPeriodic(m=[500], cov_func=cov_func) with pytest.raises(ValueError, match=err_msg): # `m`` must be a positive integer - cov_func = pm.gp.cov.Periodic(1, period=1, ls=0.1) + cov_func = pm.gp.cov.Periodic(1, period=1, ls=1.0) pm.gp.HSGPPeriodic(m=-1, cov_func=cov_func) with pytest.raises( ValueError, - match="`cov_func` must be an instance of a `Periodic` kernel only. Use the `scale` parameter to control the variance.", + match=( + "`cov_func` must be an instance of a `Periodic` kernel only. Use the `scale` " + "parameter to control the variance." + ), ): # `cov_func` must be `Periodic` only - cov_func = 5.0 * pm.gp.cov.Periodic(1, period=1, ls=0.1) + cov_func = 5.0 * pm.gp.cov.Periodic(1, period=1, ls=1.0) pm.gp.HSGPPeriodic(m=500, cov_func=cov_func) with pytest.raises( ValueError, - match="HSGP approximation for `Periodic` kernel only implemented for 1-dimensional case.", + match=( + "HSGP approximation for `Periodic` kernel only implemented for 1-dimensional case." + ), ): cov_func = pm.gp.cov.Periodic(2, period=1, ls=[1, 2]) pm.gp.HSGPPeriodic(m=500, scale=0.5, cov_func=cov_func) @pytest.mark.parametrize("cov_func", [pm.gp.cov.Periodic(1, period=1, ls=1)]) - @pytest.mark.parametrize("eta", [100.0]) + @pytest.mark.parametrize("drop_first", [True, False]) + def test_parametrization_drop_first(self, model, cov_func, X1, drop_first): + n_basis = 101 + with model: + gp = pm.gp.HSGPPeriodic(m=n_basis, scale=1.0, drop_first=drop_first, cov_func=cov_func) + gp.prior("f1", X1) + first_cos_zero = gp._beta_cos[0].eval() == 0.0 + first_sin_zero = gp._beta_sin[0].eval() == 0.0 + if drop_first: + assert ( + first_cos_zero and first_sin_zero + ), "First element of both coefficient vectors should be zero." + else: + assert ( + not first_cos_zero and first_sin_zero + ), "Only the first element of the sine coefficent vectors should be zero." + + @pytest.mark.parametrize("cov_func", [pm.gp.cov.Periodic(1, period=1, ls=1)]) + @pytest.mark.parametrize("eta", [2.0]) @pytest.mark.xfail( reason="For `pm.gp.cov.Periodic`, this test does not pass.\ The mmd is around `0.0468`.\ @@ -266,11 +292,11 @@ def test_parametrization(self): See https://github.com/pymc-devs/pymc/pull/6877/ for the full discussion." ) def test_prior(self, model, cov_func, eta, X1, rng): - """Compare HSGPPeriodic prior to unapproximated GP prior, pm.gp.Latent. Draw samples from the - prior and compare them using MMD two sample test. + """Compare HSGPPeriodic prior to unapproximated GP prior, pm.gp.Latent. Draw samples from + the prior and compare them using MMD two sample test. """ with model: - hsgp = pm.gp.HSGPPeriodic(m=200, scale=eta, cov_func=cov_func) + hsgp = pm.gp.HSGPPeriodic(m=200, scale=eta, drop_first=False, cov_func=cov_func) f1 = hsgp.prior("f1", X=X1) gp = pm.gp.Latent(cov_func=eta**2 * cov_func) @@ -278,13 +304,16 @@ def test_prior(self, model, cov_func, eta, X1, rng): idata = pm.sample_prior_predictive(samples=1000, random_seed=rng) - samples1 = az.extract(idata.prior["f1"])["f1"].values.T - samples2 = az.extract(idata.prior["f2"])["f2"].values.T + samples1 = az.extract(idata.prior, var_names="f1").values.T + samples2 = az.extract(idata.prior, var_names="f2").values.T h0, mmd, critical_value, reject = two_sample_test( samples1, samples2, n_sims=500, alpha=0.01 ) - assert not reject, f"H0 was rejected, {mmd} even though HSGP and GP priors should match." + assert not reject, ( + f"H0 was rejected, MMD {mmd:.3f} > {critical_value:.3f} even though HSGP and GP priors " + "should match." + ) @pytest.mark.parametrize("cov_func", [pm.gp.cov.Periodic(1, period=1, ls=1)]) def test_conditional_periodic(self, model, cov_func, X1): @@ -299,8 +328,8 @@ def test_conditional_periodic(self, model, cov_func, X1): idata = pm.sample_prior_predictive(samples=1000) - samples1 = az.extract(idata.prior["f"])["f"].values.T - samples2 = az.extract(idata.prior["fc"])["fc"].values.T + samples1 = az.extract(idata.prior, var_names="f").values.T + samples2 = az.extract(idata.prior, var_names="fc").values.T h0, mmd, critical_value, reject = two_sample_test( samples1, samples2, n_sims=500, alpha=0.01