From b133b8e9cf50a3b4fc922ff0f3835607a6f6bbe7 Mon Sep 17 00:00:00 2001 From: Lee Yang Date: Tue, 20 Mar 2018 13:36:14 -0700 Subject: [PATCH] pep8 for mnist examples; minor fix for latest spark/mnist_dist.py --- examples/mnist/keras/mnist_mlp.py | 41 +++++++-------- examples/mnist/spark/mnist_dist.py | 43 ++++++++-------- examples/mnist/spark/mnist_dist_dataset.py | 15 +++--- examples/mnist/spark/mnist_dist_pipeline.py | 25 ++++----- examples/mnist/spark/mnist_spark.py | 11 ++-- examples/mnist/spark/mnist_spark_dataset.py | 11 ++-- examples/mnist/spark/mnist_spark_pipeline.py | 52 +++++++++---------- examples/mnist/streaming/mnist_dist.py | 28 +++++------ examples/mnist/tf/mnist_dist.py | 26 +++++----- examples/mnist/tf/mnist_dist_dataset.py | 19 +++---- examples/mnist/tf/mnist_dist_pipeline.py | 37 +++++++------- examples/mnist/tf/mnist_spark.py | 5 +- examples/mnist/tf/mnist_spark_dataset.py | 5 +- examples/mnist/tf/mnist_spark_pipeline.py | 53 +++++++++----------- 14 files changed, 186 insertions(+), 185 deletions(-) diff --git a/examples/mnist/keras/mnist_mlp.py b/examples/mnist/keras/mnist_mlp.py index 5e0187db..4e52dd60 100644 --- a/examples/mnist/keras/mnist_mlp.py +++ b/examples/mnist/keras/mnist_mlp.py @@ -6,6 +6,7 @@ from __future__ import print_function + def main_fun(args, ctx): import numpy import os @@ -16,12 +17,9 @@ def main_fun(args, ctx): from tensorflow.contrib.keras.api.keras.layers import Dense, Dropout from tensorflow.contrib.keras.api.keras.optimizers import RMSprop from tensorflow.contrib.keras.python.keras.callbacks import LambdaCallback, TensorBoard - from tensorflow.python.saved_model import builder as saved_model_builder from tensorflow.python.saved_model import tag_constants from tensorflow.python.saved_model.signature_def_utils_impl import predict_signature_def - - from tensorflowonspark import TFNode cluster, server = TFNode.start_cluster_server(ctx) @@ -44,8 +42,8 @@ def generate_rdd_data(tf_feed, batch_size): yield (images, labels) with tf.device(tf.train.replica_device_setter( - worker_device="/job:worker/task:%d" % ctx.task_index, - cluster=cluster)): + worker_device="/job:worker/task:%d" % ctx.task_index, + cluster=cluster)): IMAGE_PIXELS = 28 batch_size = 100 @@ -98,21 +96,20 @@ def save_checkpoint(epoch, logs=None): if args.input_mode == 'tf': # train & validate on in-memory data - history = model.fit(x_train, y_train, - batch_size=batch_size, - epochs=args.epochs, - verbose=1, - validation_data=(x_test, y_test), - callbacks=callbacks) + model.fit(x_train, y_train, + batch_size=batch_size, + epochs=args.epochs, + verbose=1, + validation_data=(x_test, y_test), + callbacks=callbacks) else: # args.input_mode == 'spark': # train on data read from a generator which is producing data from a Spark RDD tf_feed = TFNode.DataFeed(ctx.mgr) - history = model.fit_generator( - generator=generate_rdd_data(tf_feed, batch_size), - steps_per_epoch=args.steps_per_epoch, - epochs=args.epochs, - verbose=1, - callbacks=callbacks) + model.fit_generator(generator=generate_rdd_data(tf_feed, batch_size), + steps_per_epoch=args.steps_per_epoch, + epochs=args.epochs, + verbose=1, + callbacks=callbacks) if args.export_dir and ctx.job_name == 'worker' and ctx.task_index == 0: # save a local Keras model, so we can reload it with an inferencing learning_phase @@ -125,11 +122,11 @@ def save_checkpoint(epoch, logs=None): # export a saved_model for inferencing builder = saved_model_builder.SavedModelBuilder(args.export_dir) signature = predict_signature_def(inputs={'images': new_model.input}, - outputs={'scores': new_model.output}) + outputs={'scores': new_model.output}) builder.add_meta_graph_and_variables(sess=sess, - tags=[tag_constants.SERVING], - signature_def_map={'predict': signature}, - clear_devices=True) + tags=[tag_constants.SERVING], + signature_def_map={'predict': signature}, + clear_devices=True) builder.save() if args.input_mode == 'spark': @@ -160,7 +157,7 @@ def save_checkpoint(epoch, logs=None): parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true") args = parser.parse_args() - print("args:",args) + print("args:", args) if args.input_mode == 'tf': cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW, log_dir=args.model_dir) diff --git a/examples/mnist/spark/mnist_dist.py b/examples/mnist/spark/mnist_dist.py index d8afc835..031b2dc1 100755 --- a/examples/mnist/spark/mnist_dist.py +++ b/examples/mnist/spark/mnist_dist.py @@ -1,4 +1,4 @@ -#Copyright 2018 Yahoo Inc. +# Copyright 2018 Yahoo Inc. # Licensed under the terms of the Apache 2.0 license. # Please see LICENSE file in the project root for terms. @@ -9,9 +9,11 @@ from __future__ import nested_scopes from __future__ import print_function + def print_log(worker_num, arg): print("{0}: {1}".format(worker_num, arg)) + def map_fun(args, ctx): from datetime import datetime import math @@ -30,7 +32,7 @@ def map_fun(args, ctx): # Parameters IMAGE_PIXELS = 28 hidden_units = 128 - batch_size = args.batch_size + batch_size = args.batch_size # Get TF cluster and server instances cluster, server = ctx.start_cluster_server(1, args.rdma) @@ -55,28 +57,28 @@ def feed_dict(batch): # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( - worker_device="/job:worker/task:%d" % task_index, - cluster=cluster)): + worker_device="/job:worker/task:%d" % task_index, + cluster=cluster)): - # Placeholders or QueueRunner/Readers for input data + # Placeholders or QueueRunner/Readers for input data with tf.name_scope('inputs'): - x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS] , name="x") + x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS], name="x") y_ = tf.placeholder(tf.float32, [None, 10], name="y_") - + x_img = tf.reshape(x, [-1, IMAGE_PIXELS, IMAGE_PIXELS, 1]) tf.summary.image("x_img", x_img) - + with tf.name_scope('layer'): # Variables of the hidden layer with tf.name_scope('hidden_layer'): - hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], stddev=1.0 / IMAGE_PIXELS), name="hid_w") + hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], stddev=1.0 / IMAGE_PIXELS), name="hid_w") hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b") tf.summary.histogram("hidden_weights", hid_w) hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b) hid = tf.nn.relu(hid_lin) - + # Variables of the softmax layer - with tf.name_scope('softmax_layer'): + with tf.name_scope('softmax_layer'): sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10], stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") sm_b = tf.Variable(tf.zeros([10]), name="sm_b") tf.summary.histogram("softmax_weights", sm_w) @@ -93,7 +95,7 @@ def feed_dict(batch): # Test trained model label = tf.argmax(y_, 1, name="label") - prediction = tf.argmax(y, 1,name="prediction") + prediction = tf.argmax(y, 1, name="prediction") correct_prediction = tf.equal(prediction, label) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy") @@ -102,10 +104,9 @@ def feed_dict(batch): summary_op = tf.summary.merge_all() logdir = ctx.absolute_path(args.model) - # logdir = args.model print("tensorflow model path: {0}".format(logdir)) hooks = [tf.train.StopAtStepHook(last_step=100000)] - + if job_name == "worker" and task_index == 0: summary_writer = tf.summary.FileWriter(logdir, graph=tf.get_default_graph()) @@ -119,9 +120,9 @@ def feed_dict(batch): step = 0 tf_feed = ctx.get_data_feed(args.mode == "train") while not mon_sess.should_stop() and not tf_feed.should_stop() and step < args.steps: - # Run a training step asynchronously - # See `tf.train.SyncReplicasOptimizer` for additional details on how to - # perform *synchronous* training. + # Run a training step asynchronously + # See `tf.train.SyncReplicasOptimizer` for additional details on how to + # perform *synchronous* training. # using feed_dict batch_xs, batch_ys = feed_dict(tf_feed.next_batch(batch_size)) @@ -132,14 +133,14 @@ def feed_dict(batch): _, summary, step = mon_sess.run([train_op, summary_op, global_step], feed_dict=feed) # print accuracy and save model checkpoint to HDFS every 100 steps if (step % 100 == 0): - print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, mon_sess.run(accuracy,{x: batch_xs, y_: batch_ys}))) + print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, mon_sess.run(accuracy, {x: batch_xs, y_: batch_ys}))) if task_index == 0: summary_writer.add_summary(summary, step) else: # args.mode == "inference" labels, preds, acc = mon_sess.run([label, prediction, accuracy], feed_dict=feed) - results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l,p in zip(labels,preds)] + results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l, p in zip(labels, preds)] tf_feed.batch_results(results) print("results: {0}, acc: {1}".format(results, acc)) @@ -148,4 +149,6 @@ def feed_dict(batch): # Ask for all the services to stop. print("{0} stopping MonitoredTrainingSession".format(datetime.now().isoformat())) - summary_writer.close() + + if job_name == "worker" and task_index == 0: + summary_writer.close() diff --git a/examples/mnist/spark/mnist_dist_dataset.py b/examples/mnist/spark/mnist_dist_dataset.py index 0092044b..3a0d9c4c 100755 --- a/examples/mnist/spark/mnist_dist_dataset.py +++ b/examples/mnist/spark/mnist_dist_dataset.py @@ -9,9 +9,11 @@ from __future__ import nested_scopes from __future__ import print_function + def print_log(worker_num, arg): print("{0}: {1}".format(worker_num, arg)) + def map_fun(args, ctx): from tensorflowonspark import TFNode from datetime import datetime @@ -48,8 +50,8 @@ def rdd_generator(): # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( - worker_device="/job:worker/task:%d" % task_index, - cluster=cluster)): + worker_device="/job:worker/task:%d" % task_index, + cluster=cluster)): # Dataset for input data ds = tf.data.Dataset.from_generator(rdd_generator, (tf.float32, tf.float32), (tf.TensorShape([IMAGE_PIXELS * IMAGE_PIXELS]), tf.TensorShape([10]))).batch(args.batch_size) @@ -58,13 +60,13 @@ def rdd_generator(): # Variables of the hidden layer hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], - stddev=1.0 / IMAGE_PIXELS), name="hid_w") + stddev=1.0 / IMAGE_PIXELS), name="hid_w") hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b") tf.summary.histogram("hidden_weights", hid_w) # Variables of the softmax layer sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10], - stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") + stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") sm_b = tf.Variable(tf.zeros([10]), name="sm_b") tf.summary.histogram("softmax_weights", sm_w) @@ -90,7 +92,7 @@ def rdd_generator(): # Test trained model label = tf.argmax(y_, 1, name="label") - prediction = tf.argmax(y, 1,name="prediction") + prediction = tf.argmax(y, 1, name="prediction") correct_prediction = tf.equal(prediction, label) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy") @@ -146,7 +148,7 @@ def rdd_generator(): else: # args.mode == "inference" labels, preds, acc = sess.run([label, prediction, accuracy]) - results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l,p in zip(labels,preds)] + results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l, p in zip(labels, preds)] tf_feed.batch_results(results) print("acc: {0}".format(acc)) @@ -156,4 +158,3 @@ def rdd_generator(): # Ask for all the services to stop. print("{0} stopping supervisor".format(datetime.now().isoformat())) sv.stop() - diff --git a/examples/mnist/spark/mnist_dist_pipeline.py b/examples/mnist/spark/mnist_dist_pipeline.py index 45252fb2..9ab5f5db 100755 --- a/examples/mnist/spark/mnist_dist_pipeline.py +++ b/examples/mnist/spark/mnist_dist_pipeline.py @@ -9,9 +9,11 @@ from __future__ import nested_scopes from __future__ import print_function + def print_log(worker_num, arg): print("{0}: {1}".format(worker_num, arg)) + def map_fun(args, ctx): from tensorflowonspark import TFNode from datetime import datetime @@ -32,7 +34,7 @@ def map_fun(args, ctx): # Parameters hidden_units = 128 - batch_size = args.batch_size + batch_size = args.batch_size # Get TF cluster and server instances cluster, server = TFNode.start_cluster_server(ctx, 1, args.protocol == 'rdma') @@ -54,18 +56,18 @@ def feed_dict(batch): # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( - worker_device="/job:worker/task:%d" % task_index, - cluster=cluster)): + worker_device="/job:worker/task:%d" % task_index, + cluster=cluster)): # Variables of the hidden layer hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], - stddev=1.0 / IMAGE_PIXELS), name="hid_w") + stddev=1.0 / IMAGE_PIXELS), name="hid_w") hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b") tf.summary.histogram("hidden_weights", hid_w) # Variables of the softmax layer sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10], - stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") + stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") sm_b = tf.Variable(tf.zeros([10]), name="sm_b") tf.summary.histogram("softmax_weights", sm_w) @@ -91,7 +93,7 @@ def feed_dict(batch): # Test trained model label = tf.argmax(y_, 1, name="label") - prediction = tf.argmax(y, 1,name="prediction") + prediction = tf.argmax(y, 1, name="prediction") correct_prediction = tf.equal(prediction, label) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy") @@ -122,7 +124,6 @@ def feed_dict(batch): # Loop until the supervisor shuts down or 1000000 steps have completed. step = 0 - #tf_feed = TFNode.DataFeed(ctx.mgr) tf_feed = TFNode.DataFeed(ctx.mgr, input_mapping=args.input_mapping) while not sv.should_stop() and not tf_feed.should_stop() and step < args.steps: # Run a training step asynchronously. @@ -137,7 +138,7 @@ def feed_dict(batch): _, summary, step = sess.run([train_op, summary_op, global_step], feed_dict=feed) # print accuracy and save model checkpoint to HDFS every 100 steps if (step % 100 == 0): - print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy,{x: batch_xs, y_: batch_ys}))) + print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy, {x: batch_xs, y_: batch_ys}))) if sv.is_chief: summary_writer.add_summary(summary, step) @@ -150,13 +151,13 @@ def feed_dict(batch): # exported signatures defined in code signatures = { tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: { - 'inputs': { 'image': x }, - 'outputs': { 'prediction': prediction }, + 'inputs': {'image': x}, + 'outputs': {'prediction': prediction}, 'method_name': tf.saved_model.signature_constants.PREDICT_METHOD_NAME }, 'featurize': { - 'inputs': { 'image': x }, - 'outputs': { 'features': hid }, + 'inputs': {'image': x}, + 'outputs': {'features': hid}, 'method_name': 'featurize' } } diff --git a/examples/mnist/spark/mnist_spark.py b/examples/mnist/spark/mnist_spark.py index c5e69056..4730ef26 100755 --- a/examples/mnist/spark/mnist_spark.py +++ b/examples/mnist/spark/mnist_spark.py @@ -25,7 +25,7 @@ parser = argparse.ArgumentParser() parser.add_argument("--batch_size", help="number of records per batch", type=int, default=100) parser.add_argument("--epochs", help="number of epochs", type=int, default=1) -parser.add_argument("--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv") +parser.add_argument("--format", help="example format: (csv|pickle|tfr)", choices=["csv", "pickle", "tfr"], default="csv") parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format") parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format") parser.add_argument("--model", help="HDFS path to save/load model during train/inference", default="mnist_model") @@ -37,14 +37,15 @@ parser.add_argument("--mode", help="train|inference", default="train") parser.add_argument("--rdma", help="use rdma connection", default=False) args = parser.parse_args() -print("args:",args) +print("args:", args) print("{0} ===== Start".format(datetime.now().isoformat())) if args.format == "tfr": images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat", - keyClass="org.apache.hadoop.io.BytesWritable", - valueClass="org.apache.hadoop.io.NullWritable") + keyClass="org.apache.hadoop.io.BytesWritable", + valueClass="org.apache.hadoop.io.NullWritable") + def toNumpy(bytestr): example = tf.train.Example() example.ParseFromString(bytestr) @@ -52,6 +53,7 @@ def toNumpy(bytestr): image = numpy.array(features['image'].int64_list.value) label = numpy.array(features['label'].int64_list.value) return (image, label) + dataRDD = images.map(lambda x: toNumpy(str(x[0]))) else: if args.format == "csv": @@ -72,4 +74,3 @@ def toNumpy(bytestr): cluster.shutdown() print("{0} ===== Stop".format(datetime.now().isoformat())) - diff --git a/examples/mnist/spark/mnist_spark_dataset.py b/examples/mnist/spark/mnist_spark_dataset.py index 2192bf00..311493ee 100755 --- a/examples/mnist/spark/mnist_spark_dataset.py +++ b/examples/mnist/spark/mnist_spark_dataset.py @@ -25,7 +25,7 @@ parser = argparse.ArgumentParser() parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100) parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1) -parser.add_argument("-f", "--format", help="example format: (csv|tfr)", choices=["csv","tfr"], default="csv") +parser.add_argument("-f", "--format", help="example format: (csv|tfr)", choices=["csv", "tfr"], default="csv") parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format") parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format") parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model") @@ -37,14 +37,15 @@ parser.add_argument("-X", "--mode", help="train|inference", default="train") parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) args = parser.parse_args() -print("args:",args) +print("args:", args) print("{0} ===== Start".format(datetime.now().isoformat())) if args.format == "tfr": images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat", - keyClass="org.apache.hadoop.io.BytesWritable", - valueClass="org.apache.hadoop.io.NullWritable") + keyClass="org.apache.hadoop.io.BytesWritable", + valueClass="org.apache.hadoop.io.NullWritable") + def toNumpy(bytestr): example = tf.train.Example() example.ParseFromString(bytestr) @@ -52,6 +53,7 @@ def toNumpy(bytestr): image = numpy.array(features['image'].int64_list.value) label = numpy.array(features['label'].int64_list.value) return (image, label) + dataRDD = images.map(lambda x: toNumpy(str(x[0]))) else: # args.format == "csv": images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')]) @@ -68,4 +70,3 @@ def toNumpy(bytestr): cluster.shutdown() print("{0} ===== Stop".format(datetime.now().isoformat())) - diff --git a/examples/mnist/spark/mnist_spark_pipeline.py b/examples/mnist/spark/mnist_spark_pipeline.py index 4314399a..2c3ad0a4 100644 --- a/examples/mnist/spark/mnist_spark_pipeline.py +++ b/examples/mnist/spark/mnist_spark_pipeline.py @@ -28,9 +28,7 @@ parser = argparse.ArgumentParser() -######## ARGS ######## - -## TFoS/cluster +# TFoS/cluster parser.add_argument("--batch_size", help="number of records per batch", type=int, default=100) parser.add_argument("--epochs", help="number of epochs", type=int, default=1) parser.add_argument("--model_dir", help="HDFS path to save/load model during train/inference", type=str) @@ -42,18 +40,18 @@ parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true") # Spark input/output -parser.add_argument("--format", help="example format: (csv|tfr)", choices=["csv","tfr"], default="csv") +parser.add_argument("--format", help="example format: (csv|tfr)", choices=["csv", "tfr"], default="csv") parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format") parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format") parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions") # Execution Modes parser.add_argument("--train", help="train a model using Estimator", action="store_true") -parser.add_argument("--inference_mode", help="type of inferencing (none|signature|direct|checkpoint)", choices=["none","signature","direct","checkpoint"], default="none") -parser.add_argument("--inference_output", help="output of inferencing (predictions|features)", choices=["predictions","features"], default="predictions") +parser.add_argument("--inference_mode", help="type of inferencing (none|signature|direct|checkpoint)", choices=["none", "signature", "direct", "checkpoint"], default="none") +parser.add_argument("--inference_output", help="output of inferencing (predictions|features)", choices=["predictions", "features"], default="predictions") args = parser.parse_args() -print("args:",args) +print("args:", args) print("{0} ===== Start".format(datetime.now().isoformat())) @@ -73,24 +71,24 @@ # train a model using Spark Estimator fitted to a DataFrame print("{0} ===== Estimator.fit()".format(datetime.now().isoformat())) # dummy tf args (from imagenet/inception example) - tf_args = { 'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94 } + tf_args = {'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94} estimator = TFEstimator(mnist_dist_pipeline.map_fun, tf_args) \ - .setInputMapping({'image':'image', 'label':'label'}) \ - .setModelDir(args.model_dir) \ - .setExportDir(args.export_dir) \ - .setClusterSize(args.cluster_size) \ - .setNumPS(args.num_ps) \ - .setProtocol(args.protocol) \ - .setTensorboard(args.tensorboard) \ - .setEpochs(args.epochs) \ - .setBatchSize(args.batch_size) \ - .setSteps(args.steps) + .setInputMapping({'image': 'image', 'label': 'label'}) \ + .setModelDir(args.model_dir) \ + .setExportDir(args.export_dir) \ + .setClusterSize(args.cluster_size) \ + .setNumPS(args.num_ps) \ + .setProtocol(args.protocol) \ + .setTensorboard(args.tensorboard) \ + .setEpochs(args.epochs) \ + .setBatchSize(args.batch_size) \ + .setSteps(args.steps) model = estimator.fit(df) else: # use a previously trained/exported model model = TFModel(args) \ - .setExportDir(args.export_dir) \ - .setBatchSize(args.batch_size) + .setExportDir(args.export_dir) \ + .setBatchSize(args.batch_size) # NO INFERENCING if args.inference_mode == 'none': @@ -100,31 +98,31 @@ elif args.inference_mode == 'checkpoint': model.setModelDir(args.model_dir) # load model from checkpoint at args.model_dir model.setExportDir(None) - model.setInputMapping({'image':'x'}) # map DataFrame 'image' column to the 'x' input tensor + model.setInputMapping({'image': 'x'}) # map DataFrame 'image' column to the 'x' input tensor if args.inference_output == 'predictions': - model.setOutputMapping({'prediction':'col_out'}) # map 'prediction' output tensor to output DataFrame 'col_out' column + model.setOutputMapping({'prediction': 'col_out'}) # map 'prediction' output tensor to output DataFrame 'col_out' column else: # args.inference_output == 'features': - model.setOutputMapping({'prediction':'col_out', 'Relu':'col_out2'}) # add 'Relu' output tensor to output DataFrame 'col_out2' column + model.setOutputMapping({'prediction': 'col_out', 'Relu': 'col_out2'}) # add 'Relu' output tensor to output DataFrame 'col_out2' column # INFER USING TENSORFLOW SAVED_MODEL WITH EXPORTED SIGNATURES elif args.inference_mode == 'signature': model.setModelDir(None) model.setExportDir(args.export_dir) # load saved_model from args.export_dir model.setTagSet(tf.saved_model.tag_constants.SERVING) # using default SERVING tagset - model.setInputMapping({'image':'image'}) # map DataFrame 'image' column to the 'image' input tensor alias of signature + model.setInputMapping({'image': 'image'}) # map DataFrame 'image' column to the 'image' input tensor alias of signature if args.inference_output == 'predictions': model.setSignatureDefKey(tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY) # default signature def key, i.e. 'predict' - model.setOutputMapping({'prediction':'col_out'}) # map 'prediction' output tensor alias to output DataFrame 'col_out' column + model.setOutputMapping({'prediction': 'col_out'}) # map 'prediction' output tensor alias to output DataFrame 'col_out' column else: # args.inference_output == 'features' model.setSignatureDefKey('featurize') # custom signature def key - model.setOutputMapping({'features':'col_out'}) # map 'features' output tensor alias to output DataFrame 'col_out' column + model.setOutputMapping({'features': 'col_out'}) # map 'features' output tensor alias to output DataFrame 'col_out' column # INFER USING TENSORFLOW SAVED_MODEL, IGNORING EXPORTED SIGNATURES else: # args.inference_mode == 'direct': model.setModelDir(None) model.setExportDir(args.export_dir) # load saved_model from args.export_dir model.setTagSet(tf.saved_model.tag_constants.SERVING) # using default SERVING tagset - model.setInputMapping({'image':'x'}) # map DataFrame 'image' column to the 'x' input tensor + model.setInputMapping({'image': 'x'}) # map DataFrame 'image' column to the 'x' input tensor if args.inference_output == 'predictions': model.setOutputMapping({'prediction': 'col_out'}) # map 'prediction' output tensor to output DataFrame 'col_out' column else: # args.inference_output == 'features' diff --git a/examples/mnist/streaming/mnist_dist.py b/examples/mnist/streaming/mnist_dist.py index 2323767a..98782ca3 100755 --- a/examples/mnist/streaming/mnist_dist.py +++ b/examples/mnist/streaming/mnist_dist.py @@ -9,9 +9,11 @@ from __future__ import nested_scopes from __future__ import print_function + def print_log(worker_num, arg): print("{0}: {1}".format(worker_num, arg)) + def map_fun(args, ctx): from tensorflowonspark import TFNode from datetime import datetime @@ -23,9 +25,8 @@ def map_fun(args, ctx): worker_num = ctx.worker_num job_name = ctx.job_name task_index = ctx.task_index - cluster_spec = ctx.cluster_spec - IMAGE_PIXELS=28 + IMAGE_PIXELS = 28 # Delay PS nodes a bit, since workers seem to reserve GPUs more quickly/reliably (w/o conflict) if job_name == "ps": @@ -33,7 +34,7 @@ def map_fun(args, ctx): # Parameters hidden_units = 128 - batch_size = args.batch_size + batch_size = args.batch_size # Get TF cluster and server instances cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma) @@ -47,7 +48,7 @@ def feed_dict(batch): labels.append(item[1]) xs = numpy.array(images) xs = xs.astype(numpy.float32) - xs = xs/255.0 + xs = xs / 255.0 ys = numpy.array(labels) ys = ys.astype(numpy.uint8) return (xs, ys) @@ -58,18 +59,18 @@ def feed_dict(batch): # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( - worker_device="/job:worker/task:%d" % task_index, - cluster=cluster)): + worker_device="/job:worker/task:%d" % task_index, + cluster=cluster)): # Variables of the hidden layer hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], - stddev=1.0 / IMAGE_PIXELS), name="hid_w") + stddev=1.0 / IMAGE_PIXELS), name="hid_w") hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b") tf.summary.histogram("hidden_weights", hid_w) # Variables of the softmax layer sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10], - stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") + stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") sm_b = tf.Variable(tf.zeros([10]), name="sm_b") tf.summary.histogram("softmax_weights", sm_w) @@ -95,7 +96,7 @@ def feed_dict(batch): # Test trained model label = tf.argmax(y_, 1, name="label") - prediction = tf.argmax(y, 1,name="prediction") + prediction = tf.argmax(y, 1, name="prediction") correct_prediction = tf.equal(prediction, label) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy") @@ -108,7 +109,7 @@ def feed_dict(batch): # Create a "supervisor", which oversees the training process and stores model state into HDFS logdir = TFNode.hdfs_path(ctx, args.model) print("tensorflow model path: {0}".format(logdir)) - summary_writer = tf.summary.FileWriter("tensorboard_%d" %(worker_num), graph=tf.get_default_graph()) + summary_writer = tf.summary.FileWriter("tensorboard_%d" % worker_num, graph=tf.get_default_graph()) if args.mode == "train": sv = tf.train.Supervisor(is_chief=(task_index == 0), @@ -150,14 +151,14 @@ def feed_dict(batch): _, summary, step = sess.run([train_op, summary_op, global_step], feed_dict=feed) # print accuracy and save model checkpoint to HDFS every 100 steps if (step % 100 == 0): - print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy,{x: batch_xs, y_: batch_ys}))) + print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy, {x: batch_xs, y_: batch_ys}))) if sv.is_chief: summary_writer.add_summary(summary, step) - else: # args.mode == "inference" + else: # args.mode == "inference" labels, preds, acc = sess.run([label, prediction, accuracy], feed_dict=feed) - results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l,p in zip(labels,preds)] + results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l, p in zip(labels, preds)] tf_feed.batch_results(results) print("acc: {0}".format(acc)) @@ -167,4 +168,3 @@ def feed_dict(batch): # Ask for all the services to stop. print("{0} stopping supervisor".format(datetime.now().isoformat())) sv.stop() - diff --git a/examples/mnist/tf/mnist_dist.py b/examples/mnist/tf/mnist_dist.py index dcac4935..bcb31881 100644 --- a/examples/mnist/tf/mnist_dist.py +++ b/examples/mnist/tf/mnist_dist.py @@ -8,10 +8,12 @@ from __future__ import division from __future__ import print_function + def print_log(worker_num, arg): print("%d: " % worker_num, end=" ") print(arg) + def map_fun(args, ctx): from datetime import datetime import math @@ -32,7 +34,6 @@ def map_fun(args, ctx): # Parameters IMAGE_PIXELS = 28 hidden_units = 128 - batch_size = 100 # Get TF cluster and server instances cluster, server = ctx.start_cluster_server(1, args.rdma) @@ -54,7 +55,7 @@ def read_csv_examples(image_dir, label_dir, batch_size=100, num_epochs=None, tas # Setup reader for image queue img_reader = tf.TextLineReader(name="img_reader") _, img_csv = img_reader.read(image_queue) - image_defaults = [ [1.0] for col in range(784) ] + image_defaults = [[1.0] for col in range(784)] img = tf.stack(tf.decode_csv(img_csv, image_defaults)) # Normalize values to [0,1] norm = tf.constant(255, dtype=tf.float32, shape=(784,)) @@ -64,12 +65,12 @@ def read_csv_examples(image_dir, label_dir, batch_size=100, num_epochs=None, tas # Setup reader for label queue label_reader = tf.TextLineReader(name="label_reader") _, label_csv = label_reader.read(label_queue) - label_defaults = [ [1.0] for col in range(10) ] + label_defaults = [[1.0] for col in range(10)] label = tf.stack(tf.decode_csv(label_csv, label_defaults)) print_log(worker_num, "label: {0}".format(label)) # Return a batch of examples - return tf.train.batch([image,label], batch_size, num_threads=args.readers, name="batch_csv") + return tf.train.batch([image, label], batch_size, num_threads=args.readers, name="batch_csv") def read_tfr_examples(path, batch_size=100, num_epochs=None, task_index=None, num_workers=None): print_log(worker_num, "num_epochs: {0}".format(num_epochs)) @@ -91,7 +92,7 @@ def read_tfr_examples(path, batch_size=100, num_epochs=None, task_index=None, nu # Setup reader for examples reader = tf.TFRecordReader(name="reader") _, serialized = reader.read(file_queue) - feature_def = {'label': tf.FixedLenFeature([10], tf.int64), 'image': tf.FixedLenFeature([784], tf.int64) } + feature_def = {'label': tf.FixedLenFeature([10], tf.int64), 'image': tf.FixedLenFeature([784], tf.int64)} features = tf.parse_single_example(serialized, feature_def) norm = tf.constant(255, dtype=tf.float32, shape=(784,)) image = tf.div(tf.to_float(features['image']), norm) @@ -100,25 +101,25 @@ def read_tfr_examples(path, batch_size=100, num_epochs=None, task_index=None, nu print_log(worker_num, "label: {0}".format(label)) # Return a batch of examples - return tf.train.batch([image,label], batch_size, num_threads=args.readers, name="batch") + return tf.train.batch([image, label], batch_size, num_threads=args.readers, name="batch") if job_name == "ps": server.join() elif job_name == "worker": # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( - worker_device="/job:worker/task:%d" % task_index, - cluster=cluster)): + worker_device="/job:worker/task:%d" % task_index, + cluster=cluster)): # Variables of the hidden layer hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], - stddev=1.0 / IMAGE_PIXELS), name="hid_w") + stddev=1.0 / IMAGE_PIXELS), name="hid_w") hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b") tf.summary.histogram("hidden_weights", hid_w) # Variables of the softmax layer sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10], - stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") + stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") sm_b = tf.Variable(tf.zeros([10]), name="sm_b") tf.summary.histogram("softmax_weights", sm_w) @@ -154,7 +155,7 @@ def read_tfr_examples(path, batch_size=100, num_epochs=None, task_index=None, nu # Test trained model label = tf.argmax(y_, 1, name="label") - prediction = tf.argmax(y, 1,name="prediction") + prediction = tf.argmax(y, 1, name="prediction") correct_prediction = tf.equal(prediction, label) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy") tf.summary.scalar("acc", accuracy) @@ -213,7 +214,7 @@ def read_tfr_examples(path, batch_size=100, num_epochs=None, task_index=None, nu summary_writer.add_summary(summary, step) else: # args.mode == "inference" labels, pred, acc = sess.run([label, prediction, accuracy]) - #print("label: {0}, pred: {1}".format(labels, pred)) + # print("label: {0}, pred: {1}".format(labels, pred)) print("acc: {0}".format(acc)) for i in range(len(labels)): count += 1 @@ -230,4 +231,3 @@ def read_tfr_examples(path, batch_size=100, num_epochs=None, task_index=None, nu # Ask for all the services to stop. print("{0} stopping supervisor".format(datetime.now().isoformat())) sv.stop() - diff --git a/examples/mnist/tf/mnist_dist_dataset.py b/examples/mnist/tf/mnist_dist_dataset.py index bcefa802..93d3d9db 100644 --- a/examples/mnist/tf/mnist_dist_dataset.py +++ b/examples/mnist/tf/mnist_dist_dataset.py @@ -8,10 +8,12 @@ from __future__ import division from __future__ import print_function + def print_log(worker_num, arg): print("%d: " % worker_num, end=" ") print(arg) + def map_fun(args, ctx): from tensorflowonspark import TFNode from datetime import datetime @@ -35,7 +37,7 @@ def _parse_csv(ln): splits = tf.string_split([ln], delimiter='|') lbl = splits.values[0] img = splits.values[1] - image_defaults = [ [0.0] for col in range(IMAGE_PIXELS * IMAGE_PIXELS) ] + image_defaults = [[0.0] for col in range(IMAGE_PIXELS * IMAGE_PIXELS)] image = tf.stack(tf.decode_csv(img, record_defaults=image_defaults)) norm = tf.constant(255, dtype=tf.float32, shape=(784,)) normalized_image = tf.div(image, norm) @@ -46,7 +48,7 @@ def _parse_csv(ln): def _parse_tfr(example_proto): print("example_proto: {}".format(example_proto)) feature_def = {"label": tf.FixedLenFeature(10, tf.int64), - "image": tf.FixedLenFeature(IMAGE_PIXELS * IMAGE_PIXELS, tf.int64)} + "image": tf.FixedLenFeature(IMAGE_PIXELS * IMAGE_PIXELS, tf.int64)} features = tf.parse_single_example(example_proto, feature_def) norm = tf.constant(255, dtype=tf.float32, shape=(784,)) image = tf.div(tf.to_float(features['image']), norm) @@ -58,8 +60,8 @@ def _parse_tfr(example_proto): elif job_name == "worker": # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( - worker_device="/job:worker/task:%d" % task_index, - cluster=cluster)): + worker_device="/job:worker/task:%d" % task_index, + cluster=cluster)): # Dataset for input data image_dir = TFNode.hdfs_path(ctx, args.images) @@ -73,13 +75,13 @@ def _parse_tfr(example_proto): # Variables of the hidden layer hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], - stddev=1.0 / IMAGE_PIXELS), name="hid_w") + stddev=1.0 / IMAGE_PIXELS), name="hid_w") hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b") tf.summary.histogram("hidden_weights", hid_w) # Variables of the softmax layer sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10], - stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") + stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") sm_b = tf.Variable(tf.zeros([10]), name="sm_b") tf.summary.histogram("softmax_weights", sm_w) @@ -155,12 +157,12 @@ def _parse_tfr(example_proto): if (step % 100 == 0): print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy))) _, summary, step, yv = sess.run([train_op, summary_op, global_step, y_val]) - #print("yval: {}".format(yv)) + # print("yval: {}".format(yv)) if sv.is_chief: summary_writer.add_summary(summary, step) else: # args.mode == "inference" labels, pred, acc = sess.run([label, prediction, accuracy]) - #print("label: {0}, pred: {1}".format(labels, pred)) + # print("label: {0}, pred: {1}".format(labels, pred)) print("acc: {0}".format(acc)) for i in range(len(labels)): count += 1 @@ -177,4 +179,3 @@ def _parse_tfr(example_proto): # Ask for all the services to stop. print("{0} stopping supervisor".format(datetime.now().isoformat())) sv.stop() - diff --git a/examples/mnist/tf/mnist_dist_pipeline.py b/examples/mnist/tf/mnist_dist_pipeline.py index 57b40561..022f0960 100644 --- a/examples/mnist/tf/mnist_dist_pipeline.py +++ b/examples/mnist/tf/mnist_dist_pipeline.py @@ -17,9 +17,10 @@ # Parameters hidden_units = 128 -batch_size = 100 +batch_size = 100 IMAGE_PIXELS = 28 + def map_fun(args, ctx): worker_num = ctx.worker_num job_name = ctx.job_name @@ -33,18 +34,18 @@ def map_fun(args, ctx): elif job_name == "worker": # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( - worker_device="/job:worker/task:%d" % task_index, - cluster=cluster)): + worker_device="/job:worker/task:%d" % task_index, + cluster=cluster)): # Variables of the hidden layer hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], - stddev=1.0 / IMAGE_PIXELS), name="hid_w") + stddev=1.0 / IMAGE_PIXELS), name="hid_w") hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b") tf.summary.histogram("hidden_weights", hid_w) # Variables of the softmax layer sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10], - stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") + stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") sm_b = tf.Variable(tf.zeros([10]), name="sm_b") tf.summary.histogram("softmax_weights", sm_w) @@ -70,7 +71,7 @@ def map_fun(args, ctx): # Test trained model label = tf.argmax(y_, 1, name="label") - prediction = tf.argmax(y, 1,name="prediction") + prediction = tf.argmax(y, 1, name="prediction") correct_prediction = tf.equal(prediction, label) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy") tf.summary.scalar("acc", accuracy) @@ -116,6 +117,7 @@ def map_fun(args, ctx): print("{0} stopping supervisor".format(datetime.now().isoformat())) sv.stop() + def export_fun(args): """Define/export a single-node TF graph for inferencing""" # Input placeholder for inferencing @@ -123,13 +125,13 @@ def export_fun(args): # Variables of the hidden layer hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], - stddev=1.0 / IMAGE_PIXELS), name="hid_w") + stddev=1.0 / IMAGE_PIXELS), name="hid_w") hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b") tf.summary.histogram("hidden_weights", hid_w) # Variables of the softmax layer sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10], - stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") + stddev=1.0 / math.sqrt(hidden_units)), name="sm_w") sm_b = tf.Variable(tf.zeros([10]), name="sm_b") hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b) @@ -151,13 +153,13 @@ def export_fun(args): # exported signatures defined in code signatures = { tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: { - 'inputs': { 'image': x }, - 'outputs': { 'prediction': prediction }, + 'inputs': {'image': x}, + 'outputs': {'prediction': prediction}, 'method_name': tf.saved_model.signature_constants.PREDICT_METHOD_NAME }, 'featurize': { - 'inputs': { 'image': x }, - 'outputs': { 'features': hid }, + 'inputs': {'image': x}, + 'outputs': {'features': hid}, 'method_name': 'featurize' } } @@ -185,7 +187,7 @@ def read_csv_examples(image_dir, label_dir, batch_size=100, num_readers=1, num_e # Setup reader for image queue img_reader = tf.TextLineReader(name="img_reader") _, img_csv = img_reader.read(image_queue) - image_defaults = [ [1.0] for col in range(784) ] + image_defaults = [[1.0] for col in range(784)] img = tf.stack(tf.decode_csv(img_csv, image_defaults)) # Normalize values to [0,1] norm = tf.constant(255, dtype=tf.float32, shape=(784,)) @@ -195,12 +197,13 @@ def read_csv_examples(image_dir, label_dir, batch_size=100, num_readers=1, num_e # Setup reader for label queue label_reader = tf.TextLineReader(name="label_reader") _, label_csv = label_reader.read(label_queue) - label_defaults = [ [1.0] for col in range(10) ] + label_defaults = [[1.0] for col in range(10)] label = tf.stack(tf.decode_csv(label_csv, label_defaults)) logging.info("label: {0}".format(label)) # Return a batch of examples - return tf.train.batch([image,label], batch_size, num_threads=num_readers, name="batch_csv") + return tf.train.batch([image, label], batch_size, num_threads=num_readers, name="batch_csv") + def read_tfr_examples(path, batch_size=100, num_epochs=None, num_readers=1, task_index=None, num_workers=None): logging.info("num_epochs: {0}".format(num_epochs)) @@ -222,7 +225,7 @@ def read_tfr_examples(path, batch_size=100, num_epochs=None, num_readers=1, task # Setup reader for examples reader = tf.TFRecordReader(name="reader") _, serialized = reader.read(file_queue) - feature_def = {'label': tf.FixedLenFeature([10], tf.int64), 'image': tf.FixedLenFeature([784], tf.int64) } + feature_def = {'label': tf.FixedLenFeature([10], tf.int64), 'image': tf.FixedLenFeature([784], tf.int64)} features = tf.parse_single_example(serialized, feature_def) norm = tf.constant(255, dtype=tf.float32, shape=(784,)) image = tf.div(tf.to_float(features['image']), norm) @@ -231,4 +234,4 @@ def read_tfr_examples(path, batch_size=100, num_epochs=None, num_readers=1, task logging.info("label: {0}".format(label)) # Return a batch of examples - return tf.train.batch([image,label], batch_size, num_threads=num_readers, name="batch") + return tf.train.batch([image, label], batch_size, num_threads=num_readers, name="batch") diff --git a/examples/mnist/tf/mnist_spark.py b/examples/mnist/tf/mnist_spark.py index 48a2ce88..32da2b20 100644 --- a/examples/mnist/tf/mnist_spark.py +++ b/examples/mnist/tf/mnist_spark.py @@ -22,7 +22,7 @@ parser = argparse.ArgumentParser() parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=0) -parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="tfr") +parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv", "pickle", "tfr"], default="tfr") parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format") parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format") parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/test", default="mnist_model") @@ -35,7 +35,7 @@ parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) parser.add_argument("-p", "--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) args = parser.parse_args() -print("args:",args) +print("args:", args) print("{0} ===== Start".format(datetime.now().isoformat())) @@ -44,4 +44,3 @@ cluster.shutdown() print("{0} ===== Stop".format(datetime.now().isoformat())) - diff --git a/examples/mnist/tf/mnist_spark_dataset.py b/examples/mnist/tf/mnist_spark_dataset.py index 6dd3cda4..58cf31ef 100644 --- a/examples/mnist/tf/mnist_spark_dataset.py +++ b/examples/mnist/tf/mnist_spark_dataset.py @@ -23,7 +23,7 @@ parser = argparse.ArgumentParser() parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100) parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=0) -parser.add_argument("-f", "--format", help="example format: (csv2|tfr)", choices=["csv2","tfr"], default="tfr") +parser.add_argument("-f", "--format", help="example format: (csv2|tfr)", choices=["csv2", "tfr"], default="tfr") parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format") parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format") parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/test", default="mnist_model") @@ -37,7 +37,7 @@ parser.add_argument("-p", "--driver_ps_nodes", help="""run tensorflow PS node on driver locally. You will need to set cluster_size = num_executors + num_ps""", default=False) args = parser.parse_args() -print("args:",args) +print("args:", args) print("{0} ===== Start".format(datetime.now().isoformat())) @@ -46,4 +46,3 @@ cluster.shutdown() print("{0} ===== Stop".format(datetime.now().isoformat())) - diff --git a/examples/mnist/tf/mnist_spark_pipeline.py b/examples/mnist/tf/mnist_spark_pipeline.py index 558d627d..6b932949 100644 --- a/examples/mnist/tf/mnist_spark_pipeline.py +++ b/examples/mnist/tf/mnist_spark_pipeline.py @@ -28,9 +28,7 @@ parser = argparse.ArgumentParser() -######## ARGS ######## - -## TFoS/cluster +# TFoS/cluster parser.add_argument("--batch_size", help="number of records per batch", type=int, default=100) parser.add_argument("--epochs", help="number of epochs", type=int, default=1) parser.add_argument("--model_dir", help="HDFS path to save/load model during train/inference", type=str) @@ -46,18 +44,18 @@ parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true") # Spark input/output -parser.add_argument("--format", help="example format: (csv|tfr)", choices=["csv","tfr"], default="csv") +parser.add_argument("--format", help="example format: (csv|tfr)", choices=["csv", "tfr"], default="csv") parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format") parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format") parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions") # Execution Modes parser.add_argument("--train", help="train a model using Estimator", action="store_true") -parser.add_argument("--inference_mode", help="type of inferencing (none|signature|direct)", choices=["none","signature","direct"], default="none") -parser.add_argument("--inference_output", help="output of inferencing (predictions|features)", choices=["predictions","features"], default="predictions") +parser.add_argument("--inference_mode", help="type of inferencing (none|signature|direct)", choices=["none", "signature", "direct"], default="none") +parser.add_argument("--inference_output", help="output of inferencing (predictions|features)", choices=["predictions", "features"], default="predictions") args = parser.parse_args() -print("args:",args) +print("args:", args) print("{0} ===== Start".format(datetime.now().isoformat())) @@ -77,27 +75,27 @@ # train a model using Spark Estimator fitted to a DataFrame print("{0} ===== Estimator.fit()".format(datetime.now().isoformat())) # dummy tf args (from imagenet/inception example) - tf_args = { 'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94 } + tf_args = {'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94} estimator = TFEstimator(mnist_dist_pipeline.map_fun, args, export_fn=mnist_dist_pipeline.export_fun) \ - .setModelDir(args.model_dir) \ - .setExportDir(args.export_dir) \ - .setClusterSize(args.cluster_size) \ - .setNumPS(args.num_ps) \ - .setDriverPSNodes(args.driver_ps_nodes) \ - .setInputMode(TFCluster.InputMode.TENSORFLOW) \ - .setTFRecordDir(args.tfrecord_dir) \ - .setProtocol(args.protocol) \ - .setReaders(args.readers) \ - .setTensorboard(args.tensorboard) \ - .setEpochs(args.epochs) \ - .setBatchSize(args.batch_size) \ - .setSteps(args.steps) + .setModelDir(args.model_dir) \ + .setExportDir(args.export_dir) \ + .setClusterSize(args.cluster_size) \ + .setNumPS(args.num_ps) \ + .setDriverPSNodes(args.driver_ps_nodes) \ + .setInputMode(TFCluster.InputMode.TENSORFLOW) \ + .setTFRecordDir(args.tfrecord_dir) \ + .setProtocol(args.protocol) \ + .setReaders(args.readers) \ + .setTensorboard(args.tensorboard) \ + .setEpochs(args.epochs) \ + .setBatchSize(args.batch_size) \ + .setSteps(args.steps) model = estimator.fit(df) else: # use a previously trained/exported model model = TFModel(args) \ - .setExportDir(args.export_dir) \ - .setBatchSize(args.batch_size) + .setExportDir(args.export_dir) \ + .setBatchSize(args.batch_size) # NO INFERENCING if args.inference_mode == 'none': @@ -108,20 +106,20 @@ model.setModelDir(None) model.setExportDir(args.export_dir) # load saved_model from args.export_dir model.setTagSet(tf.saved_model.tag_constants.SERVING) # using default SERVING tagset - model.setInputMapping({'image':'image'}) # map DataFrame 'image' column to the 'image' input tensor alias of signature + model.setInputMapping({'image': 'image'}) # map DataFrame 'image' column to the 'image' input tensor alias of signature if args.inference_output == 'predictions': model.setSignatureDefKey(tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY) # default signature def key, i.e. 'predict' - model.setOutputMapping({'prediction':'col_out'}) # map 'prediction' output tensor alias to output DataFrame 'col_out' column + model.setOutputMapping({'prediction': 'col_out'}) # map 'prediction' output tensor alias to output DataFrame 'col_out' column else: # args.inference_output == 'features' model.setSignatureDefKey('featurize') # custom signature def key - model.setOutputMapping({'features':'col_out'}) # map 'features' output tensor alias to output DataFrame 'col_out' column + model.setOutputMapping({'features': 'col_out'}) # map 'features' output tensor alias to output DataFrame 'col_out' column # INFER USING TENSORFLOW SAVED_MODEL, IGNORING EXPORTED SIGNATURES else: # args.inference_mode == 'direct': model.setModelDir(None) model.setExportDir(args.export_dir) # load saved_model from args.export_dir model.setTagSet(tf.saved_model.tag_constants.SERVING) # using default SERVING tagset - model.setInputMapping({'image':'x'}) # map DataFrame 'image' column to the 'x' input tensor + model.setInputMapping({'image': 'x'}) # map DataFrame 'image' column to the 'x' input tensor if args.inference_output == 'predictions': model.setOutputMapping({'prediction': 'col_out'}) # map 'prediction' output tensor to output DataFrame 'col_out' column else: # args.inference_output == 'features' @@ -132,4 +130,3 @@ preds.write.json(args.output) print("{0} ===== Stop".format(datetime.now().isoformat())) -