Skip to content

Commit

Permalink
Merge pull request #99 from sony/feature/20171218-collectives
Browse files Browse the repository at this point in the history
New collectives and MPI-like group concept
  • Loading branch information
TakuyaNarihira committed Feb 14, 2018
2 parents 5991f6d + 5dbe98a commit 1cee743
Show file tree
Hide file tree
Showing 16 changed files with 1,198 additions and 202 deletions.
157 changes: 78 additions & 79 deletions doc/python/tutorial/multi_device_training.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ multiple devices. It is normally used for gradients exchange in data
parallel distributed training. Basically, there are two types of
distributed trainings in Neural Network literature: Data Parallel and
Model Parallel. Here we only focus on the former, Data Parallel
Training. Data Parallel Distributed Training is based on the very
simple equation used for the optimization of a neural network called
Training. Data Parallel Distributed Training is based on the very simple
equation used for the optimization of a neural network called
(Mini-Batch) Stochastic Gradient Descent.

In the optimization process, the objective one tries to minimize is
Expand Down Expand Up @@ -63,7 +63,7 @@ NOTE

This tutorial depends on **IPython Cluster**, thus when you want to run
the following excerpts of the scripts on Jupyter Notebook, follow
`this <https://ipython.org/ipython-doc/3/parallel/parallel_process.html#using-ipcluster-in-mpiexec-mpirun-mode>`_
`this <https://ipython.org/ipython-doc/3/parallel/parallel_process.html#using-ipcluster-in-mpiexec-mpirun-mode>`__
to enable mpiexec/mpirun mode, then launch a corresponding Ipython
Cluster on Ipython Clusters tab.

Expand Down Expand Up @@ -125,10 +125,10 @@ Check different ranks are assigned to different devices
[stdout:0]
n_devices=2
mpi_rank=0
mpi_rank=1
[stdout:1]
n_devices=2
mpi_rank=1
mpi_rank=0
Create data points and a very simple neural network
Expand Down Expand Up @@ -174,19 +174,16 @@ Create data points and a very simple neural network
pred = PF.affine(h, n_class, w_init=w_init)
loss = F.mean(F.softmax_cross_entropy(pred, y))
**Important to notice** here is that ``w_init`` is passed to parametric
**Important notice** here is that ``w_init`` is passed to parametric
functions to let the network on each GPU start from the same values of
trainable parameters in the optimization process.

Add trainable parameters and create a solver.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Create a solver.
~~~~~~~~~~~~~~~~

.. code:: python
%%px
# Add parameters to communicator
comm.add_context_and_parameters((ctx, nn.get_parameters()))
# Solver and add parameters
solver = S.Adam()
solver.set_parameters(nn.get_parameters())
Expand All @@ -202,15 +199,16 @@ it is
3. loss.backward()
4. solver.update()

In use of ``C.MultiProcessDataParalellCommunicator``, these steps are performed in
different GPUs, and the **only difference** from these steps is
``comm.allreduce()`` Thus, in case of ``C.MultiProcessDataParalellCommunicator``
training steps are as follows,
In use of ``C.MultiProcessDataParalellCommunicator``, these steps are
performed in different GPUs, and the **only difference** from these
steps is ``comm.all_reduce()``. Thus, in case of
``C.MultiProcessDataParalellCommunicator`` training steps are as
follows,

1. loss.forward()
2. solver.zero\_grad()
3. loss.backward()
4. **comm.allreduce()**
4. **comm.all\_reduce([x.grad for x in nn.get\_parameters().values()])**
5. solver.update()

First, forward, zero\_grad, and backward,
Expand All @@ -235,44 +233,46 @@ Check gradients of weights once,
.. parsed-literal::
[stdout:0]
('conv/W', array([[[[ 0.06888472, 0.03302665, 0.00224538],
[ 0.10095084, 0.36394489, 0.00659006],
[ 0.15155329, 0.36173904, 0.20400617]]]], dtype=float32))
('conv/b', array([ 0.09519047], dtype=float32))
('affine/W', array([[ 0.23829283, -0.23829281],
[ 0.25489166, -0.25489166],
[ 0.07387832, -0.0738783 ],
...,
[ 0.34147066, -0.34147066],
[ 0.33993909, -0.33993909],
[ 0.07020829, -0.07020829]], dtype=float32))
('affine/b', array([ 0.18422271, -0.1842227 ], dtype=float32))
('conv/W', array([[[[ 5.0180483, 0.457942 , -2.8701296],
[ 2.0715926, 3.0698593, -1.6650047],
[-2.5591214, 6.4248834, 9.881935 ]]]], dtype=float32))
('conv/b', array([8.658947], dtype=float32))
('affine/W', array([[-0.93160367, 0.9316036 ],
[-1.376812 , 1.376812 ],
[-1.8957546 , 1.8957543 ],
...,
[-0.33000934, 0.33000934],
[-0.7211893 , 0.72118926],
[-0.25237036, 0.25237036]], dtype=float32))
('affine/b', array([-0.48865744, 0.48865741], dtype=float32))
[stdout:1]
('conv/W', array([[[[ 0.28718406, 0.19707698, 0.21287963],
[ 0.27262157, 0.48162708, 0.58341372],
[ 0.09545794, 0.37022409, 0.39285854]]]], dtype=float32))
('conv/b', array([ 0.45548177], dtype=float32))
('affine/W', array([[ 0.19560671, -0.19560665],
[ 0.5929324 , -0.59293228],
[ 0.81732005, -0.81731993],
...,
[ 0.30037487, -0.30037481],
[ 0.33988202, -0.33988199],
[ 0.1787488 , -0.1787488 ]], dtype=float32))
('affine/b', array([ 0.23541948, -0.23541945], dtype=float32))
You can see the different values on each device.
('conv/W', array([[[[ -1.2505884 , -0.87151337, -8.685524 ],
[ 10.738419 , 14.676786 , 7.483423 ],
[ 5.612471 , -12.880402 , 19.141157 ]]]], dtype=float32))
('conv/b', array([13.196114], dtype=float32))
('affine/W', array([[-1.6865108 , 1.6865108 ],
[-0.938529 , 0.938529 ],
[-1.028422 , 1.028422 ],
...,
[-0.98217344, 0.98217344],
[-0.97528917, 0.97528917],
[-0.413546 , 0.413546 ]], dtype=float32))
('affine/b', array([-0.7447065, 0.7447065], dtype=float32))
You can see the different values on each device, then call
``all_reduce``,

.. code:: python
%%px
comm.allreduce(division=True)
comm.all_reduce([x.grad for x in nn.get_parameters().values()], division=True)
Commonly, ``allreduce`` only means the sum; however, ``comm.allreduce``
addresses both cases: summation and summation division.
Commonly, ``all_reduce`` only means the sum; however,
``comm.all_reduce`` addresses both cases: summation and summation
division.

Check gradients of weights again,
Again, check gradients of weights,

.. code:: python
Expand All @@ -284,34 +284,34 @@ Check gradients of weights again,
.. parsed-literal::
[stdout:0]
('conv/W', array([[[[ 0.17803439, 0.11505181, 0.1075625 ],
[ 0.1867862 , 0.422786 , 0.29500189],
[ 0.12350561, 0.36598158, 0.29843235]]]], dtype=float32))
('conv/b', array([ 0.27533612], dtype=float32))
('affine/W', array([[ 0.21694976, -0.21694973],
[ 0.42391205, -0.42391199],
[ 0.4455992 , -0.44559911],
...,
[ 0.32092276, -0.32092273],
[ 0.33991057, -0.33991054],
[ 0.12447855, -0.12447855]], dtype=float32))
('affine/b', array([ 0.20982111, -0.20982108], dtype=float32))
('conv/W', array([[[[ 1.8837299 , -0.20678568, -5.777827 ],
[ 6.4050055 , 8.8733225 , 2.9092093 ],
[ 1.5266749 , -3.2277591 , 14.511546 ]]]], dtype=float32))
('conv/b', array([21.85506], dtype=float32))
('affine/W', array([[-2.6181145, 2.6181145],
[-2.315341 , 2.315341 ],
[-2.9241767, 2.9241762],
...,
[-1.3121828, 1.3121828],
[-1.6964785, 1.6964784],
[-0.6659163, 0.6659163]], dtype=float32))
('affine/b', array([-1.233364 , 1.2333639], dtype=float32))
[stdout:1]
('conv/W', array([[[[ 0.17803439, 0.11505181, 0.1075625 ],
[ 0.1867862 , 0.422786 , 0.29500189],
[ 0.12350561, 0.36598158, 0.29843235]]]], dtype=float32))
('conv/b', array([ 0.27533612], dtype=float32))
('affine/W', array([[ 0.21694976, -0.21694973],
[ 0.42391205, -0.42391199],
[ 0.4455992 , -0.44559911],
...,
[ 0.32092276, -0.32092273],
[ 0.33991057, -0.33991054],
[ 0.12447855, -0.12447855]], dtype=float32))
('affine/b', array([ 0.20982111, -0.20982108], dtype=float32))
You can see the same values over the devices because of ``allreuce``.
('conv/W', array([[[[ 1.8837299 , -0.20678568, -5.777827 ],
[ 6.4050055 , 8.8733225 , 2.9092093 ],
[ 1.5266749 , -3.2277591 , 14.511546 ]]]], dtype=float32))
('conv/b', array([21.85506], dtype=float32))
('affine/W', array([[-2.6181145, 2.6181145],
[-2.315341 , 2.315341 ],
[-2.9241767, 2.9241762],
...,
[-1.3121828, 1.3121828],
[-1.6964785, 1.6964784],
[-0.6659163, 0.6659163]], dtype=float32))
('affine/b', array([-1.233364 , 1.2333639], dtype=float32))
You can see the same values over the devices because of ``all_reuce``.

Update weights,

Expand All @@ -320,14 +320,13 @@ Update weights,
%%px
solver.update()
This concludes the usage of ``C.MultiProcessDataParalellCommunicator`` for
Data Parallel Distributed Training.
This concludes the usage of ``C.MultiProcessDataParalellCommunicator``
for Data Parallel Distributed Training.

Now you should have an understanding of how to use ``C.MultiProcessDataParalellCommunicator``, go to
the cifar10 example,
Now you should have an understanding of how to use
``C.MultiProcessDataParalellCommunicator``, go to the cifar10 example,

1. **multi\_device\_multi\_process\_classification.sh**
2. **multi\_device\_multi\_process\_classification.py**

for more details.

101 changes: 81 additions & 20 deletions include/nbla/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,59 +90,120 @@ class NBLA_API Communicator {
*/
virtual void init();

/** Create group
*/
virtual string new_group(pair<string, vector<int>> name_ranks_pair);

/** List groups
*/
virtual unordered_map<string, vector<int>> list_groups();

/** Find groups
*/
virtual vector<int> find_group(const string &group);

/** Check difference of the array classes.
* Check difference between the array class of the context and that of
* the synced_array. If it differs, the error occurs.
*/
void check_array_class(Context ctx, VariablePtr vp);

/** reduce.
@param division Divide the reduced value.
/** reduce over parameters added.
@param ndarray_list Vector of NdArrayPtr.
@param dst Distination rank.
@param division Divide the reduced value.
@param inplace Pack the arrays into one large array if false.
@param group Name of a group.
*/
virtual void reduce(bool division = false);
virtual void reduce(const vector<NdArrayPtr> &ndarray_list, int dst,
bool division = false, bool inplace = false,
const string &group = "world");

/** reduce over parameters added.
@param data NdArrayPtr.
@param dst Distination rank.
@param division Divide the reduced value.
@param inplace Pack the arrays into one large array if false.
@param group Name of a group.
*/
virtual void reduce(NdArrayPtr ndarray, int dst, bool division = false,
bool inplace = false, const string &group = "world");

/** allreduce over parameters added.
Currently, \e iallreduce is applied to gradient regions.
Deprecated. Use all_reduce.
Currently, \e allreduce is applied to gradient regions.
@param division Divide the reduced value.
@param inplace Pack the arrays into one large array if flase.
@param inplace Pack the arrays into one large array if false.
@param group Name of a group.
*/
virtual void allreduce(bool division = false, bool inplace = false);

/** all_reduce over parameters added.
Currently, \e iallreduce is applied to gradient regions.
@param ndarray_list Vector of NdArrayPtr
@param division Divide the reduced value.
@param inplace Pack the arrays into one large array if flase.
@param inplace Pack the arrays into one large array if false.
@param group Name of a group.
*/
virtual void all_reduce(vector<NdArrayPtr> ndarray_list,
bool division = false, bool inplace = false);
virtual void all_reduce(const vector<NdArrayPtr> &ndarray_list,
bool division = false, bool inplace = false,
const string &group = "world");

/** all_reduce over parameters added.
Currently, \e iallreduce is applied to gradient regions.
@param data NdArrayPtr
@param division Divide the reduced value.
@param inplace Pack the arrays into one large array if flase.
@param inplace Pack the arrays into one large array if false.
@param group Name of a group.
*/
virtual void all_reduce(NdArrayPtr ndarray, bool division = false,
bool inplace = false);
bool inplace = false, const string &group = "world");

/** reducescatter.
/** reduce_scatter.
@param ndarray_list Vector of NdArrayPtr
@param ndarray NdArrayPtr
@param division Divide the reduced value.
@param group Name of a group.
*/
virtual void reducescatter(bool division = false);
virtual void reduce_scatter(const vector<NdArrayPtr> &ndarray_list,
NdArrayPtr ndarray, bool division = false,
const string &group = "world");

/** broadcast.
*
@param ndarray_list Vector of NdArrayPtr.
@param src Source rank.
@param inplace Pack the arrays into one large array if false.
@param group Name of a group.
*/
virtual void bcast();
virtual void bcast(const vector<NdArrayPtr> &ndarray_list, int src,
bool inplace = false, const string &group = "world");

/** allgather.
*
/** broadcast.
@param data NdArrayPtr.
@param src Source rank.
@param inplace Pack the arrays into one large array if false.
@param group Name of a group.
*/
virtual void bcast(NdArrayPtr ndarray, int src, bool inplace = false,
const string &group = "world");

/** all_gather.
@param ndarray data to be sent.
@param ndarray_list Vector of NdArrayPtr to recieve data.
@param group Name of a group.
*/
virtual void allgather();
virtual void all_gather(NdArrayPtr ndarray,
const vector<NdArrayPtr> &ndarray_list,
const string &group = "world");

/** reduce asynchronously.
@param division Divide the reduced value.
Expand All @@ -151,7 +212,7 @@ Currently, \e iallreduce is applied to gradient regions.

/** reduce asynchronously.
@param division Divide the reduced value.
@param inplace Pack the arrays into one large array if flase.
@param inplace Pack the arrays into one large array if false.
*/
virtual void allreduce_async(bool division = false, bool inplace = true);

Expand Down

0 comments on commit 1cee743

Please sign in to comment.