Skip to content

Commit

Permalink
Adaptive SGD that combines model averaging and S-SGD. (#221)
Browse files Browse the repository at this point in the history
* add new kungfu optimizer

* add optimizer to init

* add missing self

* use lambda in tf.conf

* Working adaptive SGD.

* rebroadcast variables when switching from SMA to SSGD.

* fix format check,

* make it more clear how to use kungfurun on multiple nodes.

* fix reinit bug by using a hook

* use _tf_hook

* make path absolut

* clean code

* Fix format

* try to fix lint error

* clean
  • Loading branch information
marwage committed Dec 9, 2019
1 parent 67c4ed3 commit 5c7bcfd
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 87 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ cd KungFu
pip3 install --no-index -U .
```

KungFu provides ``kungfu-run`` to launch a training program on a multi-GPU server.
KungFu provides ``kungfu-run`` to launch a KungFu process on a multi-GPU server.
In a cluster, we need to launch ``kungfu-run`` on each node.

```bash
# Build and install kungfu-run in a given GOBIN directory.
Expand All @@ -98,7 +99,7 @@ Download the MNIST dataset ([script](scripts/download-mnist.sh)) first and then
kungfu-run -np 4 python3 examples/tf1_mnist_session.py --data-dir=./mnist
```

You can run this example on two machines (each with 8 GPUs):
You can run this example on two machines (assuming each with 8 GPUs) using the below command (NOTE: this command must be called on each machine):

```bash
# Assume the machines have NIC eth0 and their IPs are 192.168.0.1 and 192.168.0.2.
Expand Down
7 changes: 3 additions & 4 deletions examples/tf1_mnist_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,9 @@ def build_optimizer(name, batch_size):
elif name == 'sma':
from kungfu.tensorflow.optimizers import SynchronousAveragingOptimizer
return SynchronousAveragingOptimizer(optimizer)
elif name == 'noise-scale':
from kungfu.tensorflow.optimizers import SyncSGDWithGradNoiseScaleOptimizer
return SyncSGDWithGradNoiseScaleOptimizer(optimizer,
device_batch_size=batch_size)
elif name == 'ada-sgd':
from kungfu.tensorflow.optimizers import AdaptiveSGDOptimizer
return AdaptiveSGDOptimizer(optimizer, change_step=300)
else:
raise RuntimeError('unknown optimizer: %s' % name)

Expand Down
1 change: 1 addition & 0 deletions srcs/python/kungfu/tensorflow/ops/collective.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import tensorflow as tf
from kungfu._utils import map_maybe

from ._tf_oplib import _op_lib
Expand Down
2 changes: 1 addition & 1 deletion srcs/python/kungfu/tensorflow/optimizers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# from .ada_sgd import AdaptiveSGDOptimizer
from .ada_sgd import AdaptiveSGDOptimizer
from .async_sgd import PairAveragingOptimizer
from .elastic import ElasticSyncSGDOptimizer
from .grad_noise_scale import MonitorGradientNoiseScaleOptimizer
Expand Down
153 changes: 73 additions & 80 deletions srcs/python/kungfu/tensorflow/optimizers/ada_sgd.py
Original file line number Diff line number Diff line change
@@ -1,90 +1,83 @@
import tensorflow as tf
from kungfu.tensorflow.ops import (barrier, broadcast, current_cluster_size,
current_rank, defuse, fuse,
group_all_reduce,
request_variable_with_template,
save_variable)

from .async_sgd import get_random_peer
from .core import KungFuOptimizer


class AdaptiveSGDOptimizer(KungFuOptimizer):
"""AdaptiveSGDOptimizer.
Args:
optimizer:
Optimizer to use for computing gradients and applying updates.
name:
Optional name prefix for the operations created when applying
gradients. Defaults to "KungFu" followed by the provided
optimizer type.
use_locking:
Whether to use locking when updating variables.
See Optimizer.__init__ for more info.
"""
def __init__(self, optimizer, interval, name=None, use_locking=False):
super(AdaptiveSGDOptimizer, self).__init__(optimizer,
name,
use_locking=use_locking)
from kungfu._utils import map_maybe
from kungfu.tensorflow.compat import _tf_assign, _tf_hook
from kungfu.tensorflow.initializer import BroadcastGlobalVariablesOp
from kungfu.tensorflow.ops import (counter, current_cluster_size,
group_all_reduce)
from kungfu.tensorflow.optimizers.core import (_create_kungfu_keras_optimizer,
_create_kungfu_optimizer,
_KungFuAlgorithm)


def AdaptiveSGDOptimizer(optimizer,
change_step,
alpha=0.1,
name=None,
use_locking=False,
with_keras=False):

algo = _AdaptiveSGD(change_step, alpha)
if not with_keras:
return _create_kungfu_optimizer(optimizer, algo, name, use_locking)
else:
return _create_kungfu_keras_optimizer(optimizer, algo)


class _AdaptiveSGD(_KungFuAlgorithm):
def __init__(self, change_step, alpha):
self._num_workers = current_cluster_size()
self._rank = current_rank()
self._step = tf.Variable(0, trainable=False, dtype=tf.int32)
self._interval = interval

def _build_request_and_save_ops(self, target, variables):
var_fused = fuse(variables)
save_model_op = save_variable(var_fused)
other_peer_var_fused = request_variable_with_template(
target, var_fused)
other_peer_vars = defuse(other_peer_var_fused,
[v.shape for v in variables])
self._save_model_op = save_model_op # save for _get_initializer_op
return other_peer_vars, save_model_op

# Asynchronous decentralised parallel SGD
def _async_ma_sgd(self, grads_and_vars, **kwargs):
target = get_random_peer(self._num_workers, self._rank)
variables = [v for _g, v in grads_and_vars]
other_peer_vars, save_model_op = self._build_request_and_save_ops(
target, variables)
self._alpha = alpha
self._change_step = change_step
self._global_step = tf.train.get_or_create_global_step()

assign_ops = [
tf.assign(v, 0.5 * (v + other_v))
for v, other_v in zip(variables, other_peer_vars)
]
def _ssgd(self, apply_grads_func, gradients, variables, **kwargs):
sum_grads = group_all_reduce(gradients)
avg_grads = map_maybe(lambda g: g / self._num_workers, sum_grads)

apply_op = self._optimizer.apply_gradients(grads_and_vars, **kwargs)
with tf.control_dependencies(assign_ops):
with tf.control_dependencies([apply_op]):
with tf.control_dependencies([save_model_op]):
return tf.group(apply_op)
# We need to re-zip gradients and variables as grads_and_vars can be only unzipped once.
grads_and_vars = zip(avg_grads, variables)

return apply_grads_func(grads_and_vars, **kwargs)

# Synchronous model averaging SGD (SMA)
def _sync_ma_sgd(self, grads_and_vars, **kwargs):
_, variables = list(zip(*grads_and_vars))
def _sma(self, apply_grads_func, gradients, variables, **kwargs):
# It is important to apply model averaging every iteration [2]
sum_vars = group_all_reduce(variables)
avg_vars = [g / self._num_workers for g in sum_vars]
avg_vars = [v / self._num_workers for v in sum_vars]

# TODO: Apply momentum to the averaged model [2]
assign_ops = [
tf.assign(v, avg_v) for v, avg_v in zip(variables, avg_vars)
_tf_assign(v, (1 - self._alpha) * v + self._alpha * avg_v)
for v, avg_v in zip(variables, avg_vars)
]

# We need to re-zip gradients and variables as grads_and_vars can be only unzipped once.
grads_and_vars = zip(gradients, variables)

# We can overlap model averaging and local SGD [2].
with tf.control_dependencies(assign_ops):
return self._optimizer.apply_gradients(grads_and_vars, **kwargs)

def apply_gradients(self, grads_and_vars, **kwargs):
cond_op = tf.equal(tf.mod(self._step, self._interval), 0)
with tf.control_dependencies([tf.assign_add(self._step, 1)]):
return tf.cond(
cond_op, lambda: self._sync_ma_sgd(grads_and_vars, **kwargs),
lambda: self._async_ma_sgd(grads_and_vars, **kwargs))

def distributed_initializer(self):
bcast_ops = []
for v in self.variables():
bcast_ops.append(tf.assign(v, broadcast(v)))

with tf.control_dependencies(bcast_ops):
with tf.control_dependencies([self._save_model_op]):
return barrier()
return apply_grads_func(grads_and_vars, **kwargs)

def apply_gradients(self, apply_grads_func, grads_and_vars, **kwargs):
g, v = list(zip(*grads_and_vars))

return tf.cond(tf.math.less(self._global_step, self._change_step),
lambda: self._sma(apply_grads_func, g, v, **kwargs),
lambda: self._ssgd(apply_grads_func, g, v, **kwargs))


class AdaSGDHook(_tf_hook):
def __init__(self, change_step):
super(AdaSGDHook, self).__init__()
self._change_step = change_step

def begin(self):
from kungfu.tensorflow.ops import broadcast
self._ops = [tf.assign(v, broadcast(v)) for v in tf.global_variables()]

def after_create_session(self, session, coord):
self._global_step = tf.train.get_global_step()

def after_run(self, run_context, run_values):
global_step = run_context.session.run(self._global_step)
if self._change_step == global_step:
run_context.session.run(self._ops)

0 comments on commit 5c7bcfd

Please sign in to comment.