### grp

## Hands-On Machine Learning with Scikit-Learn & TensorFlow

## CHAPTER 12: Distributing TensorFlow Across Devices and Servers

## Distributed TensorFlow:
-  provides full control how to split or replicate computation graph across GPUs/CPUs and servers
-  ability to parallelize and synchronize operations via flexible parallelization approaches
-  **note** => single machine GPU performance often just as fast as distributed machine GPU performance (**depends on network transfer speed too**)

### Installation / Setup:
-  https://developer.nvidia.com/cuda-gpus
-  http://max-likelihood.com/2016/06/18/aws-tensorflow-setup/
-  http://timdettmers.com/2019/04/03/which-gpu-for-deep-learning/

##### must download and install appropriate version of CUDA (Compute Unified Device Architecture) and cuDNN (CUDA Deep Neural Network) libraries as well as must setup environment variables for TF to find CUDA and cuDNN because TF uses CUDA and cuDNN to control GPU cards and computations

## CUDA (Compute Unified Device Architecture):
-  https://developer.nvidia.com/cuda-zone

## cuDNN (CUDA Deep Neural Network):
-  https://developer.nvidia.com/cudnn
-  apart of NVIDIA's Deep Learning SDK
-  provides common DNN computations including activation layers, normalization, forward/backward convolutions, and pooling

### GPU Service Subscription:
-  Amazon Web Services
-  Microsoft Azure
-  Google Cloud

## TensorFlow Simple Placer:
-  _simple placer_ is used to place nodes that are not assigned to devices yet ...
    -  if a node was already placed on a device in a previous run of the graph, it is left on that device.
    -  else, if the user pinned a node to a device (described next), the placer places it on that device.
    -  else, it defaults to GPU #0, or the CPU if there is no GPU.

## Parallel Execution:
-  via CPU:
    -  CPU operation's queue are dispatched to a thread pool (**inter-op thread pool**)
    -  performs operations in parallel if CPU has multiple cores
    -  some operations have multithreaded CPU kernels [**tasks are split into sub-operations**] and dispatched to 2nd thread pool (**intra-op thread pool**)
    -  multiple operations and sub-operations have the ability to be performed in parallel on different CPU cores
    -  control number of threads in inter-op pool => **inter_op_parallelism_threads**
    -  control number of threads in intra-op pool => **intra_op_parallelism_threads**
-  via GPU:
    -  GPU operations queue are evaluated sequentially
    -  many operations have multithreaded GPU kernels [CUDA and cuDNN]
    -  thread pools trigger as many GPU thread pools as possible

## TensorFlow Cluster:
-  http://download.tensorflow.org/paper/whitepaper2015.pdf
-  must define a _cluster_ to execute a tf graph across multiple tf servers possibly within multiple machines:
    -  tf servers are called _tasks_
    -  each _task_ belongs to a _job_
    -  a _job_ is a named group of tasks with a common role (i.e. parameter server; worker)
    -  a _client_ is a session with a process on any machine
    -  tf servers provide two services (_master_ and _worker_):
        -  _master_:
            -  allow _clients_ to open sessions to run graphs
            -  coordinates computations across _tasks_
        -  _worker_:
            -  executes computations on _tasks_
    -  tf _client_ session can connect to multiple servers by opening multiple sessions in different threads
    -  can run one _client_ per _task_ or just one _client_ to control all _tasks_

### Variable Management and Resource Containers:
-  _local session_ => not distributed and each variable is managed by the session itself; variables are lost when session ends
-  _distributed sessions_ => variables are managed by _resource containers_ located on cluster
-  _resource containers_ make it easy to share variables across sessions

### Asynchronous Communication via TF Queues:
-  synchronous => first system sends a message to the second system and waits for a response
-  asynchronous => first system does not wait for a response from the second system
-  _queues_:
    -  exchange data between multiple sessions
    -  _async queue_ example => **client creates a graph that loads training data pushed to _queue_ while another client creates a graph that pulls data from the _queue_ to train a model**
-  First-In First-Out (**FIFO**)

### Enqueuing & Dequeuing:
-  enqueue => push data to a queue
-  dequeue => pull data out of a queuue

### Training Data Load Management:
-  transfering data from _filesystem to client to master task to other tasks_ is inefficient and can cause too much stress on FS / network bandwidth
-  **solutions**:
    -  _preloading_ => load training data in memory / assign to variable (**if data volumne fits in RAM**)
    -  _reader operations_ => read training data directly from FS (**if data volumne does not fit in RAM**):
        -  file formats:
            -  CSV
            -  fixed-length binary records
            -  TFRecords
            -  TextLineReader API

### Parallelizing Neural Networks on a TensorFlow Cluster:
-  One Neural Network per Device:
    -  solid use case for hyperparameter tuning => **each device in the cluster trains a different model with set of hyperparameters**
    -  solid use case for ensemble learning => **each device in the cluster trains a different ensemble**
-  Model Parallelism:
    -  distributing model training into separate chunks on different devices
    -  ***tricky setup and depends on architecture ... also not too beneficial***
-  Data Parallelism:
    -  replicate training on each device (_use different mini-batch on each device synchronously and asynchronously_)

### TensorFlow Implementation:
-  _In Graph Replication + Synchronous Updates_:
    -  With in-graph replication + synchronous updates, you build one big graph contain‐ ing all the model replicas (placed on different devices), and a few nodes to aggregate all their gradients and feed them to an optimizer. Your code opens a session to the cluster and simply runs the training operation repeatedly. - __Aurelien Geron [Hands on ML w SKLearn & TF]__
-  _In Graph Replication + Asynchronous Updates_:
    -  With in-graph replication + asynchronous updates, you also create one big graph, but with one optimizer per replica, and you run one thread per replica, repeatedly run‐ ning the replica’s optimizer. - __Aurelien Geron [Hands on ML w SKLearn & TF]__
-  _Between Graph Replication + Synchronous Updates_:
    -  With between-graph replication + asynchronous updates, you run multiple inde‐ pendent clients (typically in separate processes), each training the model replica as if it were alone in the world, but the parameters are actually shared with other replicas (using a resource container). - __Aurelien Geron [Hands on ML w SKLearn & TF]__
-  _Between Graph Replication + Asynchronous Updates_:
    -  With between-graph replication + synchronous updates, once again you run multiple clients, each training a model replica based on shared parameters, but this time you wrap the optimizer (e.g., a MomentumOptimizer) within a SyncReplicasOptimizer. Each replica uses this optimizer as it would use any other optimizer, but under the hood this optimizer sends the gradients to a set of queues (one per variable), which is read by one of the replica’s SyncReplicasOptimizer, called the chief. The chief aggre‐ gates the gradients and applies them, then writes a token to a token queue for each replica, signaling it that it can go ahead and compute the next gradients. This approach supports having spare replicas. - __Aurelien Geron [Hands on ML w SKLearn & TF]__

## _Exercises_

In [1]:
import tensorflow as tf
print(tf.__version__)

import sklearn
print(sklearn.__version__)

1.13.1
0.20.0


# Multiple Devices [CPUs/GPUs] on a Single Machine

### verify cuda is installed properly

In [2]:
# nvidia-smi

### install tf w/ gpu support

In [3]:
# source env/bin/activate
# pip3 install -upgrade tensorflow-gpu

### verify tf w/ cuda+cudnn integration

In [4]:
import tensorflow as tf
sess = tf.Session()

### managing gpu ram:
-  **Errors**:
    -  by default Tensorflow consumes all RAM in in **all** available GPUs the first time running a graph
    -  cannot start a second Tensorflow program while first graph is running
-  **Solutions**:
    -  run each process on different GPU cards
    -  consume only a fraction of the memory available

In [5]:
# CUDA_ERROR_OUT_OF_MEMORY => error when trying to run multiple graphs on GPU

In [6]:
# CUDA_VISIBLE_DEVICES=0,1 python3 program_1.py => program_1 only runs on GPU cards 0 and 1
# CURA_VISIBLE_DEVICES=3,2 python3 program_2.py => program_2 only runs on GPU cards 2 and 3

In [7]:
config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.4 # consumes only 40% of RAM
session = tf.Session(config=config)

### pinning nodes onto devices

In [8]:
with tf.device("/cpu:0"):
    a = tf.Variable(3.0)
    b = tf.constant(4.0)
c = a * b

Instructions for updating:
Colocations handled automatically by placer.


### logging node placements

In [9]:
config = tf.ConfigProto()
config.log_device_placement = True # prints log message where node as been placed
sess = tf.Session(config=config)
a.initializer.run(session=sess)
sess.run(c)

12.0

### automated node placements via udf

In [10]:
def variables_on_cpu(op):
    if op.type == "Variable":
        return "/cpu:0"
    else:
        return "/gpu:0"

with tf.device(variables_on_cpu):
    a = tf.Variable(3.0)
    b = tf.constant(4.0)
    c = a * b

### kernels => implementation for a device

In [11]:
# with tf.device("/gpu:0"):
#     i = tf.Variable(3)
# sess.run(i.initializer) => will error if device does not have kernel

### soft placement => have tf fall back to a CPU rather than GPU

In [12]:
# with tf.device("/gpu:0"):
#     i = tf.Variable(3)
# config = tf.ConfigProto()
# config.allow_soft_placement = True # ... use CPU instead of GPU
# sess = tf.Session(config=config)
# sess.run(i.initializer)

### control dependencies:
-  ex => postpone operations in graph to save RAM until time when operations require RAM to compute

In [13]:
a = tf.constant(1.0)
b = a + 2.0
with tf.control_dependencies([a,b]):
    x = tf.constant(3.0)
    y = tf.constant(4.0)
z = x + y

# Multiple Devices [CPUs/GPUs] Across Multiple Servers

### cluster specification:
-  most simple and recommended to run one task per machine
-  several servers on one machine requires allocating GPU RAM accordingly to avoid consuming all GPU RAM

In [14]:
cluster_spec = tf.train.ClusterSpec({
    "ps": [
        "machine-a.example.com:2221",  # /job:ps/task:0
    ],
    "worker": [
        "machine-a.example.com:2222",  # /job:worker/task:0
        "machine-b.example.com:2222",  # /job:worker/task:1
    ]})

### start tf server

In [15]:
# server = tf.train.Server(cluster_spec, job_name="worker", task_index=0)

### block all process except tf processes

In [16]:
# server.join() # blocks until the server stops (i.e. never)

### open session:
-  client session can be opened on any of the servers running on any machine
-  client can use the _gRPC_ (Google Remote Procedure Call) protocol to communicate with the server
-  _tf cluster may communicate with any server in the cluster hence appropriate ports must be open_

In [17]:
# a = tf.constant(1.0)
# b = a + 2
# c = a * 3
# with tf.Session("grpc://machine-b.example.com:2222") as sess: # master
    # print(c.eval())

### pinning operations across tasks

In [18]:
# with tf.device("/job:ps/task:0/cpu:0"):
    # a = tf.constant(1.0)
# with tf.device("/job:worker/task:0/gpu:1"):
    # b = a + 2
# c = a + b

### pinning "sharding" variables to servers:
-  best practice for large models to store model parameters on a set of parameter servers
-  _best to store parameters on CPUs and computation on GPUs_

In [19]:
# with tf.device(tf.train.replica_device_setter(ps_tasks=2)): # distributes param across tasks in round-robin method
    # v1 = tf.Variable(1.0) # pinned to /job:ps/task:0
    # v2 = tf.Variable(2.0) # pinned to /job:ps/task:1
    # v3 = tf.Variable(3.0) # pinned to /job:ps/task:0
    # v4 = tf.Variable(4.0) # pinned to /job:ps/task:1
    # v5 = tf.Variable(5.0) # pinned to /job:ps/task:0

In [20]:
# with tf.device(tf.train.replica_device_setter(ps_tasks=2)):
    # v1 = tf.Variable(1.0) # pinned to /job:ps/task:0 (+ defaults to /cpu:0)
    # v2 = tf.Variable(2.0) # pinned to /job:ps/task:1 (+ defaults to /cpu:0)
    # v3 = tf.Variable(3.0) # pinned to /job:ps/task:0 (+ defaults to /cpu:0)
# s = v1 + v2 # pinned to /job:worker (+ defaults to task:0/gpu:0)
# with tf.device("/gpu:1"):
    # p1 = 2 * s # pinned to /job:worker/gpu:1 (+ defaults to /task:0)
    # with tf.device("/task:1"):
        # p2 = 3 * s # pinned to /job:worker/task:1/gpu:1

### variable and resource container management

In [21]:
# simple_client.py
#    import tensorflow as tf
#    import sys
#    x = tf.Variable(0.0, name="x")
#    increment_x = tf.assign(x, x + 1)
# with tf.Session(sys.argv[1]) as sess:
#    if sys.argv[2:]==["init"]:
#        sess.run(x.initializer)
#    sess.run(increment_x)
#    print(x.eval())

# $ python3 simple_client.py grpc://machine-a.example.com:2222 init
#   1.0
# $ python3 simple_client.py grpc://machine-b.example.com:2222
#   2.0

### executing independent computations on same cluster

# with tf.variable_scope("my_problem_1"):
#   [...] # Construction phase of problem 1
# with tf.container("my_problem_1"):
#   [...] # Construction phase of problem 1

### frees up all resources on container + closes all open sessions on server
# tf.Session.reset("grpc://machine-a.example.com:2222", ["my_problem_1"])

### fifo queue

In [22]:
q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[[2]],
                     name="q", shared_name="shared_q")

### enqueuing data

In [23]:
# training_data_loader.py
# import tensorflow as tf
# q = tf.FIFOQueue(capacity=10, [...], shared_name="shared_q")
# training_instance = tf.placeholder(tf.float32, shape=[2])
# enqueue = q.enqueue([training_instance])
# with tf.Session("grpc://machine-a.example.com:2222") as sess:
    # sess.run(enqueue, feed_dict={training_instance: [1., 2.]})
    # sess.run(enqueue, feed_dict={training_instance: [3., 4.]})
    # sess.run(enqueue, feed_dict={training_instance: [5., 6.]})
# [...]
# training_instances = tf.placeholder(tf.float32, shape=(None, 2))
# enqueue_many = q.enqueue([training_instances])
# with tf.Session("grpc://machine-a.example.com:2222") as sess:
    # sess.run(enqueue_many,
             # feed_dict={training_instances: [[1., 2.], [3., 4.], [5., 6.]]})

### dequeuing data

In [24]:
# trainer.py
# import tensorflow as tf
# q = tf.FIFOQueue(capacity=10, [...], shared_name="shared_q")
# dequeue = q.dequeue()
# with tf.Session("grpc://machine-a.example.com:2222") as sess:
    # print(sess.run(dequeue)) # [1., 2.]
    # print(sess.run(dequeue)) # [3., 4.]
    # print(sess.run(dequeue)) # [5., 6.]
# [...]
# batch_size = 2
# dequeue_mini_batch= q.dequeue_many(batch_size)
# with tf.Session("grpc://machine-a.example.com:2222") as sess:
    # print(sess.run(dequeue_mini_batch)) # [[1., 2.], [4., 5.]]
    # print(sess.run(dequeue_mini_batch)) # blocked waiting for another instance

### tuple queues

In [25]:
# tensors w/ various types and shapes
q = tf.FIFOQueue(capacity=10, dtypes=[tf.int32, tf.float32], shapes=[[],[3,2]],
                     name="q", shared_name="shared_q")

a = tf.placeholder(tf.int32, shape=())
b = tf.placeholder(tf.float32, shape=(3, 2))
enqueue = q.enqueue((a, b))

# with tf.Session([...]) as sess:
    # sess.run(enqueue, feed_dict={a: 10, b:[[1., 2.], [3., 4.], [5., 6.]]})
    # sess.run(enqueue, feed_dict={a: 11, b:[[2., 4.], [6., 8.], [0., 2.]]})
    # sess.run(enqueue, feed_dict={a: 12, b:[[3., 6.], [9., 2.], [5., 8.]]})

In [26]:
# dequeue_a, dequeue_b = q.dequeue()
# with tf.Session([...]) as sess:
    # a_val, b_val = sess.run([dequeue_a, dequeue_b])
    # print(a_val) # 10
    # print(b_val) # [[1., 2.], [3., 4.], [5., 6.]]

In [27]:
batch_size = 2
dequeue_as, dequeue_bs = q.dequeue_many(batch_size)

# with tf.Session([...]) as sess:
    # a, b = sess.run([dequeue_a, dequeue_b])
    # print(a) # [10, 11]
    # print(b) # [[[1., 2.], [3., 4.], [5., 6.]], [[2., 4.], [6., 8.], [0., 2.]]]
    # a, b = sess.run([dequeue_a, dequeue_b]) # blocked waiting for another pair

### close queue

In [28]:
close_q = q.close()
# with tf.Session([...]) as sess:
    # [...]
    # sess.run(close_q)

### random shuffle queue:
-  useful for shuffling training instances at each epoch during training

In [29]:
q = tf.RandomShuffleQueue(capacity=50, min_after_dequeue=10,
                              dtypes=[tf.float32], shapes=[()],
                              name="q", shared_name="shared_q")

dequeue = q.dequeue_many(5)

# with tf.Session([...]) as sess:
    # print(sess.run(dequeue)) # [ 20. 15. 11. 12. 4.] (17 items left)
    # print(sess.run(dequeue)) # [ 5. 13. 6. 0. 17.] (12 items left)
    # print(sess.run(dequeue)) # 12 - 5 < 10: blocked waiting for 3 more instances

### padding fifo queue:
-  useful for variable length inputs like sequences of words

In [30]:
q = tf.PaddingFIFOQueue(capacity=50, dtypes=[tf.float32], shapes=[(None, None)],
                            name="q", shared_name="shared_q")

v = tf.placeholder(tf.float32, shape=(None, None))
enqueue = q.enqueue([v])

# with tf.Session([...]) as sess:
    # sess.run(enqueue, feed_dict={v: [[1., 2.], [3., 4.], [5., 6.]]}) # 3*2
    # sess.run(enqueue, feed_dict={v: [[1.]]}) # 1*1
    # sess.run(enqueue, feed_dict={v: [[7., 8., 9., 5.], [6., 7., 8., 9.]]}) # 2*4

### preloading

In [31]:
# training_set_init = tf.placeholder(tf.float32, shape=(None, n_features))
# training_set = tf.Variable(training_set_init, trainable=False, collections=[],
#                               name="training_set")
# with tf.Session([...]) as sess:
    # data = [...] # load the training data from the datastore
    # sess.run(training_set.initializer, feed_dict={training_set_init: data})

### reader (graph dedicated to reading training instances from CSV files)

In [32]:
reader = tf.TextLineReader(skip_header_lines=1)

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

key, value = reader.read(filename_queue)

x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

# push training instances to shuffle queue
instance_queue = tf.RandomShuffleQueue(
        capacity=10, min_after_dequeue=2,
        dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
        name="instance_q", shared_name="shared_instance_q")
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

# execute graph / iterate through file 1 row at a time until empty
# with tf.Session([...]) as sess:
    # sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    # sess.run(close_filename_queue)
# try:
    # while True:
        # sess.run(enqueue_instance)
# except tf.errors.OutOfRangeError as ex:
    # pass # no more records in the current file and no more files to read
# sess.run(close_instance_queue)

# instance_queue = tf.RandomShuffleQueue([...], shared_name="shared_instance_q")
# mini_batch_instances, mini_batch_targets = instance_queue.dequeue_up_to(2)
# [...] # use the mini_batch instances and targets to build the training graph training_op = [...]
# with tf.Session([...]) as sess:
    # try:
        # for step in range(max_steps):
            # sess.run(training_op)
# except tf.errors.OutOfRangeError as ex:
    # pass # no more training instances

Instructions for updating:
Queue-based input pipelines have been replaced by `tf.data`. Use `tf.data.TextLineDataset`.


### multithreaded reader (higher throughput via multiple threads reading data simultaneuously via many readers):
-  classes:
    -  _Coordinator_ => coordinates stopping multiple threads
    -  _QueueRunner_ => runs multiple threads each executing enqueue operations repeatedly

In [33]:
coord = tf.train.Coordinator()

# with not coord.should_stop():
    # [...] do something

coord.request_stop()
# coord.join(list_of_threads)

In [34]:
# [...] # same construction phase as earlier
# queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * 5)
# with tf.Session() as sess:
    # sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    # sess.run(close_filename_queue)
# coord = tf.train.Coordinator()
# enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)

### reading simultaneously from multiple files

In [35]:
def read_and_push_instance(filename_queue, instance_queue):
    reader = tf.TextLineReader(skip_header_lines=1)
    key, value = reader.read(filename_queue)
    x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
    features = tf.stack([x1, x2])
    enqueue_instance = instance_queue.enqueue([features, target])
    return enqueue_instance

# define queues
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()
# instance_queue = tf.RandomShuffleQueue([...])

# read_and_enqueue_ops = [
    # read_and_push_instance(filename_queue, instance_queue)
    # for i in range(5)]
# queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)

### additional exercises:

https://github.com/ageron/handson-ml/blob/master/12_distributed_tensorflow.ipynb

1. If you get a CUDA_ERROR_OUT_OF_MEMORY when starting your TensorFlow pro‐ gram, what is probably going on? What can you do about it?
2. What is the difference between pinning an operation on a device and placing an operation on a device?
3. If you are running on a GPU-enabled TensorFlow installation, and you just use the default placement, will all operations be placed on the first GPU?
4. If you pin a variable to "/gpu:0", can it be used by operations placed on /gpu:1? Or by operations placed on "/cpu:0"? Or by operations pinned to devices loca‐ ted on other servers?
5. Can two operations placed on the same device run in parallel?
6. What is a control dependency and when would you want to use one?
7. Suppose you train a DNN for days on a TensorFlow cluster, and immediately after your training program ends you realize that you forgot to save the model using a Saver. Is your trained model lost?
8. Train several DNNs in parallel on a TensorFlow cluster, using different hyper‐ parameter values. This could be DNNs for MNIST classification or any other task you are interested in. The simplest option is to write a single client program that trains only one DNN, then run this program in multiple processes in parallel, with different hyperparameter values for each client. The program should have command-line options to control what server and device the DNN should be placed on, and what resource container and hyperparameter values to use (make sure to use a different resource container for each DNN). Use a validation set or cross-validation to select the top three models.
9. Create an ensemble using the top three models from the previous exercise. Define it in a single graph, ensuring that each DNN runs on a different device. Evaluate it on the validation set: does the ensemble perform better than the indi‐ vidual DNNs?
10. Train a DNN using between-graph replication and data parallelism with asyn‐ chronous updates, timing how long it takes to reach a satisfying performance. Next, try again using synchronous updates. Do synchronous updates produce a better model? Is training faster? Split the DNN vertically and place each vertical slice on a different device, and train the model again. Is training any faster? Is the performance any different?

### grp