diff --git a/src/integrators.py b/src/integrators.py index 0000126..3088f99 100644 --- a/src/integrators.py +++ b/src/integrators.py @@ -6,6 +6,38 @@ import gvar import numpy as np +import os +import torch.distributed as dist +import socket + + +def get_ip() -> str: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) # Doesn't need to be reachable + return s.getsockname()[0] + + +def get_open_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + +def setup(): + # get IDs of reserved GPU + distributed_init_method = f"tcp://{get_ip()}:{get_open_port()}" + dist.init_process_group( + backend="gloo" + ) # , init_method=distributed_init_method, world_size = int(os.environ["WORLD_SIZE"]), rank = int(os.environ["RANK"])) + # init_method='env://', + # world_size=int(os.environ["WORLD_SIZE"]), + # rank=int(os.environ['SLURM_PROCID'])) + torch.cuda.set_device(int(os.environ["LOCAL_RANK"])) + + +def cleanup(): + dist.destroy_process_group() + class Integrator: """ @@ -76,7 +108,7 @@ def __init__( ): super().__init__(maps, bounds, q0, neval, nbatch, device, dtype) - def __call__(self, f: Callable, f_dim: int = 1, **kwargs): + def __call__(self, f: Callable, f_dim: int = 1, multigpu=False, **kwargs): x, _ = self.sample(self.nbatch) fx = torch.empty((self.nbatch, f_dim), dtype=self.dtype, device=self.device) @@ -92,15 +124,38 @@ def __call__(self, f: Callable, f_dim: int = 1, **kwargs): integ_values += fx / epoch results = np.array([RAvg() for _ in range(f_dim)]) - for i in range(f_dim): - _mean = integ_values[:, i].mean().item() - _var = integ_values[:, i].var().item() / self.nbatch - results[i].update(_mean, _var, self.neval) + if multigpu: + self.multi_gpu_statistic(integ_values, f_dim, results) + else: + for i in range(f_dim): + _mean = integ_values[:, i].mean().item() + _var = integ_values[:, i].var().item() / self.nbatch + results[i].update(_mean, _var, self.neval) if f_dim == 1: return results[0] else: return results + def multi_gpu_statistic(self, values, f_dim, results): + _mean = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + _total_mean = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + _var = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + for i in range(f_dim): + _total_mean[i] = values[:, i].mean() + _mean[i] = _total_mean[i] + _var = values[:, i].var() / self.nbatch + + dist.all_reduce(_total_mean, op=dist.ReduceOp.SUM) + _total_mean /= dist.get_world_size() + _var_between_batch = torch.square(_mean - _total_mean) + dist.all_reduce(_var_between_batch, op=dist.ReduceOp.SUM) + _var_between_batch /= dist.get_world_size() + dist.all_reduce(_var, op=dist.ReduceOp.SUM) + _var /= dist.get_world_size() + _var = _var + _var_between_batch + for i in range(f_dim): + results[i].update(_total_mean[i].item(), _var[i].item(), self.neval) + def random_walk(dim, bounds, device, dtype, u, **kwargs): rangebounds = bounds[:, 1] - bounds[:, 0] @@ -147,6 +202,7 @@ def __call__( proposal_dist: Callable = uniform, mix_rate=0.5, meas_freq: int = 1, + multigpu=False, **kwargs, ): epsilon = 1e-16 # Small value to ensure numerical stability @@ -211,15 +267,16 @@ def one_step(current_y, current_x, current_weight, current_jac): results = np.array([RAvg() for _ in range(f_dim)]) results_ref = RAvg() - - mean_ref = refvalues.mean().item() - var_ref = refvalues.var().item() / self.nbatch - - results_ref.update(mean_ref, var_ref, self.neval) - for i in range(f_dim): - _mean = values[:, i].mean().item() - _var = values[:, i].var().item() / self.nbatch - results[i].update(_mean, _var, self.neval) + if multigpu: + self.multi_gpu_statistic(values, refvalues, results, results_ref, f_dim) + else: + mean_ref = refvalues.mean().item() + var_ref = refvalues.var().item() / self.nbatch + results_ref.update(mean_ref, var_ref, self.neval) + for i in range(f_dim): + _mean = values[:, i].mean().item() + _var = values[:, i].var().item() / self.nbatch + results[i].update(_mean, _var, self.neval) if f_dim == 1: res = results[0] / results_ref * self._rangebounds.prod() @@ -227,3 +284,39 @@ def one_step(current_y, current_x, current_weight, current_jac): return result else: return results / results_ref * self._rangebounds.prod().item() + + def multi_gpu_statistic(self, values, refvalues, results, results_ref, f_dim): + # collect multigpu statistics for values + _mean = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + _total_mean = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + _var = torch.zeros(f_dim, device=self.device, dtype=self.dtype) + for i in range(f_dim): + _total_mean[i] = values[:, i].mean() + _mean[i] = _total_mean[i] + _var = values[:, i].var() / self.nbatch + + dist.all_reduce(_total_mean, op=dist.ReduceOp.SUM) + _total_mean /= dist.get_world_size() + _var_between_batch = torch.square(_mean - _total_mean) + dist.all_reduce(_var_between_batch, op=dist.ReduceOp.SUM) + _var_between_batch /= dist.get_world_size() + dist.all_reduce(_var, op=dist.ReduceOp.SUM) + _var /= dist.get_world_size() + _var = _var + _var_between_batch + for i in range(f_dim): + results[i].update(_total_mean[i].item(), _var[i].item(), self.neval) + + # collect multigpu statistics for refvalues + _mean_ref = refvalues.mean() + _total_mean_ref = _mean_ref.clone().detach() + _var_ref = refvalues.var() / self.nbatch + dist.all_reduce(_total_mean_ref, op=dist.ReduceOp.SUM) + _total_mean_ref /= dist.get_world_size() + _var_ref_between_batch = torch.square(_mean_ref - _total_mean_ref) + dist.all_reduce(_var_ref_between_batch, op=dist.ReduceOp.SUM) + _var_ref_between_batch /= dist.get_world_size() + dist.all_reduce(_var_ref, op=dist.ReduceOp.SUM) + _var_ref /= dist.get_world_size() + _var_ref = _var_ref + _var_between_batch + results_ref.update(_mean_ref.item(), _var_ref.item(), self.neval) + # return results, results_ref diff --git a/src/maps.py b/src/maps.py index 7e5b335..d1f5244 100644 --- a/src/maps.py +++ b/src/maps.py @@ -92,7 +92,16 @@ def __init__(self, bounds, ninc=1000, alpha=0.5, device="cpu", dtype=torch.float self._A = self.bounds[:, 1] - self.bounds[:, 0] self._jaclinear = torch.prod(self._A) - def train(self, nsamples, f, f_dim=1, dtype=torch.float64, epoch=5, alpha=0.5): + def train( + self, + nsamples, + f, + f_dim=1, + dtype=torch.float64, + epoch=5, + alpha=0.5, + multigpu=False, + ): q0 = Uniform(self.bounds, device=self.device, dtype=self.dtype) u, log_detJ0 = q0.sample(nsamples) @@ -105,7 +114,7 @@ def train(self, nsamples, f, f_dim=1, dtype=torch.float64, epoch=5, alpha=0.5): self.add_training_data(u, f2) self.adapt(alpha) - def add_training_data(self, u, fval): + def add_training_data(self, u, fval, multigpu=False): """Add training data ``f`` for ``u``-space points ``u``. Accumulates training data for later use by ``self.adapt()``. @@ -130,6 +139,9 @@ def add_training_data(self, u, fval): indices = iu[:, d] self.sum_f[d].scatter_add_(0, indices, fval.abs()) self.n_f[d].scatter_add_(0, indices, torch.ones_like(fval)) + if multigpu: + torch.distributed.all_reduce(self.sum_f, op=torch.distributed.ReduceOp.SUM) + torch.distributed.all_reduce(self.n_f, op=torch.distributed.ReduceOp.SUM) def adapt(self, alpha=0.0): """Adapt grid to accumulated training data. diff --git a/src/mc_multigpu_test.py b/src/mc_multigpu_test.py new file mode 100644 index 0000000..a873bcc --- /dev/null +++ b/src/mc_multigpu_test.py @@ -0,0 +1,226 @@ +import torch +from integrators import MonteCarlo, MCMC, setup +from maps import Vegas, Linear +from utils import set_seed, get_device + +# set_seed(42) +# device = get_device() +setup() +# device = torch.device("cpu") +device = torch.cuda.current_device() +print(device) + + +def test_nothing(): + pass + + +def unit_circle_integrand(x, f): + f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double() + return f[:, 0] + + +def half_sphere_integrand(x, f): + f[:, 0] = torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) * 2 + return f[:, 0] + + +def two_integrands(x, f): + f[:, 0] = (x[:, 0] ** 2 + x[:, 1] ** 2 < 1).double() + f[:, 1] = -torch.clamp(1 - (x[:, 0] ** 2 + x[:, 1] ** 2), min=0) + return f.mean(dim=-1) + + +def sharp_integrands(x, f, dim=4): + f.zero_() + for d in range(dim): + f[:, 0] += (x[:, d] - 0.5) ** 2 + f[:, 0] *= -200 + f[:, 0].exp_() + f[:, 1] = f[:, 0] * x[:, 0] + f[:, 2] = f[:, 0] * x[:, 0] ** 2 + return f.mean(dim=-1) + + +dim = 2 +bounds = [(-1, 1), (-1, 1)] +n_eval = 400000 +n_batch = 10000 +n_therm = 10 + + +vegas_map = Vegas(bounds, device=device, ninc=10) + + +# True value of pi +print(f"pi = {torch.pi} \n") + +# Start Monte Carlo integration, including plain-MC, MCMC, vegas, and vegas-MCMC +mc_integrator = MonteCarlo( + bounds=bounds, + neval=n_eval, + nbatch=n_batch, + device=device, +) +mcmc_integrator = MCMC( + bounds=bounds, neval=n_eval, nbatch=n_batch, nburnin=n_therm, device=device +) + +print("Calculate the area of the unit circle f(x1, x2) in the bounds [-1, 1]^2...") +res = mc_integrator(unit_circle_integrand, multigpu=True) +print("Plain MC Integral results: ", res) + +res = mcmc_integrator(unit_circle_integrand, mix_rate=0.5, multigpu=True) +print("MCMC Integral results: ", res) + +vegas_map.train(20000, unit_circle_integrand, alpha=0.5, multigpu=True) +vegas_integrator = MonteCarlo( + maps=vegas_map, + neval=n_eval, + nbatch=n_batch, + # nbatch=n_eval, + device=device, +) +res = vegas_integrator(unit_circle_integrand) +print("VEGAS Integral results: ", res) + +vegasmcmc_integrator = MCMC( + maps=vegas_map, + neval=n_eval, + nbatch=n_batch, + nburnin=n_therm, + device=device, +) +res = vegasmcmc_integrator(unit_circle_integrand, mix_rate=0.5) +print("VEGAS-MCMC Integral results: ", res, "\n") + + +print( + r"Calculate the integral g(x1, x2) = $2 \max(1-(x_1^2+x_2^2), 0)$ in the bounds [-1, 1]^2..." +) + +res = mc_integrator(half_sphere_integrand) +print("Plain MC Integral results: ", res) + +res = mcmc_integrator(half_sphere_integrand, mix_rate=0.5) +print("MCMC Integral results:", res) + +vegas_map.make_uniform() +# train the vegas map +vegas_map.train(20000, half_sphere_integrand, epoch=10, alpha=0.5, multigpu=True) + +res = vegas_integrator(half_sphere_integrand) +print("VEGAS Integral results: ", res) + +res = vegasmcmc_integrator(half_sphere_integrand, mix_rate=0.5) +print("VEGAS-MCMC Integral results: ", res) + + +print("\nCalculate the integral [f(x1, x2), g(x1, x2)/2] in the bounds [-1, 1]^2") +# Two integrands +res = mc_integrator(two_integrands, f_dim=2) +print("Plain MC Integral results:") +print(" Integral 1: ", res[0]) +print(" Integral 2: ", res[1]) + +res = mcmc_integrator(two_integrands, f_dim=2, mix_rate=0.5) +print("MCMC Integral results:") +print(f" Integral 1: ", res[0]) +print(f" Integral 2: ", res[1]) + +# print("VEAGS map is trained for g(x1, x2)") +vegas_map.make_uniform() +vegas_map.train(20000, two_integrands, f_dim=2, epoch=10, alpha=0.5, multigpu=True) +res = vegas_integrator(two_integrands, f_dim=2) +print("VEGAS Integral results:") +print(" Integral 1: ", res[0]) +print(" Integral 2: ", res[1]) + +res = vegasmcmc_integrator(two_integrands, f_dim=2, mix_rate=0.5) +print("VEGAS-MCMC Integral results:") +print(" Integral 1: ", res[0]) +print(" Integral 2: ", res[1]) + +print("\nCalculate the integral [h(X), x1 * h(X), x1^2 * h(X)] in the bounds [0, 1]^4") +print("h(X) = exp(-200 * (x1^2 + x2^2 + x3^2 + x4^2))") + +bounds = [(0, 1)] * 4 +mc_integrator = MonteCarlo( + bounds=bounds, + neval=n_eval, + nbatch=n_batch, + # nbatch=n_eval, + device=device, +) +mcmc_integrator = MCMC( + bounds=bounds, neval=n_eval, nbatch=n_batch, nburnin=n_therm, device=device +) +print("Plain MC Integral results:") +res = mc_integrator(sharp_integrands, f_dim=3) +print( + " I[0] =", + res[0], + " I[1] =", + res[1], + " I[2] =", + res[2], + " I[1]/I[0] =", + res[1] / res[0], +) +print("MCMC Integral results:") +res = mcmc_integrator(sharp_integrands, f_dim=3, mix_rate=0.5) +print( + " I[0] =", + res[0], + " I[1] =", + res[1], + " I[2] =", + res[2], + " I[1]/I[0] =", + res[1] / res[0], +) + +vegas_map = Vegas(bounds, device=device) +print("train VEGAS map for h(X)...") +# vegas_map.train(20000, sharp_peak, epoch=10, alpha=0.5) +vegas_map.train(20000, sharp_integrands, f_dim=3, epoch=10, alpha=0.5, multigpu=True) + +print("VEGAS Integral results:") +vegas_integrator = MonteCarlo( + maps=vegas_map, + neval=n_eval, + nbatch=n_batch, + # nbatch=n_eval, + device=device, +) +res = vegas_integrator(sharp_integrands, f_dim=3) +print( + " I[0] =", + res[0], + " I[1] =", + res[1], + " I[2] =", + res[2], + " I[1]/I[0] =", + res[1] / res[0], +) + +print("VEGAS-MCMC Integral results:") +vegasmcmc_integrator = MCMC( + maps=vegas_map, + neval=n_eval, + nbatch=n_batch, + nburnin=n_therm, + device=device, +) +res = vegasmcmc_integrator(sharp_integrands, f_dim=3, mix_rate=0.5) +print( + " I[0] =", + res[0], + " I[1] =", + res[1], + " I[2] =", + res[2], + " I[1]/I[0] =", + res[1] / res[0], +)