Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/mnist/keras/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ In this mode, Spark will distribute the MNIST dataset (as CSV) across the worker
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/keras/mnist_mlp.py \
--cluster_size 3 \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--input_mode spark \
--images ${TFoS_HOME}/mnist/csv/train/images \
--labels ${TFoS_HOME}/mnist/csv/train/labels \
Expand Down
8 changes: 6 additions & 2 deletions examples/mnist/keras/mnist_mlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def main_fun(args, ctx):
import tensorflow as tf
from tensorflow.python import keras
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.datasets import mnist
from tensorflow.python.keras.models import Sequential, load_model, save_model
from tensorflow.python.keras.layers import Dense, Dropout
from tensorflow.python.keras.optimizers import RMSprop
Expand Down Expand Up @@ -51,7 +52,6 @@ def generate_rdd_data(tf_feed, batch_size):

# the data, shuffled and split between train and test sets
if args.input_mode == 'tf':
from tensorflow.python.keras.datasets import mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.reshape(60000, 784)
x_test = x_test.reshape(10000, 784)
Expand All @@ -64,6 +64,9 @@ def generate_rdd_data(tf_feed, batch_size):
else: # args.mode == 'spark'
x_train = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS], name="x_train")
y_train = tf.placeholder(tf.float32, [None, 10], name="y_train")
(_, _), (x_test, y_test) = mnist.load_data()
x_test = x_test.reshape(10000, 784)
y_test = keras.utils.to_categorical(y_test, num_classes)

model = Sequential()
model.add(Dense(512, activation='relu', input_shape=(784,)))
Expand Down Expand Up @@ -109,6 +112,7 @@ def save_checkpoint(epoch, logs=None):
steps_per_epoch=args.steps_per_epoch,
epochs=args.epochs,
verbose=1,
validation_data=(x_test, y_test),
callbacks=callbacks)

if args.export_dir and ctx.job_name == 'worker' and ctx.task_index == 0:
Expand Down Expand Up @@ -147,7 +151,7 @@ def save_checkpoint(epoch, logs=None):
parser = argparse.ArgumentParser()
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("--epochs", help="number of epochs of training data", type=int, default=20)
parser.add_argument("--export_dir", help="directory to export saved_mode")
parser.add_argument("--export_dir", help="directory to export saved_model")
parser.add_argument("--images", help="HDFS path to MNIST images in parallelized CSV format")
parser.add_argument("--input_mode", help="input mode (tf|spark)", default="tf")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized CSV format")
Expand Down
24 changes: 6 additions & 18 deletions examples/mnist/keras/mnist_mlp_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from tensorflow.python import keras
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import Dense, Dropout
from tensorflow.python.keras.optimizers import RMSprop
from tensorflowonspark import TFNode


Expand Down Expand Up @@ -31,7 +30,7 @@ def main_fun(args, ctx):
model.add(Dropout(0.2))
model.add(Dense(10, activation='softmax'))
model.compile(loss='categorical_crossentropy',
optimizer=RMSprop(),
optimizer=tf.train.RMSPropOptimizer(learning_rate=0.001),
metrics=['accuracy'])
model.summary()

Expand All @@ -43,7 +42,7 @@ def main_fun(args, ctx):
if args.input_mode == 'tf':
# For InputMode.TENSORFLOW, just use data in memory
train_input_fn = tf.estimator.inputs.numpy_input_fn(
x={"dense_1_input": x_train},
x={"dense_input": x_train},
y=y_train,
batch_size=128,
num_epochs=None,
Expand All @@ -70,28 +69,17 @@ def train_input_fn():

# eval_input_fn ALWAYS uses data loaded in memory, since InputMode.SPARK can only feed one RDD at a time
eval_input_fn = tf.estimator.inputs.numpy_input_fn(
x={"dense_1_input": x_test},
x={"dense_input": x_test},
y=y_test,
num_epochs=args.epochs,
shuffle=False)

# serving_input_receiver_fn ALWAYS expects serialized TFExamples in a placeholder.
def serving_input_receiver_fn():
"""An input receiver that expects a serialized tf.Example."""
serialized_tf_example = tf.placeholder(dtype=tf.string,
shape=[args.batch_size],
name='input_example_tensor')
receiver_tensors = {'dense_1_input': serialized_tf_example}
feature_spec = {'dense_1_input': tf.FixedLenFeature(784, tf.string)}
features = tf.parse_example(serialized_tf_example, feature_spec)
return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)

# setup tf.estimator.train_and_evaluate() w/ FinalExporter
exporter = tf.estimator.FinalExporter("serving", serving_input_receiver_fn=serving_input_receiver_fn)
feature_spec = {'dense_input': tf.FixedLenFeature(784, tf.float32)}
exporter = tf.estimator.FinalExporter("serving", serving_input_receiver_fn=tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec))
train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=args.steps)
eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn, exporters=exporter)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

else: # mode == 'inference'
if args.input_mode == 'spark':
tf_feed = TFNode.DataFeed(ctx.mgr)
Expand Down Expand Up @@ -137,7 +125,7 @@ def predict_input_fn():
parser.add_argument("--input_mode", help="input mode (tf|spark)", default="tf")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized CSV format")
parser.add_argument("--model_dir", help="directory to write model checkpoints")
parser.add_argument("--mode", help="(train|inference")
parser.add_argument("--mode", help="(train|inference)", default="train")
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("--num_ps", help="number of ps nodes", type=int, default=1)
parser.add_argument("--steps", help="max number of steps to train", type=int, default=2000)
Expand Down
203 changes: 85 additions & 118 deletions examples/mnist/spark/mnist_dist.py
Original file line number Diff line number Diff line change
@@ -1,178 +1,145 @@
# Copyright 2018 Yahoo Inc.
# Copyright 2017 Yahoo Inc.
# Licensed under the terms of the Apache 2.0 license.
# Please see LICENSE file in the project root for terms.

# Distributed MNIST on grid based on TensorFlow MNIST example

from __future__ import absolute_import
from __future__ import division
from __future__ import nested_scopes
from __future__ import print_function

from datetime import datetime
import tensorflow as tf
from tensorflowonspark import TFNode


def print_log(worker_num, arg):
print("{0}: {1}".format(worker_num, arg))


class ExportHook(tf.train.SessionRunHook):
def __init__(self, export_dir, input_tensor, output_tensor):
self.export_dir = export_dir
self.input_tensor = input_tensor
self.output_tensor = output_tensor

def end(self, session):
print("{} ======= Exporting to: {}".format(datetime.now().isoformat(), self.export_dir))
signatures = {
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: {
'inputs': {'image': self.input_tensor},
'outputs': {'prediction': self.output_tensor},
'method_name': tf.saved_model.signature_constants.PREDICT_METHOD_NAME
}
}
TFNode.export_saved_model(session,
self.export_dir,
tf.saved_model.tag_constants.SERVING,
signatures)
print("{} ======= Done exporting".format(datetime.now().isoformat()))


def map_fun(args, ctx):
from datetime import datetime
import math
import numpy
import tensorflow as tf
import time

worker_num = ctx.worker_num
job_name = ctx.job_name
task_index = ctx.task_index

# Delay PS nodes a bit, since workers seem to reserve GPUs more quickly/reliably (w/o conflict)
if job_name == "ps":
time.sleep((worker_num + 1) * 5)

# Parameters
IMAGE_PIXELS = 28
hidden_units = 128
batch_size = args.batch_size

# Get TF cluster and server instances
cluster, server = ctx.start_cluster_server(1, args.rdma)

def feed_dict(batch):
# Convert from [(images, labels)] to two numpy arrays of the proper type
images = []
labels = []
for item in batch:
images.append(item[0])
labels.append(item[1])
xs = numpy.array(images)
xs = xs.astype(numpy.float32)
xs = xs / 255.0
ys = numpy.array(labels)
ys = ys.astype(numpy.uint8)
return (xs, ys)
# Create generator for Spark data feed
tf_feed = ctx.get_data_feed(args.mode == 'train')

def rdd_generator():
while not tf_feed.should_stop():
batch = tf_feed.next_batch(1)
if len(batch) == 0:
return
row = batch[0]
image = numpy.array(row[0]).astype(numpy.float32) / 255.0
label = numpy.array(row[1]).astype(numpy.int64)
yield (image, label)

if job_name == "ps":
server.join()
elif job_name == "worker":

# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):

# Placeholders or QueueRunner/Readers for input data
with tf.name_scope('inputs'):
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS], name="x")
y_ = tf.placeholder(tf.float32, [None, 10], name="y_")

x_img = tf.reshape(x, [-1, IMAGE_PIXELS, IMAGE_PIXELS, 1])
tf.summary.image("x_img", x_img)

with tf.name_scope('layer'):
# Variables of the hidden layer
with tf.name_scope('hidden_layer'):
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units], stddev=1.0 / IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b")
tf.summary.histogram("hidden_weights", hid_w)
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)

# Variables of the softmax layer
with tf.name_scope('softmax_layer'):
sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10], stddev=1.0 / math.sqrt(hidden_units)), name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
tf.summary.histogram("softmax_weights", sm_w)
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
# Dataset for input data
ds = tf.data.Dataset.from_generator(rdd_generator, (tf.float32, tf.float32), (tf.TensorShape([IMAGE_PIXELS * IMAGE_PIXELS]), tf.TensorShape([10]))).batch(args.batch_size)
iterator = ds.make_one_shot_iterator()
x, y_ = iterator.get_next()

global_step = tf.train.get_or_create_global_step()
# Variables of the hidden layer
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units],
stddev=1.0 / IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b")
tf.summary.histogram("hidden_weights", hid_w)

# Variables of the softmax layer
sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10],
stddev=1.0 / math.sqrt(hidden_units)), name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
tf.summary.histogram("softmax_weights", sm_w)

x_img = tf.reshape(x, [-1, IMAGE_PIXELS, IMAGE_PIXELS, 1])
tf.summary.image("x_img", x_img)

hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)

with tf.name_scope('loss'):
loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
tf.summary.scalar("loss", loss)
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))

with tf.name_scope('train'):
train_op = tf.train.AdagradOptimizer(0.01).minimize(loss, global_step=global_step)
global_step = tf.train.get_or_create_global_step()

loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
tf.summary.scalar("loss", loss)
train_op = tf.train.AdagradOptimizer(0.01).minimize(
loss, global_step=global_step)

# Test trained model
label = tf.argmax(y_, 1, name="label")
prediction = tf.argmax(y, 1, name="prediction")
correct_prediction = tf.equal(prediction, label)

accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
tf.summary.scalar("acc", accuracy)

saver = tf.train.Saver()
summary_op = tf.summary.merge_all()
init_op = tf.global_variables_initializer()

# Create a "supervisor", which oversees the training process and stores model state into HDFS
logdir = ctx.absolute_path(args.model)
print("tensorflow model path: {0}".format(logdir))
summary_writer = tf.summary.FileWriter("tensorboard_%d" % worker_num, graph=tf.get_default_graph())

if job_name == "worker" and task_index == 0:
summary_writer = tf.summary.FileWriter(logdir, graph=tf.get_default_graph())

# The MonitoredTrainingSession takes care of session initialization, restoring from
# a checkpoint, and closing when done or an error occurs
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(task_index == 0),
checkpoint_dir=logdir,
save_checkpoint_secs=10,
hooks=[tf.train.StopAtStepHook(last_step=args.steps)],
chief_only_hooks=[ExportHook(ctx.absolute_path(args.export_dir), x, prediction)]) as mon_sess:
is_chief=(task_index == 0),
scaffold=tf.train.Scaffold(init_op=init_op, summary_op=summary_op, saver=saver),
checkpoint_dir=logdir,
hooks=[tf.train.StopAtStepHook(last_step=args.steps)]) as sess:
print("{} session ready".format(datetime.now().isoformat()))

# Loop until the session shuts down or feed has no more data
step = 0
tf_feed = ctx.get_data_feed(args.mode == "train")
while not mon_sess.should_stop() and not tf_feed.should_stop():
# Run a training step asynchronously
while not sess.should_stop() and not tf_feed.should_stop():
# Run a training step asynchronously.
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.

# using feed_dict
batch_xs, batch_ys = feed_dict(tf_feed.next_batch(batch_size))
feed = {x: batch_xs, y_: batch_ys}

if len(batch_xs) > 0:
if args.mode == "train":
_, summary, step = mon_sess.run([train_op, summary_op, global_step], feed_dict=feed)
# print accuracy and save model checkpoint to HDFS every 100 steps
if (step % 100 == 0):
print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, mon_sess.run(accuracy, {x: batch_xs, y_: batch_ys})))

if task_index == 0:
summary_writer.add_summary(summary, step)
else: # args.mode == "inference"
labels, preds, acc = mon_sess.run([label, prediction, accuracy], feed_dict=feed)

results = ["{0} Label: {1}, Prediction: {2}".format(datetime.now().isoformat(), l, p) for l, p in zip(labels, preds)]
tf_feed.batch_results(results)
print("results: {0}, acc: {1}".format(results, acc))

if mon_sess.should_stop() or step >= args.steps:
tf_feed.terminate()

# Ask for all the services to stop.
print("{0} stopping MonitoredTrainingSession".format(datetime.now().isoformat()))

if job_name == "worker" and task_index == 0:
summary_writer.close()
if args.mode == "train":
_, summary, step = sess.run([train_op, summary_op, global_step])
if (step % 100 == 0):
print("{} step: {} accuracy: {}".format(datetime.now().isoformat(), step, sess.run(accuracy)))
if task_index == 0:
summary_writer.add_summary(summary, step)
else: # args.mode == "inference"
labels, preds, acc = sess.run([label, prediction, accuracy])
results = ["{} Label: {}, Prediction: {}".format(datetime.now().isoformat(), l, p) for l, p in zip(labels, preds)]
tf_feed.batch_results(results)
print("acc: {}".format(acc))

print("{} stopping MonitoredTrainingSession".format(datetime.now().isoformat()))

# WORKAROUND FOR https://github.com/tensorflow/tensorflow/issues/21745
# wait for all other nodes to complete (via done files)
done_dir = "{}/{}/done".format(ctx.absolute_path(args.model), args.mode)
print("Writing done file to: {}".format(done_dir))
tf.gfile.MakeDirs(done_dir)
with tf.gfile.GFile("{}/{}".format(done_dir, ctx.task_index), 'w') as done_file:
done_file.write("done")

for i in range(60):
if len(tf.gfile.ListDirectory(done_dir)) < len(ctx.cluster_spec['worker']):
print("{} Waiting for other nodes {}".format(datetime.now().isoformat(), i))
time.sleep(1)
else:
print("{} All nodes done".format(datetime.now().isoformat()))
break
Loading