Skip to content

Commit

Permalink
Support reducescatter in torch ut (#251)
Browse files Browse the repository at this point in the history
* Support reducescatter in torch ut
  • Loading branch information
gaopengff committed Aug 16, 2023
1 parent 51aa425 commit a7e580a
Showing 1 changed file with 42 additions and 43 deletions.
85 changes: 42 additions & 43 deletions test/parallel/test_torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ def test_horovod_allreduce_process_sets(self):
rank = hvd.rank()
size = hvd.size()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

even_ranks = [rk for rk in range(0, size) if rk % 2 == 0]
Expand Down Expand Up @@ -866,7 +866,7 @@ def test_horovod_allreduce_grad_process_sets(self):
rank = hvd.rank()
size = hvd.size()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

even_ranks = [rk for rk in range(0, size) if rk % 2 == 0]
Expand Down Expand Up @@ -1035,7 +1035,7 @@ def test_horovod_grouped_allreduce_process_sets(self):
rank = hvd.rank()
size = hvd.size()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

even_ranks = [rk for rk in range(0, size) if rk % 2 == 0]
Expand Down Expand Up @@ -1180,7 +1180,7 @@ def test_horovod_grouped_allreduce_grad_process_sets(self):
rank = hvd.rank()
size = hvd.size()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

even_ranks = [rk for rk in range(0, size) if rk % 2 == 0]
Expand Down Expand Up @@ -1360,7 +1360,7 @@ def test_horovod_allgather_process_sets(self):
rank = hvd.rank()
size = hvd.size()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

even_ranks = [rk for rk in range(0, size) if rk % 2 == 0]
Expand Down Expand Up @@ -1536,7 +1536,7 @@ def test_horovod_allgather_grad_process_sets(self):
rank = hvd.rank()
size = hvd.size()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

even_ranks = [rk for rk in range(0, size) if rk % 2 == 0]
Expand Down Expand Up @@ -1638,7 +1638,7 @@ def test_horovod_grouped_allgather_process_sets(self):
rank = hvd.rank()
size = hvd.size()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

even_ranks = [rk for rk in range(0, size) if rk % 2 == 0]
Expand Down Expand Up @@ -1824,7 +1824,7 @@ def test_horovod_broadcast_process_sets(self):
rank = hvd.rank()
size = hvd.size()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

# This test does not apply if there is only one worker.
Expand Down Expand Up @@ -2012,7 +2012,7 @@ def test_horovod_broadcast_grad_process_sets(self):
rank = hvd.rank()
size = hvd.size()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

# This test does not apply if there is only one worker.
Expand Down Expand Up @@ -2208,7 +2208,7 @@ def test_horovod_alltoall_process_sets(self):
if hvd.nccl_built() and hvd.nccl_built() < 2700:
self.skipTest("NCCL-based Alltoall requires NCCL version >= 2.7.0.")

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

# This test does not apply if there is only one worker.
Expand Down Expand Up @@ -2469,7 +2469,7 @@ def test_horovod_alltoall_grad_process_sets(self):
if hvd.nccl_built() and hvd.nccl_built() < 2700:
self.skipTest("NCCL-based Alltoall requires NCCL version >= 2.7.0.")

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

# This test does not apply if there is only one worker.
Expand Down Expand Up @@ -2776,7 +2776,7 @@ def test_broadcast_object(self):
def test_broadcast_object_process_sets(self):
hvd.init()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

# This test does not apply if there is only one worker.
Expand Down Expand Up @@ -3569,7 +3569,7 @@ def test_async_sparse_allreduce_process_sets(self):
"""Test that allgather over indices and values is equivalent to allreduce if restricted to process sets."""
hvd.init()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

# This test does not apply if there is only one worker.
Expand Down Expand Up @@ -3613,7 +3613,7 @@ def test_optimizer_process_sets(self):
Note that this test makes the most sense when running with > 2 processes."""
hvd.init()

if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Multiple process sets currently do not support CCL.")

# This test does not apply if there is only one worker.
Expand Down Expand Up @@ -3773,7 +3773,7 @@ def test_barrier_with_multiple_collectives(self):

def test_horovod_reducescatter(self):
"""Test that reducescatter correctly sums and scatters 1D, 2D, 3D tensors."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -3821,7 +3821,7 @@ def test_horovod_reducescatter(self):

def test_horovod_reducescatter_average(self):
"""Test that reducescatter correctly averages and scatters 1D, 2D, 3D tensors."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -3864,7 +3864,7 @@ def test_horovod_reducescatter_average(self):

def test_horovod_reducescatter_prescale(self):
"""Test that reducescatter correctly sums and scatters 1D, 2D, 3D tensors with prescaling."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -3936,7 +3936,7 @@ def test_horovod_reducescatter_prescale(self):

def test_horovod_reducescatter_postscale(self):
"""Test that reducescatter correctly sums and scatters 1D, 2D, 3D tensors with postscaling."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4008,7 +4008,7 @@ def test_horovod_reducescatter_postscale(self):
assert max_difference <= threshold, 'hvd.reducescatter produces incorrect results'

def test_horovod_reducescatter_scalar_error(self):
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand All @@ -4020,7 +4020,7 @@ def test_horovod_reducescatter_scalar_error(self):

def test_horovod_reducescatter_adasum(self):
"""Test that the reducescatter raises an error if we use Adasum operation."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand All @@ -4033,10 +4033,6 @@ def test_horovod_reducescatter_adasum(self):
dtypes += [torch.cuda.IntTensor, torch.cuda.LongTensor,
torch.cuda.FloatTensor, torch.cuda.DoubleTensor,
torch.cuda.HalfTensor]
elif self.is_xpu_available():
dtypes += [torch.xpu.IntTensor, torch.xpu.LongTensor,
torch.xpu.FloatTensor, torch.xpu.DoubleTensor,
torch.xpu.HalfTensor]
dims = [1, 2, 3]
for dtype, dim in itertools.product(dtypes, dims):
torch.manual_seed(1234)
Expand All @@ -4053,7 +4049,7 @@ def test_horovod_reducescatter_adasum(self):
def test_horovod_reducescatter_async_fused(self):
"""Test that the reducescatter correctly sums 1D, 2D, 3D tensors
with Tensor Fusion."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4114,7 +4110,7 @@ def test_horovod_reducescatter_async_fused(self):
def test_horovod_reducescatter_error(self):
"""Test that the reducescatter raises an error if different ranks try to
send tensors of different rank or dimension."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4152,7 +4148,7 @@ def test_horovod_reducescatter_error(self):
def test_horovod_reducescatter_type_error(self):
"""Test that the reducescatter raises an error if different ranks try to
send tensors of different type."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4180,7 +4176,7 @@ def test_horovod_reducescatter_type_error(self):
def test_horovod_reducescatter_duplicate_name_error(self):
"""Test that the reducescatter raises an error if there are
two concurrent operations with the same name."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4214,7 +4210,7 @@ def test_horovod_reducescatter_duplicate_name_error(self):

def test_horovod_reducescatter_grad(self):
"""Test the correctness of the reducescatter gradient."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4247,7 +4243,7 @@ def test_horovod_reducescatter_grad(self):

def test_horovod_reducescatter_grad_average(self):
"""Test the correctness of the reducescatter averaged gradient."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4281,7 +4277,7 @@ def test_horovod_reducescatter_grad_average(self):
def test_horovod_reducescatter_process_sets(self):
"""Test that reducescatter correctly sums and scatters 1D, 2D, 3D tensors if restricted
to non-global process sets."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4347,7 +4343,7 @@ def test_horovod_reducescatter_process_sets(self):

def test_horovod_reducescatter_grad_process_sets(self):
"""Test the correctness of the reducescatter gradient if restricted to non-global process sets."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4399,7 +4395,7 @@ def test_horovod_reducescatter_grad_process_sets(self):

def test_horovod_grouped_reducescatter(self):
"""Test that grouped reducescatter correctly sums and scatters 1D, 2D, 3D tensors."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4446,7 +4442,7 @@ def test_horovod_grouped_reducescatter(self):

def test_horovod_grouped_reducescatter_average(self):
"""Test that grouped reducescatter correctly averages and scatters 1D, 2D, 3D tensors."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4493,7 +4489,7 @@ def test_horovod_grouped_reducescatter_average(self):

def test_horovod_grouped_reducescatter_prescale(self):
"""Test that grouped reducescatter correctly sums and scatters 1D, 2D, 3D tensors with prescaling."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4566,7 +4562,7 @@ def test_horovod_grouped_reducescatter_prescale(self):

def test_horovod_grouped_reducescatter_postscale(self):
"""Test that grouped reducescatter correctly sums and scatters 1D, 2D, 3D tensors with postscaling."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand All @@ -4583,13 +4579,13 @@ def test_horovod_grouped_reducescatter_postscale(self):
dtypes += [torch.cuda.IntTensor, torch.cuda.LongTensor,
torch.cuda.FloatTensor, torch.cuda.DoubleTensor,
torch.cuda.HalfTensor]
int_types += [torch.cuda.IntTensor, torch.LongTensor]
int_types += [torch.cuda.IntTensor, torch.cuda.LongTensor]
half_types += [torch.cuda.HalfTensor]
elif self.is_xpu_available():
dtypes += [torch.xpu.IntTensor, torch.xpu.LongTensor,
torch.xpu.FloatTensor, torch.xpu.DoubleTensor,
torch.xpu.HalfTensor]
int_types += [torch.xpu.IntTensor, torch.LongTensor]
int_types += [torch.xpu.IntTensor, torch.xpu.LongTensor]
half_types += [torch.xpu.HalfTensor]
dims = [1, 2, 3]
np.random.seed(12345)
Expand All @@ -4601,8 +4597,11 @@ def test_horovod_grouped_reducescatter_postscale(self):
summed_list = hvd.grouped_reducescatter(tensors, op=hvd.Sum, postscale_factor=factor)

factor = torch.tensor(factor, dtype=torch.float64)
factor = factor.cuda(hvd.local_rank()) if dtype.is_cuda else factor
if dtype.is_cuda and not int(os.environ.get('HOROVOD_MIXED_INSTALL', 0)):
if dtype.is_cuda:
factor = factor.cuda(hvd.local_rank())
elif dtype.is_xpu:
factor = factor.xpu('xpu:{}'.format(hvd.local_rank()))
if (dtype.is_cuda or dtype.is_xpu) and not int(os.environ.get('HOROVOD_MIXED_INSTALL', 0)):
# For integer types, scaling done in FP64
factor = factor.type(torch.float64 if dtype in int_types else dtype)
tensors = [tensor.type(torch.float64 if dtype in int_types else dtype) for tensor in tensors]
Expand Down Expand Up @@ -4636,7 +4635,7 @@ def test_horovod_grouped_reducescatter_postscale(self):


def test_horovod_grouped_reducescatter_scalar_error(self):
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand All @@ -4649,7 +4648,7 @@ def test_horovod_grouped_reducescatter_scalar_error(self):

def test_horovod_grouped_reducescatter_process_sets(self):
"""Test that grouped reducescatter correctly sums and scatters 1D, 2D, 3D tensors if restricted to process sets."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down Expand Up @@ -4712,7 +4711,7 @@ def test_horovod_grouped_reducescatter_process_sets(self):

def test_horovod_grouped_reducescatter_grad(self):
"""Test the correctness of the grouped reducescatter gradient."""
if hvd.ccl_built():
if hvd.ccl_built() and not hvd.sycl_built():
self.skipTest("Reducescatter is not supported yet with oneCCL operations.")
if _is_mac and hvd.gloo_built() and not hvd.mpi_built():
self.skipTest("ReducescatterGloo is not supported on macOS")
Expand Down

0 comments on commit a7e580a

Please sign in to comment.