New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PyTorch goes distributed #241

Closed
apaszke opened this Issue Nov 20, 2016 · 13 comments

Comments

Projects
None yet
@apaszke
Member

apaszke commented Nov 20, 2016

Together with @0mp, @VirrageS andy @jytug we're developing a torch.distributed package for PyTorch. All work is done in a fork on a thd branch (we didn't want to make a lot of unnecessary noise in the main repo). We're creating this issue, so we can gather feedback on our API designs from all you guys.

We plan to make the package have two modes. The user has to choose one of them as part of the initialisation.

Process group mode

This is very similar to the API defined in MPI. We assume all processes are equal, assign them ranks and later on, allow them to use a well known set of communication collectives like reduce, broadcast, allReduce, gather, scatter, etc.

Example:

import torch.distributed
torch.distributed.init_process_group(backend='tcp')
my_rank = torch.distributed.get_rank()
num_processes = torch.distributed.get_num_processes()

...

if my_rank == 0:
    torch.distributed.send(tensor, 1)
else:
    tensor = torch.distributed.recv(0)

...

result = torch.distributed.all_reduce(tensor)

Master-worker mode

This would provide a very similar API to the torch.cuda package. At the beginning of your script you would have to call torch.distributed.init_master_worker(backend='mpi')

Operation execution is asynchronous w.r.t. to the master process, we'll implement a CUDA-like concurrency model (streams + events). Until then, the only sync points are copies between master and workers.

Example:

import torch.distributed
torch.distributed.init_master_worker(backend='tcp')

x = torch.distributed.FloatTensor(20, 20).fill_(4)
y = torch.randn(20, 20).dist_send()
z = x + y
# z.get_node(), z.get_device() == 0, -1 (i.e. CPU)
cuda_x = x.cuda()
# cuda_x.get_node(), cuda_x.get_device() == 0, 0
with torch.distributed.node(1):
    a = torch.distributed.FloatTensor(10, device=1)
    # a.get_node(), a.get_device() == 1, 1
    cuda_y = y.cuda()
    # cuda_y.get_node(), cuda_y.get_device() == 0, 0
    q = cuda_x + cuda_y
    # q.get_node(), q.get_device() == 0, 0

How to launch the jobs

We'll provide a pytorch_exec utility that will spawn the process groups in a similar fashion that mpiexec does.

Decoupling data backends from other logic

You might have noticed that both init_process_group and init_master_worker accept a backend argument. We're aware that the best strategy for sending the data might be different for every user, and it will be crucial to pick a good one to limit communication overhead. This was the reason why we decided to introduce a DataChannel interface, so users will be able to pick from one of the provided implementations (initially MPI and raw TCP sockets, later RDMA etc.), or add custom ones, so they can easily achieve the lowest overhead possible in their setup.

Please let us know what you think! Thanks!

@shubho

This comment has been minimized.

Show comment
Hide comment
@shubho

shubho Nov 21, 2016

Shubho here from SVAIL @ Baidu

One long-term thing to keep in mind is interfacing with a job scheduler - SLURM is pretty standard. I think using salloc with pytorch_exec should be fairly easy.

The framework that I helped architect at Baidu follows a peer-to-peer model - all workers are peers - no master and no slave - each peer has access to 1 GPU - and has a synchronous view of the parameter space that is completely replicated on each worker. Workers use MPI to communicate - and the MPI byte-transport layer deals with IBVerb and CUDA shared mem transport (if GPUs are on the same PCI-E root complex and many are). This peer-to-peer architecture is relatively simple and scales really well - at least to 256 GPUs and possibly more (at this point people started losing friends for hogging the cluster). It can also sustain about 70% of InfiniBand's peak bandwidth (for FDR Infiniband) but I had to reimplement the MPI collective operations since they are not optimized. This simple architecture has served us well for anything we train - various recurrent nets with or without attention, wavenet and I don't see why convnets should be an issue. This setup is however not fault tolerant - if a peer dies - the training comes to a halt - and SLURM will time it out and reschedule and this is fairly painless since we save checkpoints and it restarts from the last checkpoint. People rarely notice these failures in practice.

Possibly torch.distributed is more full featured than this... I haven't started looking at the code yet.. but will soon...

Happy to test on our cluster since we have FDR InfiniBand backplane with OpenMPI and SLURM and also contribute our learnings and code

shubho commented Nov 21, 2016

Shubho here from SVAIL @ Baidu

One long-term thing to keep in mind is interfacing with a job scheduler - SLURM is pretty standard. I think using salloc with pytorch_exec should be fairly easy.

The framework that I helped architect at Baidu follows a peer-to-peer model - all workers are peers - no master and no slave - each peer has access to 1 GPU - and has a synchronous view of the parameter space that is completely replicated on each worker. Workers use MPI to communicate - and the MPI byte-transport layer deals with IBVerb and CUDA shared mem transport (if GPUs are on the same PCI-E root complex and many are). This peer-to-peer architecture is relatively simple and scales really well - at least to 256 GPUs and possibly more (at this point people started losing friends for hogging the cluster). It can also sustain about 70% of InfiniBand's peak bandwidth (for FDR Infiniband) but I had to reimplement the MPI collective operations since they are not optimized. This simple architecture has served us well for anything we train - various recurrent nets with or without attention, wavenet and I don't see why convnets should be an issue. This setup is however not fault tolerant - if a peer dies - the training comes to a halt - and SLURM will time it out and reschedule and this is fairly painless since we save checkpoints and it restarts from the last checkpoint. People rarely notice these failures in practice.

Possibly torch.distributed is more full featured than this... I haven't started looking at the code yet.. but will soon...

Happy to test on our cluster since we have FDR InfiniBand backplane with OpenMPI and SLURM and also contribute our learnings and code

@thatguymike

This comment has been minimized.

Show comment
Hide comment
@thatguymike

thatguymike Nov 21, 2016

@shubho - I agree with respect to the scheduler, but I also don't see a large issue. However, we want to take some care in being reliant on MPI and checkpoint support. I like MPI because it gives you a good default support base, but can have interesting performance issues. We tend to go right to verbs and sockets because performance can be much better.

What we have found works the best currently is fast on node reduction and then reduction across nodes. However, in practice this means not reducing across PCIe root complexes and treating each root complex that GPUs and hopefully a NIC are attached to as a node. But if you have more complex system configs, for example more IB cards, it gets more complex. Or if you have a custom interconnect... ;-)

Scaling you can achieve will matter a whole lot on the speed of each "node" and the model, but I would expect speech models specifcally to scale pretty well, especially because you can increase the global batch pretty aggressively and maintain convergence. However, that can make checkpoint and recovery on a different sized total machine more problematic if that machine can't fit the global batch of the original. All the hyperparams need to change (generally) to adjust.

But, I like the general direction and idea. I think it's really figuring out an abstraction that allows the transport layer to be changed easily (even if by extension) and support hierarchical setups.

More complex will be model and hybrid paralel designs.

However, the most complicated thing we tend to run into is the input IO pipeline and synchronization across a filesystem. It can make for some interesting interactions, especially if your IO is on the same NICs you are using for reduction...

thatguymike commented Nov 21, 2016

@shubho - I agree with respect to the scheduler, but I also don't see a large issue. However, we want to take some care in being reliant on MPI and checkpoint support. I like MPI because it gives you a good default support base, but can have interesting performance issues. We tend to go right to verbs and sockets because performance can be much better.

What we have found works the best currently is fast on node reduction and then reduction across nodes. However, in practice this means not reducing across PCIe root complexes and treating each root complex that GPUs and hopefully a NIC are attached to as a node. But if you have more complex system configs, for example more IB cards, it gets more complex. Or if you have a custom interconnect... ;-)

Scaling you can achieve will matter a whole lot on the speed of each "node" and the model, but I would expect speech models specifcally to scale pretty well, especially because you can increase the global batch pretty aggressively and maintain convergence. However, that can make checkpoint and recovery on a different sized total machine more problematic if that machine can't fit the global batch of the original. All the hyperparams need to change (generally) to adjust.

But, I like the general direction and idea. I think it's really figuring out an abstraction that allows the transport layer to be changed easily (even if by extension) and support hierarchical setups.

More complex will be model and hybrid paralel designs.

However, the most complicated thing we tend to run into is the input IO pipeline and synchronization across a filesystem. It can make for some interesting interactions, especially if your IO is on the same NICs you are using for reduction...

@shubho

This comment has been minimized.

Show comment
Hide comment
@shubho

shubho Nov 21, 2016

We have thought about doing more topology aware reduction like you suggested - but then backed off because our collective turned out fast enough the ring-reduce algorithm is really good for large matrices. The ring-reduce conceptually treats every neighboring link as the same - though sometimes they will span QPI bus and sometimes go to the IB switch. Thankfully ignoring this difference hasn't bitten us... yet.. :) In some sense I am just using MPI as a thin layer over all the different transports with just calls to Send and Recv along with their non-blocking versions. In some sense I want a OpenMPI-lite that only has Send, Recv, ISend, IRecv and SendRecv - the rest of the collectives are best written by hand and then you can even play around with sending in reduced precision etc.

Actually speech is not that easy to scale - anything above global batch 1024 hasn't so far worked. So we rarely train beyond 64 GPUs though we can easily scale to 256 GPUs with weak linear scaling with worse convergence. So far hyperparameter or model search hasn't yielded something amenable above 1024 :(

Yes input I/O is the 800 lb gorilla in the room for training that nobody seems to talk about. Designing networked read-only filesystems that can saturate the GPUs is an unsolved problem. We have solved through various hacks - they are not pretty and not very scalable and has been a source of a lot of frustration. And our checkpoint writing can also stress the filesystem. I think this is a broad research topic - training data input and checkpoint data output - but we should worry about this later.

shubho commented Nov 21, 2016

We have thought about doing more topology aware reduction like you suggested - but then backed off because our collective turned out fast enough the ring-reduce algorithm is really good for large matrices. The ring-reduce conceptually treats every neighboring link as the same - though sometimes they will span QPI bus and sometimes go to the IB switch. Thankfully ignoring this difference hasn't bitten us... yet.. :) In some sense I am just using MPI as a thin layer over all the different transports with just calls to Send and Recv along with their non-blocking versions. In some sense I want a OpenMPI-lite that only has Send, Recv, ISend, IRecv and SendRecv - the rest of the collectives are best written by hand and then you can even play around with sending in reduced precision etc.

Actually speech is not that easy to scale - anything above global batch 1024 hasn't so far worked. So we rarely train beyond 64 GPUs though we can easily scale to 256 GPUs with weak linear scaling with worse convergence. So far hyperparameter or model search hasn't yielded something amenable above 1024 :(

Yes input I/O is the 800 lb gorilla in the room for training that nobody seems to talk about. Designing networked read-only filesystems that can saturate the GPUs is an unsolved problem. We have solved through various hacks - they are not pretty and not very scalable and has been a source of a lot of frustration. And our checkpoint writing can also stress the filesystem. I think this is a broad research topic - training data input and checkpoint data output - but we should worry about this later.

@apaszke

This comment has been minimized.

Show comment
Hide comment
@apaszke

apaszke Nov 21, 2016

Member

Actually, we’ve abstracted the code used for transmitting the data into a DataChannel interface. Right now we have two implementations - TCP and MPI. But afterwards, we can probably do something like what either / both of you have converged to - which is to use NCCL at a PCI-e root level, and MPI / IBVerbs at a Node level.

However, the whole point is that we want you to be able to customize it even further by writing custom backends. We tried to make it so that the changes to THD code would be minimal. Later, your own implementations can be selected at startup at the Python level by passing its identifier to an init method (and the rest of the python code remains completely unchanged). This can even be maintained as a separate internal library.

At the initial stages, we are focusing on correctness, so we're implementing a simple-stupid-working implementation - with simply calling MPI or writing non-performant TCP routines. Once we cross the unit tests stage, and write all Python bindings, we'll aggressively focus on perf as well.

Also, RPC calls in master-worker mode always use ZMQ for communication, so it won’t mess up any logic in your custom DataChannels. You can be sure they’ll only be used for tensor data. Since all messages will be very very small (10-100B) we decided that it’s not necessary to make the command channel interchangeable.

@shubho - it seems that the approach you used is compatible with the process group mode. It’s pretty much the same as using raw MPI - all workers are equal and there’s no master. We’re adding master-worker mode just because it might be simpler for some people to use an API they know from our CUDA packages. It’s pretty much the same principle - you can make your code automatically distributed by only calling .dist_send(). No need to rearrange stuff and interleave it with MPI collectives. It’s probably not going to keep 100 workers busy, but as long as you keep it <20 it should be ok (workers can use multiple GPUs).

About SLURM, if you use the MPI backend, you wouldn't even need to use pytorch_exec, you can simply use mpiexec.

If any of you were browsing the code and would like to ask any questions feel free to reach out to me on slack or by email, and I can give you a tour or explain what’s the plan at that moment.

Member

apaszke commented Nov 21, 2016

Actually, we’ve abstracted the code used for transmitting the data into a DataChannel interface. Right now we have two implementations - TCP and MPI. But afterwards, we can probably do something like what either / both of you have converged to - which is to use NCCL at a PCI-e root level, and MPI / IBVerbs at a Node level.

However, the whole point is that we want you to be able to customize it even further by writing custom backends. We tried to make it so that the changes to THD code would be minimal. Later, your own implementations can be selected at startup at the Python level by passing its identifier to an init method (and the rest of the python code remains completely unchanged). This can even be maintained as a separate internal library.

At the initial stages, we are focusing on correctness, so we're implementing a simple-stupid-working implementation - with simply calling MPI or writing non-performant TCP routines. Once we cross the unit tests stage, and write all Python bindings, we'll aggressively focus on perf as well.

Also, RPC calls in master-worker mode always use ZMQ for communication, so it won’t mess up any logic in your custom DataChannels. You can be sure they’ll only be used for tensor data. Since all messages will be very very small (10-100B) we decided that it’s not necessary to make the command channel interchangeable.

@shubho - it seems that the approach you used is compatible with the process group mode. It’s pretty much the same as using raw MPI - all workers are equal and there’s no master. We’re adding master-worker mode just because it might be simpler for some people to use an API they know from our CUDA packages. It’s pretty much the same principle - you can make your code automatically distributed by only calling .dist_send(). No need to rearrange stuff and interleave it with MPI collectives. It’s probably not going to keep 100 workers busy, but as long as you keep it <20 it should be ok (workers can use multiple GPUs).

About SLURM, if you use the MPI backend, you wouldn't even need to use pytorch_exec, you can simply use mpiexec.

If any of you were browsing the code and would like to ask any questions feel free to reach out to me on slack or by email, and I can give you a tour or explain what’s the plan at that moment.

@seba-1511

This comment has been minimized.

Show comment
Hide comment
@seba-1511

seba-1511 Jan 18, 2017

All links to the distributed branch are unreachable. Is it possible to have access to the current source ?

Otherwise, I love the chosen abstraction level. One additional (future) feature I'd like to experiment with is RDMA in process group mode. Similar to MPI's single-sided communication.

Thanks a lot !
Edit: Grammar

seba-1511 commented Jan 18, 2017

All links to the distributed branch are unreachable. Is it possible to have access to the current source ?

Otherwise, I love the chosen abstraction level. One additional (future) feature I'd like to experiment with is RDMA in process group mode. Similar to MPI's single-sided communication.

Thanks a lot !
Edit: Grammar

@apaszke

This comment has been minimized.

Show comment
Hide comment
@apaszke

apaszke Jan 18, 2017

Member

Yeah, sorry. Once we made the repo public, all the forks got disconnected. I made it public now, so you should be able to access it. Right now we only have MPI and custom simple TCP code, but by extending the DataChannel you can make it use any other way of sending the data. Once the library is finished we'll document this for sure.

Member

apaszke commented Jan 18, 2017

Yeah, sorry. Once we made the repo public, all the forks got disconnected. I made it public now, so you should be able to access it. Right now we only have MPI and custom simple TCP code, but by extending the DataChannel you can make it use any other way of sending the data. Once the library is finished we'll document this for sure.

@soumith soumith added the 24hr+ label Apr 18, 2017

@apaszke apaszke added this to the v0.2 milestone May 22, 2017

@soumith

This comment has been minimized.

Show comment
Hide comment
@soumith

soumith Jun 13, 2017

Member

process group mode is now merged into master and enabled by default.
Master-worker mode is going to remain experimental, a lot more work wrt perf needs to be done there, and the use-cases aren't as obvious yet.

Member

soumith commented Jun 13, 2017

process group mode is now merged into master and enabled by default.
Master-worker mode is going to remain experimental, a lot more work wrt perf needs to be done there, and the use-cases aren't as obvious yet.

@soumith soumith modified the milestone: v0.2 Jun 13, 2017

@dai-dao

This comment has been minimized.

Show comment
Hide comment
@dai-dao

dai-dao Aug 23, 2017

Hi,

I really enjoy the new Pytorch release, however I tried the torch.distributed interface and here's the error that I'm getting, any advice? Thank you!

import torch.distributed as dist

dist.init_process_group(backend='mpi',
                        world_size=4)

print('Hello from process {} (out of {})!'.format(
        dist.get_rank(), dist.get_world_size()))

RuntimeError: the MPI backend is not available; try to recompile the THD package with MPI support at /tmp/pip-4omfpcm6-build/torch/lib/THD/process_group/General.cpp:17

dai-dao commented Aug 23, 2017

Hi,

I really enjoy the new Pytorch release, however I tried the torch.distributed interface and here's the error that I'm getting, any advice? Thank you!

import torch.distributed as dist

dist.init_process_group(backend='mpi',
                        world_size=4)

print('Hello from process {} (out of {})!'.format(
        dist.get_rank(), dist.get_world_size()))

RuntimeError: the MPI backend is not available; try to recompile the THD package with MPI support at /tmp/pip-4omfpcm6-build/torch/lib/THD/process_group/General.cpp:17

@soumith soumith added this to Uncategorized in Issue Status Aug 23, 2017

@apaszke

This comment has been minimized.

Show comment
Hide comment
@apaszke

apaszke Aug 25, 2017

Member

I think the issue is that binaries are compiled without MPI (we'd need to ship them with an MPI library otherwise). You'll need to install from source if you want to use this backend

Member

apaszke commented Aug 25, 2017

I think the issue is that binaries are compiled without MPI (we'd need to ship them with an MPI library otherwise). You'll need to install from source if you want to use this backend

@apaszke

This comment has been minimized.

Show comment
Hide comment
@apaszke

apaszke Aug 25, 2017

Member

I'm also closing this issue, because it's not needed anymore

Member

apaszke commented Aug 25, 2017

I'm also closing this issue, because it's not needed anymore

@KaiyuYue

This comment has been minimized.

Show comment
Hide comment
@KaiyuYue

KaiyuYue Apr 5, 2018

Contributor

How to launch the jobs
We'll provide a pytorch_exec utility that will spawn the process groups in a similar fashion that mpiexec does.

Hi, @apaszke. Sorry to bother. Where can I find the pytorch_exec utility and execute it to launch the distributed jobs?

Contributor

KaiyuYue commented Apr 5, 2018

How to launch the jobs
We'll provide a pytorch_exec utility that will spawn the process groups in a similar fashion that mpiexec does.

Hi, @apaszke. Sorry to bother. Where can I find the pytorch_exec utility and execute it to launch the distributed jobs?

@fmassa

This comment has been minimized.

Show comment
Hide comment
@fmassa

fmassa Apr 5, 2018

Member

@KaiyuYue look for torch.distributed.launch

Member

fmassa commented Apr 5, 2018

@KaiyuYue look for torch.distributed.launch

@teng-li

This comment has been minimized.

Show comment
Hide comment
@teng-li

teng-li Apr 6, 2018

Contributor

@KaiyuYue also look at pytorch/examples#306 for reference on how to use torch.distributed.launch

Contributor

teng-li commented Apr 6, 2018

@KaiyuYue also look at pytorch/examples#306 for reference on how to use torch.distributed.launch

bddppq pushed a commit that referenced this issue Oct 15, 2018

remove value proto message since it's not being used in any model. (#241
)

* remove value proto message since it's not being used in any model.

* remove changes to onnx.proto (3)

* adding onnx protos back. :)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment