From c8e2fee7b7c5e4074096a63e5dcf878cccc2934e Mon Sep 17 00:00:00 2001 From: Lee Yang Date: Mon, 27 Jul 2020 12:46:00 -0700 Subject: [PATCH] fix pipeline example; delete commented test --- examples/mnist/estimator/mnist_pipeline.py | 6 +- test/test_pipeline.py | 76 ---------------------- 2 files changed, 3 insertions(+), 79 deletions(-) diff --git a/examples/mnist/estimator/mnist_pipeline.py b/examples/mnist/estimator/mnist_pipeline.py index 87a2c256..7326f211 100644 --- a/examples/mnist/estimator/mnist_pipeline.py +++ b/examples/mnist/estimator/mnist_pipeline.py @@ -57,8 +57,8 @@ def scale(image, label): return ds.map(scale).batch(BATCH_SIZE) def serving_input_receiver_fn(): - features = tf.compat.v1.placeholder(dtype=tf.float32, shape=[None, 28, 28, 1], name='features') - receiver_tensors = {'features': features} + features = tf.compat.v1.placeholder(dtype=tf.float32, shape=[None, 28, 28, 1], name='conv2d_input') + receiver_tensors = {'conv2d_input': features} return tf.estimator.export.ServingInputReceiver(receiver_tensors, receiver_tensors) def model_fn(features, labels, mode): @@ -179,7 +179,7 @@ def parse(ln): else: # args.mode == 'inference': # using a trained/exported model model = TFModel(args) \ - .setInputMapping({'image': 'features'}) \ + .setInputMapping({'image': 'conv2d_input'}) \ .setOutputMapping({'logits': 'prediction'}) \ .setSignatureDefKey('serving_default') \ .setExportDir(args.export_dir) \ diff --git a/test/test_pipeline.py b/test/test_pipeline.py index e848ae00..e1292edb 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -171,82 +171,6 @@ def rdd_generator(): expected = np.sum(self.weights) self.assertAlmostEqual(pred, expected, 2) -# def test_spark_sparse_tensor(self): -# """InputMode.SPARK feeding sparse tensors""" -# def sparse_train(args, ctx): -# import tensorflow as tf -# -# # reset graph in case we're re-using a Spark python worker (during tests) -# tf.compat.v1.reset_default_graph() -# -# cluster, server = ctx.start_cluster_server(ctx) -# if ctx.job_name == "ps": -# server.join() -# elif ctx.job_name == "worker": -# with tf.device(tf.compat.v1.train.replica_device_setter( -# worker_device="/job:worker/task:%d" % ctx.task_index, -# cluster=cluster)): -# y_ = tf.compat.v1.placeholder(tf.float32, name='y_label') -# label = tf.identity(y_, name='label') -# -# row_indices = tf.compat.v1.placeholder(tf.int64, name='x_row_indices') -# col_indices = tf.compat.v1.placeholder(tf.int64, name='x_col_indices') -# values = tf.compat.v1.placeholder(tf.float32, name='x_values') -# indices = tf.stack([row_indices[0], col_indices[0]], axis=1) -# data = values[0] -# -# x = tf.SparseTensor(indices=indices, values=data, dense_shape=[args.batch_size, 10]) -# w = tf.Variable(tf.random.truncated_normal([10, 1]), name='w') -# y = tf.sparse.sparse_dense_matmul(x, w, name='y') -# -# global_step = tf.compat.v1.train.get_or_create_global_step() -# cost = tf.reduce_mean(input_tensor=tf.square(y_ - y), name='cost') -# optimizer = tf.compat.v1.train.GradientDescentOptimizer(0.1).minimize(cost, global_step) -# -# with tf.compat.v1.train.MonitoredTrainingSession(master=server.target, -# is_chief=(ctx.task_index == 0), -# checkpoint_dir=args.model_dir, -# save_checkpoint_steps=20) as sess: -# tf_feed = ctx.get_data_feed(input_mapping=args.input_mapping) -# while not sess.should_stop() and not tf_feed.should_stop(): -# batch = tf_feed.next_batch(args.batch_size) -# if len(batch) > 0: -# print("batch: {}".format(batch)) -# feed = {y_: batch['y_label'], -# row_indices: batch['x_row_indices'], -# col_indices: batch['x_col_indices'], -# values: batch['x_values']} -# _, pred, trained_weights = sess.run([optimizer, y, w], feed_dict=feed) -# print("trained_weights: {}".format(trained_weights)) -# sess.close() -# -# # wait for MonitoredTrainingSession to save last checkpoint -# time.sleep(10) -# -# args = {} -# estimator = TFEstimator(sparse_train, args) \ -# .setInputMapping({'labels': 'y_label', 'row_indices': 'x_row_indices', 'col_indices': 'x_col_indices', 'values': 'x_values'}) \ -# .setInputMode(TFCluster.InputMode.SPARK) \ -# .setModelDir(self.model_dir) \ -# .setClusterSize(self.num_workers) \ -# .setNumPS(1) \ -# .setBatchSize(1) -# -# model_weights = np.array([[1.0, 1.0, 1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, -1.0]]).T -# examples = [scipy.sparse.random(1, 10, density=0.5,) for i in range(200)] -# rdd = self.sc.parallelize(examples).map(lambda e: ((e * model_weights).tolist()[0][0], e.row.tolist(), e.col.tolist(), e.data.tolist())) -# df = rdd.toDF(["labels", "row_indices", "col_indices", "values"]) -# df.show(5) -# model = estimator.fit(df) -# -# model.setOutputMapping({'label': 'label', 'y/SparseTensorDenseMatMul': 'predictions'}) -# test_examples = [scipy.sparse.random(1, 10, density=0.5,) for i in range(50)] -# test_rdd = self.sc.parallelize(test_examples).map(lambda e: ((e * model_weights).tolist()[0][0], e.row.tolist(), e.col.tolist(), e.data.tolist())) -# test_df = test_rdd.toDF(["labels", "row_indices", "col_indices", "values"]) -# test_df.show(5) -# preds = model.transform(test_df) -# preds.show(5) - if __name__ == '__main__': unittest.main()