Skip to content

Commit

Permalink
Generate test checkpoint data automatically (#1738)
Browse files Browse the repository at this point in the history
* Genearte checkpoint for test automatically

* commit the python script to generate mnist checkpoint file

* Reformat code by pre-commit

* Recovery integration test of training and evaluation

* Fix the path of python scripts for client_test.sh

* Fix the command to generate checkpoint

* Add log for checkpoint

* Add log for checkpoints

* fix mount path

* print the mount dir content

* Remove codes to print log

* Remove a backslash

* Only create train end task for training
  • Loading branch information
workingloong committed Feb 18, 2020
1 parent 791484f commit 36af4e8
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 59 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Expand Up @@ -55,7 +55,9 @@ jobs:
- |
JOB_TYPES=(
odps
train # TODO: fix issue 1737
train
evaluate
predict
local
)
for JOB_TYPE in "${JOB_TYPES[@]}"; do
Expand Down
2 changes: 2 additions & 0 deletions elasticdl/python/master/task_dispatcher.py
Expand Up @@ -190,6 +190,8 @@ def _create_train_end_callback_task(self):
SavedModelExporter to execute on_train_end,we include
a shard of data in this task.
"""
if not self._training_shards:
return

self.reset_job_counters(elasticdl_pb2.TRAIN_END_CALLBACK)
shards = self._training_shards
Expand Down
20 changes: 12 additions & 8 deletions elasticdl/python/tests/callbacks_test.py
Expand Up @@ -12,6 +12,7 @@
from elasticdl.python.elasticdl.callbacks.saved_model_exporter import (
SavedModelExporter,
)
from elasticdl.python.tests.test_utils import save_checkpoint_without_embedding
from elasticdl.python.worker.task_data_service import TaskDataService


Expand Down Expand Up @@ -50,17 +51,20 @@ def test_on_train_end(self):
elasticdl_pb2.TRAIN_END_CALLBACK,
)
task_data_service.get_dataset_by_task = mock.Mock(return_value=dataset)
model_handler = ModelHandler.get_model_handler(
distribution_strategy=DistributionStrategy.PARAMETER_SERVER,
checkpoint_dir="elasticdl/python/tests/testdata/functional_ckpt/",
)
saved_model_exporter = SavedModelExporter(
task_data_service, dataset_fn, model_handler
)

with tempfile.TemporaryDirectory() as temp_dir_name:
checkpoint_dir = os.path.join(temp_dir_name, "checkpoint")
model = custom_model_with_embedding_layer()
save_checkpoint_without_embedding(model, checkpoint_dir)
model_handler = ModelHandler.get_model_handler(
distribution_strategy=DistributionStrategy.PARAMETER_SERVER,
checkpoint_dir=checkpoint_dir,
)
saved_model_exporter = SavedModelExporter(
task_data_service, dataset_fn, model_handler
)
saved_model_path = os.path.join(temp_dir_name, "test_exporter")
params = {"batch_size": 10, "saved_model_path": saved_model_path}
model = custom_model_with_embedding_layer()
saved_model_exporter.set_params(params)
saved_model_exporter.set_model(model)
saved_model_exporter.on_train_end()
Expand Down
106 changes: 59 additions & 47 deletions elasticdl/python/tests/model_handler_test.py
@@ -1,3 +1,5 @@
import os
import tempfile
import unittest

import numpy as np
Expand Down Expand Up @@ -147,7 +149,7 @@ def setUp(self):
tf.keras.backend.clear_session()
self.model_handler = ModelHandler.get_model_handler(
distribution_strategy=DistributionStrategy.PARAMETER_SERVER,
checkpoint_dir="elasticdl/python/tests/testdata/functional_ckpt/",
checkpoint_dir="",
)

def _save_model(self, is_subclass):
Expand Down Expand Up @@ -185,68 +187,78 @@ def test_get_model_with_embedding_column_to_train(self):
)

def test_get_model_to_export(self):
self._save_model(False)
model_inst = custom_model_with_embedding_layer()
train_model = self.model_handler.get_model_to_train(model_inst)
export_model = self.model_handler.get_model_to_export(
train_model, dataset=None
)
with tempfile.TemporaryDirectory() as temp_dir:
self.model_handler._checkpoint_dir = os.path.join(
temp_dir, "test_export"
)
self._save_model(False)
model_inst = custom_model_with_embedding_layer()
train_model = self.model_handler.get_model_to_train(model_inst)
export_model = self.model_handler.get_model_to_export(
train_model, dataset=None
)

test_data = tf.constant([0])
result = export_model.call(test_data).numpy()
self.assertEqual(result[0][0], 3.0)
test_data = tf.constant([0])
result = export_model.call(test_data).numpy()
self.assertEqual(result[0][0], 3.0)

def test_get_subclass_model_to_export(self):
self.model_handler._checkpoint_dir = (
"elasticdl/python/tests/testdata/subclass_ckpt/"
)
self._save_model(True)

def _get_dataset():
dataset = tf.data.Dataset.from_tensor_slices(
np.random.randint(0, 10, (10, 4))
with tempfile.TemporaryDirectory() as temp_dir:
self.model_handler._checkpoint_dir = os.path.join(
temp_dir, "test_export"
)
dataset = dataset.batch(2)
return dataset
self._save_model(True)

model_inst = CustomModel()
dataset = _get_dataset()
def _get_dataset():
dataset = tf.data.Dataset.from_tensor_slices(
np.random.randint(0, 10, (10, 4))
)
dataset = dataset.batch(2)
return dataset

train_model = self.model_handler.get_model_to_train(model_inst)
self.assertEqual(type(train_model.embedding), Embedding)
model_inst = CustomModel()
dataset = _get_dataset()

export_model = self.model_handler.get_model_to_export(
train_model, dataset=dataset
)
train_model = self.model_handler.get_model_to_train(model_inst)
self.assertEqual(type(train_model.embedding), Embedding)

test_data = tf.constant([0])
result = export_model.call(test_data).numpy()
self.assertEqual(result[0][0], 3.0)
export_model = self.model_handler.get_model_to_export(
train_model, dataset=dataset
)

test_data = tf.constant([0])
result = export_model.call(test_data).numpy()
self.assertEqual(result[0][0], 3.0)

def test_get_model_with_sparse_to_train(self):
model_inst = custom_model_with_sparse_embedding()
model_inst = self.model_handler.get_model_to_train(model_inst)
self.assertEqual(type(model_inst.layers[1]), Embedding)

def test_get_model_with_sparse_to_export(self):
self._save_model(False)
model_inst = custom_model_with_sparse_embedding()
train_model = self.model_handler.get_model_to_train(model_inst)

# Model handler will restore model parameters from the checkpoint
# directory and assign parameters to train_model.
export_model = self.model_handler.get_model_to_export(
train_model, dataset=None
)
test_data = tf.SparseTensor(
indices=[[0, 0]], values=[0], dense_shape=(1, 1)
)
result = export_model.call(test_data).numpy()
with tempfile.TemporaryDirectory() as temp_dir:
self.model_handler._checkpoint_dir = os.path.join(
temp_dir, "test_export"
)
self._save_model(False)
model_inst = custom_model_with_sparse_embedding()
train_model = self.model_handler.get_model_to_train(model_inst)

# Model handler will restore model parameters from the checkpoint
# directory and assign parameters to train_model.
export_model = self.model_handler.get_model_to_export(
train_model, dataset=None
)
test_data = tf.SparseTensor(
indices=[[0, 0]], values=[0], dense_shape=(1, 1)
)
result = export_model.call(test_data).numpy()

# The embedding table in checkpoint file is
# [[1.0, 1.0], [1.0, 1.0], [1.0,1.0], [1.0, 1.0]], weights in the dense
# layer is [[1.0],[1.0]], bias is [1.0]. So the result is 3.0.
self.assertEqual(result[0][0], 3.0)
# The embedding table in checkpoint file is
# [[1.0, 1.0], [1.0, 1.0], [1.0,1.0], [1.0, 1.0]], weights in the
# dense layer is [[1.0],[1.0]], bias is [1.0]. So the result
# is 3.0.
self.assertEqual(result[0][0], 3.0)


if __name__ == "__main__":
Expand Down
13 changes: 12 additions & 1 deletion elasticdl/python/tests/test_utils.py
Expand Up @@ -23,13 +23,14 @@
get_module_file_path,
load_module,
)
from elasticdl.python.common.save_utils import CheckpointSaver
from elasticdl.python.data.recordio_gen.frappe_recordio_gen import (
load_raw_data,
)
from elasticdl.python.master.evaluation_service import EvaluationService
from elasticdl.python.master.servicer import MasterServicer
from elasticdl.python.master.task_dispatcher import _TaskDispatcher
from elasticdl.python.ps.parameter_server import ParameterServer
from elasticdl.python.ps.parameter_server import Parameters, ParameterServer
from elasticdl.python.tests.in_process_master import InProcessMaster
from elasticdl.python.worker.worker import Worker

Expand Down Expand Up @@ -631,3 +632,13 @@ def get_frappe_dataset(batch_size):
test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_db = test_db.batch(batch_size)
return db, test_db


def save_checkpoint_without_embedding(model, checkpoint_dir, version=100):
checkpoint_saver = CheckpointSaver(checkpoint_dir, 0, 0, False)
params = Parameters()
for var in model.trainable_variables:
params.non_embedding_params[var.name] = var
params.version = version
model_pb = params.to_model_pb()
checkpoint_saver.save(version, model_pb, False)
Binary file not shown.
Binary file not shown.
Binary file not shown.
9 changes: 7 additions & 2 deletions scripts/client_test.sh
Expand Up @@ -4,6 +4,11 @@ JOB_TYPE=$1
PS_NUM=$2
WORKER_NUM=$3

# Generate checkpoint for mnist to test evaluation and prediction
MNIST_CKPT_DIR=model_zoo/test_data/mnist_ckpt/
python -m scripts.gen_mnist_checkpoint --checkpoint_dir=${MNIST_CKPT_DIR}


if [[ "$JOB_TYPE" == "train" ]]; then
elasticdl train \
--image_base=elasticdl:ci \
Expand Down Expand Up @@ -37,7 +42,7 @@ elif [[ "$JOB_TYPE" == "evaluate" ]]; then
--image_base=elasticdl:ci \
--model_zoo=model_zoo \
--model_def=mnist_functional_api.mnist_functional_api.custom_model \
--checkpoint_dir_for_init=elasticdl/python/tests/testdata/mnist_functional_api_model/version-110 \
--checkpoint_dir_for_init=model_zoo/${MNIST_CKPT_DIR}/version-100 \
--validation_data=/data/mnist/test \
--num_epochs=1 \
--master_resource_request="cpu=0.3,memory=1024Mi" \
Expand All @@ -60,7 +65,7 @@ elif [[ "$JOB_TYPE" == "predict" ]]; then
--image_base=elasticdl:ci \
--model_zoo=model_zoo \
--model_def=mnist_functional_api.mnist_functional_api.custom_model \
--checkpoint_dir_for_init=elasticdl/python/tests/testdata/mnist_functional_api_model/version-110 \
--checkpoint_dir_for_init=model_zoo/${MNIST_CKPT_DIR}/version-100 \
--prediction_data=/data/mnist/test \
--master_resource_request="cpu=0.2,memory=1024Mi" \
--master_resource_limit="cpu=1,memory=2048Mi" \
Expand Down
37 changes: 37 additions & 0 deletions scripts/gen_mnist_checkpoint.py
@@ -0,0 +1,37 @@
import argparse

import tensorflow as tf

from elasticdl.python.tests.test_utils import save_checkpoint_without_embedding


def mnist_custom_model():
inputs = tf.keras.Input(shape=(28, 28), name="image")
x = tf.keras.layers.Reshape((28, 28, 1))(inputs)
x = tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu")(x)
x = tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu")(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(x)
x = tf.keras.layers.Dropout(0.25)(x)
x = tf.keras.layers.Flatten()(x)
outputs = tf.keras.layers.Dense(10)(x)

return tf.keras.Model(inputs=inputs, outputs=outputs, name="mnist_model")


def add_params(parser):
parser.add_argument(
"--checkpoint_dir",
help="The directory to store the mnist checkpoint",
default="",
type=str,
)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
add_params(parser)
args, _ = parser.parse_known_args()
print(args)
model = mnist_custom_model()
save_checkpoint_without_embedding(model, args.checkpoint_dir)

0 comments on commit 36af4e8

Please sign in to comment.