## Table of Contents
1. Up and running with AWS
2. Distributing Nodes across devices
    - We distribute a simple three node NN across the CPU and GPU
    - Demonstrate how much faster GPU computations are than CPU
3. Distributing nodes across computers and devices

# Up and running with AWS

We will be running tensor flow across various devices hosted on the Amazons Web Services platform. Each virtual machine is called an EC2 (Elastic Compute Cloud) instance and they come in many different software and hardware configurations. We're interested in deep learning so we'll be spinning up a couple of instances with dedicated GPU's and all the software we need to run tensorflow pre-installed. Specifically, we're going to be running Deep Learning Base AMI (Amazon Machine Images) on a pair p2.xlarge instances.  These are 1xGPU, 4xCPU instances designed for lowish memory but high computational efficienty.  

https://aws.amazon.com/blogs/machine-learning/get-started-with-deep-learning-using-the-aws-deep-learning-ami/

### Spinning up two p2.xlarge instances

- Log on to AWS, go to Services -> EC2 and click Launch Instance
- Select Deep Learning AMI (Ubuntu) Version 6.0 - ami-5b22133e for the AMI
- Click "Next: Configure Instance Details." Under Number of Instances put 2.
- Click "Review and Launch." You will be prompted to create a new keypair if you don't have one, or to specify a keypair if you do. If you create a new key pair be sure to save it somewhere convenient!
- Click through the warnings and at the end click the "View instances" button
- Name your two instances: "PS, Worker 0" and "Worker 1"
- When the instances have finished initilizing, note the IP addresses below

### Connecting to the server: *NIX
Use the command 

    ssh -i ~/mykeypair.pem -L 8157:127.0.0.1:8888 ubuntu@ec2-###-##-##-###.compute-1.amazonaws.com
    
to ssh into your new instance and start portforwarding to it. 

### Connecting to the server: Windows
Download and install PuTTY (https://www.putty.org/). 
- Open PuTTY and put your instance's IP address in the "Host Name" field. \
- Open Connections -> SSH -> Auth and add the path to your newly created private key file to the "Private key file for authentication" field. 
- Click "Open" and enter the username ubuntu

Running Jupyter 

- Click the menu at the top right corner of the putty terminal and got to "Change Settings". Under Connections -> SSH -> Tunnels and under "Source Port" add a port to be forwarded to the EC2 instances port 8888, be sure to record this port below. 
- Under "Destination" put in 127.0.0.1:8888
- Click "Add" and then "Apply"

https://docs.aws.amazon.com/dlami/latest/devguide/setup-jupyter-configure-client-windows.html


### Setting Up Jupyter with Color
After logging into your instance, it acn be useful to install jupyter themes so that you immediately know which server you're working on. To do so, activate the tensorflow environment

    source activate tensorflow_p27

and install Jupyter Themes

    pip install jupyterthemes
    
Finally, change your theme

    jt -t chesterish
    
and start your notebook

    jupyter notebook
    
Remember to change the port to the forwarded port before opening the link

### Machine Details

* Name: Bert, Roles: PS, Worker 1 IP:, Jupyter Port:, Color:
* Name: Ernie, Roles: Worker 2 IP:, Jupyter Port:, Color:

# Distributing Nodes Across Devices
The code below can be run locally, but is designed to be run on one of your new p2.xlarge instances. For more information on different configurations look here: https://www.tensorflow.org/programmers_guide/using_gpu

On a standard machine there are multiple devices that can perform computations. In TensorFlow the CPUs and GPUs are represented as string in the following format

- `/cpu:0`: The first CPU of your machine.
- `/cpu:1`: The second CPU of your machine.
- `/device:GPU:0`: The GPU of your machine, if you have one.
- `/device:GPU:1`: The second GPU of your machine, etc.

A device name takes the following form: 

`/job:<JOB_NAME>/task:<TASK_INDEX>/device:<DEVICE_TYPE>:<DEVICE_INDEX>`

<img src="TF-CUDA-GPU.png">

In [1]:
# Here's the standard preamble that we'll just be copying and pasting to all devices. 

# To support both python 2 and python 3
from __future__ import division, print_function, unicode_literals

# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)

# To plot pretty figures
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
plt.rcParams['axes.labelsize'] = 14
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "distributed"

def save_fig(fig_id, tight_layout=True):
    path = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID, fig_id + ".png")
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format='png', dpi=300)

In [2]:
import tensorflow as tf

c = tf.constant("Hello Distributed Tensorflow!")
server = tf.train.Server.create_local_server()

After the above line, go over to your Anaconda console and notice that a port has been opened

In [3]:
with tf.Session(server.target) as sess:
    print(sess.run(c))

b'Hello Distributed Tensorflow!'


### How to do device logging
First, we'll turn on device logging by running the session with `log_device_placement` set to `True`. The output should be recorded in your anaconda terminal.

In [4]:
# Creates a graph.
a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3], name='a')
b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2], name='b')
c = tf.matmul(a, b)
# Creates a session with log_device_placement set to True.
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
# Runs the op.
print(sess.run(c))


[[22. 28.]
 [49. 64.]]


### Display local devices

Question: What are the XLA_GPU and XLA_CPU doing?

In [5]:
from tensorflow.python.client import device_lib

device_lib.list_local_devices()

[name: "/device:CPU:0"
 device_type: "CPU"
 memory_limit: 268435456
 locality {
 }
 incarnation: 6114515408153548089]

Remember that TensorFlow will automatically grab all the RAM in the GPU, you can configure it to grab less, or even grab dynamically, but once RAM has been claimed it wont released. TensorFlow will also grab the GPU with the lowest ID by default. When left to default values, TensorFlow will automatically distribute your graph across the CPU and the first avaible GPU. If you want to specify a CPU/GPU use the following:

Question: In what order does tensorflow run nodes of different devices?

In [9]:
tf.reset_default_graph()

with tf.device("/cpu:0"):
    a = tf.Variable(1.0, name="a")
    b = tf.Variable(1.0, name="a")
    
with tf.device("/gpu:0"):
    c = a + b
    
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    print(sess.run(c))

InvalidArgumentError: Cannot assign a device for operation 'add': Operation was explicitly assigned to /device:GPU:0 but available devices are [ /job:localhost/replica:0/task:0/device:CPU:0 ]. Make sure the device specification refers to a valid device.
	 [[Node: add = Add[T=DT_FLOAT, _device="/device:GPU:0"](a/read, a_1/read)]]

Caused by op 'add', defined at:
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\traitlets\config\application.py", line 658, in launch_instance
    app.start()
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\ipykernel\kernelapp.py", line 478, in start
    self.io_loop.start()
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\zmq\eventloop\ioloop.py", line 177, in start
    super(ZMQIOLoop, self).start()
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\tornado\ioloop.py", line 888, in start
    handler_func(fd_obj, events)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\tornado\stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\zmq\eventloop\zmqstream.py", line 440, in _handle_events
    self._handle_recv()
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\zmq\eventloop\zmqstream.py", line 472, in _handle_recv
    self._run_callback(callback, msg)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\zmq\eventloop\zmqstream.py", line 414, in _run_callback
    callback(*args, **kwargs)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\tornado\stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\ipykernel\kernelbase.py", line 283, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\ipykernel\kernelbase.py", line 233, in dispatch_shell
    handler(stream, idents, msg)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\ipykernel\kernelbase.py", line 399, in execute_request
    user_expressions, allow_stdin)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\ipykernel\ipkernel.py", line 208, in do_execute
    res = shell.run_cell(code, store_history=store_history, silent=silent)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\ipykernel\zmqshell.py", line 537, in run_cell
    return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\IPython\core\interactiveshell.py", line 2728, in run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\IPython\core\interactiveshell.py", line 2850, in run_ast_nodes
    if self.run_code(code, result):
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\IPython\core\interactiveshell.py", line 2910, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-9-178f0f86408a>", line 8, in <module>
    c = a + b
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\ops\variables.py", line 754, in _run_op
    return getattr(ops.Tensor, operator)(a._AsTensor(), *args)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\ops\math_ops.py", line 894, in binary_op_wrapper
    return func(x, y, name=name)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\ops\gen_math_ops.py", line 182, in add
    "Add", x=x, y=y, name=name)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\framework\op_def_library.py", line 787, in _apply_op_helper
    op_def=op_def)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\framework\ops.py", line 2956, in create_op
    op_def=op_def)
  File "C:\Users\admin\Anaconda\envs\tensorflow\lib\site-packages\tensorflow\python\framework\ops.py", line 1470, in __init__
    self._traceback = self._graph._extract_stack()  # pylint: disable=protected-access

InvalidArgumentError (see above for traceback): Cannot assign a device for operation 'add': Operation was explicitly assigned to /device:GPU:0 but available devices are [ /job:localhost/replica:0/task:0/device:CPU:0 ]. Make sure the device specification refers to a valid device.
	 [[Node: add = Add[T=DT_FLOAT, _device="/device:GPU:0"](a/read, a_1/read)]]


### Dynamic Placement of Nodes (Geron)
When you create a device block, you can specify a function instead of a device name. TensorFlow will call this function for each operation it needs to place in the device block, and the function must return the name of the device to pin the operation on.

For example, the following code pins all the variable nodes to "/cpu:0" (in this case just the variable a) and all other nodes to "/gpu:0":

In [7]:
def variables_on_cpu(op):
    if op.type == "VariableV2":
        return "/cpu:0"
    else:
        return "/gpu:0"

In [None]:
tf.reset_default_graph()

with tf.device(variables_on_cpu):
    a = tf.Variable(1.0, name="a")
    b = tf.Variable(1.0, name="b")
    c = a + b

In [None]:
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    print(sess.run(c))

In [None]:
print(a.device)
print(b.device)
print(c.device)

### Parallel Execution (Geron)
When TensorFlow runs a graph, it starts by finding out the list of nodes that need to be evaluated, and it counts how many dependencies each of them has. TensorFlow then starts evaluating the nodes with zero dependencies (i.e., source nodes). If these nodes are placed on separate devices, they obviously get evaluated in parallel. If they are placed on the same device, they get evaluated in different threads, so they may run in parallel too (in separate GPU threads or CPU cores).

TensorFlow manages a thread pool on each device to parallelize operations (see Figure 12-5). These are called the inter-op thread pools. Some operations have multithreaded kernels: they can use other thread pools (one per device) called the intraop thread pools.

<img src="ThreadPool.png">

For example, in Figure 12-5, operations A, B, and C are source ops, so they can immediately be evaluated. Operations A and B are placed on GPU 0, so they are sent to this device’s inter-op thread pool, and immediately evaluated in parallel. Operation A happens to have a multithreaded kernel; its computations are split in three parts, which are executed in parallel by the intraop thread pool. Operation C goes to GPU 1’s inter-op thread pool. As soon as operation C finishes, the dependency counters of operations D and E will be decremented and will both reach 0, so both operations will be sent to the inter-op thread pool to be executed

### A quick benchmark of GPU computations vs CPU

We will perform a couple of matrix calculations to show the difference in speed between GPU computations and CPU computations.

https://gist.github.com/j-min/69aae99be6f6acfadf2073817c2f61b0

First, lets see what devices we can actually access

In [3]:
from tensorflow.python.client import device_lib

device_lib.list_local_devices()

[name: "/device:CPU:0"
 device_type: "CPU"
 memory_limit: 268435456
 locality {
 }
 incarnation: 5866721366184997421]

In [None]:
import datetime
import numpy as np

A = np.random.rand(10000,10000).astype('float32')
B = np.random.rand(10000,10000).astype('float32')

# Create a graph to store results
c1 = []
c2 = []
c3 = []

n = 1

def matpow(M, n):
    if n < 1: #Abstract cases where n < 1
        return M
    else:
        return tf.matmul(M, matpow(M, n-1))

In [None]:
'''
Single CPU computing
'''
tf.reset_default_graph()

with tf.device('/cpu:0'):
    a = tf.placeholder(tf.float32, [10000,10000])
    b = tf.placeholder(tf.float32, [10000,10000])
    c1.append(matpow(a,n))
    c1.append(matpow(b,n))
    summ = tf.add_n(c1)

t1_1 = datetime.datetime.now()
with tf.Session(config=tf.ConfigProto(log_device_placement=True)) as sess:
    # Run the op.
    sess.run(summ, {a:A, b:B})
t2_1 = datetime.datetime.now()

print("Single CPU computation time: " + str(t2_1-t1_1))

In [None]:
'''
Multiple CPU computing
'''
tf.reset_default_graph()

with tf.device('/cpu:0'):
    a = tf.placeholder(tf.float32, [10000,10000])
    c1.append(matpow(a,n))
    
with tf.device('/gpu:0'):
    b = tf.placeholder(tf.float32, [10000,10000])
    c1.append(matpow(b,n))
    sum = tf.add_n(c1)

t1_1 = datetime.datetime.now()
with tf.Session(config=tf.ConfigProto(log_device_placement=True)) as sess:
    # Run the op.
    sess.run(sum, {a:A, b:B})
t2_1 = datetime.datetime.now()

print("Multiple CPU computation time: " + str(t2_1-t1_1))

In [None]:
'''
GPU computing
'''
tf.reset_default_graph()

with tf.device('/GPU:0'):
    a = tf.placeholder(tf.float32, [10000,10000])
    c1.append(matpow(a,n))
    b = tf.placeholder(tf.float32, [10000,10000])
    c1.append(matpow(b,n))
    sum = tf.add_n(c1)

t1_1 = datetime.datetime.now()
with tf.Session(config=tf.ConfigProto(log_device_placement=True)) as sess:
    # Run the op.
    sess.run(sum, {a:A, b:B})
t2_1 = datetime.datetime.now()

print("GPU computation time: " + str(t2_1-t1_1))

# Distributed Tensorflow
Tensor flow allows us to set up a distributed network just about as easily as we spread things out over devices. All we need to do is give each machine the same directory and graph structure and it will start chugging away. There are of course a lot of different choices involved: where how to distribute the graph? Where do we store the data? Do we wait for each node to finish an entire epoch or do we allow certain nodes to que operations? 


### Setup Network Permissions
We will have to open the firewalls on both instances so that they can talk to each other. Go back to the AWS console, select your instance and click "Edit Security Group."....

## On Bert (our PS Session)

In [None]:
import tensorflow as tf

cluster_spec = tf.train.ClusterSpec({'ps' : ['localhost:2222'],'worker' : ['localhost:2223','localhost:2224']})
ps = tf.train.Server(cluster_spec,job_name='ps')
ps.join()

## On Bert (our Chief Worker)

In [None]:
from __future__ import print_function
import tensorflow as tf
import time

task_index=0

cluster_spec = tf.train.ClusterSpec({'ps' : ['localhost:2222'],'worker' : ['localhost:2223','localhost:2224']})

server = tf.train.Server(cluster_spec,job_name='worker',task_index=task_index)

# Define grahp, must be done on BOTH workers

with tf.device('/job:ps/task:0'):
    a = tf.Variable([0.], name = 'a') # Add a to the process server
    b = tf.constant([100.])
    c = tf.constant([120.])
    
with tf.device('job:worker/task:1'):
    loss2 = tf.abs(a-c)
    
with tf.device('job:worker/task:0'):
    loss1 = tf.abs(a-b)
    optimizer = tf.train.GradientDescentOptimizer(.1)
    
    grads = optimizer.compute_gradients(loss1*loss1+loss2*loss2)
    
    global_update = optimizer.apply_gradients(grads)
    
with tf.device('/job:worker/task:%d'%task_index):
    init_global = tf.global_variables_initializer()
    
sess = tf.Session(target=server.target)

print(a.device)
print(b.device)
print(c.device)
print(loss1.device)
print(loss2.device)

## On Ernie

In [None]:
from __future__ import print_function
import tensorflow as tf
import time

task_index=0

cluster_spec = tf.train.ClusterSpec({'ps' : ['localhost:2222'],'worker' : ['localhost:2223','localhost:2224']})

server = tf.train.Server(cluster_spec,job_name='worker',task_index=task_index)

# Define grahp, must be done on BOTH workers

with tf.device('/job:ps/task:0'):
    a = tf.Variable([0.], name = 'a') # Add a to the process server
    b = tf.constant([100.])
    c = tf.constant([120.])
    
with tf.device('job:worker/task:1'):
    loss2 = tf.abs(a-c)
    
with tf.device('job:worker/task:0'):
    loss1 = tf.abs(a-b)
    optimizer = tf.train.GradientDescentOptimizer(.1)
    
    grads = optimizer.compute_gradients(loss1*loss1+loss2*loss2)
    
    global_update = optimizer.apply_gradients(grads)
    
with tf.device('/job:worker/task:%d'%task_index):
    init_global = tf.global_variables_initializer()
    
sess = tf.Session(target=server.target)

## On Bert (Chief Worker Session)

In [None]:
sess.run(init_global)

for i in range(1000):
    sess.run(global_update)
    
sess.run(a)