Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 19 additions & 22 deletions examples/mnist/keras/mnist_mlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from __future__ import print_function


def main_fun(args, ctx):
import numpy
import os
Expand All @@ -16,12 +17,9 @@ def main_fun(args, ctx):
from tensorflow.contrib.keras.api.keras.layers import Dense, Dropout
from tensorflow.contrib.keras.api.keras.optimizers import RMSprop
from tensorflow.contrib.keras.python.keras.callbacks import LambdaCallback, TensorBoard

from tensorflow.python.saved_model import builder as saved_model_builder
from tensorflow.python.saved_model import tag_constants
from tensorflow.python.saved_model.signature_def_utils_impl import predict_signature_def


from tensorflowonspark import TFNode

cluster, server = TFNode.start_cluster_server(ctx)
Expand All @@ -44,8 +42,8 @@ def generate_rdd_data(tf_feed, batch_size):
yield (images, labels)

with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % ctx.task_index,
cluster=cluster)):
worker_device="/job:worker/task:%d" % ctx.task_index,
cluster=cluster)):

IMAGE_PIXELS = 28
batch_size = 100
Expand Down Expand Up @@ -98,21 +96,20 @@ def save_checkpoint(epoch, logs=None):

if args.input_mode == 'tf':
# train & validate on in-memory data
history = model.fit(x_train, y_train,
batch_size=batch_size,
epochs=args.epochs,
verbose=1,
validation_data=(x_test, y_test),
callbacks=callbacks)
model.fit(x_train, y_train,
batch_size=batch_size,
epochs=args.epochs,
verbose=1,
validation_data=(x_test, y_test),
callbacks=callbacks)
else: # args.input_mode == 'spark':
# train on data read from a generator which is producing data from a Spark RDD
tf_feed = TFNode.DataFeed(ctx.mgr)
history = model.fit_generator(
generator=generate_rdd_data(tf_feed, batch_size),
steps_per_epoch=args.steps_per_epoch,
epochs=args.epochs,
verbose=1,
callbacks=callbacks)
model.fit_generator(generator=generate_rdd_data(tf_feed, batch_size),
steps_per_epoch=args.steps_per_epoch,
epochs=args.epochs,
verbose=1,
callbacks=callbacks)

if args.export_dir and ctx.job_name == 'worker' and ctx.task_index == 0:
# save a local Keras model, so we can reload it with an inferencing learning_phase
Expand All @@ -125,11 +122,11 @@ def save_checkpoint(epoch, logs=None):
# export a saved_model for inferencing
builder = saved_model_builder.SavedModelBuilder(args.export_dir)
signature = predict_signature_def(inputs={'images': new_model.input},
outputs={'scores': new_model.output})
outputs={'scores': new_model.output})
builder.add_meta_graph_and_variables(sess=sess,
tags=[tag_constants.SERVING],
signature_def_map={'predict': signature},
clear_devices=True)
tags=[tag_constants.SERVING],
signature_def_map={'predict': signature},
clear_devices=True)
builder.save()

if args.input_mode == 'spark':
Expand Down Expand Up @@ -160,7 +157,7 @@ def save_checkpoint(epoch, logs=None):
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")

args = parser.parse_args()
print("args:",args)
print("args:", args)

if args.input_mode == 'tf':
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW, log_dir=args.model_dir)
Expand Down
43 changes: 23 additions & 20 deletions examples/mnist/spark/mnist_dist.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Copyright 2018 Yahoo Inc.
# Copyright 2018 Yahoo Inc.
# Licensed under the terms of the Apache 2.0 license.
# Please see LICENSE file in the project root for terms.

Expand All @@ -9,9 +9,11 @@
from __future__ import nested_scopes
from __future__ import print_function


def print_log(worker_num, arg):
print("{0}: {1}".format(worker_num, arg))


def map_fun(args, ctx):
from datetime import datetime
import math
Expand All @@ -30,7 +32,7 @@ def map_fun(args, ctx):
# Parameters
IMAGE_PIXELS = 28
hidden_units = 128
batch_size = args.batch_size
batch_size = args.batch_size

# Get TF cluster and server instances
cluster, server = ctx.start_cluster_server(1, args.rdma)
Expand All @@ -55,28 +57,28 @@ def feed_dict(batch):

# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):

# Placeholders or QueueRunner/Readers for input data
# Placeholders or QueueRunner/Readers for input data
with tf.name_scope('inputs'):
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS] , name="x")
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS], name="x")
y_ = tf.placeholder(tf.float32, [None, 10], name="y_")

x_img = tf.reshape(x, [-1, IMAGE_PIXELS, IMAGE_PIXELS, 1])
tf.summary.image("x_img", x_img)

with tf.name_scope('layer'):
# Variables of the hidden layer
with tf.name_scope('hidden_layer'):
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], stddev=1.0 / IMAGE_PIXELS), name="hid_w")
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], stddev=1.0 / IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b")
tf.summary.histogram("hidden_weights", hid_w)
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)

# Variables of the softmax layer
with tf.name_scope('softmax_layer'):
with tf.name_scope('softmax_layer'):
sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10], stddev=1.0 / math.sqrt(hidden_units)), name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
tf.summary.histogram("softmax_weights", sm_w)
Expand All @@ -93,7 +95,7 @@ def feed_dict(batch):

# Test trained model
label = tf.argmax(y_, 1, name="label")
prediction = tf.argmax(y, 1,name="prediction")
prediction = tf.argmax(y, 1, name="prediction")
correct_prediction = tf.equal(prediction, label)

accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
Expand All @@ -102,10 +104,9 @@ def feed_dict(batch):
summary_op = tf.summary.merge_all()

logdir = ctx.absolute_path(args.model)
# logdir = args.model
print("tensorflow model path: {0}".format(logdir))
hooks = [tf.train.StopAtStepHook(last_step=100000)]

if job_name == "worker" and task_index == 0:
summary_writer = tf.summary.FileWriter(logdir, graph=tf.get_default_graph())

Expand All @@ -119,9 +120,9 @@ def feed_dict(batch):
step = 0
tf_feed = ctx.get_data_feed(args.mode == "train")
while not mon_sess.should_stop() and not tf_feed.should_stop() and step < args.steps:
# Run a training step asynchronously
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.
# Run a training step asynchronously
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.

# using feed_dict
batch_xs, batch_ys = feed_dict(tf_feed.next_batch(batch_size))
Expand All @@ -132,14 +133,14 @@ def feed_dict(batch):
_, summary, step = mon_sess.run([train_op, summary_op, global_step], feed_dict=feed)
# print accuracy and save model checkpoint to HDFS every 100 steps
if (step % 100 == 0):
print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, mon_sess.run(accuracy,{x: batch_xs, y_: batch_ys})))
print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, mon_sess.run(accuracy, {x: batch_xs, y_: batch_ys})))

if task_index == 0:
summary_writer.add_summary(summary, step)
else: # args.mode == "inference"
labels, preds, acc = mon_sess.run([label, prediction, accuracy], feed_dict=feed)

results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l,p in zip(labels,preds)]
results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l, p in zip(labels, preds)]
tf_feed.batch_results(results)
print("results: {0}, acc: {1}".format(results, acc))

Expand All @@ -148,4 +149,6 @@ def feed_dict(batch):

# Ask for all the services to stop.
print("{0} stopping MonitoredTrainingSession".format(datetime.now().isoformat()))
summary_writer.close()

if job_name == "worker" and task_index == 0:
summary_writer.close()
15 changes: 8 additions & 7 deletions examples/mnist/spark/mnist_dist_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
from __future__ import nested_scopes
from __future__ import print_function


def print_log(worker_num, arg):
print("{0}: {1}".format(worker_num, arg))


def map_fun(args, ctx):
from tensorflowonspark import TFNode
from datetime import datetime
Expand Down Expand Up @@ -48,8 +50,8 @@ def rdd_generator():

# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):

# Dataset for input data
ds = tf.data.Dataset.from_generator(rdd_generator, (tf.float32, tf.float32), (tf.TensorShape([IMAGE_PIXELS * IMAGE_PIXELS]), tf.TensorShape([10]))).batch(args.batch_size)
Expand All @@ -58,13 +60,13 @@ def rdd_generator():

# Variables of the hidden layer
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units],
stddev=1.0 / IMAGE_PIXELS), name="hid_w")
stddev=1.0 / IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b")
tf.summary.histogram("hidden_weights", hid_w)

# Variables of the softmax layer
sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10],
stddev=1.0 / math.sqrt(hidden_units)), name="sm_w")
stddev=1.0 / math.sqrt(hidden_units)), name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
tf.summary.histogram("softmax_weights", sm_w)

Expand All @@ -90,7 +92,7 @@ def rdd_generator():

# Test trained model
label = tf.argmax(y_, 1, name="label")
prediction = tf.argmax(y, 1,name="prediction")
prediction = tf.argmax(y, 1, name="prediction")
correct_prediction = tf.equal(prediction, label)

accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
Expand Down Expand Up @@ -146,7 +148,7 @@ def rdd_generator():
else: # args.mode == "inference"
labels, preds, acc = sess.run([label, prediction, accuracy])

results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l,p in zip(labels,preds)]
results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l, p in zip(labels, preds)]
tf_feed.batch_results(results)
print("acc: {0}".format(acc))

Expand All @@ -156,4 +158,3 @@ def rdd_generator():
# Ask for all the services to stop.
print("{0} stopping supervisor".format(datetime.now().isoformat()))
sv.stop()

25 changes: 13 additions & 12 deletions examples/mnist/spark/mnist_dist_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
from __future__ import nested_scopes
from __future__ import print_function


def print_log(worker_num, arg):
print("{0}: {1}".format(worker_num, arg))


def map_fun(args, ctx):
from tensorflowonspark import TFNode
from datetime import datetime
Expand All @@ -32,7 +34,7 @@ def map_fun(args, ctx):

# Parameters
hidden_units = 128
batch_size = args.batch_size
batch_size = args.batch_size

# Get TF cluster and server instances
cluster, server = TFNode.start_cluster_server(ctx, 1, args.protocol == 'rdma')
Expand All @@ -54,18 +56,18 @@ def feed_dict(batch):

# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):

# Variables of the hidden layer
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units],
stddev=1.0 / IMAGE_PIXELS), name="hid_w")
stddev=1.0 / IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b")
tf.summary.histogram("hidden_weights", hid_w)

# Variables of the softmax layer
sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10],
stddev=1.0 / math.sqrt(hidden_units)), name="sm_w")
stddev=1.0 / math.sqrt(hidden_units)), name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
tf.summary.histogram("softmax_weights", sm_w)

Expand All @@ -91,7 +93,7 @@ def feed_dict(batch):

# Test trained model
label = tf.argmax(y_, 1, name="label")
prediction = tf.argmax(y, 1,name="prediction")
prediction = tf.argmax(y, 1, name="prediction")
correct_prediction = tf.equal(prediction, label)

accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
Expand Down Expand Up @@ -122,7 +124,6 @@ def feed_dict(batch):

# Loop until the supervisor shuts down or 1000000 steps have completed.
step = 0
#tf_feed = TFNode.DataFeed(ctx.mgr)
tf_feed = TFNode.DataFeed(ctx.mgr, input_mapping=args.input_mapping)
while not sv.should_stop() and not tf_feed.should_stop() and step < args.steps:
# Run a training step asynchronously.
Expand All @@ -137,7 +138,7 @@ def feed_dict(batch):
_, summary, step = sess.run([train_op, summary_op, global_step], feed_dict=feed)
# print accuracy and save model checkpoint to HDFS every 100 steps
if (step % 100 == 0):
print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy,{x: batch_xs, y_: batch_ys})))
print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy, {x: batch_xs, y_: batch_ys})))

if sv.is_chief:
summary_writer.add_summary(summary, step)
Expand All @@ -150,13 +151,13 @@ def feed_dict(batch):
# exported signatures defined in code
signatures = {
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: {
'inputs': { 'image': x },
'outputs': { 'prediction': prediction },
'inputs': {'image': x},
'outputs': {'prediction': prediction},
'method_name': tf.saved_model.signature_constants.PREDICT_METHOD_NAME
},
'featurize': {
'inputs': { 'image': x },
'outputs': { 'features': hid },
'inputs': {'image': x},
'outputs': {'features': hid},
'method_name': 'featurize'
}
}
Expand Down
Loading