Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Process Group to support multiple backends #88330

Closed
wants to merge 47 commits into from

Conversation

H-Huang
Copy link
Member

@H-Huang H-Huang commented Nov 2, 2022

Stack from ghstack:

Context:

#86225

Implementation

Move backend-specific (NCCL, Gloo, etc) collective implementations to corresponding Backend class. Update ProcessGroup to support multiple backends and use dispatcher to calls backends based on tensor device type

Changes

c++ changes (ProcessGroup files, Ops.cpp, init.cpp)

  • Update pybind definitions for new process group base class and new backend class
  • Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. dist.ProcessGroupGloo, dist.ProcessGroupNCCL) which are used in tests
  • Switch ProcessGroupGloo, ProcessGroupNCCL, ProcessGroupMPI, ProcessGroupUCC to derive from the Backend class.
  • Update CPU/CUDA Ops.cpp and OpsImpl.cpp to perform this dispatching by querying the backend using the device type
  • Update internal dispatched implementation of barrier to use a tensor which allows operation to be dispatched.
  • Update allgather collective to use TensorList. For some reason it was using the default implementation of allgather rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

python changes (distributed_c10d.py, test files)

  • Add BackendConfig class to specify the default configurations of backends and get_backend_config() API
  • get_backend() deprecation warning
  • init_process_group how returns a generic ProcessGroup object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
  • new_group updated to return the same as above
  • Update test_c10d_gloo.py, Update DistributedDataParallelTest to use init_process_group, Update ReducerTest, update test_broadcast_coalesced_gloo to move from PG instance and gloo options
  • Update test_c10d_nccl.py, Update DistributedDataParallelTest to use init_process_group
  • Specific tests updated: test_Backend_enum_class

Changes missing

  • lazy initialization of backends
  • support parsing of BackendConfig

open questions

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))

Differential Revision: D42069829

…pecfic implementations to backend class

[ghstack-poisoned]
@pytorch-bot
Copy link

pytorch-bot bot commented Nov 2, 2022

🔗 Helpful Links

🧪 See artifacts and rendered test results at hud.pytorch.org/pr/88330

Note: Links to docs will display an error until the docs builds have been completed.

✅ No Failures

As of commit db26d70:
💚 Looks good so far! There are no failures yet. 💚

This comment was automatically generated by Dr. CI and updates every 15 minutes.

@pytorch-bot pytorch-bot bot added the release notes: distributed (c10d) release notes category label Nov 2, 2022
H-Huang added a commit that referenced this pull request Nov 2, 2022
…pecfic implementations to backend class

ghstack-source-id: 52d6152242ca43f0f51a8872008b205c44cd928b
Pull Request resolved: #88330
@H-Huang H-Huang changed the title [14/N] Allow Process Group to support multiple backends and move PG specfic implementations to backend class Allow Process Group to support multiple backends and move PG specfic implementations to backend class Nov 2, 2022
…PG specfic implementations to backend class"


# DRAFT PR

### Changes missing
- lazy initialization of backends
- PG wrapper gaps
- init_process_group backend configuration definition

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group("")
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Nov 2, 2022
…pecfic implementations to backend class

ghstack-source-id: c1226485796206eb4c7108c1c4b142a3d1dcb835
Pull Request resolved: #88330
…PG specfic implementations to backend class"


# DRAFT PR

### Changes missing
- lazy initialization of backends
- PG wrapper gaps
- init_process_group backend configuration definition

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group("")
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Nov 2, 2022
…implementations to backend class

ghstack-source-id: d83622b6a592950634463c07c215d66c49b44488
Pull Request resolved: #88330
…PG specfic implementations to backend class"


# DRAFT PR

### Changes
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Add BackendConfig class to specify the default configurations of backends
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching
- Update internal implementation of `barrier` to use a tensor which allows it to be dispatched.

### Changes missing
- lazy initialization of backends
- PG wrapper gaps
- init_process_group backend configuration definition

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group("")
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Nov 3, 2022
…implementations to backend class

ghstack-source-id: b37b54331d60b7a31b5e7bd1fb98fdf45bbbe943
Pull Request resolved: #88330
…PG specfic implementations to backend class"


# DRAFT PR

### Changes
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Add BackendConfig class to specify the default configurations of backends
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.

####  other things changes
- `_verify_params_across_processes` uses a ProcessGroup instance, so the definition needs to call into a backend to be dispatched

### Changes missing
- lazy initialization of backends
- PG wrapper gaps
- init_process_group backend configuration definition
-  new group functionality
- how to handle pg_options?

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group("")
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Nov 3, 2022
…implementations to backend class

ghstack-source-id: d8f974fb6e5f5457f3989d378ab028cbfbb2167d
Pull Request resolved: #88330
…PG specfic implementations to backend class"


# DRAFT PR

### Changes
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Add BackendConfig class to specify the default configurations of backends
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

### Changes missing
- lazy initialization of backends
- PG wrapper gaps
- init_process_group backend configuration definition
-  new group functionality
- how to handle pg_options?

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Nov 3, 2022
…implementations to backend class

ghstack-source-id: bf1dade29869c242e0b066013c8a653c625bb646
Pull Request resolved: #88330
…PG specfic implementations to backend class"


# DRAFT PR

### Changes
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Add BackendConfig class to specify the default configurations of backends
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

### Changes missing
- lazy initialization of backends
- PG wrapper gaps
- init_process_group backend configuration definition
-  new group functionality

### open questions
- how to handle `dist.all_gather_into_tensor`
- how to handle pg_options?
- 68 test failures (https://github.com/pytorch/pytorch/actions/runs/3389388359/jobs/5632608684), now unsupported APIs
- Pure Python PG extensions (#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Nov 4, 2022
…implementations to backend class

ghstack-source-id: a5365bced8df5ba49db39afda92c5db24118c1c8
Pull Request resolved: #88330
…PG specfic implementations to backend class"


# DRAFT PR

### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above

### Changes missing
- lazy initialization of backends
- PG wrapper gaps
- support parsing of BackendConfig

### open questions
- how to handle `dist.all_gather_into_tensor`
- how to handle pg_options?
- 68 test failures (https://github.com/pytorch/pytorch/actions/runs/3389388359/jobs/5632608684), now unsupported APIs
- Pure Python PG extensions (#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Nov 7, 2022
…implementations to backend class

ghstack-source-id: 58d6d122d253f6ac2db72bc4ad02b4da72e047d2
Pull Request resolved: #88330
…PG specfic implementations to backend class"


# DRAFT PR

### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above

### Changes missing
- lazy initialization of backends
- PG wrapper gaps
- support parsing of BackendConfig

### open questions
- how to handle `dist.all_gather_into_tensor`
- how to handle pg_options?
- 68 test failures (https://github.com/pytorch/pytorch/actions/runs/3389388359/jobs/5632608684), now unsupported APIs
- Pure Python PG extensions (#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Nov 7, 2022
…implementations to backend class

ghstack-source-id: f4f1c5972dd1b626a25ef01d1d41470119836dbd
Pull Request resolved: #88330
…PG specfic implementations to backend class"


# DRAFT PR

### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above

### Changes missing
- lazy initialization of backends
- PG wrapper gaps
- support parsing of BackendConfig

### open questions
- how to handle `dist.all_gather_into_tensor`
- how to handle pg_options?
- 68 test failures (https://github.com/pytorch/pytorch/actions/runs/3389388359/jobs/5632608684), now unsupported APIs
- Pure Python PG extensions (#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Nov 7, 2022
…implementations to backend class

ghstack-source-id: 658cd6f640d72898e48a74f248a9258a482bab16
Pull Request resolved: #88330
…PG specfic implementations to backend class"


# DRAFT PR

### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above

### Changes missing
- lazy initialization of backends
- PG wrapper gaps
- support parsing of BackendConfig

### open questions
- how to handle `dist.all_gather_into_tensor`
- how to handle pg_options?
- 68 test failures (https://github.com/pytorch/pytorch/actions/runs/3389388359/jobs/5632608684), now unsupported APIs
- Pure Python PG extensions (#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Nov 9, 2022
…implementations to backend class

ghstack-source-id: 40c12511cb2641cb840b1caa9ed4db3dcd918a17
Pull Request resolved: #88330
…PG specfic implementations to backend class"


### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`, test files)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above
- Update `test_c10d_gloo.py`, Update `DistributedDataParallelTest` to use `init_process_group`, Update `ReducerTest`, update `test_broadcast_coalesced_gloo` to move from PG instance and gloo options
- Update `test_c10d_nccl.py`, Update `DistributedDataParallelTest` to use `init_process_group`
- Specific tests updated: `test_Backend_enum_class`

### Changes missing
- lazy initialization of backends
- support parsing of BackendConfig

### open questions
- Pure Python PG extensions (#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```

Differential Revision: [D41812672](https://our.internmc.facebook.com/intern/diff/D41812672)

[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Dec 13, 2022
…implementations to backend class

ghstack-source-id: dd54833d274901aa112be9b4fab244446204ff54
Pull Request resolved: #88330
@H-Huang
Copy link
Member Author

H-Huang commented Dec 13, 2022

@H-Huang has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@H-Huang
Copy link
Member Author

H-Huang commented Dec 13, 2022

@H-Huang has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

Copy link
Contributor

@kwen2501 kwen2501 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the epic PR, @H-Huang !
I only added two minor comments for the changes since my last review. They are not urgent, but would be great if we can get to them at some point.
Congrats on this large PR!

Comment on lines 119 to 122
if (backendTypeToBackend_.find(backendType) != backendTypeToBackend_.end()) {
auto backend = backendTypeToBackend_.at(backendType);
deviceTypeToBackend_[deviceType] = backend;
return backend;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it seems this unnecessary 2nd search is still here. Can you please remove it later? Thanks!

# only create single backend pg when backend is set to gloo, nccl, mpi, etc.
if backend_enum.lower() in Backend.backend_list:
for device in backend_config.get_device_backend_map().keys():
pg._register_backend(torch.device(device), backend_type, backend)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if not yet done so, let's create an issue for tracking the move of _register_backend to c++ level.

@kwen2501 kwen2501 changed the title Allow Process Group to support multiple backends and move PG specfic implementations to backend class Allow Process Group to support multiple backends Dec 14, 2022
### Implementation
Move backend-specific (NCCL, Gloo, etc) collective implementations to corresponding `Backend` class

### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`, test files)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above
- Update `test_c10d_gloo.py`, Update `DistributedDataParallelTest` to use `init_process_group`, Update `ReducerTest`, update `test_broadcast_coalesced_gloo` to move from PG instance and gloo options
- Update `test_c10d_nccl.py`, Update `DistributedDataParallelTest` to use `init_process_group`
- Specific tests updated: `test_Backend_enum_class`

### Changes missing
- lazy initialization of backends
- support parsing of BackendConfig

### open questions
- Pure Python PG extensions (#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```

Differential Revision: [D42002805](https://our.internmc.facebook.com/intern/diff/D42002805)

[ghstack-poisoned]
### Implementation
Move backend-specific (NCCL, Gloo, etc) collective implementations to corresponding `Backend` class

### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`, test files)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above
- Update `test_c10d_gloo.py`, Update `DistributedDataParallelTest` to use `init_process_group`, Update `ReducerTest`, update `test_broadcast_coalesced_gloo` to move from PG instance and gloo options
- Update `test_c10d_nccl.py`, Update `DistributedDataParallelTest` to use `init_process_group`
- Specific tests updated: `test_Backend_enum_class`

### Changes missing
- lazy initialization of backends
- support parsing of BackendConfig

### open questions
- Pure Python PG extensions (#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
### Implementation
Move backend-specific (NCCL, Gloo, etc) collective implementations to corresponding `Backend` class

### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`, test files)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above
- Update `test_c10d_gloo.py`, Update `DistributedDataParallelTest` to use `init_process_group`, Update `ReducerTest`, update `test_broadcast_coalesced_gloo` to move from PG instance and gloo options
- Update `test_c10d_nccl.py`, Update `DistributedDataParallelTest` to use `init_process_group`
- Specific tests updated: `test_Backend_enum_class`

### Changes missing
- lazy initialization of backends
- support parsing of BackendConfig

### open questions
- Pure Python PG extensions (#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Dec 15, 2022
ghstack-source-id: 7dccf9b74ad4879ed7e9844bde04612c242a7941
Pull Request resolved: #88330
### Implementation
Move backend-specific (NCCL, Gloo, etc) collective implementations to corresponding `Backend` class

### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`, test files)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above
- Update `test_c10d_gloo.py`, Update `DistributedDataParallelTest` to use `init_process_group`, Update `ReducerTest`, update `test_broadcast_coalesced_gloo` to move from PG instance and gloo options
- Update `test_c10d_nccl.py`, Update `DistributedDataParallelTest` to use `init_process_group`
- Specific tests updated: `test_Backend_enum_class`

### Changes missing
- lazy initialization of backends
- support parsing of BackendConfig

### open questions
- Pure Python PG extensions (#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```


[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Dec 15, 2022
ghstack-source-id: f6913300a6274e96532ddc4183ec980be567f42e
Pull Request resolved: #88330
@H-Huang
Copy link
Member Author

H-Huang commented Dec 15, 2022

@H-Huang has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

### Implementation
Move backend-specific (NCCL, Gloo, etc) collective implementations to corresponding `Backend` class

### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`, test files)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above
- Update `test_c10d_gloo.py`, Update `DistributedDataParallelTest` to use `init_process_group`, Update `ReducerTest`, update `test_broadcast_coalesced_gloo` to move from PG instance and gloo options
- Update `test_c10d_nccl.py`, Update `DistributedDataParallelTest` to use `init_process_group`
- Specific tests updated: `test_Backend_enum_class`

### Changes missing
- lazy initialization of backends
- support parsing of BackendConfig

### open questions
- Pure Python PG extensions (#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```

Differential Revision: [D42069829](https://our.internmc.facebook.com/intern/diff/D42069829)

[ghstack-poisoned]
H-Huang added a commit that referenced this pull request Dec 15, 2022
ghstack-source-id: df5ac14aee8f9230daf88d0c3584aaf7d2a6d005
Pull Request resolved: #88330
H-Huang added a commit to H-Huang/pytorch that referenced this pull request Dec 16, 2022
Summary:
Pull Request resolved: pytorch#88330

### Implementation
Move backend-specific (NCCL, Gloo, etc) collective implementations to corresponding `Backend` class

### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`, test files)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above
- Update `test_c10d_gloo.py`, Update `DistributedDataParallelTest` to use `init_process_group`, Update `ReducerTest`, update `test_broadcast_coalesced_gloo` to move from PG instance and gloo options
- Update `test_c10d_nccl.py`, Update `DistributedDataParallelTest` to use `init_process_group`
- Specific tests updated: `test_Backend_enum_class`

### Changes missing
- lazy initialization of backends
- support parsing of BackendConfig

### open questions
- Pure Python PG extensions (pytorch#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```

Test Plan: Imported from OSS

Differential Revision: D42069829

Pulled By: H-Huang

fbshipit-source-id: caef748ab7b5c4d951353324f04d0acc75704eba
H-Huang added a commit to H-Huang/pytorch that referenced this pull request Dec 16, 2022
Summary:
Exported PR:
Pull Request resolved: pytorch#90997

Original PR with comments (APPROVED):
Pull Request resolved: pytorch#88330

### Implementation
Move backend-specific (NCCL, Gloo, etc) collective implementations to corresponding `Backend` class

### Changes

#### c++ changes (ProcessGroup files, `Ops.cpp`, `init.cpp`)
- Update pybind definitions for new process group base class and new backend class
- Update pybinded backend class with collective definitions to keep BC with Python PG instances (e.g. `dist.ProcessGroupGloo`, `dist.ProcessGroupNCCL`) which are used in tests
- Switch `ProcessGroupGloo`, `ProcessGroupNCCL`, `ProcessGroupMPI`, `ProcessGroupUCC` to derive from the `Backend` class.
- Update CPU/CUDA `Ops.cpp` and `OpsImpl.cpp` to perform this dispatching by querying the backend using the device type
- Update internal dispatched implementation of `barrier` to use a tensor which allows operation to be dispatched.
- Update `allgather` collective to use `TensorList`. For some reason it was using the default implementation of `allgather` rather than dispatching it correctly. I still don't understand why and had originally filed an issue in 85122.

#### python changes (`distributed_c10d.py`, test files)
- Add BackendConfig class to specify the default configurations of backends and `get_backend_config()` API
- `get_backend()` deprecation warning
- `init_process_group` how returns a generic `ProcessGroup` object, it contains a list of backends (the ones stated above) which it will dispatch operations to.
- `new_group` updated to return the same as above
- Update `test_c10d_gloo.py`, Update `DistributedDataParallelTest` to use `init_process_group`, Update `ReducerTest`, update `test_broadcast_coalesced_gloo` to move from PG instance and gloo options
- Update `test_c10d_nccl.py`, Update `DistributedDataParallelTest` to use `init_process_group`
- Specific tests updated: `test_Backend_enum_class`

### Changes missing
- lazy initialization of backends
- support parsing of BackendConfig

### open questions
- Pure Python PG extensions (pytorch#66338)

This basic script (using 2 backends within a process group) is working, running CI to understand gaps

```python
# python -m torch.distributed.run --nnodes=1 --nproc_per_node=2 basic_scenario.py
import torch.distributed as dist
import torch
import os

if __name__ == "__main__":
    rank = os.environ.get("RANK")
    # initialize with both gloo and nccl
    dist.init_process_group()
    # with gloo
    dist.all_reduce(torch.tensor([1.0]))
    print(f"Rank {rank} finished")
    # with nccl
    dist.all_reduce(torch.tensor([1.0], device=f"cuda:{rank}"))
```

Test Plan: Imported from OSS

Differential Revision: D42069829

Pulled By: H-Huang

fbshipit-source-id: 8e529f6e2d53180af7e1860b38b4841e5486e369
@facebook-github-bot
Copy link
Contributor

@H-Huang merged this pull request in 7a0f29b.

hasanyeganeh pushed a commit to hasanyeganeh/pytorch-pytorch that referenced this pull request Dec 21, 2022
ghstack-source-id: 19a0f5876cd9b9d21c2f8139d16db13df4643c04
Pull Request resolved: pytorch/pytorch#88330
@facebook-github-bot facebook-github-bot deleted the gh/H-Huang/91/head branch June 8, 2023 14:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ciflow/trunk Trigger trunk jobs on your pull request Merged release notes: distributed (c10d) release notes category skip-pr-sanity-checks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants