From 76e40f5d1e37ce039f0980e5b1b18d010abd1fa6 Mon Sep 17 00:00:00 2001 From: Tao Wang Date: Tue, 29 Oct 2024 23:51:03 -0400 Subject: [PATCH 1/4] multigpu wip --- src/integrators.py | 80 +++++++++++++++++++++++++++++++++++++++++ src/mc_multigpu_test.py | 41 +++++++++++++++++++++ 2 files changed, 121 insertions(+) create mode 100644 src/mc_multigpu_test.py diff --git a/src/integrators.py b/src/integrators.py index ecec374..4ded27e 100644 --- a/src/integrators.py +++ b/src/integrators.py @@ -6,6 +6,38 @@ import gvar import numpy as np +import os +import torch.distributed as dist +import socket + + +def get_ip() -> str: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) # Doesn't need to be reachable + return s.getsockname()[0] + + +def get_open_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + +def setup(): + # get IDs of reserved GPU + distributed_init_method = f"tcp://{get_ip()}:{get_open_port()}" + dist.init_process_group( + backend="gloo" + ) # , init_method=distributed_init_method, world_size = int(os.environ["WORLD_SIZE"]), rank = int(os.environ["RANK"])) + # init_method='env://', + # world_size=int(os.environ["WORLD_SIZE"]), + # rank=int(os.environ['SLURM_PROCID'])) + torch.cuda.set_device(int(os.environ["LOCAL_RANK"])) + + +def cleanup(): + dist.destroy_process_group() + class Integrator: """ @@ -110,6 +142,54 @@ def __call__(self, f: Callable, **kwargs): else: return results + def gpu_run(self, f: Callable, **kwargs): + x, _ = self.sample(1) + f_values = f(x) + if isinstance(f_values, (list, tuple)) and isinstance( + f_values[0], torch.Tensor + ): + f_size = len(f_values) + type_fval = f_values[0].dtype + elif isinstance(f_values, torch.Tensor): + f_size = 1 + type_fval = f_values.dtype + else: + raise TypeError( + "f must return a torch.Tensor or a list/tuple of torch.Tensor." + ) + + epoch = self.neval // self.nbatch + values = torch.zeros((self.nbatch, f_size), dtype=type_fval, device=self.device) + + for iepoch in range(epoch): + x, log_detJ = self.sample(self.nbatch) + f_values = f(x) + batch_results = self._multiply_by_jacobian(f_values, torch.exp(log_detJ)) + + values += batch_results / epoch + + results = np.array([RAvg() for _ in range(f_size)]) + for i in range(f_size): + _total_mean = values[:, i].mean() + _mean = _total_mean.detach().clone() + dist.all_reduce(_total_mean, op=dist.ReduceOp.SUM) + _total_mean /= dist.get_world_size() + + _var_between_batch = torch.square(_mean - _total_mean) + dist.all_reduce(_var_between_batch, op=dist.ReduceOp.SUM) + _var_between_batch /= dist.get_world_size() + + _var = values[:, i].var() / self.nbatch + dist.all_reduce(_var, op=dist.ReduceOp.SUM) + _var /= dist.get_world_size() + results[i].update( + _mean.item(), (_var + _var_between_batch).item(), self.neval + ) + if f_size == 1: + return results[0] + else: + return results + def _multiply_by_jacobian(self, values, jac): # if isinstance(values, dict): # return {k: v * torch.exp(log_det_J) for k, v in values.items()} diff --git a/src/mc_multigpu_test.py b/src/mc_multigpu_test.py new file mode 100644 index 0000000..99ebece --- /dev/null +++ b/src/mc_multigpu_test.py @@ -0,0 +1,41 @@ +import torch +from integrators import MonteCarlo, MCMC, setup +from maps import Vegas, Linear +from utils import set_seed, get_device + +set_seed(42) +# device = get_device() +setup() +# device = torch.device("cpu") +device = torch.cuda.current_device() + + +def unit_circle_integrand(x): + inside_circle = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double() + return inside_circle + + +dim = 2 +bounds = [(-1, 1), (-1, 1)] +n_eval = 400000 +n_batch = 10000 +n_therm = 10 + + +vegas_map = Vegas(bounds, device=device, ninc=10) + + +# True value of pi +print(f"pi = {torch.pi} \n") + +# Start Monte Carlo integration, including plain-MC, MCMC, vegas, and vegas-MCMC +mc_integrator = MonteCarlo( + bounds=bounds, + neval=n_eval, + nbatch=n_batch, + device=device, +) + +print("Calculate the area of the unit circle f(x1, x2) in the bounds [-1, 1]^2...") +res = mc_integrator.gpu_run(unit_circle_integrand) +print("Plain MC Integral results: ", res) From a2e2817df768ffb16e839493de28485cbbee00c3 Mon Sep 17 00:00:00 2001 From: Tao Wang Date: Wed, 6 Nov 2024 14:43:30 -0500 Subject: [PATCH 2/4] add multigpu for mcmc --- src/integrators.py | 139 +++++++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 61 deletions(-) diff --git a/src/integrators.py b/src/integrators.py index 4ded27e..eb0d98a 100644 --- a/src/integrators.py +++ b/src/integrators.py @@ -106,7 +106,7 @@ def __init__( ): super().__init__(maps, bounds, q0, neval, nbatch, device, dtype) - def __call__(self, f: Callable, **kwargs): + def __call__(self, f: Callable, multigpu=False, **kwargs): x, _ = self.sample(1) f_values = f(x) if isinstance(f_values, (list, tuple)) and isinstance( @@ -131,64 +131,40 @@ def __call__(self, f: Callable, **kwargs): batch_results = self._multiply_by_jacobian(f_values, torch.exp(log_detJ)) values += batch_results / epoch - - results = np.array([RAvg() for _ in range(f_size)]) - for i in range(f_size): - _mean = values[:, i].mean().item() - _var = values[:, i].var().item() / self.nbatch - results[i].update(_mean, _var, self.neval) + if multigpu: + results = self.multigpu_statistic(values, f_size) + else: + results = np.array([RAvg() for _ in range(f_size)]) + for i in range(f_size): + _mean = values[:, i].mean().item() + _var = values[:, i].var().item() / self.nbatch + results[i].update(_mean, _var, self.neval) if f_size == 1: return results[0] else: return results - def gpu_run(self, f: Callable, **kwargs): - x, _ = self.sample(1) - f_values = f(x) - if isinstance(f_values, (list, tuple)) and isinstance( - f_values[0], torch.Tensor - ): - f_size = len(f_values) - type_fval = f_values[0].dtype - elif isinstance(f_values, torch.Tensor): - f_size = 1 - type_fval = f_values.dtype - else: - raise TypeError( - "f must return a torch.Tensor or a list/tuple of torch.Tensor." - ) - - epoch = self.neval // self.nbatch - values = torch.zeros((self.nbatch, f_size), dtype=type_fval, device=self.device) - - for iepoch in range(epoch): - x, log_detJ = self.sample(self.nbatch) - f_values = f(x) - batch_results = self._multiply_by_jacobian(f_values, torch.exp(log_detJ)) - - values += batch_results / epoch - + def multi_gpu_statistic(self, values, f_size): results = np.array([RAvg() for _ in range(f_size)]) + _mean = torch.zeros(f_size) + _total_mean = torch.zeros(f_size) + _var = torch.zeros(f_size) for i in range(f_size): - _total_mean = values[:, i].mean() - _mean = _total_mean.detach().clone() - dist.all_reduce(_total_mean, op=dist.ReduceOp.SUM) - _total_mean /= dist.get_world_size() - - _var_between_batch = torch.square(_mean - _total_mean) - dist.all_reduce(_var_between_batch, op=dist.ReduceOp.SUM) - _var_between_batch /= dist.get_world_size() - + _total_mean[i] = values[:, i].mean() + _mean[i] = _total_mean[i] _var = values[:, i].var() / self.nbatch - dist.all_reduce(_var, op=dist.ReduceOp.SUM) - _var /= dist.get_world_size() - results[i].update( - _mean.item(), (_var + _var_between_batch).item(), self.neval - ) - if f_size == 1: - return results[0] - else: - return results + + dist.all_reduce(_total_mean, op=dist.ReduceOp.SUM) + _total_mean /= dist.get_world_size() + _var_between_batch = torch.square(_mean - _total_mean) + dist.all_reduce(_var_between_batch, op=dist.ReduceOp.SUM) + _var_between_batch /= dist.get_world_size() + dist.all_reduce(_var, op=dist.ReduceOp.SUM) + _var /= dist.get_world_size() + _var += _var_between_batch + for i in range(f_size): + results[i].update(_total_mean[i], _var[i], self.neval) + return results def _multiply_by_jacobian(self, values, jac): # if isinstance(values, dict): @@ -243,6 +219,7 @@ def __call__( proposal_dist: Callable = uniform, thinning=1, mix_rate=0.5, + multigpu=False, **kwargs, ): epsilon = 1e-16 # Small value to ensure numerical stability @@ -323,17 +300,19 @@ def one_step(current_y, current_x, current_weight, current_jac): values += batch_results / n_meas refvalues += batch_results_ref / n_meas - results = np.array([RAvg() for _ in range(f_size)]) - results_ref = RAvg() + if multigpu: + results, results_ref = self.multi_gpu_statistic(values, refvalues, f_size) + else: + results = np.array([RAvg() for _ in range(f_size)]) + results_ref = RAvg() - mean_ref = refvalues.mean().item() - var_ref = refvalues.var().item() / self.nbatch - - results_ref.update(mean_ref, var_ref, self.neval) - for i in range(f_size): - _mean = values[:, i].mean().item() - _var = values[:, i].var().item() / self.nbatch - results[i].update(_mean, _var, self.neval) + mean_ref = refvalues.mean().item() + var_ref = refvalues.var().item() / self.nbatch + results_ref.update(mean_ref, var_ref, self.neval) + for i in range(f_size): + _mean = values[:, i].mean().item() + _var = values[:, i].var().item() / self.nbatch + results[i].update(_mean, _var, self.neval) if f_size == 1: res = results[0] / results_ref * self._rangebounds.prod() @@ -341,3 +320,41 @@ def one_step(current_y, current_x, current_weight, current_jac): return result else: return results / results_ref * self._rangebounds.prod().item() + + def multi_gpu_statistic(self, values, refvalues, f_size): + # collect multigpu statistics for values + results = np.array([RAvg() for _ in range(f_size)]) + _mean = torch.zeros(f_size) + _total_mean = torch.zeros(f_size) + _var = torch.zeros(f_size) + for i in range(f_size): + _total_mean[i] = values[:, i].mean() + _mean[i] = _total_mean[i] + _var = values[:, i].var() / self.nbatch + + dist.all_reduce(_total_mean, op=dist.ReduceOp.SUM) + _total_mean /= dist.get_world_size() + _var_between_batch = torch.square(_mean - _total_mean) + dist.all_reduce(_var_between_batch, op=dist.ReduceOp.SUM) + _var_between_batch /= dist.get_world_size() + dist.all_reduce(_var, op=dist.ReduceOp.SUM) + _var /= dist.get_world_size() + _var += _var_between_batch + for i in range(f_size): + results[i].update(_total_mean[i], _var[i], self.neval) + + # collect multigpu statistics for refvalues + results_ref = RAvg() + _mean_ref = refvalues.mean().item() + _total_mean_ref = _mean_ref.clone().detach() + _var_ref = refvalues.var().item() / self.nbatch + dist.all_reduce(_total_mean_ref, op=dist.ReduceOp.SUM) + _total_mean_ref /= dist.get_world_size() + _var_ref_between_batch = torch.square(_mean_ref - _total_mean_ref) + dist.all_reduce(_var_ref_between_batch, op=dist.ReduceOp.SUM) + _var_ref_between_batch /= dist.get_world_size() + dist.all_reduce(_var_ref, op=dist.ReduceOp.SUM) + _var_ref /= dist.get_world_size() + _var += _var_between_batch + results_ref.update(_mean_ref, _var_ref, self.neval) + return results, results_ref From cb227b28471e6f82dc54347df0daaa023afea627 Mon Sep 17 00:00:00 2001 From: Tao Wang Date: Thu, 7 Nov 2024 13:02:52 -0500 Subject: [PATCH 3/4] add multigpu for vegas --- src/integrators.py | 52 +++++------ src/maps.py | 19 ++-- src/mc_multigpu_test.py | 190 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 227 insertions(+), 34 deletions(-) diff --git a/src/integrators.py b/src/integrators.py index eb0d98a..94cc19d 100644 --- a/src/integrators.py +++ b/src/integrators.py @@ -131,10 +131,11 @@ def __call__(self, f: Callable, multigpu=False, **kwargs): batch_results = self._multiply_by_jacobian(f_values, torch.exp(log_detJ)) values += batch_results / epoch + + results = np.array([RAvg() for _ in range(f_size)]) if multigpu: - results = self.multigpu_statistic(values, f_size) + self.multi_gpu_statistic(values, f_size, results) else: - results = np.array([RAvg() for _ in range(f_size)]) for i in range(f_size): _mean = values[:, i].mean().item() _var = values[:, i].var().item() / self.nbatch @@ -144,11 +145,11 @@ def __call__(self, f: Callable, multigpu=False, **kwargs): else: return results - def multi_gpu_statistic(self, values, f_size): - results = np.array([RAvg() for _ in range(f_size)]) - _mean = torch.zeros(f_size) - _total_mean = torch.zeros(f_size) - _var = torch.zeros(f_size) + def multi_gpu_statistic(self, values, f_size, results): + # results = np.array([RAvg() for _ in range(f_size)]) + _mean = torch.zeros(f_size, device=self.device, dtype=self.dtype) + _total_mean = torch.zeros(f_size, device=self.device, dtype=self.dtype) + _var = torch.zeros(f_size, device=self.device, dtype=self.dtype) for i in range(f_size): _total_mean[i] = values[:, i].mean() _mean[i] = _total_mean[i] @@ -161,10 +162,10 @@ def multi_gpu_statistic(self, values, f_size): _var_between_batch /= dist.get_world_size() dist.all_reduce(_var, op=dist.ReduceOp.SUM) _var /= dist.get_world_size() - _var += _var_between_batch + _var = _var + _var_between_batch for i in range(f_size): - results[i].update(_total_mean[i], _var[i], self.neval) - return results + results[i].update(_total_mean[i].item(), _var[i].item(), self.neval) + # return results def _multiply_by_jacobian(self, values, jac): # if isinstance(values, dict): @@ -300,12 +301,13 @@ def one_step(current_y, current_x, current_weight, current_jac): values += batch_results / n_meas refvalues += batch_results_ref / n_meas + results = np.array([RAvg() for _ in range(f_size)]) + results_ref = RAvg() if multigpu: - results, results_ref = self.multi_gpu_statistic(values, refvalues, f_size) + results, results_ref = self.multi_gpu_statistic( + values, refvalues, results, results_ref, f_size + ) else: - results = np.array([RAvg() for _ in range(f_size)]) - results_ref = RAvg() - mean_ref = refvalues.mean().item() var_ref = refvalues.var().item() / self.nbatch results_ref.update(mean_ref, var_ref, self.neval) @@ -321,12 +323,11 @@ def one_step(current_y, current_x, current_weight, current_jac): else: return results / results_ref * self._rangebounds.prod().item() - def multi_gpu_statistic(self, values, refvalues, f_size): + def multi_gpu_statistic(self, values, refvalues, results, results_ref, f_size): # collect multigpu statistics for values - results = np.array([RAvg() for _ in range(f_size)]) - _mean = torch.zeros(f_size) - _total_mean = torch.zeros(f_size) - _var = torch.zeros(f_size) + _mean = torch.zeros(f_size, device=self.device, dtype=self.dtype) + _total_mean = torch.zeros(f_size, device=self.device, dtype=self.dtype) + _var = torch.zeros(f_size, device=self.device, dtype=self.dtype) for i in range(f_size): _total_mean[i] = values[:, i].mean() _mean[i] = _total_mean[i] @@ -339,15 +340,14 @@ def multi_gpu_statistic(self, values, refvalues, f_size): _var_between_batch /= dist.get_world_size() dist.all_reduce(_var, op=dist.ReduceOp.SUM) _var /= dist.get_world_size() - _var += _var_between_batch + _var = _var + _var_between_batch for i in range(f_size): - results[i].update(_total_mean[i], _var[i], self.neval) + results[i].update(_total_mean[i].item(), _var[i].item(), self.neval) # collect multigpu statistics for refvalues - results_ref = RAvg() - _mean_ref = refvalues.mean().item() + _mean_ref = refvalues.mean() _total_mean_ref = _mean_ref.clone().detach() - _var_ref = refvalues.var().item() / self.nbatch + _var_ref = refvalues.var() / self.nbatch dist.all_reduce(_total_mean_ref, op=dist.ReduceOp.SUM) _total_mean_ref /= dist.get_world_size() _var_ref_between_batch = torch.square(_mean_ref - _total_mean_ref) @@ -355,6 +355,6 @@ def multi_gpu_statistic(self, values, refvalues, f_size): _var_ref_between_batch /= dist.get_world_size() dist.all_reduce(_var_ref, op=dist.ReduceOp.SUM) _var_ref /= dist.get_world_size() - _var += _var_between_batch - results_ref.update(_mean_ref, _var_ref, self.neval) + _var_ref = _var_ref + _var_between_batch + results_ref.update(_mean_ref.item(), _var_ref.item(), self.neval) return results, results_ref diff --git a/src/maps.py b/src/maps.py index 6f6fc1d..5f40beb 100644 --- a/src/maps.py +++ b/src/maps.py @@ -77,18 +77,22 @@ def __init__(self, bounds, ninc=1000, alpha=0.5, device="cpu", dtype=torch.float elif isinstance(ninc, torch.Tensor): self.ninc = ninc.to(dtype=torch.int32, device=device) else: - raise ValueError("'ninc' must be an int, list, numpy array, or torch tensor.") - + raise ValueError( + "'ninc' must be an int, list, numpy array, or torch tensor." + ) + # Ensure ninc has the correct shape if self.ninc.shape != (self.dim,): - raise ValueError(f"'ninc' must be a scalar or a 1D array of length {self.dim}.") + raise ValueError( + f"'ninc' must be a scalar or a 1D array of length {self.dim}." + ) self.make_uniform() self.alpha = alpha self._A = self.bounds[:, 1] - self.bounds[:, 0] self._jaclinear = torch.prod(self._A) - def train(self, nsamples, f, epoch=5, alpha=0.5): + def train(self, nsamples, f, epoch=5, alpha=0.5, multigpu=False): q0 = Uniform(self.bounds, device=self.device, dtype=self.dtype) u, log_detJ0 = q0.sample(nsamples) @@ -106,10 +110,10 @@ def _integrand(x): for _ in range(epoch): x, log_detJ = self.forward(u) f2 = torch.exp(2 * (log_detJ + log_detJ0)) * _integrand(x) ** 2 - self.add_training_data(u, f2) + self.add_training_data(u, f2, multigpu=multigpu) self.adapt(alpha) - def add_training_data(self, u, fval): + def add_training_data(self, u, fval, multigpu=False): """Add training data ``f`` for ``u``-space points ``u``. Accumulates training data for later use by ``self.adapt()``. @@ -134,6 +138,9 @@ def add_training_data(self, u, fval): indices = iu[:, d] self.sum_f[d].scatter_add_(0, indices, fval.abs()) self.n_f[d].scatter_add_(0, indices, torch.ones_like(fval)) + if multigpu: + torch.distributed.all_reduce(self.sum_f, op=torch.distributed.ReduceOp.SUM) + torch.distributed.all_reduce(self.n_f, op=torch.distributed.ReduceOp.SUM) def adapt(self, alpha=0.0): """Adapt grid to accumulated training data. diff --git a/src/mc_multigpu_test.py b/src/mc_multigpu_test.py index 99ebece..7cda232 100644 --- a/src/mc_multigpu_test.py +++ b/src/mc_multigpu_test.py @@ -3,11 +3,16 @@ from maps import Vegas, Linear from utils import set_seed, get_device -set_seed(42) +# set_seed(42) # device = get_device() setup() # device = torch.device("cpu") device = torch.cuda.current_device() +print(device) + + +def test_nothing(): + pass def unit_circle_integrand(x): @@ -15,6 +20,29 @@ def unit_circle_integrand(x): return inside_circle +def half_sphere_integrand(x): + return torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) * 2 + + +def integrand_list(x): + return [unit_circle_integrand(x), half_sphere_integrand(x) / 2] + + +def integrand_list1(x): + dx2 = torch.zeros(x.shape[0], dtype=x.dtype, device=x.device) + for d in range(4): + dx2 += (x[:, d] - 0.5) ** 2 + f = torch.exp(-200 * dx2) + return [f, f * x[:, 0], f * x[:, 0] ** 2] + + +def sharp_peak(x): + dx2 = torch.zeros(x.shape[0], dtype=x.dtype, device=x.device) + for d in range(4): + dx2 += (x[:, d] - 0.5) ** 2 + return torch.exp(-200 * dx2) + + dim = 2 bounds = [(-1, 1), (-1, 1)] n_eval = 400000 @@ -35,7 +63,165 @@ def unit_circle_integrand(x): nbatch=n_batch, device=device, ) +mcmc_integrator = MCMC( + bounds=bounds, neval=n_eval, nbatch=n_batch, nburnin=n_therm, device=device +) print("Calculate the area of the unit circle f(x1, x2) in the bounds [-1, 1]^2...") -res = mc_integrator.gpu_run(unit_circle_integrand) +res = mc_integrator(unit_circle_integrand, multigpu=True) +print("Plain MC Integral results: ", res) + +res = mcmc_integrator(unit_circle_integrand, mix_rate=0.5, multigpu=True) +print("MCMC Integral results: ", res) + +vegas_map.train(20000, unit_circle_integrand, alpha=0.5, multigpu=True) +vegas_integrator = MonteCarlo( + maps=vegas_map, + neval=n_eval, + nbatch=n_batch, + # nbatch=n_eval, + device=device, +) +res = vegas_integrator(unit_circle_integrand) +print("VEGAS Integral results: ", res) + +vegasmcmc_integrator = MCMC( + maps=vegas_map, + neval=n_eval, + nbatch=n_batch, + nburnin=n_therm, + device=device, +) +res = vegasmcmc_integrator(unit_circle_integrand, mix_rate=0.5) +print("VEGAS-MCMC Integral results: ", res, "\n") + + +print( + r"Calculate the integral g(x1, x2) = $2 \max(1-(x_1^2+x_2^2), 0)$ in the bounds [-1, 1]^2..." +) + +res = mc_integrator(half_sphere_integrand) print("Plain MC Integral results: ", res) + +res = mcmc_integrator(half_sphere_integrand, mix_rate=0.5) +print("MCMC Integral results:", res) + +vegas_map.make_uniform() +# train the vegas map +vegas_map.train(20000, half_sphere_integrand, epoch=10, alpha=0.5, multigpu=True) + +res = vegas_integrator(half_sphere_integrand) +print("VEGAS Integral results: ", res) + +res = vegasmcmc_integrator(half_sphere_integrand, mix_rate=0.5) +print("VEGAS-MCMC Integral results: ", res) + + +print("\nCalculate the integral [f(x1, x2), g(x1, x2)/2] in the bounds [-1, 1]^2") +# Two integrands +res = mc_integrator(integrand_list) +print("Plain MC Integral results:") +print(" Integral 1: ", res[0]) +print(" Integral 2: ", res[1]) + +res = mcmc_integrator(integrand_list, mix_rate=0.5) +print("MCMC Integral results:") +print(f" Integral 1: ", res[0]) +print(f" Integral 2: ", res[1]) + +# print("VEAGS map is trained for g(x1, x2)") +vegas_map.make_uniform() +vegas_map.train(20000, integrand_list, epoch=10, alpha=0.5, multigpu=True) +res = vegas_integrator(integrand_list) +print("VEGAS Integral results:") +print(" Integral 1: ", res[0]) +print(" Integral 2: ", res[1]) + +res = vegasmcmc_integrator(integrand_list, mix_rate=0.5) +print("VEGAS-MCMC Integral results:") +print(" Integral 1: ", res[0]) +print(" Integral 2: ", res[1]) + +print("\nCalculate the integral [h(X), x1 * h(X), x1^2 * h(X)] in the bounds [0, 1]^4") +print("h(X) = exp(-200 * (x1^2 + x2^2 + x3^2 + x4^2))") + +bounds = [(0, 1)] * 4 +mc_integrator = MonteCarlo( + bounds=bounds, + neval=n_eval, + nbatch=n_batch, + # nbatch=n_eval, + device=device, +) +mcmc_integrator = MCMC( + bounds=bounds, neval=n_eval, nbatch=n_batch, nburnin=n_therm, device=device +) +print("Plain MC Integral results:") +res = mc_integrator(integrand_list1) +print( + " I[0] =", + res[0], + " I[1] =", + res[1], + " I[2] =", + res[2], + " I[1]/I[0] =", + res[1] / res[0], +) +print("MCMC Integral results:") +res = mcmc_integrator(integrand_list1, mix_rate=0.5) +print( + " I[0] =", + res[0], + " I[1] =", + res[1], + " I[2] =", + res[2], + " I[1]/I[0] =", + res[1] / res[0], +) + +vegas_map = Vegas(bounds, device=device) +print("train VEGAS map for h(X)...") +# vegas_map.train(20000, sharp_peak, epoch=10, alpha=0.5) +vegas_map.train(20000, integrand_list1, epoch=10, alpha=0.5, multigpu=True) + +print("VEGAS Integral results:") +vegas_integrator = MonteCarlo( + maps=vegas_map, + neval=n_eval, + nbatch=n_batch, + # nbatch=n_eval, + device=device, +) +res = vegas_integrator(integrand_list1) +print( + " I[0] =", + res[0], + " I[1] =", + res[1], + " I[2] =", + res[2], + " I[1]/I[0] =", + res[1] / res[0], +) + +print("VEGAS-MCMC Integral results:") +vegasmcmc_integrator = MCMC( + maps=vegas_map, + neval=n_eval, + nbatch=n_batch, + nburnin=n_therm, + device=device, +) +res = vegasmcmc_integrator(integrand_list1, mix_rate=0.5) +print( + " I[0] =", + res[0], + " I[1] =", + res[1], + " I[2] =", + res[2], + " I[1]/I[0] =", + res[1] / res[0], +) From df1f4e099cd0b92205d20eb9c7c35af446e79965 Mon Sep 17 00:00:00 2001 From: Tao Wang Date: Thu, 7 Nov 2024 23:23:37 -0500 Subject: [PATCH 4/4] minor change --- src/integrators.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/integrators.py b/src/integrators.py index 8b49a90..3088f99 100644 --- a/src/integrators.py +++ b/src/integrators.py @@ -136,11 +136,11 @@ def __call__(self, f: Callable, f_dim: int = 1, multigpu=False, **kwargs): else: return results - def multi_gpu_statistic(self, values, f_size, results): - _mean = torch.zeros(f_size, device=self.device, dtype=self.dtype) - _total_mean = torch.zeros(f_size, device=self.device, dtype=self.dtype) - _var = torch.zeros(f_size, device=self.device, dtype=self.dtype) - for i in range(f_size): + def multi_gpu_statistic(self, values, f_dim, results): + _mean = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + _total_mean = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + _var = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + for i in range(f_dim): _total_mean[i] = values[:, i].mean() _mean[i] = _total_mean[i] _var = values[:, i].var() / self.nbatch @@ -153,7 +153,7 @@ def multi_gpu_statistic(self, values, f_size, results): dist.all_reduce(_var, op=dist.ReduceOp.SUM) _var /= dist.get_world_size() _var = _var + _var_between_batch - for i in range(f_size): + for i in range(f_dim): results[i].update(_total_mean[i].item(), _var[i].item(), self.neval) @@ -285,12 +285,12 @@ def one_step(current_y, current_x, current_weight, current_jac): else: return results / results_ref * self._rangebounds.prod().item() - def multi_gpu_statistic(self, values, refvalues, results, results_ref, f_size): + def multi_gpu_statistic(self, values, refvalues, results, results_ref, f_dim): # collect multigpu statistics for values - _mean = torch.zeros(f_size, device=self.device, dtype=self.dtype) - _total_mean = torch.zeros(f_size, device=self.device, dtype=self.dtype) - _var = torch.zeros(f_size, device=self.device, dtype=self.dtype) - for i in range(f_size): + _mean = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + _total_mean = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + _var = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + for i in range(f_dim): _total_mean[i] = values[:, i].mean() _mean[i] = _total_mean[i] _var = values[:, i].var() / self.nbatch @@ -303,7 +303,7 @@ def multi_gpu_statistic(self, values, refvalues, results, results_ref, f_size): dist.all_reduce(_var, op=dist.ReduceOp.SUM) _var /= dist.get_world_size() _var = _var + _var_between_batch - for i in range(f_size): + for i in range(f_dim): results[i].update(_total_mean[i].item(), _var[i].item(), self.neval) # collect multigpu statistics for refvalues