Skip to content
Permalink
Browse files

add mnist training unit test for worker with multi-PS (#1434)

* init

* debug

* clean code

* clean code
  • Loading branch information...
QiJune committed Nov 8, 2019
1 parent 4c5dfcc commit 067f3e64d786b09f3a91aba787eaa9556f21f95b
@@ -18,35 +18,14 @@
)
from elasticdl.python.ps.embedding_table import get_slot_table_name
from elasticdl.python.ps.parameter_server import ParameterServer
from elasticdl.python.tests.test_utils import PserverArgs

_test_model_zoo_path = os.path.dirname(os.path.realpath(__file__))
_module_file = get_module_file_path(
_test_model_zoo_path, "test_module.custom_model"
)


class PserverArgs(object):
def __init__(
self,
grads_to_wait=8,
lr_staleness_modulation=0,
use_async=False,
model_zoo=_test_model_zoo_path,
model_def="test_module.custom_model",
optimizer="optimizer",
port=9999,
log_level="INFO",
):
self.grads_to_wait = grads_to_wait
self.lr_staleness_modulation = lr_staleness_modulation
self.use_async = use_async
self.model_zoo = model_zoo
self.model_def = model_def
self.optimizer = optimizer
self.port = port
self.log_level = log_level


class PserverServicerTest(unittest.TestCase):
def setUp(self):
self._port = 9999
@@ -80,6 +59,8 @@ def create_server_and_stub(
lr_staleness_modulation=lr_staleness_modulation,
use_async=use_async,
port=self._port,
model_zoo=_test_model_zoo_path,
model_def="test_module.custom_model",
**kwargs,
)
pserver = ParameterServer(args)
@@ -21,6 +21,28 @@
from elasticdl.python.worker.worker import Worker


class PserverArgs(object):
def __init__(
self,
grads_to_wait=8,
lr_staleness_modulation=0,
use_async=False,
model_zoo=None,
model_def=None,
optimizer="optimizer",
port=9999,
log_level="INFO",
):
self.grads_to_wait = grads_to_wait
self.lr_staleness_modulation = lr_staleness_modulation
self.use_async = use_async
self.model_zoo = model_zoo
self.model_def = model_def
self.optimizer = optimizer
self.port = port
self.log_level = log_level


class DatasetName(object):
IMAGENET = "imagenet1"
FRAPPE = "frappe1"
@@ -10,6 +10,7 @@
from elasticdl.python.common.hash_utils import string_to_id
from elasticdl.python.common.model_utils import get_model_spec
from elasticdl.python.ps.parameter_server import ParameterServer
from elasticdl.python.tests.test_utils import PserverArgs
from elasticdl.python.worker.worker import Worker


@@ -24,36 +25,15 @@ def random_batch(batch_size):
return images, labels


_model_zoo_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "../../../model_zoo"
)
_test_model_zoo_path = os.path.dirname(os.path.realpath(__file__))


class PserverArgs(object):
def __init__(
self,
grads_to_wait=8,
lr_staleness_modulation=0,
use_async=False,
model_zoo=_test_model_zoo_path,
model_def="test_module.custom_model",
optimizer="optimizer",
port=9999,
log_level="INFO",
):
self.grads_to_wait = grads_to_wait
self.lr_staleness_modulation = lr_staleness_modulation
self.use_async = use_async
self.model_zoo = model_zoo
self.model_def = model_def
self.optimizer = optimizer
self.port = port
self.log_level = log_level


class WorkerMNISTTest(unittest.TestCase):
def setUp(self):
self._model_zoo_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "../../../model_zoo"
)
self._model_def = (
"mnist_functional_api.mnist_functional_api.custom_model"
)
self._batch_size = 16
ports = [12345, 12346]
self._pserver, self._channel = self._create_pserver_and_channel(ports)

@@ -65,7 +45,13 @@ def _create_pserver_and_channel(self, ports):
pservers = []
channels = []
for port in ports:
args = PserverArgs(grads_to_wait=1, use_async=True, port=port,)
args = PserverArgs(
grads_to_wait=1,
use_async=False,
port=port,
model_zoo=self._model_zoo_path,
model_def=self._model_def,
)
pserver = ParameterServer(args)
pserver.prepare()
pservers.append(pserver)
@@ -87,20 +73,17 @@ def _create_pserver_and_channel(self, ports):
channels.append(channel)
return pservers, channels

def test_train(self):
def test_compare_onebatch_train(self):
images, labels = random_batch(self._batch_size)
# TODO(yunjian.lmh): test optimizer wrapper
model_def = "mnist_functional_api.mnist_functional_api.custom_model"
batch_size = 16
images, labels = random_batch(batch_size)

tf.keras.backend.clear_session()
tf.random.set_seed(22)
worker = Worker(
worker_id=0,
job_type=elasticdl_pb2.TRAINING,
minibatch_size=batch_size,
model_zoo=_model_zoo_path,
model_def=model_def,
minibatch_size=self._batch_size,
model_zoo=self._model_zoo_path,
model_def=self._model_def,
ps_channels=self._channel,
)
worker._run_model_call_before_training(images)
@@ -119,8 +102,8 @@ def test_train(self):
eval_metrics_fn,
prediction_outputs_processor,
) = get_model_spec(
model_zoo=_model_zoo_path,
model_def=model_def,
model_zoo=self._model_zoo_path,
model_def=self._model_def,
dataset_fn="dataset_fn",
model_params=None,
loss="loss",
@@ -133,8 +116,8 @@ def test_train(self):
output = model.call(images, training=True)
labels = tf.reshape(labels, [-1])
loss = loss_fn(output, labels)
grads = tape.gradient(loss, model.trainable_variables)
opt_fn().apply_gradients(zip(grads, model.trainable_variables))
grads = tape.gradient(loss, model.trainable_variables)
opt_fn().apply_gradients(zip(grads, model.trainable_variables))

for v in model.trainable_variables:
ps_id = string_to_id(v.name, len(self._channel))
@@ -143,6 +126,106 @@ def test_train(self):
)
np.testing.assert_array_equal(ps_v.numpy(), v.numpy())

def test_compare_mnist_train(self):
(
(x_train, y_train),
(x_test, y_test),
) = tf.keras.datasets.mnist.load_data()
x_train = tf.convert_to_tensor(x_train, dtype=tf.float32) / 255.0
y_train = tf.convert_to_tensor(y_train, dtype=tf.int32)

x_test = tf.convert_to_tensor(x_test, dtype=tf.float32) / 255.0
y_test = tf.convert_to_tensor(y_test, dtype=tf.int32)

db = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db = db.batch(self._batch_size).repeat(10)
test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_db = test_db.batch(self._batch_size)

acc_meter = tf.keras.metrics.Accuracy()

tf.keras.backend.clear_session()
tf.random.set_seed(22)

worker = Worker(
worker_id=0,
job_type=elasticdl_pb2.TRAINING,
minibatch_size=self._batch_size,
model_zoo=self._model_zoo_path,
model_def=self._model_def,
ps_channels=self._channel,
)

worker_results = []
for step, (x, y) in enumerate(db):
if step == 0:
worker._run_model_call_before_training(x)
worker.report_variable()

worker.get_model(step, elasticdl_pb2.MINIMUM)
w_loss, w_grads = worker.training_process_eagerly(x, y)
worker.report_gradient(w_grads)

if step % 20 == 0:
worker.get_model(step, elasticdl_pb2.MINIMUM)
for (x, y) in test_db:
out = worker.forward_process(x)
acc_meter.update_state(tf.argmax(out, axis=1), y)

worker_results.append(
(float(w_loss.numpy()), float(acc_meter.result().numpy()))
)
acc_meter.reset_states()

if step > 50:
break

tf.keras.backend.clear_session()
tf.random.set_seed(22)
acc_meter.reset_states()

(
model,
dataset_fn,
loss_fn,
opt_fn,
eval_metrics_fn,
prediction_outputs_processor,
) = get_model_spec(
model_zoo=self._model_zoo_path,
model_def=self._model_def,
dataset_fn="dataset_fn",
model_params=None,
loss="loss",
optimizer="optimizer",
eval_metrics_fn="eval_metrics_fn",
prediction_outputs_processor="PredictionOutputsProcessor",
)
local_results = []
for step, (x, y) in enumerate(db):

with tf.GradientTape() as tape:
out = model.call(x, training=True)
ll = loss_fn(out, y)
grads = tape.gradient(ll, model.trainable_variables)
opt_fn().apply_gradients(zip(grads, model.trainable_variables))

if step % 20 == 0:
for (x, y) in test_db:
out = model.call(x, training=False)
acc_meter.update_state(tf.argmax(out, axis=1), y)

local_results.append(
(float(ll.numpy()), float(acc_meter.result().numpy()))
)
acc_meter.reset_states()

if step > 50:
break

for w, l in zip(worker_results, local_results):
self.assertTupleEqual(w, l)


if __name__ == "__main__":
unittest.main()

0 comments on commit 067f3e6

Please sign in to comment.
You can’t perform that action at this time.