Skip to content

Conversation

d4l3k
Copy link
Member

@d4l3k d4l3k commented May 13, 2025

This adds support for using torch dtypes in CUDA kernels when building PyTorch.

Tested with: pytorch/pytorch#153406

Test plan:

import os
import time

transport = "TCP"
#transport = "IBVERBS"

os.environ["GLOO_DEVICE_TRANSPORT"] = transport
rank = int(os.environ["RANK"])
os.environ["CUDA_VISIBLE_DEVICES"] = str(rank)

ibv = "mlx5_0:1,mlx5_3:1,mlx5_4:1,mlx5_5:1,mlx5_6:1,mlx5_9:1,mlx5_10:1,mlx5_11:1".split(",")[rank]
ibv_name, ibv_port = ibv.split(":")
os.environ["TORCH_GLOO_IBV_NAME"] = ibv_name
os.environ["TORCH_GLOO_IBV_PORT"] = ibv_port
os.environ["TORCH_GLOO_IBV_INDEX"] = "3"

import torch
import torch.distributed as dist

dist.init_process_group("gloo")

rank = dist.get_rank()

# initial sanity check
#device = "cpu"
#t = torch.zeros(10, device=device)
#dist.all_reduce(t)
#print("sanity complete")

device = "cpu"

iters = 10
warmup_iters = 2

for nelem in [10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000]:
    t = torch.zeros(nelem, device=device)

    torch.cuda.current_stream().synchronize()
    for i in range(warmup_iters):
        dist.all_reduce(t)

    torch.cuda.current_stream().synchronize()

    start = time.perf_counter()

    for i in range(iters):
        dist.all_reduce(t)

    torch.cuda.current_stream().synchronize()

    dur = (time.perf_counter() - start)
    qps = iters/dur

    bandwidth_gb = t.nbytes * iters / dur / 1e9

    gb = t.nbytes / 1e9

    if rank == 0:
        print(f"{transport=} {device=} {iters=} {nelem=} {qps=} {gb=} {bandwidth_gb=}\n", end="")

@d4l3k d4l3k force-pushed the d4l3k/torch_dtypes branch from 80cc076 to ff3d11a Compare May 14, 2025 17:56
Copy link
Contributor

@fduwjj fduwjj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, is there any other dtype we want to support aside from bf16?


message(STATUS "GLOO_USE_TORCH_DTYPES : ${GLOO_USE_TORCH_DTYPES} ${GLOO_TORCH_DIR}")
if(GLOO_USE_TORCH_DTYPES)
target_include_directories(gloo_hip PRIVATE ${GLOO_TORCH_DIR})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting... Gloo also supports RCOM?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup -- it's a bit of a mess but yes. It uses HIP the same way pytorch does

@d4l3k d4l3k merged commit fe67c4b into main May 15, 2025
7 checks passed
@d4l3k d4l3k deleted the d4l3k/torch_dtypes branch May 15, 2025 04:49
pytorchmergebot pushed a commit to pytorch/pytorch that referenced this pull request May 16, 2025
This enables Gloo CUDA when used with a backend that supports GPUDirect which currently is only the IBVERBS backend.

This requires some changes to Gloo which are in pytorch/gloo#441

Since we're now depending on gloo_cuda we need to split ProcessGroupGloo into two pieces, one with the CPU bits (libtorch_cpu) and one with CUDA kernels in libtorch_cuda. This unfortunately requires some major refactoring as some CPU code is shared across both.

The gloo submodule is updated to depend on the new Gloo changes

Test plan:

```py
import os
import time

transport = "TCP"
#transport = "IBVERBS"

os.environ["GLOO_DEVICE_TRANSPORT"] = transport
rank = int(os.environ["RANK"])
os.environ["CUDA_VISIBLE_DEVICES"] = str(rank)

ibv = "mlx5_0:1,mlx5_3:1,mlx5_4:1,mlx5_5:1,mlx5_6:1,mlx5_9:1,mlx5_10:1,mlx5_11:1".split(",")[rank]
ibv_name, ibv_port = ibv.split(":")
os.environ["TORCH_GLOO_IBV_NAME"] = ibv_name
os.environ["TORCH_GLOO_IBV_PORT"] = ibv_port
os.environ["TORCH_GLOO_IBV_INDEX"] = "3"

import torch
import torch.distributed as dist

dist.init_process_group("gloo")

rank = dist.get_rank()

# initial sanity check
#device = "cpu"
#t = torch.zeros(10, device=device)
#dist.all_reduce(t)
#print("sanity complete")

device = "cpu"

iters = 10
warmup_iters = 2

for nelem in [10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000]:
    t = torch.zeros(nelem, device=device)

    torch.cuda.current_stream().synchronize()
    for i in range(warmup_iters):
        dist.all_reduce(t)

    torch.cuda.current_stream().synchronize()

    start = time.perf_counter()

    for i in range(iters):
        dist.all_reduce(t)

    torch.cuda.current_stream().synchronize()

    dur = (time.perf_counter() - start)
    qps = iters/dur

    bandwidth_gb = t.nbytes * iters / dur / 1e9

    gb = t.nbytes / 1e9

    if rank == 0:
        print(f"{transport=} {device=} {iters=} {nelem=} {qps=} {gb=} {bandwidth_gb=}\n", end="")
```

Pull Request resolved: #153406
Approved by: https://github.com/fduwjj
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants