## Tensorflow with GPU

In [2]:
# Common imports
import numpy as np
import os
import tensorflow as tf

# to make this notebook's output stable across runs
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)

# To plot pretty figures
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt


This notebook aims to teach some of the basics of GPU-enabled Tensorflow. This assumes that you already have a GPU-enabled computer with Nvidia's CUDA and cuDNN software. You will have to install the GPU version of Tensorflow as well. For additional information please see the following:

https://www.tensorflow.org/install/gpu

## Devices Supported

In TensorFlow, the supported device types are CPU and GPU. They are represented as strings. For example:

`"/cpu:0"`: The CPU of your machine.

`"/device:GPU:0"`: The GPU of your machine, if you have one.

`"/device:GPU:1"`: The second GPU of your machine, etc.

If a TensorFlow operation has both CPU and GPU implementations, the GPU devices will be given priority when the operation is assigned to a device. For example, `matmul` has both CPU and GPU kernels. On a system with devices `cpu:0` and `gpu:0`, `gpu:0` will be selected to run `matmul`.

## Device Placement 

Whenever you run a computation graph, Tensorflow evaluates a node that is not placed on a device by using a "simple placer" ruleset to place it, along with all the other nodes that are not placed yet. The simple placer respects the following rules:
 - If a node was already placed in a previous run of the graph, it is left on that device. 
 - Else, if the user *pinned* a node to a device, the placer places it on that device. 
 - Else, it defaults to GPU#0, or CPU if there is no GPU. 
 
We can find out which devices your operations and tensors are assigned to, create the session with `log_device_placement` configuration option set to `True`:

In [3]:
# Create a 2 by 3 tensor that is named 'a'
a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2,3], name='a')
# Create a 3 by 2 tensor that is named 'b'
b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3,2], name='b')
# Multiply both tensors
c = tf.matmul(a, b)
# Creates a session with log_device_placement set to True
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
# Run the operation
print(sess.run(c))


[[22. 28.]
 [49. 64.]]


Then please go to your shell (Anaconda prompt in my case) to verify the following log message:

The lines starting with "I" for Info are the log messages. When we create a session, Tensorflow logs a session to tell us that it has found a GPU card. The the first time we run the graph (in this case initializing variable `a`), the simple placer is run and places each node on the device it was assigned to.  Above we see that the operations and tensors are assigned to the default device GPU:0. 

### GPU Ram

Tensorflow maps nearly all of the GPU memory of all the GPUs (subject to `CUDA_VISIBLE_DEVICES`) visible to processes. In some cases you may want to run each process on a different on different GPU cards. The easiest way to do this is to set the `CUDA_VISIBLE_DEVICES` environment variable so that each process sees only the appropiate GPU cards. For example if we have 4 GPU cards, we can start 2 programs, program_1.py and program_2.py, with the first program using the first two GPUs. In our shell we would run: 

Then we can start the second program using the last tow GPUs. Then in another shell run: 

Two more options are:
1. The `allow_growth option`, which attempts to allocate only as much GPU memory based on runtime allocations: it starts out allocating very little memory, and as Sessions get run and more GPU memory is needed, we extend the GPU memory region needed by the TensorFlow process. Note that we do not release memory, since that can lead to even worse memory fragmentation (and so your GPU may run out of memory). To turn this option on, set the option in the ConfigProto by:

2. The second method is the `per_process_gpu_memory_fraction` option, which determines the fraction of the overall amount of memory that each visible GPU should be allocated. For example, you can tell TensorFlow to only allocate 40% of the total memory of each GPU by:

The second option is a more determinstic approach. 

### Pinning to devices

If you don't "pin" to a device then the whole graph will be placed on the default device. To pin nodes to a device, you must create a device block using the `device()` function:

In [6]:
reset_graph()

# Here we are creating a "device" block, only a and b will be pinned to 
# to cpu:0 since that is within the context block
with tf.device("/cpu:0"):
    a = tf.Variable(3.0)
    b = tf.constant(4.0)

# This multiplication node in not pinned on any device and so it will be placed 
# on the default device. 
c = a * b

## Reading in Data

In previous versions of Tensorflow, we had read in data from a source dataset based on `from_tensor_slices()` or `from_tensor()`. Since Tensorflow 1.4 we can use the Tensorflow Data API for reading in data. It handles most of the complexity that we had to deal with previously (e.g., threads). 

More information on the Data API can be found here:
https://www.tensorflow.org/guide/datasets

In [16]:
reset_graph()
test_csv = open("my_test.csv", "w")
test_csv.write("x1, x2 , target\n")
test_csv.write("1.,, 0\n")
test_csv.write("4., 5. , 1\n")
test_csv.write("7., 8. , 0\n")
test_csv.close()

In [17]:
filenames = ["my_test.csv"]

In [18]:
dataset = tf.data.TextLineDataset(filenames)

We still need to tell it how to decode each line:

In [19]:
def decode_csv_line(line):
    x1, x2, y = tf.decode_csv(
        line, record_defaults=[[-1.], [-1.], [-1.]])
    X = tf.stack([x1, x2])
    return X, y

Next, we can apply this decoding function to each element in the dataset using `map()`:

In [20]:
dataset = dataset.skip(1).map(decode_csv_line)

Finally, let's create a one-shot iterator using `make_one_shot_iterator()`. A one-shot iterator is the simplest form of iterator, which only supports iterating once through a dataset, with no need for explicit initialization. One-shot iterators handle almost all of the cases that the existing queue-based input pipelines support, but they do not support parameterization. We are also call its `get_next()` method to get a tensor that represents the next element:

In [21]:
it = dataset.make_one_shot_iterator()
X, y = it.get_next()

Let's repeatedly evaluate `next_element` to go through the dataset. When there are not more elements, we get an `OutOfRangeError`:

In [22]:
with tf.Session() as sess:
    try:
        while True:
            X_val, y_val = sess.run([X, y])
            print(X_val, y_val)
    except tf.errors.OutOfRangeError as ex:
        print("Done")

[ 1. -1.] 0.0
[4. 5.] 1.0
[7. 8.] 0.0
Done


## Distributed Tensorflow 

To run graphs across multiple servers you first need to define a *cluster*. A cluster is composed of one or more Tensorflow servers, called *tasks*, typically spread across several machines. Each task belongs to a *job*. A job is a named group of tasks that typically have a common role such as keeping track of the model parameters where the job is usually called "ps" (parameter server) or the job is called "worker" since this job performs computations.

Every Tensorflow server provides tow services: the *master service* and the *worker service*. The master service allows clients to open sessions and use them to run graphs, It coordinates the computations across tasks and allows the worker service to actually execute the computations on the other task adn get thier results. 

For more information please go to:
https://www.tensorflow.org/deploy/distributed

To start a Tensorflow server you must create a **Server** object. Let's first create a **Server** object on the local host:

In [24]:
reset_graph()

In [25]:
c = tf.constant("Hello distributed TensorFlow!")
# This creates the Server object called "server" on the local host
server = tf.train.Server.create_local_server()

In [26]:
with tf.Session(server.target) as sess:
    print(sess.run(c))

b'Hello distributed TensorFlow!'


When using a plain local session, like above, each variable's state is managed by the session itself and as soon as it ends the variables are lost. Also, multiple ses

### Clusterspec

The cluster specification defines two jobs: "ps" and "worker" containing 2 tasks and 3 tasks. In this example, machine at 127.0.0.1 will listen in on different ports: 2 ports correspond to the ps job and 3 correspond to the worker job. 

In [5]:
cluster_spec = tf.train.ClusterSpec({
    "ps": [
        "127.0.0.1:2221",  # /job:ps/task:0
        "127.0.0.1:2222",  # /job:ps/task:1
    ],
    "worker": [
        "127.0.0.1:2223",  # /job:worker/task:0
        "127.0.0.1:2224",  # /job:worker/task:1
        "127.0.0.1:2225",  # /job:worker/task:2
    ]})

In order to start the tasks we must now pass in the cluster spec (so it can communicate with other servers), it's job name, and a task number.

In [6]:
task_ps0 = tf.train.Server(cluster_spec, job_name="ps", task_index=0)
task_ps1 = tf.train.Server(cluster_spec, job_name="ps", task_index=1)
task_worker0 = tf.train.Server(cluster_spec, job_name="worker", task_index=0)
task_worker1 = tf.train.Server(cluster_spec, job_name="worker", task_index=1)
task_worker2 = tf.train.Server(cluster_spec, job_name="worker", task_index=2)

### Pinning operations across devices and servers

You can use device blocks to pin operations to any device managed by any task by specifying the job name and task index. If you omit the task index it will use the default device for that job.  

In [7]:
reset_graph()

with tf.device("/job:ps"):
    a = tf.Variable(1.0, name="a")

with tf.device("/job:worker"):
    b = a + 2

with tf.device("/job:worker/task:1"):
    c = a + b

Tensorflow (the client) uses the *gRPC* protocol (Google Remote Procedure Call) to communicate with the server. It is based on HTTP2 which opens a connection and leaves it open during the whole session,
allowing bidirectional communication once open. Data is transmitted using *protocol buffers* a lightweight binary data interchange format. 

In [8]:
# Using grpc the client will open a session on the server located at 
# 127.0.0.1:2221 and it will evaluate c
with tf.Session("grpc://127.0.0.1:2221") as sess:
    sess.run(a.initializer)
    print(c.eval())

4.0


A common pattern when training a neural network on a distributed setup is to store the model parameters on a set of parameter servers (the "ps" job) while the other tasks focus on computation (the "worker" job). For models with millions of parameters, it's advisable to shard these parameters across multiple servers so to avoid saturating a single parameter server's network card. If you were to manually pin every variable to a different parameter server then it would be tedious. Instead, Tensorflow provides `replica_device_setter`() function which distributes the variables across all the ps tasks in a round-robin fashion, you must also pass in the number of ps tasks and job names for that device:

In [9]:
reset_graph()

with tf.device(tf.train.replica_device_setter(
        ps_tasks=2,
        ps_device="/job:ps",
        worker_device="/job:worker")):
    v1 = tf.Variable(1.0, name="v1")  # pinned to /job:ps/task:0 (defaults to /cpu:0)
    v2 = tf.Variable(2.0, name="v2")  # pinned to /job:ps/task:1 (defaults to /cpu:0)
    v3 = tf.Variable(3.0, name="v3")  # pinned to /job:ps/task:0 (defaults to /cpu:0)
    s = v1 + v2            # pinned to /job:worker (defaults to task:0/cpu:0)
    with tf.device("/task:1"):
        p1 = 2 * s         # pinned to /job:worker/task:1 (defaults to /cpu:0)
        with tf.device("/cpu:0"):
            p2 = 3 * s     # pinned to /job:worker/task:1/cpu:0

config = tf.ConfigProto()
config.log_device_placement = True

with tf.Session("grpc://127.0.0.1:2221", config=config) as sess:
    v1.initializer.run()