Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions examples/mnist/mnist_data_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,39 @@

import numpy
import tensorflow as tf
from array import array
from tensorflow.contrib.learn.python.learn.datasets import mnist


def toTFExample(image, label):
"""Serializes an image/label as a TFExample byte string"""
example = tf.train.Example(
features = tf.train.Features(
feature = {
features=tf.train.Features(
feature={
'label': tf.train.Feature(int64_list=tf.train.Int64List(value=label.astype("int64"))),
'image': tf.train.Feature(int64_list=tf.train.Int64List(value=image.astype("int64")))
}
)
)
return example.SerializeToString()


def fromTFExample(bytestr):
"""Deserializes a TFExample from a byte string"""
example = tf.train.Example()
example.ParseFromString(bytestr)
return example


def toCSV(vec):
"""Converts a vector/array into a CSV string"""
return ','.join([str(i) for i in vec])


def fromCSV(s):
"""Converts a CSV string to a vector/array"""
return [float(x) for x in s.split(',') if len(s) > 0]


def writeMNIST(sc, input_images, input_labels, output, format, num_partitions):
"""Writes MNIST image/label vectors into parallelized files on HDFS"""
# load MNIST gzip into memory
Expand Down Expand Up @@ -69,12 +73,12 @@ def writeMNIST(sc, input_images, input_labels, output, format, num_partitions):
labelRDD.map(toCSV).saveAsTextFile(output_labels)
elif format == "csv2":
imageRDD.map(toCSV).zip(labelRDD).map(lambda x: str(x[1]) + "|" + x[0]).saveAsTextFile(output)
else: # format == "tfr":
else: # format == "tfr":
tfRDD = imageRDD.zip(labelRDD).map(lambda x: (bytearray(toTFExample(x[0], x[1])), None))
# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar
tfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
# Note: this creates TFRecord files w/o requiring a custom Input/Output format
# else: # format == "tfr":
# def writeTFRecords(index, iter):
Expand All @@ -86,6 +90,7 @@ def writeMNIST(sc, input_images, input_labels, output, format, num_partitions):
# tfRDD = imageRDD.zip(labelRDD).map(lambda x: toTFExample(x[0], x[1]))
# tfRDD.mapPartitionsWithIndex(writeTFRecords).collect()


def readMNIST(sc, output, format):
"""Reads/verifies previously created output"""

Expand All @@ -100,12 +105,12 @@ def readMNIST(sc, output, format):
elif format == "csv":
imageRDD = sc.textFile(output_images).map(fromCSV)
labelRDD = sc.textFile(output_labels).map(fromCSV)
else: # format.startswith("tf"):
else: # format.startswith("tf"):
# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar
tfRDD = sc.newAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
imageRDD = tfRDD.map(lambda x: fromTFExample(str(x[0])))
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
imageRDD = tfRDD.map(lambda x: fromTFExample(bytes(x[0])))

num_images = imageRDD.count()
num_labels = labelRDD.count() if labelRDD is not None else num_images
Expand All @@ -114,21 +119,22 @@ def readMNIST(sc, output, format):
print("num_labels: ", num_labels)
print("samples: ", samples)


if __name__ == "__main__":
import argparse

from pyspark.context import SparkContext
from pyspark.conf import SparkConf

parser = argparse.ArgumentParser()
parser.add_argument("-f", "--format", help="output format", choices=["csv","csv2","pickle","tf","tfr"], default="csv")
parser.add_argument("-n", "--num-partitions", help="Number of output partitions", type=int, default=10)
parser.add_argument("-o", "--output", help="HDFS directory to save examples in parallelized format", default="mnist_data")
parser.add_argument("-r", "--read", help="read previously saved examples", action="store_true")
parser.add_argument("-v", "--verify", help="verify saved examples after writing", action="store_true")
parser.add_argument("--format", help="output format", choices=["csv", "csv2", "pickle", "tf", "tfr"], default="csv")
parser.add_argument("--num-partitions", help="Number of output partitions", type=int, default=10)
parser.add_argument("--output", help="HDFS directory to save examples in parallelized format", default="mnist_data")
parser.add_argument("--read", help="read previously saved examples", action="store_true")
parser.add_argument("--verify", help="verify saved examples after writing", action="store_true")

args = parser.parse_args()
print("args:",args)
print("args:", args)

sc = SparkContext(conf=SparkConf().setAppName("mnist_parallelize"))

Expand All @@ -139,4 +145,3 @@ def readMNIST(sc, output, format):

if args.read or args.verify:
readMNIST(sc, args.output + "/train", args.format)

2 changes: 1 addition & 1 deletion examples/mnist/spark/mnist_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def toNumpy(bytestr):
label = numpy.array(features['label'].int64_list.value)
return (image, label)

dataRDD = images.map(lambda x: toNumpy(str(x[0])))
dataRDD = images.map(lambda x: toNumpy(bytes(x[0])))
else:
if args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
Expand Down
28 changes: 14 additions & 14 deletions examples/mnist/spark/mnist_spark_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@
num_ps = 1

parser = argparse.ArgumentParser()
parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("-f", "--format", help="example format: (csv|tfr)", choices=["csv", "tfr"], default="csv")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
parser.add_argument("--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("--format", help="example format: (csv|tfr)", choices=["csv", "tfr"], default="csv")
parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("--model", help="HDFS path to save/load model during train/inference", default="mnist_model")
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("--mode", help="train|inference", default="train")
parser.add_argument("--rdma", help="use rdma connection", default=False)
args = parser.parse_args()
print("args:", args)

Expand All @@ -54,7 +54,7 @@ def toNumpy(bytestr):
label = numpy.array(features['label'].int64_list.value)
return (image, label)

dataRDD = images.map(lambda x: toNumpy(str(x[0])))
dataRDD = images.map(lambda x: toNumpy(bytes(x[0])))
else: # args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
Expand Down
20 changes: 13 additions & 7 deletions examples/mnist/tf/mnist_dist_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def map_fun(args, ctx):
import tensorflow as tf
import time

num_workers = args.cluster_size if args.driver_ps_nodes else args.cluster_size - args.num_ps
worker_num = ctx.worker_num
job_name = ctx.job_name
task_index = ctx.task_index
Expand All @@ -43,10 +44,9 @@ def _parse_csv(ln):
normalized_image = tf.div(image, norm)
label_value = tf.string_to_number(lbl, tf.int32)
label = tf.one_hot(label_value, 10)
return (normalized_image, label, label_value)
return (normalized_image, label)

def _parse_tfr(example_proto):
print("example_proto: {}".format(example_proto))
feature_def = {"label": tf.FixedLenFeature(10, tf.int64),
"image": tf.FixedLenFeature(IMAGE_PIXELS * IMAGE_PIXELS, tf.int64)}
features = tf.parse_single_example(example_proto, feature_def)
Expand All @@ -68,10 +68,17 @@ def _parse_tfr(example_proto):
file_pattern = os.path.join(image_dir, 'part-*')
files = tf.gfile.Glob(file_pattern)

parse_fn = _parse_tfr if args.format == 'tfr' else _parse_csv
ds = tf.data.TextLineDataset(files).map(parse_fn).batch(args.batch_size)
if args.format == 'csv2':
ds = tf.data.TextLineDataset(files)
parse_fn = _parse_csv
else: # args.format == 'tfr'
ds = tf.data.TFRecordDataset(files)
parse_fn = _parse_tfr

ds = ds.shard(num_workers, task_index).repeat(args.epochs).shuffle(args.shuffle_size)
ds = ds.map(parse_fn).batch(args.batch_size)
iterator = ds.make_initializable_iterator()
x, y_, y_val = iterator.get_next()
x, y_ = iterator.get_next()

# Variables of the hidden layer
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units],
Expand Down Expand Up @@ -156,8 +163,7 @@ def _parse_tfr(example_proto):
if args.mode == "train":
if (step % 100 == 0):
print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy)))
_, summary, step, yv = sess.run([train_op, summary_op, global_step, y_val])
# print("yval: {}".format(yv))
_, summary, step = sess.run([train_op, summary_op, global_step])
if sv.is_chief:
summary_writer.add_summary(summary, step)
else: # args.mode == "inference"
Expand Down
26 changes: 13 additions & 13 deletions examples/mnist/tf/mnist_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@
num_ps = 1

parser = argparse.ArgumentParser()
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=0)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv", "pickle", "tfr"], default="tfr")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/test", default="mnist_model")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster (for Spark Standalone)", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
parser.add_argument("-p", "--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False)
parser.add_argument("--epochs", help="number of epochs", type=int, default=0)
parser.add_argument("--format", help="example format: (csv|pickle|tfr)", choices=["csv", "pickle", "tfr"], default="tfr")
parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("--model", help="HDFS path to save/load model during train/test", default="mnist_model")
parser.add_argument("--cluster_size", help="number of nodes in the cluster (for Spark Standalone)", type=int, default=num_executors)
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("--mode", help="train|inference", default="train")
parser.add_argument("--rdma", help="use rdma connection", default=False)
parser.add_argument("--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False)
args = parser.parse_args()
print("args:", args)

Expand Down
32 changes: 17 additions & 15 deletions examples/mnist/tf/mnist_spark_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,29 @@
num_ps = 1

parser = argparse.ArgumentParser()
parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=0)
parser.add_argument("-f", "--format", help="example format: (csv2|tfr)", choices=["csv2", "tfr"], default="tfr")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/test", default="mnist_model")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster (for Spark Standalone)", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
parser.add_argument("-p", "--driver_ps_nodes", help="""run tensorflow PS node on driver locally.
parser.add_argument("--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("--cluster_size", help="number of nodes in the cluster (for Spark Standalone)", type=int, default=num_executors)
parser.add_argument("--driver_ps_nodes", help="""run tensorflow PS node on driver locally.
You will need to set cluster_size = num_executors + num_ps""", default=False)
parser.add_argument("--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("--format", help="example format: (csv2|tfr)", choices=["csv2", "tfr"], default="tfr")
parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("--mode", help="train|inference", default="train")
parser.add_argument("--model", help="HDFS path to save/load model during train/test", default="mnist_model")
parser.add_argument("--num_ps", help="number of ps nodes", default=1)
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("--rdma", help="use rdma connection", default=False)
parser.add_argument("--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("--shuffle_size", help="size of shuffle buffer", type=int, default=1000)
parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)


print("{0} ===== Start".format(datetime.now().isoformat()))
cluster = TFCluster.run(sc, mnist_dist_dataset.map_fun, args, args.cluster_size, num_ps, args.tensorboard,
cluster = TFCluster.run(sc, mnist_dist_dataset.map_fun, args, args.cluster_size, args.num_ps, args.tensorboard,
TFCluster.InputMode.TENSORFLOW, driver_ps_nodes=args.driver_ps_nodes)
cluster.shutdown()

Expand Down