From c112593bf2c694bb3a38ccfb8f541709df1ba2c0 Mon Sep 17 00:00:00 2001 From: houpc Date: Tue, 29 Oct 2024 20:33:26 +0800 Subject: [PATCH] add unittests and improve several type checkings --- src/base.py | 6 +- src/base_test.py | 103 ++++++++++ src/integrators.py | 16 +- src/integrators_test.py | 404 ++++++++++++++++++++++++++++++++++++++++ src/maps.py | 64 ++++--- src/maps_test.py | 231 +++++++++++++++++++++++ src/mc_test.py | 3 + src/utils.py | 34 +++- src/utils_test.py | 146 +++++++++++++++ src/vegas_test.py | 11 +- 10 files changed, 980 insertions(+), 38 deletions(-) create mode 100644 src/base_test.py create mode 100644 src/integrators_test.py create mode 100644 src/maps_test.py create mode 100644 src/utils_test.py diff --git a/src/base.py b/src/base.py index 07055b1..687e8df 100644 --- a/src/base.py +++ b/src/base.py @@ -12,13 +12,13 @@ class BaseDistribution(nn.Module): def __init__(self, bounds, device="cpu", dtype=torch.float64): super().__init__() self.dtype = dtype - # self.bounds = bounds if isinstance(bounds, (list, np.ndarray)): self.bounds = torch.tensor(bounds, dtype=dtype, device=device) elif isinstance(bounds, torch.Tensor): - self.bounds = bounds + self.bounds = bounds.to(dtype=dtype, device=device) else: - raise ValueError("Unsupported map specification") + raise ValueError("'bounds' must be a list, numpy array, or torch tensor.") + self.dim = self.bounds.shape[0] self.device = device diff --git a/src/base_test.py b/src/base_test.py new file mode 100644 index 0000000..8116b4c --- /dev/null +++ b/src/base_test.py @@ -0,0 +1,103 @@ +import unittest +import torch +import numpy as np +from base import BaseDistribution, Uniform + + +class TestBaseDistribution(unittest.TestCase): + def setUp(self): + # Common setup for all tests + self.bounds_list = [[0.0, 1.0], [2.0, 3.0]] + self.bounds_tensor = torch.tensor([[0.0, 1.0], [2.0, 3.0]]) + self.device = "cpu" + self.dtype = torch.float64 + + def test_init_with_list(self): + base_dist = BaseDistribution(self.bounds_list, self.device, self.dtype) + self.assertEqual(base_dist.bounds.tolist(), self.bounds_list) + self.assertEqual(base_dist.dim, 2) + self.assertEqual(base_dist.device, self.device) + self.assertEqual(base_dist.dtype, self.dtype) + + def test_init_with_tensor(self): + base_dist = BaseDistribution(self.bounds_tensor, self.device, self.dtype) + self.assertTrue(torch.equal(base_dist.bounds, self.bounds_tensor)) + self.assertEqual(base_dist.dim, 2) + self.assertEqual(base_dist.device, self.device) + self.assertEqual(base_dist.dtype, self.dtype) + + def test_init_with_invalid_bounds(self): + with self.assertRaises(ValueError): + BaseDistribution("invalid_bounds", self.device, self.dtype) + + def test_sample_not_implemented(self): + base_dist = BaseDistribution(self.bounds_list, self.device, self.dtype) + with self.assertRaises(NotImplementedError): + base_dist.sample() + + def tearDown(self): + # Common teardown for all tests + pass + + +class TestUniform(unittest.TestCase): + def setUp(self): + # Common setup for all tests + self.bounds_list = [[0.0, 1.0], [2.0, 3.0]] + self.bounds_tensor = torch.tensor([[0.0, 1.0], [2.0, 3.0]]) + self.device = "cpu" + self.dtype = torch.float64 + self.uniform_dist = Uniform(self.bounds_list, self.device, self.dtype) + + def test_init_with_list(self): + self.assertEqual(self.uniform_dist.bounds.tolist(), self.bounds_list) + self.assertEqual(self.uniform_dist.dim, 2) + self.assertEqual(self.uniform_dist.device, self.device) + self.assertEqual(self.uniform_dist.dtype, self.dtype) + + def test_init_with_tensor(self): + uniform_dist = Uniform(self.bounds_tensor, self.device, self.dtype) + self.assertTrue(torch.equal(uniform_dist.bounds, self.bounds_tensor)) + self.assertEqual(uniform_dist.dim, 2) + self.assertEqual(uniform_dist.device, self.device) + self.assertEqual(uniform_dist.dtype, self.dtype) + + def test_sample_within_bounds(self): + nsamples = 1000 + samples, log_detJ = self.uniform_dist.sample(nsamples) + self.assertEqual(samples.shape, (nsamples, 2)) + self.assertTrue(torch.all(samples[:, 0] >= 0.0)) + self.assertTrue(torch.all(samples[:, 0] <= 1.0)) + self.assertTrue(torch.all(samples[:, 1] >= 2.0)) + self.assertTrue(torch.all(samples[:, 1] <= 3.0)) + self.assertEqual(log_detJ.shape, (nsamples,)) + self.assertTrue( + torch.allclose( + log_detJ, torch.tensor([np.log(1.0) + np.log(1.0)] * nsamples) + ) + ) + + def test_sample_with_single_sample(self): + samples, log_detJ = self.uniform_dist.sample(1) + self.assertEqual(samples.shape, (1, 2)) + self.assertTrue(torch.all(samples[:, 0] >= 0.0)) + self.assertTrue(torch.all(samples[:, 0] <= 1.0)) + self.assertTrue(torch.all(samples[:, 1] >= 2.0)) + self.assertTrue(torch.all(samples[:, 1] <= 3.0)) + self.assertEqual(log_detJ.shape, (1,)) + self.assertTrue( + torch.allclose(log_detJ, torch.tensor([np.log(1.0) + np.log(1.0)])) + ) + + def test_sample_with_zero_samples(self): + samples, log_detJ = self.uniform_dist.sample(0) + self.assertEqual(samples.shape, (0, 2)) + self.assertEqual(log_detJ.shape, (0,)) + + def tearDown(self): + # Common teardown for all tests + pass + + +if __name__ == "__main__": + unittest.main() diff --git a/src/integrators.py b/src/integrators.py index 4b93ad9..ecec374 100644 --- a/src/integrators.py +++ b/src/integrators.py @@ -103,8 +103,8 @@ def __call__(self, f: Callable, **kwargs): results = np.array([RAvg() for _ in range(f_size)]) for i in range(f_size): _mean = values[:, i].mean().item() - _std = values[:, i].std().item() / self.nbatch**0.5 - results[i].add(gvar.gvar(_mean, _std)) + _var = values[:, i].var().item() / self.nbatch + results[i].update(_mean, _var, self.neval) if f_size == 1: return results[0] else: @@ -247,15 +247,17 @@ def one_step(current_y, current_x, current_weight, current_jac): results_ref = RAvg() mean_ref = refvalues.mean().item() - std_ref = refvalues.std().item() / self.nbatch**0.5 + var_ref = refvalues.var().item() / self.nbatch - results_ref.add(gvar.gvar(mean_ref, std_ref)) + results_ref.update(mean_ref, var_ref, self.neval) for i in range(f_size): _mean = values[:, i].mean().item() - _std = values[:, i].std().item() / self.nbatch**0.5 - results[i].add(gvar.gvar(_mean, _std)) + _var = values[:, i].var().item() / self.nbatch + results[i].update(_mean, _var, self.neval) if f_size == 1: - return results[0] / results_ref * self._rangebounds.prod() + res = results[0] / results_ref * self._rangebounds.prod() + result = RAvg(itn_results=[res], sum_neval=self.neval) + return result else: return results / results_ref * self._rangebounds.prod().item() diff --git a/src/integrators_test.py b/src/integrators_test.py new file mode 100644 index 0000000..fb29d94 --- /dev/null +++ b/src/integrators_test.py @@ -0,0 +1,404 @@ +import unittest +import torch +import numpy as np +from integrators import Integrator, MonteCarlo, MCMC +from utils import RAvg + + +class TestIntegrator(unittest.TestCase): + def setUp(self): + # Common setup for all tests + self.bounds = [[0, 1], [2, 3]] + self.device = "cpu" + self.dtype = torch.float64 + self.neval = 1000 + self.nbatch = 100 + + def test_init_with_bounds(self): + integrator = Integrator( + bounds=self.bounds, device=self.device, dtype=self.dtype + ) + self.assertEqual(integrator.bounds.tolist(), self.bounds) + self.assertEqual(integrator.dim, 2) + self.assertEqual(integrator.neval, self.neval) + self.assertEqual(integrator.nbatch, self.neval) + self.assertEqual(integrator.device, self.device) + self.assertEqual(integrator.dtype, self.dtype) + + def test_init_with_maps(self): + class MockMaps: + def __init__(self, bounds, dtype): + self.bounds = torch.tensor(bounds, dtype=dtype) + self.dtype = dtype + + maps = MockMaps(bounds=self.bounds, dtype=self.dtype) + integrator = Integrator(maps=maps, device=self.device, dtype=self.dtype) + self.assertEqual(integrator.bounds.tolist(), self.bounds) + self.assertEqual(integrator.dim, 2) + self.assertEqual(integrator.neval, self.neval) + self.assertEqual(integrator.nbatch, self.neval) + self.assertEqual(integrator.device, self.device) + self.assertEqual(integrator.dtype, self.dtype) + + def test_init_with_mismatched_dtype(self): + class MockMaps: + def __init__(self, bounds, dtype): + self.bounds = bounds + self.dtype = dtype + + maps = MockMaps(bounds=self.bounds, dtype=torch.float32) + with self.assertRaises(ValueError): + Integrator(maps=maps, device=self.device, dtype=self.dtype) + + def test_init_with_invalid_bounds(self): + with self.assertRaises(TypeError): + Integrator(bounds=123, device=self.device, dtype=self.dtype) + + def test_sample_without_maps(self): + integrator = Integrator( + bounds=self.bounds, device=self.device, dtype=self.dtype + ) + u, log_detJ = integrator.sample(100) + self.assertEqual(u.shape, (100, 2)) + self.assertEqual(log_detJ.shape, (100,)) + + def test_sample_with_maps(self): + class MockMaps: + def __init__(self, bounds, dtype): + self.bounds = bounds + self.dtype = dtype + + def forward(self, u): + return u, torch.zeros(u.shape[0], dtype=self.dtype) + + maps = MockMaps(bounds=self.bounds, dtype=self.dtype) + integrator = Integrator(maps=maps, device=self.device, dtype=self.dtype) + u, log_detJ = integrator.sample(100) + self.assertEqual(u.shape, (100, 2)) + self.assertEqual(log_detJ.shape, (100,)) + + +class TestMonteCarlo(unittest.TestCase): + def setUp(self): + # Setup common parameters for tests + self.maps = None + self.bounds = [[0, 1]] + self.q0 = None + self.neval = 1000 + self.nbatch = 100 + self.device = "cpu" + self.dtype = torch.float64 + self.monte_carlo = MonteCarlo( + self.maps, + self.bounds, + self.q0, + self.neval, + self.nbatch, + self.device, + self.dtype, + ) + + def tearDown(self): + # Teardown after tests + pass + + def test_initialization(self): + # Test if the MonteCarlo class initializes correctly + self.assertIsInstance(self.monte_carlo, MonteCarlo) + self.assertEqual(self.monte_carlo.neval, self.neval) + self.assertEqual(self.monte_carlo.nbatch, self.nbatch) + self.assertEqual(self.monte_carlo.device, self.device) + self.assertEqual(self.monte_carlo.dtype, self.dtype) + self.assertTrue( + torch.equal( + self.monte_carlo.bounds, + torch.tensor(self.bounds, dtype=self.dtype, device=self.device), + ) + ) + self.assertEqual(self.monte_carlo.dim, 1) + + def test_call_single_tensor_output(self): + # Test the __call__ method with a function that returns a single tensor + def f(x): + return x.sum(dim=1) + + result = self.monte_carlo(f) + self.assertIsInstance(result, RAvg) + self.assertEqual(result.shape, ()) + self.assertIsInstance(result.mean, float) + self.assertIsInstance(result.sdev, float) + + def test_call_multiple_tensor_output(self): + # Test the __call__ method with a function that returns a list of tensors + def f(x): + return [x.sum(dim=1), x.prod(dim=1)] + + result = self.monte_carlo(f) + self.assertIsInstance(result, np.ndarray) + self.assertEqual(result.shape, (2,)) + self.assertEqual(result.dtype, RAvg) + + def test_call_invalid_output_type(self): + # Test the __call__ method with a function that returns an invalid type + def f(x): + return "invalid" + + with self.assertRaises(TypeError): + self.monte_carlo(f) + + def test_multiply_by_jacobian_single_tensor(self): + # Test the _multiply_by_jacobian method with a single tensor + values = torch.tensor([1.0, 2.0, 3.0], dtype=self.dtype) + jac = torch.tensor([0.5, 0.5, 0.5], dtype=self.dtype) + result = self.monte_carlo._multiply_by_jacobian(values, jac) + expected = torch.tensor([[0.5], [1.0], [1.5]], dtype=self.dtype) + self.assertTrue(torch.allclose(result, expected)) + + def test_multiply_by_jacobian_multiple_tensors(self): + # Test the _multiply_by_jacobian method with a list of tensors + values = [ + torch.tensor([1.0, 2.0, 3.0], dtype=self.dtype), + torch.tensor([4.0, 5.0, 6.0], dtype=self.dtype), + ] + jac = torch.tensor([0.5, 0.5, 0.5], dtype=self.dtype) + result = self.monte_carlo._multiply_by_jacobian(values, jac) + expected = torch.tensor([[0.5, 2.0], [1.0, 2.5], [1.5, 3.0]], dtype=self.dtype) + self.assertTrue(torch.allclose(result, expected)) + + def test_sample_method(self): + # Test the sample method to ensure it returns the correct shape + x, log_detJ = self.monte_carlo.sample(self.nbatch) + self.assertEqual(x.shape, (self.nbatch, 1)) + self.assertEqual(log_detJ.shape, (self.nbatch,)) + + def test_call_with_cuda(self): + # Test the __call__ method with different device values + def f(x): + return x.sum(dim=1) + + # Test with device = "cuda" if available + if torch.cuda.is_available(): + monte_carlo_cuda = MonteCarlo( + self.maps, + self.bounds, + self.q0, + self.neval, + self.nbatch, + "cuda", + self.dtype, + ) + result_cuda = monte_carlo_cuda(f) + self.assertIsInstance(result_cuda, RAvg) + self.assertIsInstance(result_cuda.mean, float) + self.assertIsInstance(result_cuda.sdev, float) + self.assertEqual(result_cuda.shape, ()) + + def test_call_with_different_dtype(self): + # Test the __call__ method with different dtype values + def f(x): + return x.sum(dim=1) + + # Test with dtype = torch.float32 + monte_carlo_float32 = MonteCarlo( + self.maps, + self.bounds, + self.q0, + self.neval, + self.nbatch, + self.device, + torch.float32, + ) + result_float32 = monte_carlo_float32(f) + self.assertIsInstance(result_float32, RAvg) + self.assertIsInstance(result_float32.mean, float) + self.assertIsInstance(result_float32.sdev, float) + self.assertEqual(result_float32.shape, ()) + + def test_call_with_different_bounds(self): + # Test the __call__ method with different bounds values + def f(x): + return x.sum(dim=1) + + # Test with bounds = [0, 2] + monte_carlo_bounds_0_2 = MonteCarlo( + self.maps, + [[0, 2]], + self.q0, + self.neval, + self.nbatch, + self.device, + self.dtype, + ) + result_bounds_0_2 = monte_carlo_bounds_0_2(f) + self.assertIsInstance(result_bounds_0_2, RAvg) + self.assertEqual(result_bounds_0_2.shape, ()) + + # Test with bounds = [-1, 1] + monte_carlo_bounds_minus1_1 = MonteCarlo( + self.maps, + [(-1, 1)], + self.q0, + self.neval, + self.nbatch, + self.device, + self.dtype, + ) + result_bounds_minus1_1 = monte_carlo_bounds_minus1_1(f) + self.assertIsInstance(result_bounds_minus1_1, RAvg) + self.assertEqual(result_bounds_minus1_1.shape, ()) + + +class TestMCMC(unittest.TestCase): + def setUp(self): + # Setup common parameters for tests + self.maps = None + self.bounds = [(-1.2, 0.6)] + self.q0 = None + self.neval = 10000 + self.nbatch = 100 + self.nburnin = 500 + self.device = "cpu" + self.dtype = torch.float64 + self.mcmc = MCMC( + self.maps, + self.bounds, + self.q0, + self.neval, + self.nbatch, + self.nburnin, + self.device, + self.dtype, + ) + + def tearDown(self): + # Teardown after tests + pass + + def test_initialization(self): + # Test if the MCMC class initializes correctly + self.assertIsInstance(self.mcmc, MCMC) + self.assertEqual(self.mcmc.neval, self.neval) + self.assertEqual(self.mcmc.nbatch, self.nbatch) + self.assertEqual(self.mcmc.nburnin, self.nburnin) + self.assertEqual(self.mcmc.device, self.device) + self.assertEqual(self.mcmc.dtype, self.dtype) + self.assertTrue( + torch.equal( + self.mcmc.bounds, + torch.tensor(self.bounds, dtype=self.dtype, device=self.device), + ) + ) + self.assertEqual(self.mcmc.dim, 1) + + def test_call_single_tensor_output(self): + # Test the __call__ method with a function that returns a single tensor + def f(x): + return x.sum(dim=1) + + result = self.mcmc(f) + self.assertIsInstance(result, RAvg) + self.assertEqual(result.shape, ()) + self.assertIsInstance(result.mean, float) + self.assertIsInstance(result.sdev, float) + + def test_call_multiple_tensor_output(self): + # Test the __call__ method with a function that returns a list of tensors + def f(x): + return [x.sum(dim=1), x.prod(dim=1)] + + result = self.mcmc(f) + self.assertIsInstance(result, np.ndarray) + self.assertEqual(result.shape, (2,)) + self.assertEqual(result.dtype, RAvg) + + def test_call_invalid_output_type(self): + # Test the __call__ method with a function that returns an invalid type + def f(x): + return "invalid" + + with self.assertRaises(TypeError): + self.mcmc(f) + + def test_call_with_different_device(self): + # Test the __call__ method with different device values + def f(x): + return x.sum(dim=1) + + # Test with device = "cuda" if available + if torch.cuda.is_available(): + mcmc_cuda = MCMC( + self.maps, + self.bounds, + self.q0, + self.neval, + self.nbatch, + self.nburnin, + "cuda", + self.dtype, + ) + result_cuda = mcmc_cuda(f) + self.assertIsInstance(result_cuda, RAvg) + self.assertEqual(result_cuda.shape, ()) + + def test_call_with_different_dtype(self): + # Test the __call__ method with different dtype values + def f(x): + return x.sum(dim=1) + + # Test with dtype = torch.float32 + mcmc_float32 = MCMC( + self.maps, + self.bounds, + self.q0, + self.neval, + self.nbatch, + self.nburnin, + self.device, + torch.float32, + ) + result_float32 = mcmc_float32(f) + self.assertIsInstance(result_float32, RAvg) + self.assertEqual(result_float32.shape, ()) + self.assertIsInstance(result_float32.mean, float) + self.assertIsInstance(result_float32.sdev, float) + + def test_call_with_different_bounds(self): + # Test the __call__ method with different bounds values + def f(x): + return x.sum(dim=1) + + # Test with bounds = [0, 2] + mcmc_bounds = MCMC( + self.maps, + [(3, 5.2), (-1.1, 0.2)], + self.q0, + self.neval, + self.nbatch, + self.nburnin, + self.device, + self.dtype, + ) + result_bounds = mcmc_bounds(f) + self.assertIsInstance(result_bounds, RAvg) + self.assertEqual(result_bounds.shape, ()) + + def test_call_with_different_proposal_dist(self): + # Test the __call__ method with different proposal distributions + def f(x): + return x.sum(dim=1) + + from integrators import random_walk, gaussian + + # Test with uniform proposal distribution + result_rw = self.mcmc(f, proposal_dist=random_walk) + self.assertIsInstance(result_rw, RAvg) + self.assertEqual(result_rw.shape, ()) + + # Test with normal proposal distribution + result_normal = self.mcmc(f, proposal_dist=gaussian) + self.assertIsInstance(result_normal, RAvg) + self.assertEqual(result_normal.shape, ()) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/maps.py b/src/maps.py index 02f5fb1..6f6fc1d 100644 --- a/src/maps.py +++ b/src/maps.py @@ -12,8 +12,11 @@ def __init__(self, bounds, device="cpu", dtype=torch.float64): super().__init__() if isinstance(bounds, (list, np.ndarray)): self.bounds = torch.tensor(bounds, dtype=dtype, device=device) + elif isinstance(bounds, torch.Tensor): + self.bounds = bounds.to(dtype=dtype, device=device) else: - raise ValueError("Unsupported map specification") + raise ValueError("'bounds' must be a list, numpy array, or torch tensor.") + self.dim = self.bounds.shape[0] self.device = device self.dtype = dtype @@ -65,36 +68,26 @@ def inverse(self, x): class Vegas(Map): def __init__(self, bounds, ninc=1000, alpha=0.5, device="cpu", dtype=torch.float64): super().__init__(bounds, device, dtype) - # self.nbin = nbin - self.alpha = alpha + + # Ensure ninc is a tensor of appropriate shape and type if isinstance(ninc, int): - self.ninc = torch.ones(self.dim, dtype=torch.int32, device=device) * ninc - else: + self.ninc = torch.full((self.dim,), ninc, dtype=torch.int32, device=device) + elif isinstance(ninc, (list, np.ndarray)): self.ninc = torch.tensor(ninc, dtype=torch.int32, device=device) + elif isinstance(ninc, torch.Tensor): + self.ninc = ninc.to(dtype=torch.int32, device=device) + else: + raise ValueError("'ninc' must be an int, list, numpy array, or torch tensor.") + + # Ensure ninc has the correct shape + if self.ninc.shape != (self.dim,): + raise ValueError(f"'ninc' must be a scalar or a 1D array of length {self.dim}.") - self.inc = torch.empty( - self.dim, self.ninc.max(), dtype=self.dtype, device=self.device - ) - self.grid = torch.empty( - self.dim, self.ninc.max() + 1, dtype=self.dtype, device=self.device - ) - + self.make_uniform() + self.alpha = alpha self._A = self.bounds[:, 1] - self.bounds[:, 0] self._jaclinear = torch.prod(self._A) - for d in range(self.dim): - self.grid[d, : self.ninc[d] + 1] = torch.linspace( - self.bounds[d, 0], - self.bounds[d, 1], - self.ninc[d] + 1, - dtype=self.dtype, - device=self.device, - ) - self.inc[d, : self.ninc[d]] = ( - self.grid[d, 1 : self.ninc[d] + 1] - self.grid[d, : self.ninc[d]] - ) - self.clear() - def train(self, nsamples, f, epoch=5, alpha=0.5): q0 = Uniform(self.bounds, device=self.device, dtype=self.dtype) u, log_detJ0 = q0.sample(nsamples) @@ -239,6 +232,27 @@ def adapt(self, alpha=0.0): ) self.clear() + def make_uniform(self): + self.inc = torch.empty( + self.dim, self.ninc.max(), dtype=self.dtype, device=self.device + ) + self.grid = torch.empty( + self.dim, self.ninc.max() + 1, dtype=self.dtype, device=self.device + ) + + for d in range(self.dim): + self.grid[d, : self.ninc[d] + 1] = torch.linspace( + self.bounds[d, 0], + self.bounds[d, 1], + self.ninc[d] + 1, + dtype=self.dtype, + device=self.device, + ) + self.inc[d, : self.ninc[d]] = ( + self.grid[d, 1 : self.ninc[d] + 1] - self.grid[d, : self.ninc[d]] + ) + self.clear() + def extract_grid(self): "Return a list of lists specifying the map's grid." grid = [] diff --git a/src/maps_test.py b/src/maps_test.py new file mode 100644 index 0000000..97f12ff --- /dev/null +++ b/src/maps_test.py @@ -0,0 +1,231 @@ +import unittest +import torch + +# import numpy as np +from maps import Map, CompositeMap, Linear, Vegas + + +class TestMap(unittest.TestCase): + def setUp(self): + self.bounds = [[0, 1], [2, 3]] + self.device = "cpu" + self.dtype = torch.float64 + self.map = Map(self.bounds, self.device, self.dtype) + + def test_init_with_list(self): + self.assertEqual(self.map.bounds.tolist(), self.bounds) + self.assertEqual(self.map.dim, 2) + self.assertEqual(self.map.device, self.device) + self.assertEqual(self.map.dtype, self.dtype) + + def test_init_with_tensor(self): + bounds_tensor = torch.tensor(self.bounds, dtype=self.dtype, device=self.device) + map_tensor = Map(bounds_tensor, self.device, self.dtype) + self.assertTrue(torch.equal(map_tensor.bounds, bounds_tensor)) + self.assertEqual(map_tensor.dim, 2) + self.assertEqual(map_tensor.device, self.device) + self.assertEqual(map_tensor.dtype, self.dtype) + + def test_init_with_invalid_bounds(self): + with self.assertRaises(ValueError): + Map("invalid_bounds", self.device, self.dtype) + + def test_forward_not_implemented(self): + with self.assertRaises(NotImplementedError): + self.map.forward(torch.tensor([0.5, 0.5], dtype=self.dtype)) + + def test_inverse_not_implemented(self): + with self.assertRaises(NotImplementedError): + self.map.inverse(torch.tensor([0.5, 0.5], dtype=self.dtype)) + + +class TestCompositeMap(unittest.TestCase): + def setUp(self): + self.bounds1 = [[0, 1], [2, 3]] + self.bounds2 = [[1, 2], [3, 4]] + self.map1 = Linear(self.bounds1) + self.map2 = Linear(self.bounds2) + self.composite_map = CompositeMap([self.map1, self.map2]) + + def test_init_with_empty_maps(self): + with self.assertRaises(ValueError): + CompositeMap([]) + + def test_forward(self): + u = torch.tensor([[0.5, 0.5]], dtype=torch.float64) + expected_output = torch.tensor([[1.5, 5.5]], dtype=torch.float64) + expected_log_detJ = torch.tensor([0.0], dtype=torch.float64) + output, log_detJ = self.composite_map.forward(u) + self.assertTrue(torch.equal(output, expected_output)) + self.assertTrue(torch.equal(log_detJ, expected_log_detJ)) + + def test_inverse(self): + x = torch.tensor([[1.5, 5.5]], dtype=torch.float64) + expected_output = torch.tensor([[0.5, 0.5]], dtype=torch.float64) + expected_log_detJ = torch.tensor([0.0], dtype=torch.float64) + output, log_detJ = self.composite_map.inverse(x) + self.assertTrue(torch.equal(output, expected_output)) + self.assertTrue(torch.equal(log_detJ, expected_log_detJ)) + + +class TestLinear(unittest.TestCase): + def setUp(self): + self.bounds = [[0, 1], [2, 3]] + self.linear_map = Linear(self.bounds) + + def test_forward(self): + u = torch.tensor([[0.5, 0.5]], dtype=torch.float64) + expected_output = torch.tensor([[0.5, 2.5]], dtype=torch.float64) + expected_log_detJ = torch.tensor([0.0], dtype=torch.float64) + output, log_detJ = self.linear_map.forward(u) + self.assertTrue(torch.equal(output, expected_output)) + self.assertTrue(torch.equal(log_detJ, expected_log_detJ)) + + def test_inverse(self): + x = torch.tensor([[0.5, 2.5]], dtype=torch.float64) + expected_output = torch.tensor([[0.5, 0.5]], dtype=torch.float64) + expected_log_detJ = torch.tensor([0.0], dtype=torch.float64) + output, log_detJ = self.linear_map.inverse(x) + self.assertTrue(torch.equal(output, expected_output)) + self.assertTrue(torch.equal(log_detJ, expected_log_detJ)) + + +class TestVegas(unittest.TestCase): + def setUp(self): + # Setup common parameters for tests + self.bounds = torch.tensor([[0.0, 1.0], [0.0, 1.0]], dtype=torch.float64) + self.ninc = 10 + self.alpha = 0.5 + self.device = "cpu" + self.dtype = torch.float64 + self.vegas = Vegas( + self.bounds, + ninc=self.ninc, + alpha=self.alpha, + device=self.device, + dtype=self.dtype, + ) + grid0 = torch.linspace(0, 1, 11, dtype=self.dtype) + self.init_grid = torch.stack([grid0, grid0]) + + def tearDown(self): + # Teardown after each test + del self.vegas + + def test_initialization(self): + # Test initialization of the Vegas class + self.assertEqual(self.vegas.dim, 2) + self.assertEqual(self.vegas.alpha, self.alpha) + self.assertEqual(self.vegas.ninc.tolist(), [self.ninc, self.ninc]) + self.assertEqual(self.vegas.grid.shape, (2, self.ninc + 1)) + self.assertTrue(torch.equal(self.vegas.grid, self.init_grid)) + self.assertEqual(self.vegas.inc.shape, (2, self.ninc)) + + def test_add_training_data(self): + # Test adding training data + u = torch.tensor([[0.1, 0.2], [0.3, 0.4]], dtype=torch.float64) + fval = torch.tensor([1.0, 2.0], dtype=torch.float64) + self.vegas.add_training_data(u, fval) + self.assertIsNotNone(self.vegas.sum_f) + self.assertIsNotNone(self.vegas.n_f) + + def test_adapt(self): + # Test grid adaptation + u = torch.tensor([[0.1, 0.2], [0.3, 0.4]], dtype=torch.float64) + fval = torch.tensor([1.0, 2.0], dtype=torch.float64) + self.vegas.add_training_data(u, fval) + self.vegas.adapt(alpha=self.alpha) + self.assertEqual(self.vegas.grid.shape, (2, self.ninc + 1)) + self.assertEqual(self.vegas.inc.shape, (2, self.ninc)) + + def test_extract_grid(self): + # Test extracting the grid + grid = self.vegas.extract_grid() + self.assertEqual(len(grid), 2) + self.assertEqual(len(grid[0]), self.ninc + 1) + self.assertEqual(len(grid[1]), self.ninc + 1) + + def test_clear(self): + # Test clearing accumulated data + u = torch.tensor([[0.1, 0.2], [0.3, 0.4]], dtype=torch.float64) + fval = torch.tensor([1.0, 2.0], dtype=torch.float64) + self.vegas.add_training_data(u, fval) + self.vegas.clear() + self.assertIsNone(self.vegas.sum_f) + self.assertIsNone(self.vegas.n_f) + + def test_forward(self): + # Test forward transformation + u = torch.tensor([[0.1, 0.2], [0.3, 0.4]], dtype=torch.float64) + x, log_jac = self.vegas.forward(u) + self.assertEqual(x.shape, u.shape) + self.assertEqual(log_jac.shape, (u.shape[0],)) + + def test_inverse(self): + # Test inverse transformation + x = torch.tensor([[0.1, 0.2], [0.3, 0.4]], dtype=torch.float64) + u, log_jac = self.vegas.inverse(x) + self.assertEqual(u.shape, x.shape) + self.assertEqual(log_jac.shape, (x.shape[0],)) + + def test_train(self): + # Test training the Vegas class + def f(x): + return torch.sum(x, dim=1) + + nsamples = 100 + epoch = 5 + self.vegas.train(nsamples, f, epoch=epoch, alpha=self.alpha) + self.assertEqual(self.vegas.grid.shape, (2, self.ninc + 1)) + self.assertEqual(self.vegas.inc.shape, (2, self.ninc)) + + def test_make_uniform(self): + # Test the make_uniform method + self.vegas.make_uniform() + self.assertEqual(self.vegas.grid.shape, (2, self.ninc + 1)) + self.assertEqual(self.vegas.inc.shape, (2, self.ninc)) + self.assertTrue(torch.equal(self.vegas.grid, self.init_grid)) + self.assertIsNone(self.vegas.sum_f) + self.assertIsNone(self.vegas.n_f) + + def test_edge_cases(self): + # Test edge cases + # Test with ninc as a list + ninc_list = [5, 15] + vegas_list = Vegas( + self.bounds, + ninc=ninc_list, + alpha=self.alpha, + device=self.device, + dtype=self.dtype, + ) + self.assertEqual(vegas_list.ninc.tolist(), ninc_list) + del vegas_list + + # Test with alpha < 0 + vegas_neg_alpha = Vegas( + self.bounds, + ninc=self.ninc, + alpha=-0.5, + device=self.device, + dtype=self.dtype, + ) + self.assertEqual(vegas_neg_alpha.alpha, -0.5) + del vegas_neg_alpha + + # Test with different device + if torch.cuda.is_available(): + device_cuda = "cuda" + vegas_cuda = Vegas( + self.bounds, + ninc=self.ninc, + alpha=self.alpha, + device=device_cuda, + dtype=self.dtype, + ) + self.assertEqual(vegas_cuda.device, device_cuda) + del vegas_cuda + + +if __name__ == "__main__": + unittest.main() diff --git a/src/mc_test.py b/src/mc_test.py index 9172c8b..65da182 100644 --- a/src/mc_test.py +++ b/src/mc_test.py @@ -1,3 +1,4 @@ +# Integration tests for MonteCarlo and MCMC integrators class. import torch from integrators import MonteCarlo, MCMC from maps import Vegas, Linear @@ -102,6 +103,7 @@ def sharp_peak(x): res = mcmc_integrator(half_sphere_integrand, mix_rate=0.5) print("MCMC Integral results:", res) +vegas_map.make_uniform() # train the vegas map vegas_map.train(20000, half_sphere_integrand, epoch=10, alpha=0.5) @@ -125,6 +127,7 @@ def sharp_peak(x): print(f" Integral 2: ", res[1]) # print("VEAGS map is trained for g(x1, x2)") +vegas_map.make_uniform() vegas_map.train(20000, integrand_list, epoch=10, alpha=0.5) res = vegas_integrator(integrand_list) print("VEGAS Integral results:") diff --git a/src/utils.py b/src/utils.py index f06df88..c1286ce 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,5 +1,4 @@ import torch -import math import numpy as np import gvar import sys @@ -32,6 +31,11 @@ def __init__(self, weighted=True, itn_results=None, sum_neval=0): self.add(r) self.sum_neval = sum_neval + def update(self, mean, var, last_neval=None): + self.add(gvar.gvar(mean, var**0.5)) + if last_neval is not None: + self.sum_neval += last_neval + def add(self, res): self.itn_results.append(res) if isinstance(res, gvar.GVarRef): @@ -57,6 +61,34 @@ def extend(self, ravg): self.add(r) self.sum_neval += ravg.sum_neval + def __reduce_ex__(self, protocol): + return ( + RAvg, + ( + self.weighted, + gvar.dumps(self.itn_results, protocol=protocol), + self.sum_neval, + ), + ) + + def _remove_gvars(self, gvlist): + tmp = RAvg( + weighted=self.weighted, + itn_results=self.itn_results, + sum_neval=self.sum_neval, + ) + tmp.itn_results = gvar.remove_gvars(tmp.itn_results, gvlist) + tgvar = gvar.gvar_factory() # small cov matrix + super(RAvg, tmp).__init__(*tgvar(0, 0).internaldata) + return tmp + + def _distribute_gvars(self, gvlist): + return RAvg( + weighted=self.weighted, + itn_results=gvar.distribute_gvars(self.itn_results, gvlist), + sum_neval=self.sum_neval, + ) + def _chi2(self): if len(self.itn_results) <= 1: return 0.0 diff --git a/src/utils_test.py b/src/utils_test.py new file mode 100644 index 0000000..37eadc7 --- /dev/null +++ b/src/utils_test.py @@ -0,0 +1,146 @@ +import unittest +import numpy as np +import gvar +from utils import RAvg + + +class TestRAvg(unittest.TestCase): + def setUp(self): + # Initialize common variables for tests + self.weighted_ravg = RAvg(weighted=True) + self.unweighted_ravg = RAvg(weighted=False) + self.test_results = [ + gvar.gvar(1.0, 0.1), + gvar.gvar(2.0, 0.2), + gvar.gvar(3.0, 0.3), + ] + _means = np.array([1.0, 2.0, 3.0]) + _sdevs = np.array([0.1, 0.2, 0.3]) + self.unweighted_sdev = np.sum(_sdevs**2) ** 0.5 / 3 + self.weighted_sdev = np.sum(1 / _sdevs**2) ** -0.5 + self.weighted_mean = np.sum(_means / _sdevs**2) * self.weighted_sdev**2 + + def tearDown(self): + # Clean up after tests + pass + + def test_init_weighted(self): + self.assertTrue(self.weighted_ravg.weighted) + self.assertEqual(self.weighted_ravg._wlist, []) + self.assertEqual(self.weighted_ravg._mlist, []) + + def test_init_unweighted(self): + self.assertFalse(self.unweighted_ravg.weighted) + self.assertEqual(self.unweighted_ravg._sum, 0.0) + self.assertEqual(self.unweighted_ravg._varsum, 0.0) + self.assertEqual(self.unweighted_ravg._count, 0) + self.assertEqual(self.unweighted_ravg._mlist, []) + + def test_add_weighted(self): + for res in self.test_results: + self.weighted_ravg.add(res) + self.assertEqual(len(self.weighted_ravg.itn_results), 3) + self.assertEqual(len(self.weighted_ravg._wlist), 3) + self.assertEqual(len(self.weighted_ravg._mlist), 3) + self.assertAlmostEqual(self.weighted_ravg.mean, self.weighted_mean) + self.assertAlmostEqual(self.weighted_ravg.sdev, self.weighted_sdev) + + def test_add_unweighted(self): + for res in self.test_results: + self.unweighted_ravg.add(res) + self.assertEqual(len(self.unweighted_ravg.itn_results), 3) + self.assertEqual(self.unweighted_ravg.mean, 2.0) + self.assertAlmostEqual(self.unweighted_ravg.sdev, self.unweighted_sdev) + self.assertEqual(self.unweighted_ravg._sum, 6.0) + self.assertEqual(self.unweighted_ravg._varsum, 0.14) + self.assertEqual(self.unweighted_ravg._count, 3) + + def test_update(self): + self.weighted_ravg.update(1.0, 0.1, last_neval=10) + self.assertEqual(len(self.weighted_ravg.itn_results), 1) + self.assertEqual(self.weighted_ravg.sum_neval, 10) + self.weighted_ravg.update(2.0, 0.2, last_neval=20) + self.assertEqual(len(self.weighted_ravg.itn_results), 2) + self.assertEqual(self.weighted_ravg.sum_neval, 30) + + self.unweighted_ravg.update(1.0, 0.1, last_neval=10) + self.assertEqual(len(self.unweighted_ravg.itn_results), 1) + self.assertEqual(self.unweighted_ravg.sum_neval, 10) + self.unweighted_ravg.update(2.0, 0.2, last_neval=20) + self.assertEqual(len(self.unweighted_ravg.itn_results), 2) + self.assertEqual(self.unweighted_ravg.sum_neval, 30) + + def test_extend(self): + ravg = RAvg(weighted=True) + for res in self.test_results: + ravg.add(res) + ravg.sum_neval = 30 + self.weighted_ravg.extend(ravg) + self.assertEqual(len(self.weighted_ravg.itn_results), 3) + self.assertEqual(self.weighted_ravg.sum_neval, 30) + + def test_reduce_ex(self): + self.weighted_ravg.sum_neval = 10 + reduced = self.weighted_ravg.__reduce_ex__(2) + self.assertEqual(reduced[0], RAvg) + self.assertTrue(reduced[1][0]) + self.assertEqual(reduced[1][2], 10) + + def test_remove_gvars(self): + self.weighted_ravg.sum_neval = 10 + ravg = self.weighted_ravg._remove_gvars([]) + self.assertEqual(ravg.weighted, self.weighted_ravg.weighted) + self.assertEqual(ravg.sum_neval, self.weighted_ravg.sum_neval) + + def test_distribute_gvars(self): + self.weighted_ravg.sum_neval = 10 + ravg = self.weighted_ravg._distribute_gvars([]) + self.assertEqual(ravg.weighted, self.weighted_ravg.weighted) + self.assertEqual(ravg.sum_neval, self.weighted_ravg.sum_neval) + + def test_chi2(self): + for _ in range(3): + self.weighted_ravg.add(self.test_results[0]) + self.unweighted_ravg.add(self.test_results[0]) + self.assertTrue(np.isclose(self.weighted_ravg.chi2, 0.0)) + self.assertTrue(np.isclose(self.unweighted_ravg.chi2, 0.0)) + + def test_dof(self): + for res in self.test_results: + self.weighted_ravg.add(res) + dof = self.weighted_ravg.dof + self.assertEqual(dof, 2) + + def test_nitn(self): + for res in self.test_results: + self.weighted_ravg.add(res) + nitn = self.weighted_ravg.nitn + self.assertEqual(nitn, 3) + + def test_Q(self): + self.weighted_ravg.add(self.test_results[0]) + Q = self.weighted_ravg.Q + self.assertTrue(np.isnan(Q)) + self.weighted_ravg.add(self.test_results[0]) + self.assertIsInstance(Q, float) + + def test_avg_neval(self): + nevals = [10, 20, 30] + for i, res in enumerate(self.test_results): + self.weighted_ravg.update(res.mean, res.var, nevals[i]) + self.assertEqual(self.weighted_ravg.sum_neval, 60) + self.assertEqual(self.weighted_ravg.avg_neval, 20) + + def test_summary(self): + for res in self.test_results: + self.weighted_ravg.add(res) + summary = self.weighted_ravg.summary() + self.assertIsInstance(summary, str) + + def test_converged(self): + self.weighted_ravg.add(gvar.gvar(1.0, 0.01)) + self.assertTrue(self.weighted_ravg.converged(0.1, 0.1)) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/vegas_test.py b/src/vegas_test.py index 4ca34a5..addc8fc 100644 --- a/src/vegas_test.py +++ b/src/vegas_test.py @@ -1,3 +1,4 @@ +# Integration tests for VEGAS + MonteCarlo/MCMC integral methods. import torch from integrators import MonteCarlo, MCMC from maps import Vegas, Linear @@ -56,6 +57,7 @@ def func(x): ) res = vegasmcmc_integrator(func, mix_rate=0.5) print("VEGAS-MCMC Integral results: ", res) +print(type(res)) # Start Monte Carlo integration, including plain-MC, MCMC, vegas, and vegas-MCMC print("\nCalculate the integral [h(X), x1 * h(X), x1^2 * h(X)] in the bounds [0, 1]^4") @@ -71,8 +73,7 @@ def func(x): vegas_integrator = MonteCarlo( maps=vegas_map, neval=n_eval, - # nbatch=n_batch, - nbatch=n_eval, + nbatch=n_batch, device=device, ) res = vegas_integrator(integrand_list1) @@ -86,6 +87,12 @@ def func(x): " I[1]/I[0] =", res[1] / res[0], ) +print(type(res)) +print(type(res[0])) +print(res[0].sum_neval) +print(res[0].itn_results) +print(res[0].nitn) + print("VEGAS-MCMC Integral results:") vegasmcmc_integrator = MCMC(