-
Notifications
You must be signed in to change notification settings - Fork 2
/
example.py
156 lines (123 loc) · 6.1 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
'''
Distributed Tensorflow 0.8.0 example of using data parallelism and share model parameters.
Trains a simple sigmoid neural network on mnist for 20 epochs on three machines using one parameter server.
Change the hardcoded host urls below with your own hosts.
Run like this:
pc-01$ python example.py --job_name="ps" --task_index=0
pc-02$ python example.py --job_name="worker" --task_index=0
pc-03$ python example.py --job_name="worker" --task_index=1
pc-04$ python example.py --job_name="worker" --task_index=2
More details here: ischlag.github.io
'''
from __future__ import print_function
import tensorflow as tf
import sys
import time
# cluster specification
parameter_servers = ["localhost:12222"]
# workers = ["localhost:12223", "localhost:12224", "localhost:12225"]
workers = ["localhost:12223"]
cluster = tf.train.ClusterSpec({"ps": parameter_servers, "worker": workers})
# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS
# start a server for a specific task
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
# config
batch_size = 5000 # As big as will fit on my gpu
learning_rate = 0.016 # Fast learning
training_epochs = 50
n_hidden = 2000
logs_path = "/tmp/mnist/2"
# load mnist data set
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
# Between-graph replication
with tf.device(
tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
# count the number of updates
global_step = tf.get_variable('global_step', [], initializer=tf.constant_initializer(0), trainable=False)
# input images
with tf.name_scope('input'):
# None -> batch size can be any size, 784 -> flattened mnist image
x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
# target 10 output classes
y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
# model parameters will change during training so we use tf.Variable
tf.set_random_seed(1)
with tf.name_scope("weights"):
W1 = tf.get_variable('W1',
shape=(784, n_hidden),
initializer=tf.contrib.layers.xavier_initializer())
W2 = tf.get_variable('W2',
shape=(n_hidden, 10),
initializer=tf.contrib.layers.xavier_initializer())
# bias
with tf.name_scope("biases"):
b1 = tf.Variable(tf.zeros([n_hidden]))
b2 = tf.Variable(tf.zeros([10]))
# implement model
with tf.name_scope("softmax"):
# y is our prediction
z2 = tf.add(tf.matmul(x, W1), b1)
a2 = tf.nn.sigmoid(z2)
logits = tf.add(tf.matmul(a2, W2), b2)
dropout_logits = tf.nn.dropout(logits, 0.3)
softmax_logits = tf.nn.softmax(logits)
# specify cost function
with tf.name_scope('cross_entropy'):
# this is our cost
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(dropout_logits, y_)
loss = tf.reduce_mean(cross_entropy)
# specify optimizer
with tf.name_scope('train'):
# optimizer is an "operation" which we can execute in a session
grad_op = tf.train.AdamOptimizer(learning_rate=learning_rate)
train_op = grad_op.minimize(loss, global_step=global_step)
with tf.name_scope('Accuracy'):
# accuracy
correct_prediction = tf.equal(tf.argmax(softmax_logits, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
# create a summary for our cost and accuracy
tf.scalar_summary("cost", loss)
tf.scalar_summary("accuracy", accuracy)
# merge all summaries into a single "operation" which we can execute in a session
summary_op = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
print("Variables initialized ...")
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), global_step=global_step, init_op=init_op)
begin_time = time.time()
frequency = 100
with sv.prepare_or_wait_for_session(server.target) as sess:
# create log writer object (this will log on every machine)
writer = tf.train.SummaryWriter(logs_path, graph=tf.get_default_graph())
# perform training cycles
start_time = time.time()
for epoch in range(training_epochs):
# number of batches in one epoch
batch_count = int(mnist.train.num_examples / batch_size)
count = 0
for i in range(batch_count):
batch_x, batch_y = mnist.train.next_batch(batch_size)
# perform the operations we defined earlier on batch
_, cost, summary, step, train_accuracy = sess.run([train_op, loss, summary_op, global_step, accuracy],
feed_dict={x: batch_x, y_: batch_y})
writer.add_summary(summary, step)
count += 1
if count % frequency == 0 or i + 1 == batch_count:
elapsed_time = time.time() - start_time
start_time = time.time()
print("Step: %d," % (step + 1), " Epoch: %2d," % (epoch + 1),
" Batch: %3d of %3d," % (i + 1, batch_count), " Cost: %.4f," % cost,
" Train acc %2.2f" % (train_accuracy * 100),
" AvgTime: %3.2fms" % float(elapsed_time * 1000 / frequency))
count = 0
print("Test-Accuracy: %2.2f" % (sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}) *100))
print("Total Time: %3.2fs" % float(time.time() - begin_time))
print("Final Cost: %.4f" % cost)
sv.stop()
print("done")