# Distributing TensorFlow Across Devices and Servers
page 315<br>
For details, see
- https://github.com/ageron/handson-ml/blob/master/12_distributed_tensorflow.ipynb (GitHub),
- https://aws.amazon.com/de/getting-started/tutorials/build-train-deploy-machine-learning-model-sagemaker (AWS; see also the file *TensorFlow_on_AWS.key* in the folder *Amazon_AWS*),
- https://colab.research.google.com/notebooks/welcome.ipynb (Google Colab),
- https://towardsdatascience.com/getting-started-with-google-colab-f2fff97f594c (introduction to Google Colab),
- https://www.tensorflow.org/install/gpu (installation of TensorFlow-GPU),
- https://en.wikibooks.org/wiki/Python_Programming/Creating_Python_Programs (on **how to run a python script**; but also check out the own comments following the heading **Managing GPU RAM** further below in this notebook), and
- https://www.w3schools.com/python/python_try_except.asp (on `try` and `except`) .

Despite sped-up training thanks to proper initialization of trainable variables, good (e.g., non-saturating) activation functions, avoidance of vanishing/exploding gradients via batch normalization, and performant optimizers (see Chapter 11), **training may still be relatively slow on a single CPU**. This chapter describes how to distribute operations across different devices (CPUs and or GPUs) and run them in parallel. We will first deal with the distribution on a single machine and then with the distribution across multiple devices on several machines. These are disciplines where TensorFlow really shines: it gives the user a lot of control over parallelization as well as over the allocation and usage of available resources.

Instead of waiting several weeks for a calculation to finish, one may end up waiting just a few hours. So in the same time, one could try out much more hyperparameter configurations. But let's start with GPU usage on a single machine!
## Multiple Devices on a Single Machine
page 316<br>
It is often enough to train on a **single machine** with several GPUs. Due to the communications overhead between machines, training on a single machine with 8 GPUs is about as quick as training on two separate machines and a total 16 GPUs (8 per machine). So this section is on GPU usage on a single machine.
### Installation
page 316<br>
In order to run TensorFlow on a GPU, the GPU requires an NVidia Compute Capability of 3.0 or greater, see https://developer.nvidia.com/cuda-gpus. The appropriate versions of NVidia's *Compute Unified Device Architecture* (CUDA) and *CUDA Deep Neural Network* (cuDNN) libaries need to be downloaded and installed. In short,
- CUDA makes the GPU usable to applicatons other than graphics, indluding (but not limited to) the training of DNNs and
- cuDNN is a GPU-accelerated library of DNN primitives.

A GPU version of TensorFlow is also required. Depending on other python applications on the system, an isolated conda environment may be appropriate. Details on this can be found on the conda project webpage: https://docs.conda.io/projects/conda/en/latest/user-guide/tasks/manage-environments.html.

The entire installation process can be a bit tricky and changes frequently due to version updates. Fortunately, there are many guides on the internet and on the TensorFlow website (https://www.tensorflow.org/install/gpu). If running the command `nvidia-smi` in a shell / terminal leads to a list of the available GPU cards, then CUDA is properly installed. (The picture below is a screenshot taken from the *Machine Learning Machine*: it lists an NVidia RTX 2060 with 6GB RAM and an NVidia GTX 1050ti with 4GB RAM).

<img src="images/nvidia-smi_MLM.png" width="500">

**Suggestion or Tip**<br>
If you don't own any GPU cards, you can use a hosting service with GPU capability such as Amazon AWS. Detailed instructions to set up TensorFlow 0.9 with Python 3.5 on an Amazon AWS GPU instance are available in Žiga Avsec's helpful blog post (http://goo.gl/kbge5b [currently not available – but this may be helpful: https://aws.amazon.com/de/getting-started/tutorials/build-train-deploy-machine-learning-model-sagemaker or the own introduction **TensorFlow_on_AWS.key**]). It should not be too hard to update it to the latest version of TensorFlow. Google also released a cloud service called *Cloud Machine Learning* (https://cloud.google.com/ml) to run TensorFlow graphs. In May 2016, they announced that their platform now includes servers equipped with *tensor processing units* (TPUs), processors specialized for Machine Learning that are much faster than GPUs for many ML tasks. Of course, another option is simply to buy your own GPU card. Tim Dettmers wrote a great blog post (https://goo.gl/pCtSAn) to help you choose, and he updates it fairly regularly.
### Managing the GPU RAM
page 319<br>
By default, TensorFlow grabs all the GPU memory on every available GPU the first time a graph is run. As a consequence, no second TensorFlow program can be run in parallel. There are several ways to allocate GPU memory and for all of them, it is useful to check the GPU and memory allocation via the terminal command `$ nvidia-smi`. To **start a python script from the terminal**, simply navigate in the terminal to the folder that contains the script, e.g. *LRS_MNIST.py* in the folder *other_resources/chapter12*, and then run `python3 ./LRS_MNIST.py` / `python3 LRS_MNIST.py` in it for Mac / Linux. The python file can be created with a simple editor, it just needs the `.py` ending. Simply typesetting the commands in the same way as in an `.ipynb` file will do.
 1. Run separate python scripts from separate terminals with the commands<br>
`$ CUDA_VISIBLE_DEVICES=0 python3 LRS_MNIST.py`<br>
`$ CUDA_VISIBLE_DEVICES=0 python3 LRS_MNIST.py` (or `.._DEVICES=0,1,2,3 pyth..` if more GPUs shall be allocated). All the specified GPU's memory will be allocated to the TensorFlow graph that is being run.
 2. One can simultaneously run different graphs on shared GPUs by specifying the allocated GPU RAM fraction for each graph. Of course, the total allocation should not exceed 1. To do so, simply replace `with tf.Session() as sess:` by<br>
`config = tf.ConfigProto()`<br>
`config.gpu_options.per_process_gpu_memory_fraction=0.4`<br>
`with tf.Session(config=config) as sess:`.<br>
We can run another graph in another .ipynb notebook or in another shell as long as we allocate it a low enough gpu memory fraction, e.g., also 0.4. This is specified in the file *LRS_MNIST_ConfigProto.py*.
 3. By changing the second line above to `config.gpu_options.allow_growth=True` one allows TensorFlow to only grab memory (or *grow* in memory) when necessary. However, TensorFlow does not release memory just because it doesn't need it anymore! An implementation is shown in the file *LRS_MNIST_AllowGrowth.py*.

These three options have been implemented and run on a machine learning machine with two GPUs (an RTX 2060 and a GTX 1050 Ti). Illustrative screenshots are shown in the image below, ordered according to the above list.
<img src="images/Managing_GPU-RAM.png">
### Placing Operations on Devices
page 320<br>
The original TensorFlow whitepaper (http://goo.gl/vSjA14) introduced a *dynamic placer* algorithm that automatically distributes operations over devices by taking into account many variables (e.g., measured computation time in previous runs, RAM available for each device, communication delay, user-imposed constraints, etc.). But to the best of current knowledge, this dynamic placer has never been released to the public. Possibly because the *simple placer* is easy to use and at the same time makes very good use of the available resources.<br>
Whenever you run a graph, if TensorFlow needs to evalutate a node that is not placed on a device yet, it uses **Simple placement** to place it, along with all other nodes that are not placed yet. It follows three rules (relevance increases with number):
 1. Do not change the placement of a node that is already placed.
 2. If the user *pinned* (described next) a node to a device, this device shall be used.
 3. Otherwise, the node is placed on GPU #0 (or to the CPU if there is no GPU).

So apart from the default placement to GPU #0 or to the CPU, placement is mainly up to the user. This can be done with device block using the `device()` function as follows.

In [1]:
import tensorflow as tf
import numpy as np
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)
reset_graph()
with tf.device("/cpu:0"):   # place nodes on device (CPU)
    a = tf.Variable(3.0)
    b = tf.constant(4.0)
with tf.Session() as sess:  # run session
    sess.run(a.initializer) # initialize variable "a"
    print(sess.run(a * b))  # run calculation and print result

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


12.0


**General remark**<br>
The `"/cpu:0"` device aggregates all CPUs on a multi-CPU system. There is currently no way to pin nodes on specific CPUs or to use just a subset of all CPUs.

**Logging placements** is achieved by replacing `with tf.Session() as sess:` by<br>
`config = tf.ConfigProto()`<br>
`config.log_device_placement = True`<br>
`with tf.Session(config=config) as sess:`.<br>
The terminal lines that start with `I` (for "Info") are the log messages. All nodes but the multiplicaton (`mul`) have been placed on the CPU. In contrast to the Mac, the Linux machine does have a GPU (even two) so there, the multiplication is placed on GPU #0, according to the third rule of simple placement (see above). The below screenshots show the terminal outputs of the file *LoggingSimplePlacement.py*, run on Mac (left) and Linux (right).

<img src="images/LoggingPlacements.png">

The following list is a collection of further ideas related to the placing of operations on devices. For each list item, there is also a python script (to be run on a device with a GPU) and a screenshot below that shall help explain how the idea works.
- **Dynamic placement function**s are an alternative to specifying device names when placing operations (nodes) on a device. The example shown in the screenshot below is rather basic but more sophisticated functions are possible. (*DynamicPlacement.py*)
- **Operations and kernels** enable specific TensorFlow operations on specific devices. But not every TensorFlow operation has a kernel for every device. For example TensorFlow does not have a GPU kernel for integer variables, only for floating point numbers. This must be taken into account when attempting to initialize variables on a GPU. (*OperationsAndKernels.py*)
- **Soft placement** can alleviate the problem of not having a kernel for a certain device: instead of insisting on placing an operatoin on a specific device, soft placement will place it on the CPU instead. (*SoftPlacement.py*)

<img src="images/FurtherPlacementStuff.png" width="800">

### Parallel Execution
page 323<br>
When TensorFlow first goes through a graph, it counts how many dependencies each node has. The calculation start with those nodes that have zero dependencies (e.g. nodes A, B, and C in the figure below, which is similar to Figure 12-5 on page 324 of the book). These will be evaluated in parallel, either on different devices or within separate threads on the same device. On each device, TensorFlow manages a thread pool for the parallelization of operations: the **inter-op thread pools**. Operations A and B may be placed on GPU #0 and run through this device's inter-op thread pool **in parallel**. Operation A happens to have a multithreaded kernel so that it can be broken up into independed threads that run **in parallel** within an **intra-op thread pool**. Operation C is sent to GPU #1 and evaluated in the according inter-op thread pool. As soon as its evaluation is finished, the dependencies of nodes D and E decrement to 0. Then, these nodes will be evaluated. Node A will be evaluated once the evaluation of nodes A, B, D, and E is complete.

<img src="images/ParallelExecution.png" width="500">

**Suggestion or Tip**<br>
You can control the nubmer of threads per inter-op pool by setting the `inter_op_parallelism_threads` option. Note that the first session you start creates the inter-op thread pools. All other sessions will just reuse them unless you set the `use_per_session_threads` option to `True`. You can control the number of threads per intra-op pool by setting the `intra_op_parallelism_threads` option.

### Control Dependencies
page 325<br>
In some cases, it might be favorable to delay the evaluation of a node despite its dependency counter being zero (i.e., it coule be evaluated immediately). For example, if an operation requires a lot of memory and thus executing it in parallel with other operations that also need memory could end up bottlenecking the communications bandwidth. Very inefficient! *Control dependencies* make it possible to postpone evaluation of some nodes. In the example below, `x` and `y` – despite being independent of `a` and `b` – are only evaluated those nodes.

In [2]:
a = tf.constant(1.0)
b = a + 2.0                           # b depends on a
# wait until operations a and b have been evaluated
with tf.control_dependencies([a, b]): # [a, b] could be replaced by only [b], since b depends on a
    x = tf.constant(3.0)
    y = tf.constant(4.0)
z = x + y                             # z can only be evaluated after x and y
# run z in session and print result
with tf.Session() as sess:
    print(sess.run(z))

7.0


We have learned
- how to place operations on devices (*placing operations on devices*),
- how parallel execution works (*parallel execution*), and
- how to optimize parallel execution (*control dependencies*).

With these tools, the use of available recources on a single machine (possibly with different devices) can already be optimized quite a bit. So let's have a look at multiple machines (servers)!
## Multiple Devices Across Multiple Servers
page 325<br>
Before distributing operations across multiple servers, one needs to define a *cluster*. A **cluster is composed of TensorFlow *servers***, also called *tasks*, that are **typically spread over multiple machines**, as shown in the figure below (similar to Figure 12-6 on page 326 of the book). Each task belongs to a **job**. As shown in the picture, it is possible to have more than one task / server assigned to a machine. In that case however, one must ensure that the first server does not grab all the available GPU memory, as mentioned earlier. So usually, one task / server per machine is preferred. (The CPU memory is shared by all tasks on the same machine.)
<img src="images/TF_Cluster.png" width="500">

In [3]:
cluster_spec = tf.train.ClusterSpec({
    "ps": [
        "machine-a.example.com:2221", # /job:ps/task:0
    ],                                # ps = parameter server: keeps track of the model parameters (by convention)
    "worker": [
        "machine-a.example.com:2222", # /job:worker/task:0
        "machine-b.example.com:2222", # /job:worker/task:1
    ]})                               # worker: performs computations (by convention)
cluster_spec

<tensorflow.python.training.server_lib.ClusterSpec at 0x108ee62b0>

To start a TensorFlow server, one instantiates a `Server` objects and passes it the *cluster specification* as well as the *job* and the *task*:<br>
`server = tf.train.Server(cluster_spec, job_name="worker", task_index=0)`.

In the book, there is also a comment on how `server.join()` can be used to block the main thread from a process. This may be helpful if the process is supposed to run the server and nothing else.

### Opening a Session
page 327<br>
Once everything has been set up (construction phase), a session can be started from any server / task on any machine (execution phase). The code below creates a simple graph and then opens a session on the specified server. That server will be the **master** and it will place operations on the appropriate (e.g., according to specification) devices.

In [4]:
print("This code runs only once. Restart kernel from here if necessary.")
### construction
# this can run only once (not clear why), but you can restart here if necessary: all imports etc. are here
import tensorflow as tf
import numpy as np
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)
# specify the cluster (jobs and tasks)
cluster_spec = tf.train.ClusterSpec({
    "ps": [                 # ps = parameter server: keeps track of the model parameters (by convention)
        "127.0.0.1:2221",   # /job:ps/task:0
    ],
    "worker": [             # worker: performs computations (by convention)
        "127.0.0.1:2222",   # /job:worker/task:0
        "127.0.0.1:2223",   # /job:worker/task:1
    ]})
# make servers (=tasks, in TensorFlow, see picture above)
task_ps0 = tf.train.Server(cluster_spec, job_name="ps", task_index=0)
task_worker0 = tf.train.Server(cluster_spec, job_name="worker", task_index=0)
task_worker1 = tf.train.Server(cluster_spec, job_name="worker", task_index=1)
# distribute servers to devices and make a little calculation
reset_graph()
with tf.device("/job:ps"):
    a = tf.Variable(1.0, name="a")
with tf.device("/job:worker"):
    b = a + 2
with tf.device("/job:worker/task:1"):
    c = a + b
### execution
# choose any server (=task) on any machine
with tf.Session("grpc://127.0.0.1:2222") as sess:
    sess.run(a.initializer) # initialize a
    print(c.eval())         # evaluate and print c

This code runs only once. Restart kernel from here if necessary.
4.0


### The Master and Worker Services
page 327<br>
All TensorFlow servers provide two services: the *master service* and the *worker service*. The master service allows clients to open sessions and run graphs in them. It also coordinates the computations. Thw worker service executes the computations. Communications between clients and servers are managed by the **gRPC** (*Google Remote Procedure Call*) protocol. It is based on HTTP2 and allows highly efficient bidirectional communication via *protocol buffers* (another Google technology). This entire architecture offers the user a lot of flexibility.

**Warning / caution**<br>
All servers in a TensorFlow cluster may communicate with any other server in the cluster, so make sure to open the appropriate ports on your firewall.
### Pinning Operations Across Tasks
page 328<br>
In the code on *Opening a Session* above, we have already used device blocks to assign certain operations (nodes) to cerrtain workers and taksks but we have not assigned the actual device. In principle, it should work with the code shown below.

`with tf.device("/job:ps/cpu:0"):            # assign CPU #0`<br>
`....a = tf.Variable(1.0, name="a")`<br>
`with tf.device("/job:worker/gpu:0"):        # assign GPU #0`<br>
`....b = a + 2`<br>
`with tf.device("/job:worker/task:1/gpu:1"): # assign GPU #1`<br>
`....c = a + b`

If the task index is omitted (as is done twice above), TensorFlow defaults to `"/job:ps/task:0"`. Above, this is appropriate. If the job name and the task index are omitted, TensorFlow defaults to the session's master task.
### Sharding Variables Across Multiple Parameter Servers
page 329<br>
As mentioned before, a common way to handle parameters in a large cluster is to have dedicated parameter servers (*ps*) separated from servers that perform calculations. If the cluster is very big, using only a single parameter server might bottleneck that server's network card. Not good! A popular solution is to have several parameter servers but manually pinning each variable parameter to a specific server (*sharding*) would be a hell of a task. Fortunately, this is handled *automatically* by TensorFlow's `replica_device_setter()` function that distributes all `ps` tasks in a round-robin (https://en.wikipedia.org/wiki/Round-robin_scheduling) fashion. The upper part (until "see text below") in the following code shows how it would be implemented.

`with tf.device(tf.train.replica_deice_setter(ps_tasks=2):`<br>
`....v1 = tf.Variable(1.0) # pinned to /job:ps/task:0`<br>
`....v2 = tf.Variable(2.0) # pinned to /job:ps/task:1`<br>
`....v3 = tf.Variable(3.0) # pinned to /job:ps/task:2`<br>
`....v4 = tf.Variable(4.0) # pinned to /job:ps/task:3`<br>
`....v5 = tf.Variable(5.0) # pinned to /job:ps/task:4`<br>
`....# see text below`<br>
`....s = v1 + v2           # pinned to /job:worker (+ defaults to /task:0/gpu:0)`<br>
`....with tf.device("/gpu:1"):`<br>
`........p1 = 2 * s        # pinned to /job:worker/gpu:1 (+ defaults to /task:0)`<br>
`........with tf.device("/task:1"):`<br>
`............p2 = 3 * s    # pinned to /job:worker/task:1/gpu:1`

Instead of `ps_tasks`, one may also pass `cluster=cluster_spec` and TensorFlow will infer `ps_tasks` from that.<br>
If the block contains other operations than just variables, TensorFlow will pin them to `/job:worker`, i.e., by default to the first device that takes the first `worker` task. This can be controlled by setting the `workder_device` parameter but an even better option is to use embedded device blocks as shown in the lower part of the above code.

**General note**<br>
This example assumes that the parameter servers are CPU-only, which is typically the case since they only need to store and communicate parameters, not perform intensive computations.

### Sharing State Across Sessions Using Resource Containers
page 330<br>
A local session (that is, a default session as we have used it most of the time) handles the variables' states itself. So the varialbe states do not exist outside the session. Even if another local session runs the same graph at the same time the variable state cannot be shared: each session has its own copy. But **distributed sessions** handle variables with *resource containers* that exist outside the distributed session on the cluster itself. Consider the following (purely illustrative) code.

`# simple_client.py (we do not actually build such a python file – this is only for illustration)`<br>
`import tesnorflow as tf`<br>
`import sys`<br>
`x = tf.Variable(0.0, name="x")`<br>
`increment_x = tf.assign(x, x +1)`<br>
`with tf.Session(sys.argv[1]) as sess:`<br>
`....if sys.argv[2:]==["init"]:`<br>
`........sess.run(x.initializer)`<br>
`....print(x.eval())`

Now suppose that we have TensorFlow cluster running on machines A and B with port 2222. We could first run the above code / file on machine A by typing

`$ python3 ./simple_client.py grpc://machine-a.example.com:2222 init`<br>
`1.0`.<br>
`$ python3 ./simple_client.py grpc://machine-b.example.com:2222 init`<br>
`2.0`.<br>

In the first run, the variable `x` has been initialized (to 0) and incremented (by +1). If we run the code a second time, on the same or on a different machine (above we use machine B, i.e., a different one) then there will be no initialization (so it will stay 1) but there will be an increment (of +1). So the result will be 2. But this solution is a double sided sword! It is great to share variables across multiple sessions but running completely independent calculations is tricky because one has to carefully avoid double use of variable names (because those would be unintentionally shared). To avoid cutting oneself with the double sided sword, one may completely construct separate problems in separate variable scopes with unique names, or one use container blocks

`with tf.variable_scope("my_problem_1"):     # variable scope`<br>
`....[...] # construction of problem_1       # variable scope`,

`with tf.container("my_problem_1"):          # container block`<br>
`....[...] # construction of problem_1       # container block`.

Positive side effects of using a dedicated container are short variable names and the ability to reset a container. For example, the command `tf.Session.reset("grpc://machine-a.example.com:2222", ["my_problem_1])` will connect to the server on machine A and free all its resources (all variables are reset). Resource containers make it possible to share variables across sessions in many different scenarios (see Figure 12-7 in the book and surrounding text for details). In addition, they can also preserve the state of other stateful operations, namely queues and readers. Let's check them out! We start with queues.
### Asynchronous Communication Using TensorFlow Queues
page 331<br>
Queues are a great tool for sharing data across multiple sessions within a cluster. An important application is to have one client that creates a graph for preprocessing data and loading it into the queue and another client that pulls data from that queue and trains on it within a training graph. This can speed up training significantly because **preprocessing and training happen in parallel** rather than in sequence.<br>
TensorFlow has implementations for different kinds of queues. The simplest one ist the **first in first out** (FIFO) queue. In the following example, it can hold up to 10 tensors containing two float values each (see https://www.tensorflow.org/api_docs/python/tf/queue/FIFOQueue for details):

`q = tf.FIFOQueue(capacity=10, dtyptes=[tf.float32], shapes=[[2]], name="q", shared_name="shared_q")`.

<img src="images/Queues.png" width="500">

**Warning / caution**<br>
To share variables across sessions, all you had to do was to specify the same name and container on both ends. With queues TensorFlow does not use the `name` attribute but instead uses `shared_name`, so it is important to specify it (even if it is thes same as the `name`). And, of course, use the same container.

**Enqueuing data**, i.e., putting data in the queue, reqeuires an `enqueue` operation. Code that pushes three instances to the queue would look somewhat like this:

`# training_data-_oader.py`<br>
`import tensorflow as tf`<br>
`q = [...] # specify the queue (e.g. FIFOQueue from above)`<br>
`training_instance = tf.placeholder(tf.float32, shape=(2))`<br>
`enqueue = q.enqueue([training_instance])`<br>
`with tf.Session("grpc://machine-a.example.com:2222") as sess:`<br>
`....sess.run(enqueue, feed_dict={training_instance: [1., 2.]})`<br>
`....sess.run(enqueue, feed_dict={training_instance: [3., 4.]})`<br>
`....sess.run(enqueue, feed_dict={training_instance: [5., 6.]})`.

Instead of enqueuing individual instances one after another, one may also enqueue many at once via the `enqueue_many` operation:

`[...]`<br>
`training_instances = tf.placeholder(tf.flaot32, shape=(None, 2))`<br>
`enqueue_many = q.enqueue([training_instances])`<br>
`with tf.Session("grpc://machine-a.example.com:2222") as sess:`<br>
`....sess.run(enqueue_many, feed_dict={training_instances: [[1., 2.], [3., 4.], [5., 6.]]})`.

Both examples above achieve the same goal: enqueuing three tensors in the queue. **Dequeuing data**, i.e., taking data out of the queue, is necessary to make use of the enqueued data, e.g., for training. This is achieved via the `dequeue` operation. In code, it would look somewhat like this:

`# trainer.py`<br>
`import tensorflow as tf`<br>
`q = [...]`<br>
`dequeue = q.dequeue()`<br>
`with tf.Session("grpc://machine-a.example.com:2222") as sess:`<br>
`....print(sess.run(dequeue)) # [1., 2.]`<br>
`....print(sess.run(dequeue)) # [3., 4.]`<br>
`....print(sess.run(dequeue)) # [5., 6.]`.

Pulling entire batches via `dequeue_many` will usually be more convenient (similar to `enqueue_many`):

`[...]`<br>
`batch_size = 2 # adjust this as appropriate`<br>
`dequeue_mini_batch = q.dequeue_many(batch_size)`<br>
`with tf.Session("grpc://machine-a.example.com:2222") as sess:`<br>
`....print(sess.run(dequeue_mini_batch)) # [[1., 2.], [3., 4.]]`<br>
`....print(sess.run(dequeue_mini_batch)) # blocked (waiting for another instance to match batch_size)`.

When a queue is full (i.e., when its `capacity` has been reached, see `q = tf.FIFOQueue(...)` above), it will be blocked until data has been pulled from it via a dequeue operation. Likewise, dequeuing will be blocked as long as the queue is empty or contains less instances that is required for a `dequeue_many` batch.

**Queues of tuples** make it possible to feed more than one tensor per step (one such tuple is just one unit of the queues *capacity*) so that all data required for, e.g., one training iteration can be queued and dequeued at once. The following queue stores pairs of tensors of different shape where one has type `int32` and the other has type `float32`:

`q = tf.FIFOQeue(capacity=10, dtyptes=[tf.int32, tf.float32], shapes=[[], [3, 2]], name="q", shared_name="shared_q")`<br>
`a = tf.placeholder(tf.int32, shape=())`<br>
`b = tf.placeholder(tf.float32, shape=(3, 2))`<br>
`enqueue = q.enqueue((a, b))`<br>
`with tf.Session([...]) as sess:`<br>
`....sess.run(enqueue, feed_dict={a: 10, b:[[1., 2.], [3., 4.], [5., 6.]]})`<br>
`....sess.run(enqueue, feed_dict={a: 11, b:[[2., 4.], [6., 8.], [0., 2.]]})`<br>
`....sess.run(enqueue, feed_dict={a: 12, b:[[3., 6.], [9., 2.], [5., 8.]]})`.

Note that the enqueue operation is given **pairs** of tensors! In the following, the `dequeue()` function creates a **pair** of dequeue operations:

`dequeue_a, dequeue_b = q.dequeue() # it makes sense to run these two together in the upcoming session`<br>
`with tf.Session([...]) as sess:`<br>
`....a_val, b_val = sess.run([dequeue_a, dequeue_b])`<br>
`....print(a_val) # 10`<br>
`....print(b_val) # [[1., 2.], [3., 4.], [5., 6.]]`.

**Warning / caution**<br>
If you rund `dequeue_a` on its own, it will dequeue a pair and return only the first element; the second element will be lost (and similarly, if you run `dequeue_b` on its own, the first element will be lost).

The `enqueue_many()` and `dequeue_many()` functions also work like this. Syntax for the latter is shown below:

`batch_size = 2`<br>
`dequeue_as, dequeue_bs = q.dequeue_many(batch_size)`<br>
`with tf.Session([...]) as sess:`<br>
`....as, bs = sess.run([dequeue_as, dequeue_bs])`<br>
`....print(as) # [10, 11]                        # works: see enqueuing above`<br>
`....print(bs) # [[[1., 2.], [3., 4.], [5., ].]], [[2., 4.], [6., 8.], [0., 2.]]]`<br>
`....as, bs = sess.run([dequeue_as, dequeue_bs]) # blocked waiting for another pair`.

**Closing a queue** – presumably in any session – via

`close_q = q.close()`<br>
`with tf.Session([...]) as sess:`<br>
`....[...]`<br>
`....sess.run(close_q)`

is useful to signal to other sessions relying on that queue that no more data will be enqueued. Pending  enqueue requests will still be executed unless on specifies `q.close(cancel_pending_enqueues=True)`. Dequeuing will go on until finished, i.e., until the queue is empty (for `dequeue`) or there are not enough instances for the specified batch size (for `dequeue_many`). In the latter case, one might prefer to use the `dequeue_up_to` operation. As the name suggests, it dequeues as many instances as specified via `batch_size` unless less instances are in the queue. In the latter case, it dequeues the rest.

**RandomShuffleQueue**, with the TensorFlow function `RandomShuffleQueue` is an alternative to `FIFOQueue` that shuffles the enequeued instances before dequeuing. This may be useful to shuffle instances differently in different training epochs. The reason for the `min_after_dequeue` argument in

`q = tf.RandomShuffleQueue(capacity=50, min_after_dequeue=10, dtypes=[tf.float32],`<br>
`..........................shapes=[()], name="q", shared_name="shared_q")`

is used to ensure enough options for shuffling the instances. Either further instances will be enqueued so that dequeuing can continue while satisfiying the minimum number of instances for shuffling or the loop will eventually be closed. In the latter case, the `min_after_dequeue` argument is discarded and the remaining instances are dequeued. Suppose floats `1.` to `22.` have been enqueued. The dequeuing may look like this:

`dequeue = q.dequeue_many(5)`<br>
`with tf.Session([...]) as sess:`<br>
`....print(sess.run(dequeue)) # [20. 15. 11. 12.  4.] (17 items left)`<br>
`....print(sess.run(dequeue)) # [ 5. 13.  6.  0. 17.] (12 items left)`<br>
`....print(sess.run(dequeue)) # 12 - 5 = 7 < 10: dequeuing blocked (wait for 3 more instances so that 10 would remain)`.

**PaddingFIFOQueue** is a queue that accepts tensors of varying dimensions but with constant rank. When dequeuing a mini-batch with `dequeue_many` or `dequeue_up_to`, all instances have the same shape, namely the maximum in each dimension (for the current mini-batch). Smaller tensors will be padded with zeros. When dequeuing with `dequeue`, each "mini-batch" consists of only one instance so the original shape will always be kept. An example where padding is applied would look somewhat like this:

`q = tf.PaddingFIFOQueue(capacity=50, dtypes=[tf.float32], shapes=[(None, None)], name="q", shared_name="shared_q")`<br>
`v = tf.placeholder(tf.float32, shape=(None, None))`<br>
`enqueue = q.enqueue([v])`<br>
`with tf.Session([...]) as sess:`<br>
`....sess.run(enqueue, feed_dict={v: [[1., 2.], [3., 4.], [5., 6.]]})       # shape 3x2`<br>
`....sess.run(enqueue, feed_dict={v: [[1.]]})                               # shape 3x2`<br>
`....sess.run(enqueue, feed_dict={v: [[7., 8., 9., 5.], [6., 7., 8., 9.]]}) # shape 3x2`.

Such a queue is useful when the data have varying shape, as is common, e.g., in *Natural Language Processing* (NLP, see Chapter 14). We have already learne
- how to **distribute operations** across tasks / servers,
- how to **share and communicate variables** between servers, and
- how to **enqueue and dequeue data** in sessions different from computational sessions so that these **things can be done in parallel**, thus speeding up the overall process.

One very important step is left. We should also know how to efficiently feed the graph with training data.
### Loading Data Directly from the Graph
page 337<br>
Up to now, we have assumed that the client feeds the data to the cluster via placeholders. That works well in many cases but it involves moving the data several times:
- from filesystem to client,
- from client to master task, and
- possibly from master task to another task.

Ane maybe several clients shall train different neural networks at the same time, e.g., for hyperparamter optimization. This might bottleneck the network bandwith. And a solution exists!

If a dataset fits in memory, a good solution will be to simply **Preload the data into a variable** once and then just use this variable in the graph. The data will be transferred from the client to the cluster only once (but moving it from a master task to some other task might still be necessary). An implementation would look somewhat like this:

`training_set_init = tf.placeholder(tf.float32, shape=(None, n_features))`<br>
`training_set = tf.Variable(training_set_init, trainable=False, collections=[], name="training_set")`<br>
`with tf.Session([...]) as sess:`<br>
`.... data = [...] # load the training data from the data storage`<br>
`.... sess.run(trainign_set.initializer, feed_dict={training_set_init: data})`.

The argument `trainable=False` is necessary to keep optimizers from meddling with the input data. Additionally, one ought to set `collections=[]`, thus removing it from collections, in particular from `GraphKeys.GLOBAL_VARIABLES`. Otherwise it would unnecessarily be saved in a checkpoint.

**General note**<br>
This example assumes that all of your training set (including the labels) consists of only of `float32` values. If that's not the case, you will need one variable per type.

**Reading the training data directly from the graph** via *reader operations* is useful if the training set does not fit in memory. Also, the data never passes through this client, thus avoiding unnecessary operations. TensorFlow provides readers for (i) CSV files, (ii) fixed-length binary records, and (iii) TensorFlow's own `TFRecords` format, which is based on protocol buffers (https://en.wikipedia.org/wiki/Protocol_Buffers). Figure 12-9 on page 340 of the book shows the overall idea. But it not reproduced here because that idea is also illustrated by the slightly more complicated Figure 12-10 on page 342 in the book. That figure is reproduced furhter below. Using just one thread between  the filename queue and the instance queue corresponds to Figure 12-9.<br>
Let's begin vy illustrating *reader operations* with the very simple CSV file *my_test.csv* (displayed below)!
<img src="images/My_Test_CSV.png" width="100">
It is obvious that the first line is a header. But it is not explicitly marked as such, so we need to work around that when reading data from this file. Also, the first and second column contain floats and appear to be features (see header). The third column only contains integers that seem to indicate a binary target class. We will read this file with a `TextLineReader`. This is an object that reads text files line by line. And it is stateful: it keeps track of what file it is working on and what its position in this file is, so it can be called step-by-step (or line-by-line). The file name(s) shall be fed to the reader via a queue with enqueued placeholders that will be filled at runtime. The queue shall be explicitly closed when finished.<br>
Once the line reader knows what file to open, it shall retrieve a line (*value*) and unique metadata about the line (*key*). This is a **key/value pair**. For the `TextLineReader`, the key contains the filename, a colon (`:`), and the line number. The value is just the content of that line. The value string does contain the features and target / label but we need to extract them from the string. Here, this is done with

`x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])`<br>
`features = tf.stack([x1, x2])`,

where the second argument of the first line specifies (i) the default value in case there is no data in a column and (ii) the format (here: float, float, int). The second line simply stacks the features of one instance. Since the instance has been read, now, it can be pushed to a `RandomShuffleQueue` that will shuffle the instances and make them available to some kind of operation, e.g., to a training operation. That queue shall also be closed when done. To finish off, we also need a session that runs these queues (`filename_queue` and `instance_queue`), thus pulling filenames (only one, here) from one queue and pushing instance features and targets to the other queue. When all filenames have been pulled from `filename_queue`, TensorFlow would just wait for new filename to be fed to the queue. But if the queue is already closed, an `OutOfRangeError` will be raised. By default, this would lead to an error message. Yet, we handle this error with `try` and `except` (see link above). Then, we close `instance_queue` and generate mini-batches from it until it is empty. Since that queue is also alredy closed, we would get again an `OutOfRangeError`. But we handle this one in the same manner as the previous one. All mini-batches are printed to the screen.

In [5]:
# make / open a simple CSV file (my_test.csv)
test_csv = open("./other_resources/chapter12/my_test.csv", "w")
test_csv.write("x1, x2 , target\n")             # header
test_csv.write("1., 2., 0\n")                   # instance 1 with features "1.", "2.", and class "0"
test_csv.write("4., 5 , 1\n")                   # instance 2
test_csv.write("7.,   , 0\n")                   # instance 3
test_csv.close()
# always a good idea
reset_graph()
# TextLineReader
reader = tf.TextLineReader(skip_header_lines=1) # skip the header
# queue to feed file names to reader
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string],shapes=[()]) # make a first-in-first-out queue
filename = tf.placeholder(tf.string)                                       # placeholder for file name
enqueue_filename = filename_queue.enqueue([filename])                      # enqueue the placeholder
close_filename_queue = filename_queue.close()                              # queue closing operation
# read in the data
key, value = reader.read(filename_queue)                                   # a key-value pair from specified file
x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.],[-1]]) # decode the value and specify defaults ...
features = tf.stack([x1, x2])                                              # ... and formats; then stack features
# queue that shuffles retrieved instances and makes them available for a (training) graph
instance_queue = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=2, dtypes=[tf.float32, tf.int32],
                                       shapes=[[2], []], name="instance_q", shared_name="shared_instance_q")
enqueue_instance = instance_queue.enqueue([features, target])              # enqueue features and target (see above)
close_instance_queue = instance_queue.close()                              # queue closing operation
# page 340 in the book
mini_batch_instances, mini_batch_targets = instance_queue.dequeue_up_to(2) # dequeue mini-batches via "dequeue_up_to"
# run the queues in a session
with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={                                 # ...there is only ...
        filename: "./other_resources/chapter12/my_test.csv"})              # ... one filename instance ...
    sess.run(close_filename_queue)                                         # ... so we can alreadyy close the queue
    # details on try - except: https://www.w3schools.com/python/python_try_except.asp
    try:                                                               # as long as there are instances in the ...
        while True:                                                    # ... filename_queue, try to enqueue them ...
            sess.run(enqueue_instance)                                 # ... in the RandomShuffleQueue
    except tf.errors.OutOfRangeError:                                  # if there is an OutOfRangeError (because ...
        print("No more files to read")                                 # ... "filename_queue has ended) print a ...
    sess.run(close_instance_queue)                                     # ... message and close the RandomShuffleQueue
    # all below is from github
    try:                                                               # as long as there are instances in the ...
        while True:                                                    # ... RandomShuffleQueue, try to make mini- ...
            print(sess.run([mini_batch_instances,mini_batch_targets])) # ... batches out of them and print them
    except tf.errors.OutOfRangeError:                                  # if there is an OutOfRangeError (because ...
        print("No more training instances")                            # ... the queue has ended) print a message

No more files to read
[array([[4., 5.],
       [1., 2.]], dtype=float32), array([1, 0], dtype=int32)]
[array([[ 7.00000000e+00, -1.28134154e-20]], dtype=float32), array([0], dtype=int32)]
No more training instances


Note that the first mini-batch consists of two instances (as specified by `instance_queue.dequeue_up_to(2)`):
- first, features $[4.,\,5.]$ and label $[1]$ and
- second, features $[1.,\,0.]$ and label $[0]$.

So the order has been randomized, as expected from `RandomShuffleQueue` (but this may vary from run to run). The second mini batch consists of only one instance. This demonstrates the use of `dequeue_up_to(2)`: `dequeue_many()` would not have returned an incomplete mini-batch. It has features $[7.,\,8.]$ and the label $[0]$. It seems that the `record_defaults=[[-1.], [-1.], [-1]]` argument does not work as intended: the filled in $8$ for the last instance's missing second feature (incomplete mini-batch) should actually read $-1$.

In the above code, we did not actually use the mini-batches as input to network that would be trained via a training operation. To do that, one would further design a graph with, e.g., fully connected layers that takes the mini-batches as input and also has some training operation (`training_op = [...]`). This training operation would then be part of a session, somewhat like this:

`with tf.Session([...]) as sess:`<br>
`....try:`<br>
`........for step in range(max_steps):`<br>
`............sess.run(training_op)`<br>
`....except tf.errors.OutOfRangeError:`<br>
`........pass # no more training instances (same procedure as in code demonstration above)`.

**Warning / caution**<br>
TensorFlow queues don't handle sparse tensors well, so if your training instances are sparse you should parse the records after the instance queue.

**Multithreaded readers using a Coordinator and a QueueRunner** can be employed for data loading as shown above, yet with much larger throughput. In principle, imultaneously reading instances in via multiple threads could be achieved via Python's `threading` module. But it's much more convenient with TenorFlow's `Coordinator` and `QueueRunner` classes.

The sole job of a `Coordinator` is to **coordinate the stopping of multiple threads**. Any thread that is managed by the coordinaor can request at any time that all threads that are managed by the coordinator stop, simply by calling the `request.stop()` method. Then any thread will stop as soon as it finishes its current iteration. One can wait with the running program to continue only after all threads have stopped by passing the coordinator a list of threads via `tf.train.Coordinator().join(list_of_threads)`.

A `QueueRunner` runs multiple threads, each with an own queue, that enqueue instance into their queue as fast as possible. As soon as one queue gets closed, that queue receives an `OutOfRangeError`. Via a `Coordinator`, the queue that received the error tells all threads within the `QueueRunner` to stop. The total implementation would look somewhat as follows:

`# construction as in code above (# make / open a simple CSV file)`<br>
`enqueue_instance = instance_queue.enqueue([features, target])                    # taken from above`<br>
`close_instance_queue = instance_queue.close()                                    # taken from above`<br>
`queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * 5)      # to run 5 threads in parallel`<br>
`with tf.Session() as sess:                                                       # start session`<br>
`....sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})              # enqueue "my_test.csv" into queue`<br>
`....sess.run(close_filename_queue)                                               # close filename queue`<br>
`....coord = tf.train.Coordinaotor()                                              # make a coordinator instance`<br>
`....enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True) # use 5 parellel threads for ...`<br>
`.................................................................................# ... enqueuing instances`.

Above, threads in the `QueueRunner` run the same `enqueue_instance` operation over and over. The `Coordinator` would be used to tell the `QueueRunner` to stop  gracefully, as explained above. Then, the parallel threads are created. This will work fine and a bit faster than the above code. But all threads are reading from the same file. We can achieve higher throughput by **simultaneously reading from different files**. The process is illustrated in the picture below (similar to Figure 12-10 on page 342 of the book).

<img src="images/Queue+Coord.png" width="600">

To actually do this, we use function that receives the queues (filename and instance) takes a filename from the queue, opens the according file, reads in a line, creates an instance, and pushes to to the instance queue. The rest is pretty much as outlined above but with different files. An example is implemented below (with comments as appropriate).

In [6]:
### book
# always a good idea
reset_graph()
# take filename, open file, read line, make instance, and push it to instance_queue
def read_and_push_instance(filename_queue, instance_queue):
    reader = tf.TextLineReader(skip_header_lines=1)                             # reader shall skip file header
    key, value = reader.read(filename_queue)                                    # read current line
    x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]]) # decode value
    features = tf.stack([x1, x2])                                               # stack features
    enqueue_instance = instance_queue.enqueue([features, target])               # enqueue features and labels, ...
    return enqueue_instance                                                     # ... and return that command
# queue for filenames
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])     # create queue for filenames
filename = tf.placeholder(tf.string)                                            # placeholder for filename instances
enqueue_filename = filename_queue.enqueue([filename])                           # enqueue the placeholder
close_filename_queue = filename_queue.close()                                   # run this to close the queue
# queue for instances (as above)
instance_queue = tf.RandomShuffleQueue(capacity=10, min_after_dequeue=2, dtypes=[tf.float32, tf.int32],
                                       shapes=[[2],[]], name="instance_q", shared_name="shared_instance_q")
# dequeue mini batches via "dequeue_up_to" (as shown above and on page 340 of the book)
mini_batch_instances, mini_batch_targets = instance_queue.dequeue_up_to(2)

# make queue runner with a list of 5 different pairs of filename queues and instance queues
read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue)  # this uses the above function
                        for i in range(5)]
queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)       # specifiy queue type and list of ops
### github
# start session and show that this works
with tf.Session() as sess:
    sess.run(enqueue_filename,
             feed_dict={filename: "other_resources/chapter12/my_test.csv"})     # enqueue the specified filename
    sess.run(close_filename_queue)                                              # close the filename queue
    coord = tf.train.Coordinator()                                              # build coordinator
    enqueue_threads = queue_runner.create_threads(sess, coord=coord,start=True) # create 5 enqueuing threads with ...
                                                                                # ... queue runner and coordinator
    try:                                                                        # derails on try - except: see above
        while True:
            print(sess.run([mini_batch_instances, mini_batch_targets]))         # do mini batches until queue is empty
    except tf.errors.OutOfRangeError:
        print("No more training instances")

[array([[4., 5.],
       [1., 2.]], dtype=float32), array([1, 0], dtype=int32)]
[array([[ 7.00000000e+00, -1.28134154e-20]], dtype=float32), array([0], dtype=int32)]
No more training instances


A quite different approach is shown on GitHub (see link above). We shall take it into consideration and go through it very quickly! Here it is.

In [7]:
### everything github
reset_graph()                                         # always a good idea
filenames = ["other_resources/chapter12/my_test.csv"] # list of filenames
dataset = tf.data.TextLineDataset(filenames)          # transform to dataset
def decode_csv_line(line):                            # function reveives a line and return features and labels
    x1, x2, y = tf.decode_csv(line, record_defaults=[[-1.], [-1.], [-1.]]) # tf.decode_csv is the same as above
    X = tf.stack([x1, x2])                            # stack features
    return X, y                                       # return features and labels
dataset = dataset.skip(1).map(decode_csv_line)        # transform dataset line-by-line (skip header) to tensor
it = dataset.make_one_shot_iterator()                 # make an iterator over dataset
X, y = it.get_next()                                  # this will return the next iteration
with tf.Session() as sess:                            # start session
    try:                                              # as above
        while True:                                   # as above
            X_val, y_val = sess.run([X, y])           # almost the same idea as above
            print(X_val, y_val)                       # almost the same idea as above
    except tf.errors.OutOfRangeError:                 # as above
        print("Done")                                 # as above

[1. 2.] 0.0
[4. 5.] 1.0
[ 7.00000000e+00 -1.28134154e-20] 0.0
Done


**Other convenience functions** that simplify common tasks when reading training instances are also included in the TensorFlow library (see https://www.tensorflow.org/api_docs/python for a full list). Here we shortly outline the use of a few functions that are useful in the context of loading data directly from the graph. The `strint_input_producer()` is a 1D tensor (list) of filenames, creates a thread that pushes a filename to a filename queue, and then closes the queue. When the number of epochs is specified, it shuffles the instances before closing the queue. It also creates a `QueueRunner`. Similar *producer* functions are `input_producer()`, `range_input_producer()`, and `slice_input_producer()`.

The dedicated collection `GraphKeys.QUEUE_RUNNERS` contains queue runners. Those included in that collection can be started via the `tf.train.sstart_queue_runners()` function. A queue runner that has not been start will remain open and empty, thus blocking the reader forever.

The `shuffle_batch()` function takes a list of tensors (like `[features, targets]`) and creates (i) a `RandomShuffleQueue`, (ii) a `QueueRunner` that enqueues tensors to the queue (added to the `GraphKeys.QUEUE_RUNNERS` collection), and (iii) a `dequeue_many` operation to retrieve a mini-batch from the queue. That makes it very easy to create a multithreaded input pipeline that feeds a queue and a training pipeline that reads mini-batches from that queue (see also the latest figure above, similar to Figure 12-10 on page 342 in the book: the input pipeling would be added at the right end, after the instance queue). The `batch()`, `batch_join()`, and `shuffle_batch_join()` functions are apparently also worth checking out in this context.

To summarize, we have learned how to
- use multiple GPUs,
- set up and start a TensorFlow cluster,
- distribute computations across multiple devices and servers,
- share variables and other stateful operations (e.g., queues and readers) across sessions via containers,
- coordinate multiple graphs asynchronously using queues, and
- read input efficiently via readers, queue runners, and coordinators.

Let's use all this to parallelize neural networks!
## Parallelizing Neural Networks on a TensorFlow Cluster
page 344<br>
We will first discuss the parallelization of several models over a cluster (with one model per server) and then we will discuss the much more elaborate task of parallelizing one model over a cluster.
### One Neural Network per Device
page 344<br>
A model that runs on a single machine will also run on a cluster: simply specify the master server's address – and during construction optionally the device (possibly a GPU) – when creating the session. That is it. The model will run on the specified server (and device).

Running the same model in parallel on different servers of a cluster, where each server uses different devices, is perfect for **hyperparameter searching**. This concept is illustrated in the figure below (similar to Figure 12-11 on page 344 of the book). The speed-up is almost linear: training 100 copies of the model with different hyperparameters on 50 servers with 2 devices each will only take about as long as the slowest model (the speed depends on the device and the hyperparameters). So the more compute is available, the more hyperparameter configurations can be tried out in roughly the same time.

<img src="images/MonoNN_Cluster.png" width="350">

Running the same model on different servers of a cluster is also very helpful when a service that relies on inference instead of training shall process a high number of *queries per second* (QPS). One can simply replicate the model over as many servers as required, no matter how high the number of QPS. Note, that adding more servers in this manner (one copy of the model per server) will not speed up individual inference instances.

**General remark**<br>
Another option is to serve your neural networks using *TensorFlow Serving*. It is an open source system, releasd by Google in February 2016, designed to serve a high volume of queries to Machine Learning models (typically built with TensorFlow). It handles model versioning, sou you can easily deploy a new version of your network to production, or experiment with various algorithms without interrupting your service, and it can sustain a heavy load by adding more servers. For more details, check out https://tensorflow.github.io/serving [or https://www.tensorflow.org/tfx/guide/serving].
### In-Graph Versus Between-Graph Replication
page 345<br>
If an ensemble of (usually different) networks shall be trained – e.g., to aggregate predictions and then make a final prediction based on all aggregated predicions (via *winner-takes-it-all*, relative majority, a trained perceptron, or whatever) – it is possible to train each predictor completely independent from all other predictors. Yet, for inference, all predictions need to be aggregated. This requires coordination. And there are two approaches to it, both of which are shown in the figure below (similar to Figures 12-12 and 12-13 on page 346 of the book):
- **In-graph replication** relies on *a single graph that contains everything*: preprocessing pipelines (queues), the independent models (in parallel, see figure below), and the aggregation plus prediction pipeline (queue). All these steps are distributed from one client to different servers. In particular, the independent networks shall run in parallel sessions on different servers, see the left part of the picture below.
- **Between-graph replication** uses *independent graphs for independent networks*. Preprocessing and aggregation (plus final inference) can be achieved via additional pipelines /queues. The idea is nicely illustrated in the right part of the picture below.

<img src="images/GraphRep.png" width="900">

Both approaches have their pros and cons. In-graph replication is simpler to implement because one needs to handle only one graph and one client. However, between-graph replication allows for more flexibility: Each network can be trained completely independent of the others, which makes it easier to develop and test. Additionally one can, e.g., add a time-out that allows the model to proceed even if one of the parallel networks does not produce a usable output (for example because its server crashed). Such a time-out can be implemented with `run()`, `RunOptions`, and `timeout_in_ms` as shown here:

`with tf.Session([...]) as  sess:`<br>
`....[...]`<br>
`....run_optons = tf.RunOptions()`<br>
`....run_options.timeout_in_ms = 1000                           # 1s timeout`<br>
`....try:`<br>
`........pred = sess.run(dequeue_prediton, options=run_options) # pass run_options to sess.run()`<br>
`....except tf.errors.DeadlineExceededError:`<br>
`........[...]                                                  # the dequeue operation timed out after 1s`.

Another option is to set a time limit to *any* operation inside the session via `operation_timeout_in_ms` (before starting the session) and `run()`. This is implemented below.

In [8]:
### everything github
reset_graph()                                                   # always a good idea
q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()]) # make a queue
v = tf.placeholder(tf.float32)                                  # placeholder for value
enqueue = q.enqueue([v])                                        # operation for enqueuing the placeholder
dequeue = q.dequeue()                                           # dequeuing operation
output = dequeue + 1                                            # make an output
config = tf.ConfigProto()                   # configure the time-out ...
config.operation_timeout_in_ms = 1000       # ... time in ms for every ...
with tf.Session(config=config) as sess:     # ... operation in this session
    sess.run(enqueue, feed_dict={v: 1.0})           # enqueue 1 => will be 2
    sess.run(enqueue, feed_dict={v: 2.0})           # enqueue 1 => will be 2
    sess.run(enqueue, feed_dict={v: 3.0})           # enqueue 1 => will be 2
    print(sess.run(output))                         # dequeue and print 2 (from enqueued 1)
    print(sess.run(output, feed_dict={dequeue: 5})) # dequeue and print 6 (from enqueued 5)
    print(sess.run(output))                         # dequeue and print 3 (from enqueued 2)
    print(sess.run(output))                         # dequeue and print 4 (from enqueued 3)
    try:                                    # try-except as further above (see also link at top)
        print(sess.run(output))             # the queue "q" is empty and waits for an input so this will raise ...
    except tf.errors.DeadlineExceededError: # ... exactly this error (due to time out)
        print("Timed out while dequeuing.") # this is exactly what happened

2.0
6.0
3.0
4.0
Timed out while dequeuing.


### Model Parallelism
page 347<br>
Chopping a model into several parts such that those parts can be run in parallel (at least in part) on different devices is called *model parallelism*. But this is tricky, since the dependence of each part on the other parts is crucial. So it depends on the model architecture. Running a higher dense layers in parallel to a lower dense layer does not really come with an advantage because the higher layer would need to wait for the output of the lower layer. So fully connected layers pose a problem to model parallelism but a layers that are only partially connected such that there are basically two halves that can be split more easily, see the figure below (similar to Figures 12-14 and 12-15 on page 348 of the book). The latter applies for the different filters in a given layer of a convolutional neural network (CNN, see Chapter 13).

<img src="images/SplitNNs/SplitFPNNs.png" width="850">

Recurrent neural networks (RNNs, see Chapter 14) can actually split vertically. Here, each layer produces an output that is fed upwards to the next higher level but also forward to itself for the next time step. So as soon as a layer has produced an output (or *state*), the next higher layer can chew on that output / state in parallel to the original layer that now processes the next time step. Obviously, there is a lot of communication overhead. But if the individual cells (i.e., the calculation of one layer for one time step) are demanding enought, the parallelization advantage will dominate over the communication overhead disadvantege. The idea is illustrated in the figure below (similar to Figure 12-16 on page 349 of the book).

<img src="images/SplitNNs/SplitRNNs.png" width="450">

To summarize, model parallelism can work really well if the computational speed-up dominates over the communication drag that is implied by different parts of the model depending on each other. So parts / layers that communicate a lot of data amongst each other should usually run on the same device.
### Data Parallelism
page 349<br>
Another way to **parallelize the training** of a network is to feed different training batches to different copies of the model, run these models in parallel, aggregate their result (only for *synchronous updates*), update the trainable variables, communicate the updates to all parallel models (such that they remain identical), and start over again. The procedure is illustrated in the figure below (similar to Figure 12-17 on page 349 of the book). There are two approches to this: *synchronous updates* and *asynchronous updates*.

<img src="images/DataParallelism.png" width="650">

**Synchronous updates** of the trainable variables require the aggregation of all gradients before updating. This leads to a suboptimal use of resources if one device finishes its calculation before another. After updating, the updated variables are communicated to all devices at the same time. So the parameter server's bandwidth might be unsaturated most of the time while bottlenecking during the updates because of all replicas getting updated at the same time.

**Suggestion or Tip**<br>
To reduce the waiting time at each step, you could ignore the gradients from the lslowest few replicas (typcally ~10%). For example you could run 20 replicas, but only aggregate the gradients ffrom the fastest 18 replicas at each step, and just ignore the gradients from the last 2. As soon as the parameters are updated, the first 18 replicas can start working again immediately, without habing to wait for the 2 slowest replicas. This setup is generall ydescibed as having 18 replicas plus 2 *spare replicas*. [Unless, 2 devices are really much slower than the others, in general all replicas will be taken into account sooner or later.]

**Asynchronous updates** of the trainable variables are achieved making one training step whenever one replica submits its gradients. So there is no aggregation and the *mean* in the above picture is omitted. Not waiting for other replicas results in less waiting time so the average update frequency will be higher than for synchronous updates. Whenever an update has been made, the new variables become available for all replicas, each of which only retrieves the latest variables prior to restarting a new iteration. The replicas do this asynchronously and thus, in general, at different times so the danger of bottlenecking the parameter server's bandwidth is reduced.

If there are $N$ replicas, then on average $N-1$ variable updates (from all other replicas) occur during one iteration of any specific replica. So this replica's gradients will be outdated by the time they have been computed (see figure below, similar to Figure 12-18 on page 351 of the book). The reason asynchronous updates do work is that *on average*, these gradients will still point in the right direction. When gradients are not pointing in a meaningful direction, they are called **stale gradients** (*stale*: no longer fresh; e.g. "stale bread"). This can compromise training. But there are a number of countermeasures:
- Reduction of the learning rate.
- Dropping or scaling down of stale gradients.
- Adjustment of the mini-batch size (and possibly other hyperparameters).
- Starting with a *warm-up phase* that uses only one replica at the beginning of training, when gradients tend to be large and are not yet on track towards a valley of the cost function. This avoids gradients from different replicas pointing in very different directions.

<img src="images/StaleGradients.png" width="400">

A 2016 paper by the Google Brain team (http://goo.gl/9GCiPb) summarizes the team's benchmarks on various approaches towards data parallelism. The conclusion is that synchronous updates with a few spare replicas (see *Suggestion or Tip* above; the cited paper refers to *backup workers* instead of *spare replicas*) performs best. But a lot of time has passed since then so this is conclusion might not be up-to-date.

**Bandwidth saturation** is an issue common to synchronous and asynchronous data parellelism. Both approaches require that (i) parameters / variables be communicated to each replica prior to a new iteration and that (ii) gradients be communicated back from the replica to the parameter server at the end of an iteration. So *at some point*, adding *another GPU* for further parallelization (more replicas) *does not speed up the overall training time because data transmission just saturates* (even more).

**Suggestion or Tip**<br>
For some models, typically relatively small and trained on a very large training set, you are often better off training the model on a single machine with a single GPU.

Saturation is more severe the larger or the denser a model is because those have more nonzero parameters (variables and gradients) to communicate. Zeros can be communicated efficiently. With 50 (500) GPUs, Jeff Dean (the former head of Google Brain) reported a typical speedup of 25-40x (300x) on dense (sparse) models. So sparse model really appear to scale better. Further examples (as of Q1 2016):
- 6x speedup for Neural Machine Translation (NMT) on 8 GPUs,
- 32x speedup for Inception/ImageNet on 50 GPUs, and
- 300x speedup for Rankbrain on 500 GPUs.

Saturation sets in and performance degrades around a few hundred (dozen) GPUs for sparse (dense) models. But due to continued research in this area, these findings may need to be updated. Below follow a few measures that promise to reduce saturation.
- Group the GPUs on a few servers rather than many servers to reduce communication.
- Shard the parameters across several parameter servers (see above).
- Cut the amount in data in half by moving from `tf.float32` (32-bit precision) to `tf.flaot16` (16-bit precision). Model performance and convergence will usually remain almost unaffected.

**Suggestion or Tip**<br>
Although 16-bit precision is the minimum for training [a] neural network, you can actually drop down to 8-bit precision after training to reduce the size of the model and speed up computations. This is called *quantizing* the neural network. It is particularly useful for deploying and running pretrained models on mobile phones. See Pete Warden's great post (http://goo.gl/09Cb6v) on the subject.

**TensorFlow implementation**s of data parallelism (and no model parallelism) depend on the kind of replication (for the replicas, each of which submits its gradient for a global variable update): in-graph or between-graph. Below, we comment on all of them. The parameters (trainable variables) are always handled by a dedicated parameter server (or by several parameter servers when the parameters are sharded, see above).
- *In-graph replication and synchronous updates*
Build one big graph that contains all the replicas – placed on different devices – and a few nodes to aggregate their gradients and feed them to the optimizer. The code would open only one session on the cluster and run the training operation repeatedly.
- *In-graph replication and asynchronous updates*
Also build one big graph with all the replicas but also have a dedicated optimizer for each replica. Run one thread per replica and optimizer.
- Between-graph replication and asynchronous updates*
Run multiple independent clients in separate processes (*each in their own world*), each training one replica. The parameters are shared between replicas via a resource container.
- Between-graph replicatoin and synchronous updates*
By and large the same as *between-graph + asynchronous* but now, the optimizer (e.g., `MomentumOptimizer`) is wrapped within a `SyncReplicasOptimizer`. Each replica uses this optimizer normally but under the hood, it sends the gradients into queues (one per variable). Some *chief* of the `SyncReplicaOptimizer` aggregates the gradients, applies them, and then returns the results to the replicas via *token queues* (one per replica). [Apparently, this technique supports having *sparse replicas*.]

## Extra Material
Some extra material is available on GitHub (see link above).

## Exercises
page 354
### 1.-7.
Solutions are shown in Appendix A of the book and in the separate notebook *ExercisesWithoutCode*.
### 8. Train several DNNs in parallel on a TensorFlow cluster, using different hyperparameter values. This could be DNNs for MNIST classification or any other task you are interested in. The simplest option is to write a single client program that trains only one DNN, then run this program in multiple processes in parallel, with different hyperparameter values for each client. The program could have command-line options to control what server and device the DNN should be placed on, and what resource container and hyperparameter values to use (make sure to use a different resource container for each DNN). Use a validation set or cross-validation to select the top three models.

In [9]:
print("A solution may be implemented later.")

A solution may be implemented later.


### 9. Create an ensemble using the top three models from the previous exercise. Define it in a single graph, ensuring that each DNN runs on a different device. Evaluate it on the validation set: does the ensemble perform better than the individual DNNs?

In [10]:
print("A solution may be implemented later.")

A solution may be implemented later.


### 10. Train a DNN using between graph replication and data parallelism with asynchronous updates, timing how long it takes to reach a satisfying performance. Next, try again using synchronous updates. Do synchronous updates produce a better model? Is training faster? Split the DNN vertically and place each vertical slice on a different device, and train the model again. Is training any faster? Is the performance any different?

In [11]:
print("A solution may be implemented later.")

A solution may be implemented later.
