In [1]:
import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()

import os, json

In [2]:
BUFFER_SIZE = 10000
BATCH_SIZE = 64
def input_fn(mode, input_context=None):
    datasets, info = tfds.load(name='mnist', with_info = True, as_supervised = True)
    mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else datasets['test'])
    
    def scale(image, label):
        image = tf.cast(image, tf.float32)
        image /= 255
        return image, label
    
    if input_context:
        mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                            input_context.input_pipeline_id)
    return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

In [3]:
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type':'worker', 'index': 0}
})

In [4]:
LEARNING_RATE= 1e-4
def model_fn(features, labels, mode):
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation = 'relu', input_shape=(28,28,1)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation = 'relu'),
        tf.keras.layers.Dense(10)
    ])
    logits = model(features, training = False)
    
    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions = {'logits': logits}
        return tf.estimator.EstimatorSpec(labels = labels, predictions = predictions)
    
    optimizer = tf.compat.v1.train.GradientDescentOptimizer(learning_rate = LEARNING_RATE)
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction = tf.keras.losses.Reduction.NONE)(labels, logits)
    loss = tf.reduce_sum(loss)*(1./BATCH_SIZE)
    if mode == tf.estimator.ModeKeys.EVAL:
        return tf.estimator.EstimatorSpec(mode, loss=loss)
    
    return tf.estimator.EstimatorSpec(
        mode=mode,
        loss=loss,
        train_op = optimizer.minimize(loss, tf.compat.v1.train.get_or_create_global_step())
    )

In [5]:
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

INFO:tensorflow:Enabled multi-worker collective ops with available devices: ['/job:worker/replica:0/task:0/device:CPU:0', '/job:worker/replica:0/task:0/device:XLA_CPU:0']
INFO:tensorflow:Using MirroredStrategy with devices ('/job:worker/task:0',)
INFO:tensorflow:MultiWorkerMirroredStrategy with cluster_spec = {'worker': ['localhost:12345', 'localhost:23456']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO


In [None]:
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator(
    model_fn = model_fn, model_dir = '/tmp/multiworker', config=config
)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec = tf.estimator.TrainSpec(input_fn = input_fn),
    eval_spec = tf.estimator.EvalSpec(input_fn = input_fn)
)

INFO:tensorflow:TF_CONFIG environment variable: {'cluster': {'worker': ['localhost:12345', 'localhost:23456']}, 'task': {'type': 'worker', 'index': 0}}
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:RunConfig initialized for Distribute Coordinator with INDEPENDENT_WORKER mode
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x000001C188934248>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': N

Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.


INFO:tensorflow:Collective batch_all_reduce: 1 all-reduces, num_devices = 1, group_size = 2, communication_hint = AUTO, num_packs = 1


INFO:tensorflow:Collective batch_all_reduce: 1 all-reduces, num_devices = 1, group_size = 2, communication_hint = AUTO, num_packs = 1


INFO:tensorflow:Calling model_fn.


INFO:tensorflow:Calling model_fn.


INFO:tensorflow:Collective batch_all_reduce: 6 all-reduces, num_devices = 1, group_size = 2, communication_hint = AUTO, num_packs = 1


INFO:tensorflow:Collective batch_all_reduce: 6 all-reduces, num_devices = 1, group_size = 2, communication_hint = AUTO, num_packs = 1


INFO:tensorflow:Done calling model_fn.


INFO:tensorflow:Done calling model_fn.


INFO:tensorflow:Collective batch_all_reduce: 1 all-reduces, num_devices = 1, group_size = 2, communication_hint = AUTO, num_packs = 1


INFO:tensorflow:Collective batch_all_reduce: 1 all-reduces, num_devices = 1, group_size = 2, communication_hint = AUTO, num_packs = 1


Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.



Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.



Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.





INFO:tensorflow:Create CheckpointSaverHook.


INFO:tensorflow:Create CheckpointSaverHook.


INFO:tensorflow:all_hooks [<tensorflow_estimator.python.estimator.util.DistributedIteratorInitializerHook object at 0x000001C18372CAC8>, <tensorflow.python.training.basic_session_run_hooks.NanTensorHook object at 0x000001C189C950C8>, <tensorflow.python.training.basic_session_run_hooks.LoggingTensorHook object at 0x000001C189D3C948>, <tensorflow.python.training.basic_session_run_hooks.StepCounterHook object at 0x000001C189D35648>, <tensorflow.python.training.basic_session_run_hooks.SummarySaverHook object at 0x000001C189D20F88>, <tensorflow.python.training.basic_session_run_hooks.CheckpointSaverHook object at 0x000001C188BDA6C8>]


INFO:tensorflow:all_hooks [<tensorflow_estimator.python.estimator.util.DistributedIteratorInitializerHook object at 0x000001C18372CAC8>, <tensorflow.python.training.basic_session_run_hooks.NanTensorHook object at 0x000001C189C950C8>, <tensorflow.python.training.basic_session_run_hooks.LoggingTensorHook object at 0x000001C189D3C948>, <tensorflow.python.training.basic_session_run_hooks.StepCounterHook object at 0x000001C189D35648>, <tensorflow.python.training.basic_session_run_hooks.SummarySaverHook object at 0x000001C189D20F88>, <tensorflow.python.training.basic_session_run_hooks.CheckpointSaverHook object at 0x000001C188BDA6C8>]


INFO:tensorflow:Creating chief session creator with config: device_filters: "/job:worker/task:0"
allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
    scoped_allocator_optimization: ON
    scoped_allocator_opts {
      enable_op: "CollectiveReduce"
    }
  }
}
experimental {
  collective_group_leader: "/job:worker/replica:0/task:0"
}



INFO:tensorflow:Creating chief session creator with config: device_filters: "/job:worker/task:0"
allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
    scoped_allocator_optimization: ON
    scoped_allocator_opts {
      enable_op: "CollectiveReduce"
    }
  }
}
experimental {
  collective_group_leader: "/job:worker/replica:0/task:0"
}



Instructions for updating:
Use the iterator's `initializer` property instead.


Instructions for updating:
Use the iterator's `initializer` property instead.


INFO:tensorflow:Graph was finalized.


INFO:tensorflow:Graph was finalized.
