<a href="https://colab.research.google.com/github/lblogan14/master_tensorflow_keras/blob/master/ch15_dist_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Strategies for distributed execution
For distributing the training of the single model across multiple devices or nodes, there are
the following strategies:
* **Model Parallel**: \\
Divide the model into multiple subgraphs and place the separate
graphs on different nodes or devices. The subgraphs perform their computation
and exchange the variables as required.
* **Data Parallel**: \\
Divide the data into batches and run the same model on multiple
nodes or devices, combining the parameters on a master node. Thus the worker
nodes train the model on batches of data and send the parameter updates to the
master node, also known as the parameter server.

![](https://github.com/armando-fandango/Mastering-TensorFlow/blob/master/images/ch-15/15-01.png?raw=true)

This diagram shows the data parallel approach where the model replicas read the partitions of data in batches and send the parameter updates to the parameter servers, and the parameter servers send the updated parameters back to the model replicas for the next batched computation of updates.

In TensorFlow, there are two ways to implement replicating the model on multiple notes/devices under the data parallel strategy:
* **In-Graph Replication**: \\
There is a single client task that owns the
model parameters and assigns the model calculations to multiple worker tasks.
* **Between-Graph Replication**: \\
Each client task connects to its
own worker in order to assign the model calculation, but all workers update the
same shared model. In this model, TensorFlow automatically assigns one worker
to be the chief worker so that the model parameters are initialized only once by
the chief worker.

Within both these approaches, the parameters on the parameter servers can be updated in
two different ways:
* **Synchronous Update**: \\
the parameter servers wait to
receive the updates from all the workers before updating the gradients. The
parameter server aggregates the updates, for example by calculating the mean of
all the aggregates and applying them to the parameters. After the update, the
parameters are sent to all the workers simultaneously. The disadvantage of this
method is that one slow worker may slow down the updates for everyone.
* **Asynchronous Update**: \\
the workers send the
updates to parameter server(s) as they are ready, and then the parameter server
applies the updates as it receives them and sends them back. The disadvantage of
this method is that by the time the worker calculates the parameters and sends
the updates back, the parameters could have been updated several times by other
workers. This problem can be alleviated by several methods such as lowering the
batch size or lowering the learning rate.

#TensorFlow clusters
A *TensorFlow (TF) cluster* is one mechanism that implements the distributed strategies.

At the logical level, a TF cluster runs one or more *jobs*, and each *job*
consists of one or more *tasks*. Thus a *job* is just a logical grouping of the *tasks*. At the process
level, each task runs as a TF server. At the machine level, each physical machine or node can
run more than one task by running more than one server, one server per task. The *client*
creates the graph on different servers and starts the execution of the graph on one server by
calling the remote session.
![alt text](https://github.com/armando-fandango/Mastering-TensorFlow/blob/master/images/ch-15/15-02.png?raw=true)

This is how two clients connected to two jobs named `ml`. The two nodes are running three tasks each, and the job `w1` is spread across two nodes
while the other jobs are contained within the nodes.

To create and train a model in data parallel,
1. Define the cluster specifications
2. Create a server to host a task
3. Define the variable nodes to be assigned to parameter server tasks
4. Define the operation nodes to be replicated on all worker tasks
5. Create a remote session
6. Train the model in the remote session
7. Use the model for prediction

##Define cluster spcification
The cluster specification
generally consists of two jobs: 
1. `ps` to create parameter server tasks
2. `worker` to create worker tasks.

    
    clusterSpec = tf.train.ClusterSpec({
        'ps': [
            'master0.neurasights.com:2222', # /job:ps/task:0
            'master1.neurasights.com:2222' # /job:ps/task:1
              ]
        'worker': [
            'worker0.neurasights.com:2222', # /job:worker/task:0
            'worker1.neurasights.com:2222', # /job:worker/task:1
            'worker0.neurasights.com:2223', # /job:worker/task:2
            'worker1.neurasights.com:2223' # /job:worker/task:3
                  ]
    })
    
This specification creates two jobs, with two tasks in job `ps` spread across two physical
nodes and four tasks in job `worker` spread across two physical nodes.

We can also create all tasks on a localhost, on different ports:

    ps = [
        'localhost:9001',    # /job:ps/task:0
        ]
    workers = [
        'localhost:9002', # /job:worker/task:0
        'localhost:9003', # /job:worker/task:1
        'localhost:9004', # /job:worker/task:2
        ]
    clusterSpec = tf.train.ClusterSpec({'ps':ps, 'worker':workers})

The tasks are identified with `/job:<job name>/task:<task index>`

##Create the server instances
Since the cluster contains one server instance per task, on every physical node, start the
servers by passing them the cluster specification, their own job name and task index. The
servers use the cluster specification to figure out what other nodes are involved in the
computation.

    server = tf.train.Server(clusterSpec, job_name="ps", task_index=0)
    server = tf.train.Server(clusterSpec, job_name="worker", task_index=0)
    server = tf.train.Server(clusterSpec, job_name="worker", task_index=1)
    server = tf.train.Server(clusterSpec, job_name="worker", task_index=2)
    
To run a single Python file on all the physical machines,

    server = tf.train.Server(clusterSpec,
                             job_name=FLAGS.task_index,
                             config=config)
                             
The `job_name` and the `task_index` are taken from the parameters passed at
the command line. The package, `tf.flags` is a fancy parser that gives you access to the
command-line arguments.

The Python file is executed as follows,

    # the model should be run in each physical node
    # using the appropriate arguments
    $ python3 model.py --job_name='ps' --task_index=0
$ python3 model.py --job_name='worker' --task_index=0
    $ python3 model.py --job_name='worker' --task_index=1
$ python3 model.py --job_name='worker' --task_index=2

To ensure that our parameter server only uses CPU and our worker tasks use GPU, the configuration object is initialized,

    config = tf.ConfigProto()
    config.allow_soft_placement = True
    if FLAGS.job_name=='ps':
        #print(config.device_count['GPU'])
        config.device_count['GPU']=0
        server = tf.train.Server(clusterSpec,
                                 job_name=FLAGS.job_name,
                                 task_index=FLAGS.task_index,
                                 config=config
                                 )
        server.join()
        sys.exit('0')
    elif FLAGS.job_name=='worker':
        config.gpu_options.per_process_gpu_memory_fraction = 0.2
        server = tf.train.Server(clusterSpec,
                                 job_name=FLAGS.job_name,
                                 task_index=FLAGS.task_index,
                                 config=config
                                 )
                                 
The parameter server is made to wait with `server.join()` while the worker tasks execute
the training of the model and exit.

##Define the parameter and operations across servers and devices
* use `tf.device()` function to place the parameters on the `ps` tasks and the computing nodes of the graphs on the `worker` tasks. \\
Note that you can also place the graph nodes on specific devices by
adding the device string to the task string as follows: \\
`/job:<job name>/task:<task index>/device:<device type>:<device index>.`
* use `tf.train.replica_device_setter()` to place the variables and operations
  1. define the worker device to be the current worker: \\
  `worker_device='/job:worker/task:{}'.format(FLAGS.task_index)`
  2. define a device function using `replica_device_setter`
  `device_func = tf.train.replica_device_setter(worker_device=worker_device,cluster=clusterSpec)`
  3. create the graph inside the `tf.device(device_func)` block and train it. The creation and training of the graph is different for synchronous updates and asynchronous updates.

##Define and train the graph for asynchronous updates
In asynchronous updates all the
worker tasks send the parameter updates when they are ready, and the parameter server
updates the parameters and sends back the parameters. There is no synchronization or
waiting or aggregation of parameter updates:
![alt text](https://github.com/armando-fandango/Mastering-TensorFlow/blob/master/images/ch-15/15-04.png?raw=true)

In [0]:
import sys
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

In [0]:
FLAGS = tf.flags.FLAGS

tf.flags.DEFINE_string('job_name','ps','name of the job, default ps')
tf.flags.DEFINE_integer('task_index',0,'index of the job, default 0')
tf.flags.DEFINE_string('ps_hosts','localhost:9001','Comma-separated list of hostname:port pairs, default localhost:9001')
tf.flags.DEFINE_string('worker_hosts','localhost:9002','Comma-separated list of hostname:port pairs, default localhost:9002')

For asynchronous updates, the graph is created and trained with the following steps:

1. The definition of the graph is done within the `with` block. \\
2. Create a `global step` variable using the inbuilt TensorFlow function.
3. Define datasets, parameters,
4. Define placehoders, weights, biases, logits, cross-entropy, loss op, train op, accuracy, etc...
5. TensorFlow provides a supervisor class that helps in creating sessions for training
and is very useful in a distributed training setting.
6. Use the supervisor object to create a session and run the training under this
session block

In [0]:
def main(_):
    mnist = input_data.read_data_sets('/home/armando/datasets/mnist', one_hot=True)

    ps = FLAGS.ps_hosts.split(',')
    workers = FLAGS.worker_hosts.split(',')

    clusterSpec = tf.train.ClusterSpec({'ps': ps, 'worker': workers})

    config = tf.ConfigProto()
    config.allow_soft_placement = True

    #server = tf.train.Server(clusterSpec,
    #                         job_name=FLAGS.job_name,
    #                         task_index=FLAGS.task_index,
    #                         config=config
    #                         )

    if FLAGS.job_name=='ps':
        #print(config.device_count['GPU'])
        config.device_count['GPU']=0
        server = tf.train.Server(clusterSpec,
                                 job_name=FLAGS.job_name,
                                 task_index=FLAGS.task_index,
                                 config=config
                                 )
        server.join()
        sys.exit('0')
    elif FLAGS.job_name=='worker':
        config.gpu_options.per_process_gpu_memory_fraction = 0.2
        server = tf.train.Server(clusterSpec,
                                 job_name=FLAGS.job_name,
                                 task_index=FLAGS.task_index,
                                 config=config
                                 )
        is_chief = (FLAGS.task_index==0)

        worker_device='/job:worker/task:{}'.format(FLAGS.task_index)
        device_func = tf.train.replica_device_setter(worker_device=worker_device,
                                                     cluster=clusterSpec
                                                     )
    # the default values are: ps_device='/job:ps',worker_device='/job:worker'
        with tf.device(device_func):

            global_step = tf.train.get_or_create_global_step()
            #tf.Variable(0,name='global_step',trainable=False)
            x_test = mnist.test.images
            y_test = mnist.test.labels

            # parameters
            n_outputs = 10  # 0-9 digits
            n_inputs = 784  # total pixels

            learning_rate = 0.01
            n_epochs = 50
            batch_size = 100
            n_batches = int(mnist.train.num_examples/batch_size)
            n_epochs_print=10

            # input images
            x_p = tf.placeholder(dtype=tf.float32,
                                 name='x_p',
                                 shape=[None, n_inputs])
            # target output
            y_p = tf.placeholder(dtype=tf.float32,
                                 name='y_p',
                                 shape=[None, n_outputs])

            w = tf.Variable(tf.random_normal([n_inputs, n_outputs],
                                             name='w'
                                             )
                            )
            b = tf.Variable(tf.random_normal([n_outputs],
                                             name='b'
                                             )
                            )
            logits = tf.matmul(x_p,w) + b
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=y_p,
                                                                    logits=logits
                                                                    )
            loss_op = tf.reduce_mean(cross_entropy)
            optimizer = tf.train.GradientDescentOptimizer(learning_rate)
            train_op = optimizer.minimize(loss_op,global_step=global_step)
            correct_pred = tf.equal(tf.argmax(logits, 1), tf.argmax(y_p, 1))
            accuracy_op = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

        sv = tf.train.Supervisor(is_chief=is_chief,
                                 init_op = tf.global_variables_initializer(),
                                 global_step=global_step)



        with sv.prepare_or_wait_for_session(server.target) as mts:
            lstep = 0

            for epoch in range(n_epochs):
                for batch in range(n_batches):
                    x_batch, y_batch = mnist.train.next_batch(batch_size)
                    feed_dict={x_p:x_batch,y_p:y_batch}
                    _,loss,gstep=mts.run([train_op,loss_op,global_step],
                                         feed_dict=feed_dict)
                    lstep +=1
                if (epoch+1)%n_epochs_print==0:
                    print('worker={},epoch={},global_step={}, local_step={}, loss = {}'.
                          format(FLAGS.task_index,epoch,gstep,lstep,loss))
            feed_dict={x_p:x_test,y_p:y_test}
            accuracy = mts.run(accuracy_op, feed_dict=feed_dict)
            print('worker={}, final accuracy = {}'.format(FLAGS.task_index,accuracy))
    sv.stop()

if __name__ == '__main__':

  tf.app.run()

##Define and train the graph for synchronous updates
In synchronous updates, the tasks
send their updates to the parameter servers, and ps tasks wait for all the updates to be
received, aggregate them, and then update the parameters. The worker tasks wait for the
updates before proceeding to the next iteration of computing parameter updates:

![alt text](https://github.com/armando-fandango/Mastering-TensorFlow/blob/master/images/ch-15/15-05.png?raw=true)

For synchronous updates, the following modifications need to be made to the code:
1. The optimizer needs to be wrapped in SyncReplicaOptimizer. Thus, after
defining the optimizer, add the following code:


    # SYNC: next line added for making it sync update
    optimizer = tf.train.SyncReplicasOptimizer(optimizer,
                                               replicas_to_aggregate=len(workers),
                                               total_num_replicas=len(workers),
                                               )
2. This should be followed by adding the training operation as before:

    
    train_op = optimizer.minimize(loss_op,global_step=global_step)

3. Next, add the initialization function definitions, specific to the synchronous
update method:
    
    
    if is_chief:
        local_init_op = optimizer.chief_init_op()
    else:
        local_init_op = optimizer.local_step_init_op()
    chief_queue_runner = optimizer.get_chief_queue_runner()
    init_token_op = optimizer.get_init_tokens_op()
    
4. The supervisor object is also created differently with two additional initialization
functions:

    
    # SYNC: sv is initialized differently for sync update
    sv = tf.train.Supervisor(is_chief=is_chief,
                             init_op = tf.global_variables_initializer(),
                             local_init_op = local_init_op,
                             ready_for_local_init_op = optimizer.ready_for_local_init_op,
                             global_step=global_step)
5. Finally, within the session block for training, we initialize the sync variables and
start the queue runners if it is the chief worker task:


    # SYNC: if block added to make it sync update
    if is_chief:
        mts.run(init_token_op)
        sv.start_queue_runners(mts, [chief_queue_runner])

The rest of the code remains the same as an asynchronous update.

In [0]:
FLAGS = tf.flags.FLAGS

tf.flags.DEFINE_string('job_name','ps','name of the job, default ps')
tf.flags.DEFINE_integer('task_index',0,'index of the job, default 0')

def main(_):
    mnist = input_data.read_data_sets('/home/armando/datasets/mnist', one_hot=True)

    ps = [
            'localhost:9001',  # /job:ps/task:0
         ]
    workers = [
            'localhost:9002',  # /job:worker/task:0
            'localhost:9003',  # /job:worker/task:1
            'localhost:9004',  # /job:worker/task:2
            ]
    clusterSpec = tf.train.ClusterSpec({'ps': ps, 'worker': workers})

    config = tf.ConfigProto()
    config.allow_soft_placement = True

    if FLAGS.job_name=='ps':
        #print(config.device_count['GPU'])
        config.device_count['GPU']=0
        server = tf.train.Server(clusterSpec,
                                 job_name=FLAGS.job_name,
                                 task_index=FLAGS.task_index,
                                 config=config
                                 )
        server.join()
        sys.exit('0')
    elif FLAGS.job_name=='worker':
        config.gpu_options.per_process_gpu_memory_fraction = 0.2
        server = tf.train.Server(clusterSpec,
                                 job_name=FLAGS.job_name,
                                 task_index=FLAGS.task_index,
                                 config=config
                                 )
        is_chief = (FLAGS.task_index==0)

        worker_device='/job:worker/task:{}'.format(FLAGS.task_index)
        device_func = tf.train.replica_device_setter(worker_device=worker_device,
                                                     cluster=clusterSpec
                                                     )
    # the default values are: ps_device='/job:ps',worker_device='/job:worker'
        with tf.device(device_func):

            global_step = tf.train.get_or_create_global_step()
            #tf.Variable(0,name='global_step',trainable=False)
            x_test = mnist.test.images
            y_test = mnist.test.labels

            # parameters
            n_outputs = 10  # 0-9 digits
            n_inputs = 784  # total pixels

            learning_rate = 0.01
            n_epochs = 50
            batch_size = 100
            n_batches = int(mnist.train.num_examples/batch_size)
            n_epochs_print=10

            # input images
            x_p = tf.placeholder(dtype=tf.float32,
                                 name='x_p',
                                 shape=[None, n_inputs])
            # target output
            y_p = tf.placeholder(dtype=tf.float32,
                                 name='y_p',
                                 shape=[None, n_outputs])

            w = tf.Variable(tf.random_normal([n_inputs, n_outputs],
                                             name='w'
                                             )
                            )

            b = tf.Variable(tf.random_normal([n_outputs],
                                             name='b'
                                             )
                            )

            logits = tf.matmul(x_p,w) + b

            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=y_p,
                                                                    logits=logits
                                                                    )
            loss_op = tf.reduce_mean(cross_entropy)

            optimizer = tf.train.GradientDescentOptimizer(learning_rate)

            # SYNC: next line added for making it sync update
            optimizer = tf.train.SyncReplicasOptimizer(optimizer,
                                                       replicas_to_aggregate=len(workers),
                                                       total_num_replicas=len(workers),
                                                       )

            train_op = optimizer.minimize(loss_op,global_step=global_step)

            # SYNC: next 6 lines added for making it sync update
            if is_chief:
                local_init_op = optimizer.chief_init_op()
            else:
                local_init_op = optimizer.local_step_init_op()
            chief_queue_runner = optimizer.get_chief_queue_runner()
            init_token_op = optimizer.get_init_tokens_op()

            correct_pred = tf.equal(tf.argmax(logits, 1), tf.argmax(y_p, 1))
            accuracy_op = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

        # SYNC: sv is initialized differently for sync update
        sv = tf.train.Supervisor(is_chief=is_chief,
                                 init_op = tf.global_variables_initializer(),
                                 local_init_op = local_init_op,
                                 ready_for_local_init_op = optimizer.ready_for_local_init_op,
                                 global_step=global_step)


        with sv.prepare_or_wait_for_session(server.target) as mts:

            # SYNC: if block added to make it sync update
            if is_chief:
                mts.run(init_token_op)
                sv.start_queue_runners(mts, [chief_queue_runner])

            lstep = 0

            for epoch in range(n_epochs):
                for batch in range(n_batches):
                    x_batch, y_batch = mnist.train.next_batch(batch_size)
                    feed_dict={x_p:x_batch,y_p:y_batch}
                    _,loss,gstep=mts.run([train_op,loss_op,global_step], feed_dict=feed_dict)
                    lstep +=1
                if (epoch+1)%n_epochs_print==0:
                    print('worker={},epoch={},global_step={}, local_step={}, loss = {}'.format(FLAGS.task_index,epoch,gstep,lstep,loss))
            feed_dict={x_p:x_test,y_p:y_test}
            accuracy = mts.run(accuracy_op, feed_dict=feed_dict)
            print('worker={}, final accuracy = {}'.format(FLAGS.task_index,accuracy))
#    sv.stop()

if __name__ == '__main__':

  tf.app.run()