In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
import json
import os

In [2]:
JOB = os.environ.get('JOB')
TASK = int(os.environ.get('TASK'))

In [3]:
with open('/eslap/legacy/cluster.json','r') as f:
        cluster=json.load(f)
        cluster_spec=tf.train.ClusterSpec(cluster)
        
workers = ['/job:worker/task:'+str(i) for i in range(len(cluster['worker']))]
param_servers = ['/job:ps/task:'+str(i) for i in range(len(cluster['ps']))]
cluster

{'ps': ['tf-ps-0:2222', 'tf-ps-1:2222'],
 'worker': ['tf-worker-0:2222',
  'tf-worker-1:2222',
  'tf-worker-2:2222',
  'tf-worker-3:2222'],
 'master': ['tf-master-0:2222']}

In [4]:
server = tf.train.Server(cluster_spec, job_name=JOB, task_index=TASK)

In [5]:
all_data=pd.read_csv('/eslap/legacy/mnist.csv')
train,test=train_test_split(all_data,test_size=0.25)
del all_data 
batch_size=128
num_batches=int(train.shape[0]/batch_size)

In [7]:
def to_one_hot(targets,num_classes):
    aux = np.zeros((targets.shape[0], num_classes))
    aux[np.arange(targets.shape[0]), targets] = 1
    return aux

In [8]:
Y_train = to_one_hot(train['label'].values, num_classes = 10)
Y_test = to_one_hot(test['label'].values, num_classes = 10)
X_train = train.drop(labels = ['label'],axis = 1).values/255.0
X_test= test.drop(labels = ['label'],axis = 1).values/255.0
X_train.shape,Y_train.shape,X_test.shape,Y_test.shape

((31500, 784), (31500, 10), (10500, 784), (10500, 10))

In [9]:
# Set parameters
learning_rate = 0.01
training_iteration = 30
batch_size = 128
display_step = 2

# TF graph input
x = tf.placeholder(tf.float64, [None, 784]) # mnist data image of shape 28*28=784
y = tf.placeholder(tf.float64, [None, 10]) # 0-9 digits recognition => 10 classes

x_list=tf.split(x, len(workers))
y_list=tf.split(y, len(workers))

# Create a model
with tf.device(param_servers[0]):
    W_1 = tf.Variable(tf.truncated_normal([784, 64],dtype=tf.float64))
    b_1 = tf.Variable(tf.zeros([64],dtype=tf.float64))
    
with tf.device(param_servers[1]):
    W_2 = tf.Variable(tf.truncated_normal([64, 10],dtype=tf.float64))
    b_2 = tf.Variable(tf.zeros([10],dtype=tf.float64))

losses=[]
accuracies=[]
for i,worker in enumerate(workers):
    with tf.device(worker):    
        # Construct a linear model
        h_1=tf.nn.relu(tf.matmul(x_list[i], W_1) + b_1)
        h_2=tf.matmul(h_1, W_2) + b_2
        model = tf.nn.softmax(h_2)  # Softmax

        loss = -tf.reduce_sum(y_list[i]*tf.log(model))
        losses.append(loss)

        predictions = tf.equal(tf.argmax(model, 1), tf.argmax(y_list[i], 1))
        accuracy = tf.reduce_mean(tf.cast(predictions, "float"))
        accuracies.append(accuracy)

average_loss_op = tf.add_n(losses) / tf.convert_to_tensor(len(losses), dtype=tf.float64)
train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss)
    

average_accuracy_op = tf.add_n(accuracies) / tf.convert_to_tensor(len(accuracies), dtype=tf.float32)

init = tf.global_variables_initializer()


In [10]:
# Launch the graph
with tf.Session(server.target) as sess:
    sess.run(init)
    # Training cycle
    epsilon=0.9
    test_acc = 0.0
    iteration=0
    while test_acc < epsilon and iteration < 30:
        avg_acc = 0.0
        avg_cost = 0.0
        for i in range(num_batches):
            lower_index=batch_size*i
            if i==num_batches-1:
                batch = X_train[lower_index:]
                batch_labels=Y_train[lower_index:]
            else:
                upper_index=batch_size*(i+1)
                batch = X_train[lower_index:upper_index]
                batch_labels=Y_train[lower_index:upper_index]
            
            sess.run(train_step, feed_dict={x: batch, y: batch_labels})
            
            avg_cost += sess.run(average_loss_op, feed_dict={x: batch, y: batch_labels})/float(num_batches)
            avg_acc += sess.run(average_accuracy_op, feed_dict={x: batch, y: batch_labels})/float(num_batches)
            
        test_loss = sess.run(average_loss_op, feed_dict={x: X_test, y: Y_test})
        test_acc = sess.run(average_accuracy_op, feed_dict={x: X_test, y: Y_test})
        
        print ("Iteration:%d"%iteration, "training loss= %f"%avg_cost, "training accuracy= %f"%avg_acc,\
               "test accuracy= %f"%test_acc)
        iteration+=1
        
    print("Training completed!")


Iteration:0 training loss= 71.135869 training accuracy= 0.640943 test accuracy= 0.714381
Iteration:1 training loss= 25.506088 training accuracy= 0.778150 test accuracy= 0.777048
Iteration:2 training loss= 20.917178 training accuracy= 0.821458 test accuracy= 0.798190
Iteration:3 training loss= 18.625326 training accuracy= 0.843299 test accuracy= 0.815048
Iteration:4 training loss= 17.285545 training accuracy= 0.860477 test accuracy= 0.825714
Iteration:5 training loss= 16.368682 training accuracy= 0.871304 test accuracy= 0.837429
Iteration:6 training loss= 15.712874 training accuracy= 0.878796 test accuracy= 0.846952
Iteration:7 training loss= 15.338522 training accuracy= 0.884383 test accuracy= 0.854095
Iteration:8 training loss= 15.020151 training accuracy= 0.888824 test accuracy= 0.860381
Iteration:9 training loss= 14.827269 training accuracy= 0.892820 test accuracy= 0.862667
Iteration:10 training loss= 14.692618 training accuracy= 0.896345 test accuracy= 0.864667
Iteration:11 trainin