# Distributed Tensorflow


![TensorFlowing](./files/tensors_flowing.gif)


In [1]:
import tensorflow as tf

In [2]:
!pip install version_information

%load_ext version_information
%version_information numpy, scipy, matplotlib, pandas, tensorflow, sklearn, skflow

[33mYou are using pip version 9.0.0, however version 9.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


Software,Version
Python,3.5.2 64bit [GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]
IPython,5.1.0
OS,Linux 3.16.0 4 amd64 x86_64 with debian jessie sid
numpy,1.11.2
scipy,0.18.1
matplotlib,1.5.3
pandas,0.19.1
tensorflow,0.11.0
sklearn,0.18.1
skflow,The 'skflow' distribution was not found and is required by the application


## Overview of Components

### Cluster

To define a distributed computation in tensorflow we need to specify two kinds of jobs:

- worker jobs
- parameter server (ps) jobs

Each **job** is defined by one ore more **tasks**. Each task is usually specified with a simple numerical index, i.e. `0,1,2,3, ..`.

In [3]:
!kubectl get pod

NAME                       READY     STATUS    RESTARTS   AGE
prediction-codegen-k114g   1/1       Running   0          6d
prediction-pmml-uafkm      1/1       Running   0          7d
turbine-u6588              1/1       Running   0          12d
weavescope-app-nrc6o       1/1       Running   0          12d
weavescope-probe-5rogi     1/1       Running   0          12d


In [4]:
CLUSTER_SPEC= """
{
    'ps' : ['clustered-tensorflow-ps0:2222', 'clustered-tensorflow-ps1:2222'],
    'worker' : [ 'clustered-tensorflow-worker0','clustered-tensorflow-worker1:2222']
}
"""

In [5]:
import ast

cluster_def = ast.literal_eval(CLUSTER_SPEC)

In [6]:
cluster_def

{'ps': ['clustered-tensorflow-ps0:2222', 'clustered-tensorflow-ps1:2222'],
 'worker': ['clustered-tensorflow-worker0',
  'clustered-tensorflow-worker1:2222']}

In [7]:
clusterSpec = tf.train.ClusterSpec(cluster_def)

In [8]:
clusterSpec.jobs

['worker', 'ps']

In [9]:
for job in clusterSpec.jobs:
    print(job, clusterSpec.job_tasks(job))    

worker ['clustered-tensorflow-worker0', 'clustered-tensorflow-worker1:2222']
ps ['clustered-tensorflow-ps0:2222', 'clustered-tensorflow-ps1:2222']


In [10]:
workers = ['/job:worker/task:{}'.format(i) for i in range(len(cluster_def['worker']))]
param_servers = ['/job:ps/task:{}'.format(i) for i in range(len(cluster_def['ps']))]

In [11]:
workers

['/job:worker/task:0', '/job:worker/task:1']

In [12]:
param_servers

['/job:ps/task:0', '/job:ps/task:1']

### Pinning of Variables
Each Variable is assigned to a specific device.

In [13]:
l = tf.Variable("local_cpu")
l.device

''

We can enforce the assigned device using the `tf.device` context.

In [14]:
for ps in param_servers:
    with tf.device(ps):
        v = tf.Variable("my_var")
v.device

'/job:ps/task:1'

## Tensorflow Server

The server is responsible to handle the actual communication. On each of the cluster's node we will spawn a simple gRPC Server. 

```python
def launch_worker(job_name, task_id, cluster_def):
    server = tf.train.Server(
        cluster_def,
        job_name=job_name,
        task_index=task_id
    )
    server.join()
```

### Connecting to a Server

to connect to _any_ server you can specify the 'target' of the session,direct ip:port of the server when creating a [Session](https://www.tensorflow.org/versions/r0.8/api_docs/python/client.html#Session) object.

Note that the server is generic and can assume either the role of parameter server or of worker.The Cluster configuration decides the role.

![ps workers](./ps_workers.png)

The best practice is to create a single Image launching the tensorflow worker. 

Environment variables then specify the exact role for the worker at run time.

### gRPC

[gRPC](http://www.grpc.io) Is a Remote Procedure Call protocol based on [Protocol Buffers](https://developers.google.com/protocol-buffers/).


Each object in tensorflow that has to be sent over the wire has a gRPC definition. 

1. Client figures out what variables need to be serialized to gRPC.
1. Client makes the gRPC remote call to the Server and sends the values.
1. If the Server accepts the call, the serialized tensors are de-serialized
1. The Server runs the requested operation on the graph and all its dependencies
1. The Server serializes the result and sends it back on the same connection to the Client
1. The Client receives the results and deserializes.

![gRPC Communicaton](./grpc_communication.png)

Example of a gRPC declaration for the [Variable ](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/framework/variable.proto)


```javascript
syntax = "proto3";

package tensorflow;

// Protocol buffer representing a Variable.
message VariableDef {
  // Name of the variable tensor.
  string variable_name = 1;

  // Name of the initializer op.
  string initializer_name = 2;

  // Name of the snapshot tensor.
  string snapshot_name = 3;

}
```

Each variable can then be serialized using the `to_proto` method:

In [15]:
v.to_proto()

variable_name: "Variable_2:0"
initializer_name: "Variable_2/Assign"
snapshot_name: "Variable_2/read:0"

## Simple reduce sum Example

In [16]:
batch_size = 1000

#graph = tf.Graph()
#with graph.as_default():        
#    with tf.device('/job:ps/task:0'):
#        input_array = tf.placeholder(tf.int32, shape=[None])
#        final_result = tf.Variable(0)
        
    # divide the input across the cluster:
#    all_reduce = []
#    splitted = tf.split(0, len(workers), input_array)
#    for idx, (portion, worker) in enumerate(zip(splitted,workers)):
#        with tf.device(worker):
#           print(worker)
#           local_reduce = tf.reduce_sum(portion)
#           local_reduce = tf.Print(portion, [local_reduce], message="portion is")
#           all_reduce.append(local_reduce)
    
#    final_result = tf.reduce_sum(tf.pack(all_reduce))

#    print(final_result)

In [17]:
batch_size = 1000

graph = tf.Graph()
with graph.as_default():
    all_reduce = []

    initializer = tf.truncated_normal_initializer(mean=0.0, stddev=1.0, seed=None, dtype=tf.float32)
    
    with tf.device(tf.train.replica_device_setter(cluster=clusterSpec)): 
        input_array = tf.placeholder(tf.int32, shape=[None])
        final_result = tf.Variable(0)
        
    # divide the input across the cluster:
    splitted = tf.split(0, len(workers), input_array)
    for idx, (portion, worker) in enumerate(zip(splitted,workers)):
        with tf.device(worker):
           print(worker)
           local_reduce = tf.reduce_sum(portion)
           local_reduce = tf.Print(portion, [local_reduce], message="portion is")
           all_reduce.append(local_reduce)
    
    final_result = tf.reduce_sum(tf.pack(all_reduce))

/job:worker/task:0
/job:worker/task:1


In [18]:
sess_config = tf.ConfigProto(
    allow_soft_placement=True,
    log_device_placement=True)

We can now run the graph 

In [None]:
import numpy as np
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)

#, config=sess_config
with tf.Session("grpc://clustered-tensorflow-worker0:2222", graph=graph) as session:
    result = session.run(final_result, feed_dict={ input_array: np.ones([1000]) }, options=run_options)
    print(result)

We can also inspect any remote variable:

In [None]:
final_result.device