Tensorflow provides a high performance models using benchmark and it utilizes some flexible distributed training mechanism like 'parameter server', 'replicated' and 'cross replica'. I'll give a deep dive onto its insight.

They provide a class called 'BenchmarkCNN' to do the whole benchmarking work. We'll analyze it first.

In [None]:
class BenchmarkCNN(object):
    '''Class for benchmarking a cnn network'''
    
    def __init__(self):
        '''Will initialize lots of parameters used to do model training'''
        self.model
        self.model_conf
        self.trace_filename
        self.data_format
        self.num_batches
        self.num_warmup_batches
        self.graph_file
        self.resize_method
        self.sync_queue_counter
        self.num_gpus
        self.batch_size = self.model_conf.get_batch_size() * self.num_gpus
        assert self.model_conf.get_learning_rate() > 0.0
        
        #distributed training parameters
        self.job_name
        self.ps_hosts
        self.worker_hosts
        self.dataset
        self.data_name
        self.local_parameter_device_flag
        
        
        #Now based on if we're doing multi-nodes training or single node training, start different path
        if self.job_name:
            self.task_index
            self.cluster = tf.train.ClusterSpec({'ps': self.ps_hosts, 'worker': self.worker_hosts})
            self.server = tf.train.Server(self.cluster, job_name = self.job_name, 
                                         task_index = self.task_index,
                                         config = create_config_proto(),
                                         protocol = FLAGS.server_protocol)
            worker_prefix = '/job:worker/task:%s'% self.task_index
            self.param_server_device = tf.train.replica_device_setter(
                    worker_device = worker_prefix + '/cpu:0', cluster = self.cluster)
            #This device on which the queues for managing synchronization between servers should be stored.
            num_ps = len(self.ps_hosts)
            self.sync_queue_devices = ['/job:ps/task:%s/cpu:0' % i 
                                          for i in range(num_ps)]
        else:
            self.task_index = 0
            self.cluster = None
            self.server = None
            worker_prefix = ''
            self.param_server_device = '/%s:0' % FLAGS.local_parameter_device
            self.sync_queue_devices = [self.param_server_device]
            
        
        #Device to use for ops that need to always run on the local worker's CPU
        self.cpu_device = '%s/cpu:0' % worker_prefix
        
        #Device to use for ops that need to always run on the local worker's compute device (like GPU/TPU),
        #and never on a parameter service device.
        self.raw_devices = ['%s/%s:%i' % (worker_prefix, FLAGS.device, i)
                           for i in xrange(FLAGS.num_gpus)]
        
        #Right now TF only supports staged_vars used on 'parameter server' variable update mechanism
        #Attach different 'variable manager' object based on different 'variable_update' and 'staged_vars' settings
        if FLAGS.variable_update == 'parameter_server':
            if self.job_name:
                if not FLAGS.staged_vars:
                    self.variable_mgr = variable_mgr.VariableMgrDistributedFetchFromPS(self)
                else:
                    .... = variable_mgr.VariableMgrDistributedFetchFromStagedPS(self)
            else:
                if not FLAGS.staged_vars:
                    .... = variable_mgr.VariableMgrLocalFetchFromPS(self)
                else:
                    .... = .....VariableMgrLocalFetchFromStagedPS(self)
        elif ....variable_update == 'replicated':
            if self.job_name:
                raise ValueError('Invalid--')
            self.variable_mgr = variable_mgr.VariableMgrLocalReplicated(self, FLAGS.use_nccl)
        elif ..... == 'distributed_replicated':
            if not self.job_name:
                raise ValueError('Invalid--')
            self.variable_mgr = variable_mgr.VariableMgrDistributedReplicated(self)
        elif ..... == 'independent':
            if self.job_name:
                raise ValueError('Invalid--')
            self.variable_mgr = variable_mgr.VariableMgrIndependent(self)
        else:
            raise ValueError('Invalid--')
            
        
        #Device to use for running on the local worker's compute device, but with variables assigned to
        #parameter server devices.
        self.devices = self.variable_mgr.get_devices()
        
        #Control global step 
        if self.job_name:
            self.global_step_device = self.param_server_device
        else:
            self.global_step_device = self.cpu_device

In [None]:
def run(self):
    '''Start the benchmark based on some flags variables'''
    if FLAGS.job_name == 'ps':
        self.server.join()
        return
    
    with tf.Graph().as_default():
        if FLAGS.eval:
            self._eval_cnn()
        else:
            self._benchmark_cnn()

In [None]:
def _eval_cnn(self):
    '''Evaluate the model from a checkpoint using validation dataset.'''
    #build model by using inner method
    (enqueue_ops, fetches) = self._build_model()
    #use saver to load or save checkpoints
    saver = tf.train.Saver(tf.global_variables())
    #use summary to keep event logs to further analyze the data
    summary_writer = tf.summary.FileWriter(FLAGS.eval_dir, tf.get_default_graph())
    
    target = ''
    # Session is a class for running tensorflow operations; a 'Session' object encapsulates the environment in which
    # 'Operation' objects are executed, and 'Tensor' objects are evaluated.
    # A session may own resources, such as @{tf.Variable}, @{tf.QueueBase}, and @{tf.ReaderBase}. It's important to
    # release these resources when they're no longer required. To do this, either invoke the @{tf.Session.close} 
    # method on the session, or use the session as a context manager.
    # The ['ConfigProto'] protocol buffer exposes various configuration options for a session.
    # Its __init__ method interface can be seen below:
    # def __init__(self, target = '',graph = None, config = None):
    # if no 'graph' argument is specified when constructing the session, the default graph will belaunched in the 
    # session. If using more than one graph in the same process,you'll have to use different sessions for each graph,
    # but each graph can be used in multiple sessions.
    # target: The execution engine to connect to. defaults to using an in-process engine. also can be distributed
    with tf.Session(target = target, config = create_config_proto()) as sess:
        for in in xrange(len(enqueue_ops)):
            sess.run(enqueue_ops[:(i + 1)])
        global_step = load_checkpoint(saver, sess, FLAGS.train_dir)
        
        start_time = time.time()
        count_top_1 = 0.0
        count_top_5 = 0.0
        total_eval_count = self.num_batches * self.batch_size
        for step in xrange(self.num_batches):
            results = sess.run(fetches)
            count_top_1 += results[0]
            count_top_5 += results[1]
            if (step + 1) % FLAGS.display_every == 0:
                duration = time.time() = start_time
                examples_per_sec = self.batch_size * FLAGS.display_every / duration
                start_time = time.time()
        precision_at_1 = count_top_1 / total_eval_count
        recall_at_5 = count_top_5 / total_eval_count
        summary = tf.Summary()
        summary.value.add(tag = 'eval/Accuracy@1', simple_value = precision_at_1)
        summary.value.add(tag = 'eval/Recall@5', simple_value = recall_at_5)
        summary_writer.add_summary(summary, global_step)

In [None]:
def _benchmark_cnn(self):
    '''Run cnn in benchmark mode. When forward_only on, it forwards CNN.'''
    # Within this function,we can see how various computing agents sync with each other
    (enqueue_ops, fetches) = self._build_model() #build the graph to run
    main_fetch_group = tf.group(*fetches)
    execution_barrier = None # only used on distributed training mode
    # so while do cluster training, cross_replica_sync (false means 'parameter server' mode and true means distributed 'replicated' mode.)
    if self.job_name and not FLAGS.cross_replica_sync:
        execution_barrier = self.add_sync_queues_and_barrier('execution_barrier_', [])
    
    # Get the global step tensor.
    # The global step tensor must be an integer variable. We first try to find it in the collection 'GLOBAL_STEP',
    # or by name 'global_step: 0'.
    global_step = tf.contrib.framework.get_global_step() 
    with tf.device(self.global_step_device):
        with tf.control_dependencies([main_fetch_group]):
            inc_global_step = global_step.assign_add(1)
            fetches.append(inc_global_step)
    
    if self.job_name and FLAGS.cross_replica_sync:
        # Block all replicas until all replicas are ready for next step.
        # Used on 'distributed replica' training mode
        fetches.append(self.add_sync_queues_and_barrier('sync_queues_step_end_', [main_fetch_group]))
    
    
    variable_mgr_post_init_ops = self.variable_mgr.get_post_init_ops()
    if variable_mgr_post_init_ops:
        post_init_op_group = tf.group(*variable_mgr_post_init_ops)
    else:
        post_init_op_group = None
    
    local_var_init_op = tf.local_variables_initializer()
    summary_op = tf.summary.merge_all()
    is_chief = (not self.job_name or self.task_index == 0) # If distributed mode, only record summary on 'master' node
    summary_writer = None
    if (is_chief and FLAGS.summary_verbosity and FLAGS.train_dir and FLAGS.save_summaries_steps > 0):
        summary_writer = tf.summary.FileWriter(FLAGS.train_dir, tf.get_default_graph())
        
    # Run the summaries in the same thread as the training operations by passing in None for summary_op to avoid a
    # summary_thread being started.
    # Running summaries and training operations in parallel could run out of GPU memory.
    # However kept to have individual thread here for saving 'checkpoints' to designate our 'saver'
    sv = tf.train.Supervisor(
        is_chief = is_chief,
        logdir = FLAGS.train_dir,
        saver = tf.train.Saver(tf.global_variables()),
        global_step = global_step,
        summary_op = None,
        save_model_secs = FLAGS.save_model_secs,
        summary_writer = summary_writer)
    
    step_train_times = []
    with sv.managed_session(
        master = self.server.target if self.server else '',
        config = create_config_proto(),
        start_standard_services = FLAGS.summary_verbosity > 0) as sess:
        for i in xrange(len(enqueue_ops)):
            sess.run(enqueue_ops[:(i + 1)])
        sess.run(local_var_init_op)
        if post_init_op_group:
            sess.run(post_init_op_group)
        
        init_global_step = 0
        if FLAGS.pretrain_dir is not None:
            init_global_step = load_checkpoint(sv.saver, sess, FLAGS.pretrain_dir)
        global_step_watcher = GlobalStepWatcher(sess, global_step,
                                               len(self.worker_hosts) * self.num_warmup_batches + init_global_step,
                                               len(self.worker_hosts) * (self.num_warmup_batches + self.num_batches) - 1)
        global_step_watcher.start()
        
        if self.graph_file is not None:
            path, filename = os.path.split(self.graph_file)
            as_text = filename.endswith('txt')
            tf.train.write_graph(sess.graph_def, path, filename, as_text)
            
        local_step = -1 * self.num_warmup_batches
        
        if FLAGS.cross_replica_sync and FLAGS.job_name:
            # In cross-replica sync mode, all workers must run the same number of local steps, or else the workers 
            # running the extra step will block.
            done_fn = lambda: local_step == self.num_batches
        else:
            done_fn = lambda: global_step_watcher.done()
        while not done_fn():
            if local_step == 0:
                #Done warm up
                if execution_barrier:
                    #Wait for other replicas to finish warm up
                    assert global_step_watcher.start_time == 0
                    sess.run([execution_barrier])
                
                assert len(step_train_times) == self.num_warmup_batches
                step_train_times = summary_op
            
            if (summary_writer and (local_step + 1) % FLAGS.save_summaries_steps == 0):
                fetch_summary = summary_op
            else:
                fetch_summary = None
            summary_str = benchmark_one_step(sess, fetches, local_step, self.batch_size, step_train_times,
                                             self.trace_filename, fetch_summary)
            if summary_str is not None and is_chief:
                sv.summary_computed(sess, summary_str)
            local_step += 1
        
        #waits for the global step to be done, regardless of done_fn.
        while not global_step_watcher.done():
            time.sleep(.25)
        log_fn('total images/sec: %.2f' % (global_step_watcher.steps_per_second() * self.batch_size))
        
        #Save the model checkpoint
        if FLAGS.train_dir is not None and is_chief:
            checkpoint_path = os.path.join(FLAGS.train_dir, 'model.ckpt')
            if not gfile.Exists(FLAGS.train_dir):
                gfile.MakeDirs(FLAGS.train_dir)
            sv.saver.save(sess, checkpoint_path, global_step)
        
        if execution_barrier:
            # Wait for other workers to reach the end, so this worker doesn't go away underneath them.
            sess.run([execution_barrier])
    
    sv.stop()

In [None]:
def _build_model(self):
    '''Build the Tensorflow graph.'''
    image_size = self.model_conf.get_image_size()
    data_type = tf.float32
    input_data_type = tf.float32
    input_nchan = 3
    # Sets the graph-level random seed.
    # Operations that rely on a random seed actually derive it from two seeds: the graph-level and operation-level
    # seeds. This sets the graph-level seed.
    # If either graph-level seed or op-level seed set, op will choose it as the random seed. If both of them are set,
    # then both seeds are used in conjunction to determine the random sequence.
    # To generate different sequences accross sessions, set neither graph-level nor op-level seeds.
    tf.set_random_seed(1234)
    np.random.seed(4321)
    phase_train = not (FLAGS.eval or FLAGS.forward_only)
    
    losses = []
    device_grads = []
    all_logits = []
    all_top_1_ops = []
    all_top_5_ops = []
    # Below lists are used to do pipeline processing which is a classical concurrent model
    enqueue_ops = []
    gpu_copy_stage_ops = []
    gpu_compute_stage_ops = []
    gpu_grad_stage_ops = []
    
    use_synthetic_gpu_images = (self.dataset is None)
    
    with tf.device(self.global_step_device):
        global_step = tf.contrib.framework.get_or_create_global_step()
        
    # Build the processing and model for the worker.
    with tf.device(self.cpu_device):
        nclass, images_splits, labels_splits = add_image_preprocessing(
                        self.dataset, input_nchan, image_size, self.batch_size,
                        len(self.devices), input_date_type, self.resize_method, not FLAGS.eval)
    
    update_ops = None
    staging_delta_ops = []
    
    for device_num in xrange(len(self.devices)):
        # Below create the tf.variable_scope around all model graph operations.
        with self.variable_mgr.create_outer_variable_scope(device_num), tf.name_scope('tower_%i' % device_num) as 
        name_scope:
            results = self.add_forward_pass_and_gradients(
                images_splits[device_num], labels_splits[device_num], nclass, phase_train, device_num,
                input_data_type, data_type, input_nchan, use_synthetic_gpu_images, gpu_copy_stage_ops,
                gpu_compute_stage_ops, gpu_grad_stage_ops)
            if phase_train:
                losses.append(results[0])
                device_grads.append(results[1])
            else:
                all_logits.append(results[0])
                all_top_1_ops.append(results[1])
                all_top_5_ops.append(results[2])
            
            # Return if only updates for the first GPU tower should be applied.
            # device_num == 0 and not self.each_tower_has_variables()
            if self.variable_mgr.retain_tower_updates(device_num):
                #Retain the Batch Normalization updates operations only from the first tower. Ideally, we should 
                #grab the updates from all towers but these stats accumulate extremely fast so we can ignore the other
                #stats from the other towers without significant detriment.
                # key to collect 'update_ops'
                update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, name_scope)
                staging_delta_ops = list(self.variable_mgr.staging_delta_ops)
    
    if not update_ops:
        update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS, name_scope)
    enqueue_ops.append(tf.group(*gpu_copy_stage_ops))
    if self.variable_mgr.supports_staged_vars():
        for staging_ops in self.variable_mgr.staging_vars_on_devices:
            gpu_compute_stage_ops.extend(
                [put_op for _, (put_op, _) in six.iteritems(staging_ops)])
    enqueue_ops.append(tf.group(*gpu_compute_stage_ops))
    if gpu_grad_stage_ops:
        staging_delta_ops += gpu_grad_stage_ops
    if staging_delta_ops:
        enqueue_ops.append(tf.group(*(staging_delta_ops)))
    
    if not phase_train:
        if FLAGS.forward_only:
            all_logits = tf.concat(all_logits, 0)
            fetches = [all_logits] + enqueue_ops
        else:
            all_top_1_ops = tf.reduce_sum(all_top_1_ops)
            all_top_5_ops = tf.reduce_sum(all_top_5_ops)
            fetches = [all_top_1_ops, all_top_5_ops] + enqueue_ops
        return (enqueue_ops, fetches)
    
    extra_nccl_ops = []
    # Preprocess the device gradients prior to applying them
    apply_gradient_devices, gradient_state = (
        self.variable_mgr.preprocess_device_grads(device_grads))
    
    training_ops = []
    for d, device in enumerate(apply_gradient_devices):
        total_loss = tf.reduce_mean(losses)
        # Obtains the [(gradient, variable)] to apply for device_num (d here)
        avg_grads = self.variable_mgr.get_gradients_to_apply(d, gradient_state)
        
        gradient_clip = FLAGS.gradient_clip
        learning_rate = self.model_conf.get_learning_rate()
        if self.dataset and FLAGS.num_epochs_per_decay > 0:
            num_batches_per_epoch = (
                self.dataset.num_examples_per_epoch() / self.batch_size)
            decay_steps = int(num_batches_per_epoch * FLAGS.num_epochs_per_decay)
            
            # Decay the learning rate exponentially based on the number of steps.
            learning_rate = tf.train.exponential_decay(
                FLAGS.learning_rate, global_step,
                decay_steps, FLAGS.learning_rate_decay_factor, staircase = True)
        
        if gradient_clip is not None:
            clipped_grads = [
                (tf.clip_by_value(grad, -gradient_clip, +gradient_clip), var)
                for grad, var in avg_grads
            ]
        else:
            clipped_grads = avg_grads
        
        # Then choose which optimizer type we're gonna to use
        if FLAGS.optimizer == 'momentum':
            opt = tf.train.MomentumOptimizer(
                learning_rate, FLAGS.momentum, use_nesterov = True)
        elif FLAGS.optimizer == 'sgd':
            opt = tf.train.GradientDescentOptimizer(learning_rate)
        elif FLAGS.optimizer == 'rmsprop':
            opt = tf.train.RMSPropOptimizer(learning_rate, FLAGS.rmsprop_decay, 
                                           momentum = FLAGS.rmsprop_momentum,
                                           epsilon = FLAGS.rmsprop_epsilon)
        else:
            raise ValueError('Optimizer not found')
        
        # Adds training ops for grads to 'training_ops'
        self.variable_mgr.append_apply_gradients_ops(
            gradient_state, opt, clipped_grads, training_ops)
        
        train_op = tf.group(*(train_ops + update_ops + extra_nccl_ops))
        
        with tf.device(self.cpu_device):
            if self.task_index == 0 and FLAGS.summary_verbosity > 0:
                tf.summary.scalar('learning_rate', learning_rate)
                tf.summary.scalar('total_loss', total_loss)
                for grad, var in avg_grads:
                    if grad is not None:
                        tf.summary.histogram(var.op.name + '/gradients', grad)
                for var in tf.trainable_variables():
                    tf.summary.histogram(var.op.name, var)
        fetches = [train_op, total_loss] + enqueue_ops
        return (enqueue_ops, fetches)

In [None]:
def add_forward_pass_and_gradients(
    self, host_images, host_labels, nclass, phase_train, device_num,
    input_data_type, data_type, input_nchan, use_synthetic_gpu_images,
    gpu_copy_stage_ops, gpu_compute_stage_ops, gpu_grad_stage_ops):
    '''Add ops for forward-pass and gradient comutations.'''
    if not use_synthetic_gpu_images:
        with tf.device(self.cpu_device):
            images_shape = host_images.get_shape()
            labels_shape = host_labels.get_shape()
            gpu_copy_stage = data_flow_ops.StagingArea(
                [tf.float32, tf.int32],
                shapes = [images_shape, labels_shape])
            gpu_copy_stage_op = gpu_copy_stage.put(
                [host_images, host_labels])
            gpu_copy_stage_ops.append(gpu_copy_stage_op)
            host_images, host_labels = gpu_copy_stage.get()
    
    with tf.device(self.raw_devices[device_num]):
        if not use_synthetic_gpu_images:
            gpu_compute_stage = data_flow_ops.StagingArea(
                [tf.float32, tf.int32],
                shapes = [images_shape, labels_shape]
                )
            # The CPU-to-GPU copy is triggered here
            gpu_compute_stage_op = gpu_compute_stage.put(
                [host_images, host_labels])
            images, labels = gpu_compute_stage.get()
            images = tf.reshape(images, shape = images_shape)
            gpu_compute_stage_ops.append(gpu_compute_stage_op)
        else:
            # Minor hack to avoid H2D copy when using synthetic data
            images = tf.truncated_normal(
                host_images.get_shape(),
                dtype = input_data_type,
                stddev = 1e-1,
                name = 'synthetic_images')
            images = tf.contrib.framework.local_variable(
                images, name = 'gpu_cached_images')
            labels = host_labels
    
    with tf.device(self.devices[device_num]):
        # Rescale to [0,1)
        images *= 1. / 256
        # Rescale to [-1, 1] instead of [0, 1)
        images = tf.subtract(images, 0.5)
        images = tf.multiply(images, 2.0)
        
        if self.data_format == 'NCHW':
            images = tf.transpose(images, [0, 3, 1, 2])
        if input_data_type != data_type:
            images = tf.cast(images, data_type)
        network = ConvNetBuilder(
            images, input_nchan, phase_train, self.data_format, data_type)
        self.model_conf.add_inference(network)
        # Add the final fully-connected class layer
        logits = network.affine(nclass, activation = 'linear')
        if not phase_train:
            top_1_op = tf.reduce_sum(
                tf.cast(tf.nn.in_top_k(logits, labels, 1), data_type))
            top_5_op = tf.reduce_sum(
                tf.cast(tf.nn.in_top_k(logits, labels, 5), data_type))
            return (logits, top_2_op, top_5_op)
        loss = loss_function(logits, labels)
        params = self.variable_mgr.trainable_variables_on_device(device_num)
        l2_loss = tf.add_n([tf.nn.l2_loss(v) for v in params])
        weight_decay = FLAGS.weight_decay
        if weight_decay is not None and weight_decay != 0:
            loss += weight_decay * l2_loss
        
        aggmeth = tf.AggregationMethod.DEFAULT
        grads = tf.gradients(loss, params, aggregation_method = aggmeth)
        
        if FLAGS.staged_vars:
            grad_dtypes = [grad.dtype for grad in grads]
            grad_shapes = [grad.shape for grad in grads]
            grad_stage = data_flow_ops.StagingArea(grad_dtypes, grad_shapes)
            grad_stage_op = grad_stage.put(grads)
            # In general, this decouples the computation of the gradients and the updates of the weights.
            # During  the pipeline warm up, this runs enough training to produce the first set of gradients.
            gpu_grad_stage_ops.append(grad_stage_op)
            grads = grad_stage.get()
            
        param_refs = self.variable_mgr.trainable_variables_on_device(
            device_num, writable = True)
        gradvars = zip(grads, param_refs)
        return (loss, gradvars)

In [None]:
def add_sync_queues_and_barrier(self, name_prefix,
                                   enqueue_after_list):
    '''Add ops to enqueue on all worker queues'''
    # name_prefix: prefixed for the shared name of ops.
    # enqueue_after_list: control dependency from ops
    # Returns: an op that should be used as control dependency before starting next step. (This op would only 
    # be executed on parameter devices)
    self.sync_queue_counter += 1
    num_workers = self.cluster.num_tasks('worker')
    with tf.device(self.sync_queue_devices[
            self.sync_queue_counter % len(self.sync_queue_devices)
    ]):
        sync_queues = [tf.FIFOQueue(num_workers, [tf.bool], shape = [[]], shared_name = '%s%s' % (name_prefix, i))
         for i in range(num_workers)]
        queue_ops = []
        # For each worker, add an entry in a queue, signaling that it can finish this step.
        token = tf.constant(False)
        with tf.control_dependencies(enqueue_after_list):
            ##About tf.control_dependencies here:
            # Returns a context manager that specifies control dependencies. 
            # Use with the 'with' keyword to specify that all operations constructed within the context should
            # have control dependencies on 'control_inputs'.
            # For example:
            # '''python
            # with g.control_dependencies([a, b, c]):
            #   #'d' and 'e' will only run after 'a', 'b', and 'c' have executed
            # d = ..., e = ...
            #'''
            # Multiple calls to 'control_dependencies()' can be nested, and in that case a new 'Op' will have control
            # dependencies on the union of 'control_inputs' from all active contexts.
            # Also you can pass None to clear the control dependencies.
            # Also, the control dependencies context applies *only* to ops that are constructed within the context.
            # Merely using an op or tensor in the context does not add a control dependencies.
            for i, q in enumerate(sync_queues):
                if i == self.task_index:
                    queue_ops.append(tf.no_op())
                else:
                    queue_ops.append(q.enqueue(token))
            
        #Drain tokens off queue for this worker, one for each other worker.
        queue_ops.append(sync_queues[self.task_index].dequeue_many(len(sync_queues) - 1))
        
    return tf.group(*queue_ops)