From 625088cf85161f60f3a4916cba0be4924cf6735b Mon Sep 17 00:00:00 2001 From: Yilei Cai Date: Tue, 3 Jul 2018 12:24:30 -0700 Subject: [PATCH 1/2] rm shard() --- examples/mnist/tf/mnist_dist.py | 8 ++++---- examples/mnist/tf/mnist_dist_dataset.py | 7 ++++++- examples/mnist/tf/mnist_dist_pipeline.py | 8 ++++++-- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/examples/mnist/tf/mnist_dist.py b/examples/mnist/tf/mnist_dist.py index bcb31881..5048a483 100644 --- a/examples/mnist/tf/mnist_dist.py +++ b/examples/mnist/tf/mnist_dist.py @@ -41,14 +41,14 @@ def map_fun(args, ctx): def read_csv_examples(image_dir, label_dir, batch_size=100, num_epochs=None, task_index=None, num_workers=None): print_log(worker_num, "num_epochs: {0}".format(num_epochs)) # Setup queue of csv image filenames - tf_record_pattern = os.path.join(image_dir, 'part-*') - images = tf.gfile.Glob(tf_record_pattern) + csv_file_pattern = os.path.join(image_dir, 'part-*') + images = tf.gfile.Glob(csv_file_pattern) print_log(worker_num, "images: {0}".format(images)) image_queue = tf.train.string_input_producer(images, shuffle=False, capacity=1000, num_epochs=num_epochs, name="image_queue") # Setup queue of csv label filenames - tf_record_pattern = os.path.join(label_dir, 'part-*') - labels = tf.gfile.Glob(tf_record_pattern) + csv_file_pattern = os.path.join(label_dir, 'part-*') + labels = tf.gfile.Glob(csv_file_pattern) print_log(worker_num, "labels: {0}".format(labels)) label_queue = tf.train.string_input_producer(labels, shuffle=False, capacity=1000, num_epochs=num_epochs, name="label_queue") diff --git a/examples/mnist/tf/mnist_dist_dataset.py b/examples/mnist/tf/mnist_dist_dataset.py index 3d189971..57aa2954 100644 --- a/examples/mnist/tf/mnist_dist_dataset.py +++ b/examples/mnist/tf/mnist_dist_dataset.py @@ -68,6 +68,11 @@ def _parse_tfr(example_proto): file_pattern = os.path.join(image_dir, 'part-*') files = tf.gfile.Glob(file_pattern) + # Divide the data for each worker + if task_index is not None and num_workers is not None: + num_files = len(files) + files = files[task_index:num_files:num_workers] + if args.format == 'csv2': ds = tf.data.TextLineDataset(files) parse_fn = _parse_csv @@ -75,7 +80,7 @@ def _parse_tfr(example_proto): ds = tf.data.TFRecordDataset(files) parse_fn = _parse_tfr - ds = ds.shard(num_workers, task_index).repeat(args.epochs).shuffle(args.shuffle_size) + ds = ds.repeat(args.epochs).shuffle(args.shuffle_size) ds = ds.map(parse_fn).batch(args.batch_size) iterator = ds.make_initializable_iterator() x, y_ = iterator.get_next() diff --git a/examples/mnist/tf/mnist_dist_pipeline.py b/examples/mnist/tf/mnist_dist_pipeline.py index 64a362f2..0418b4a8 100644 --- a/examples/mnist/tf/mnist_dist_pipeline.py +++ b/examples/mnist/tf/mnist_dist_pipeline.py @@ -59,12 +59,16 @@ def _parse_tfr(example_proto): sm_b = tf.Variable(tf.zeros([10]), name="sm_b") tf.summary.histogram("softmax_weights", sm_w) - # read from saved tf records + # Read from saved tf records images = TFNode.hdfs_path(ctx, args.tfrecord_dir) tf_record_pattern = os.path.join(images, 'part-*') tfr_files = tf.gfile.Glob(tf_record_pattern) + # Divide the data for each worker + if task_index is not None and num_workers is not None: + num_files = len(tfr_files) + tfr_files = tfr_files[task_index:num_files:num_workers] ds = tf.data.TFRecordDataset(tfr_files) - ds = ds.shard(num_workers, task_index).repeat(args.epochs).shuffle(args.shuffle_size) + ds = ds.repeat(args.epochs).shuffle(args.shuffle_size) ds = ds.map(_parse_tfr).batch(args.batch_size) iterator = ds.make_initializable_iterator() x, y_ = iterator.get_next() From ed49d7d7a6576d825bed21330f5877d387f3c898 Mon Sep 17 00:00:00 2001 From: Yilei Cai Date: Thu, 5 Jul 2018 15:24:13 -0700 Subject: [PATCH 2/2] use dataset list_files and interleave as tf doc example --- examples/mnist/tf/mnist_dist_dataset.py | 15 ++++----------- examples/mnist/tf/mnist_dist_pipeline.py | 11 +++-------- examples/mnist/tf/mnist_spark_dataset.py | 2 +- examples/mnist/tf/mnist_spark_pipeline.py | 2 +- 4 files changed, 9 insertions(+), 21 deletions(-) diff --git a/examples/mnist/tf/mnist_dist_dataset.py b/examples/mnist/tf/mnist_dist_dataset.py index 57aa2954..d85ea35d 100644 --- a/examples/mnist/tf/mnist_dist_dataset.py +++ b/examples/mnist/tf/mnist_dist_dataset.py @@ -66,21 +66,15 @@ def _parse_tfr(example_proto): # Dataset for input data image_dir = TFNode.hdfs_path(ctx, args.images_labels) file_pattern = os.path.join(image_dir, 'part-*') - files = tf.gfile.Glob(file_pattern) - - # Divide the data for each worker - if task_index is not None and num_workers is not None: - num_files = len(files) - files = files[task_index:num_files:num_workers] + ds = tf.data.Dataset.list_files(file_pattern) + ds = ds.shard(num_workers, task_index).repeat(args.epochs).shuffle(args.shuffle_size) if args.format == 'csv2': - ds = tf.data.TextLineDataset(files) + ds = ds.interleave(tf.data.TextLineDataset, cycle_length=args.readers, block_length=1) parse_fn = _parse_csv else: # args.format == 'tfr' - ds = tf.data.TFRecordDataset(files) + ds = ds.interleave(tf.data.TFRecordDataset, cycle_length=args.readers, block_length=1) parse_fn = _parse_tfr - - ds = ds.repeat(args.epochs).shuffle(args.shuffle_size) ds = ds.map(parse_fn).batch(args.batch_size) iterator = ds.make_initializable_iterator() x, y_ = iterator.get_next() @@ -164,7 +158,6 @@ def _parse_tfr(example_proto): # See `tf.train.SyncReplicasOptimizer` for additional details on how to # perform *synchronous* training. - # using QueueRunners/Readers if args.mode == "train": if (step % 100 == 0): print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy))) diff --git a/examples/mnist/tf/mnist_dist_pipeline.py b/examples/mnist/tf/mnist_dist_pipeline.py index 0418b4a8..4145f717 100644 --- a/examples/mnist/tf/mnist_dist_pipeline.py +++ b/examples/mnist/tf/mnist_dist_pipeline.py @@ -62,13 +62,9 @@ def _parse_tfr(example_proto): # Read from saved tf records images = TFNode.hdfs_path(ctx, args.tfrecord_dir) tf_record_pattern = os.path.join(images, 'part-*') - tfr_files = tf.gfile.Glob(tf_record_pattern) - # Divide the data for each worker - if task_index is not None and num_workers is not None: - num_files = len(tfr_files) - tfr_files = tfr_files[task_index:num_files:num_workers] - ds = tf.data.TFRecordDataset(tfr_files) - ds = ds.repeat(args.epochs).shuffle(args.shuffle_size) + ds = tf.data.Dataset.list_files(tf_record_pattern) + ds = ds.shard(num_workers, task_index).repeat(args.epochs).shuffle(args.shuffle_size) + ds = ds.interleave(tf.data.TFRecordDataset, cycle_length=args.readers, block_length=1) ds = ds.map(_parse_tfr).batch(args.batch_size) iterator = ds.make_initializable_iterator() x, y_ = iterator.get_next() @@ -126,7 +122,6 @@ def _parse_tfr(example_proto): # See `tf.train.SyncReplicasOptimizer` for additional details on how to # perform *synchronous* training. - # using QueueRunners/Readers if (step % 100 == 0): print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy))) _, summary, step = sess.run([train_op, summary_op, global_step]) diff --git a/examples/mnist/tf/mnist_spark_dataset.py b/examples/mnist/tf/mnist_spark_dataset.py index e738c833..8a03ab15 100644 --- a/examples/mnist/tf/mnist_spark_dataset.py +++ b/examples/mnist/tf/mnist_spark_dataset.py @@ -33,7 +33,7 @@ parser.add_argument("--num_ps", help="number of ps nodes", default=1) parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions") parser.add_argument("--rdma", help="use rdma connection", default=False) -parser.add_argument("--readers", help="number of reader/enqueue threads", type=int, default=1) +parser.add_argument("--readers", help="number of reader/enqueue threads per worker", type=int, default=10) parser.add_argument("--shuffle_size", help="size of shuffle buffer", type=int, default=1000) parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000) parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true") diff --git a/examples/mnist/tf/mnist_spark_pipeline.py b/examples/mnist/tf/mnist_spark_pipeline.py index 1b023e77..92ef417c 100644 --- a/examples/mnist/tf/mnist_spark_pipeline.py +++ b/examples/mnist/tf/mnist_spark_pipeline.py @@ -39,7 +39,7 @@ parser.add_argument("-p", "--driver_ps_nodes", help="""run tensorflow PS node on driver locally. You will need to set cluster_size = num_executors + num_ps""", default=False) parser.add_argument("--protocol", help="Tensorflow network protocol (grpc|rdma)", default="grpc") -parser.add_argument("--readers", help="number of reader/enqueue threads", type=int, default=1) +parser.add_argument("--readers", help="number of reader/enqueue threads per worker", type=int, default=10) parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000) parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true") parser.add_argument("--shuffle_size", help="size of shuffle buffer", type=int, default=1000)