In [1]:
import keras

import tensorflow as tf
import numpy as np
from concurrent.futures import ThreadPoolExecutor, wait
import time
from threading import Thread

Using TensorFlow backend.


In [2]:
data_x = np.random.rand(100,64,64,3)
data_y = np.random.randint(26,size=(100,26))
print(data_y)

[[ 7 16  4 ..., 15 21 10]
 [14  2 12 ..., 14 17  1]
 [ 1  3 16 ..., 11 20 25]
 ..., 
 [15 18 15 ...,  4  6  6]
 [15  5 18 ...,  7  4  1]
 [ 9 22  3 ...,  0  1  6]]


In [3]:
class Dataset():
    def __init__(self,batch_size):
        self.batch_size = batch_size
        self.len_data = 100
        self._make_inputs()
        self.idx = -1
        self.num_threads = 2
        self.num_batch = self.len_data // self.batch_size + 1
    def _make_inputs(self):
        
        self.inputs = tf.placeholder(shape=[64,64,3],dtype=tf.float32,name='data_x')
        self.labels = tf.placeholder(shape=[26],dtype=tf.int32,name='data_y')
        self.queue = tf.FIFOQueue(shapes=[[64,64,3],[26]],dtypes=[tf.float32,tf.int32],
                                  shared_name="fifoqueue",capacity=self.batch_size*2)
        self.enqueue_op = self.queue.enqueue([self.inputs,self.labels])
        

        self._queue_close = self.queue.close(cancel_pending_enqueues=True)

    def next_batch(self):
        
        batch_x , batch_y = self.queue.dequeue_many(self.batch_size)
        return batch_x,batch_y
    


    def close_queue(self, session):

        session.run(self._queue_close)
        
    def _pre_batch_queue(self,sess,coord):
        
        while not coord.should_stop():
            self.idx += 1
            index = self.idx % self.len_data
            feature = data_x[index]
            label = data_y[index]

            sess.run(self.enqueue_op,feed_dict = {self.inputs : feature,self.labels: label})

    
    def start_queue_threads(self,sess,coord):
        queue_threads = [Thread(target=self._pre_batch_queue, args=(sess, coord))
                         for i in range(self.num_threads)]
        
        for queue_thread in queue_threads:
            coord.register_thread(queue_thread)
            queue_thread.daemon = True
            queue_thread.start()

        return queue_threads
    


In [4]:
def weight_variable(shape):
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)


def model(x):
    

    conv = tf.nn.conv2d(x,weight_variable([5,5,3,64]),strides = [1,1,1,1],padding='VALID')
    bias = bias_variable([64])
    h_conv1 = tf.nn.relu(conv + bias)

    conv2 = tf.nn.conv2d(h_conv1,weight_variable([5,5,64,32]),strides = [1,1,1,1],padding='VALID')
    bias2 = bias_variable([32])
    h_conv2 = tf.nn.relu(conv2 + bias2)
    shape = h_conv2.shape.as_list()
    h_conv_flat = tf.reshape(h_conv2,[-1,shape[1]*shape[2]*32])
    logits  = tf.matmul(h_conv_flat,weight_variable([100352,26]) + bias_variable([26]))
    return logits


In [5]:
def get_loss(batch_x,batch_y):
    logits = model(batch_x)

    cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=logits,
                                                    labels=tf.cast(batch_y, tf.float32),
                                                      name="cross-entropy")
    loss = tf.reduce_mean(cross_entropy, name='loss')
    return loss
    

In [6]:
def get_tower_results(dataset,ngpus,optimizer):
    tower_gradients = []
    tower_loss = []

    for i in range(ngpus):
        with tf.variable_scope(tf.get_variable_scope(),reuse= i > 0):
             with tf.device('/gpu:%d'%i):
                with tf.name_scope('tower_%d' % i) as scope:
                    batch_x ,batch_y = dataset.next_batch()
                    loss = get_loss(batch_x,batch_y)
                    print(batch_x)
                    print('\n')
                    gradient = optimizer.compute_gradients(loss)
                    print(gradient)
                    print('\n')
                    tower_gradients.append(gradient)
                    tower_loss.append(loss)
                    
    return tower_gradients,tf.reduce_mean(tower_loss)

In [7]:
def average_gradients(tower_gradients):
    average_grads = []

    # Run this on cpu_device to conserve GPU memory
    with tf.device('/cpu:0'):
        # Loop over gradient/variable pairs from all towers
        for grad_and_vars in zip(*tower_gradients):
            # Introduce grads to store the gradients for the current variable
            grads = []

            # Loop over the gradients for the current variable
            for g, _ in grad_and_vars:
                # Add 0 dimension to the gradients to represent the tower.
                if g is not None:
                    expanded_g = tf.expand_dims(g, 0)
                    # Append on a 'tower' dimension which we will average over below.
                    grads.append(expanded_g)

            # Average over the 'tower' dimension
            grad = tf.concat(grads, 0)
            grad = tf.reduce_mean(grad, 0)

            # Create a gradient/variable tuple for the current variable with its average gradient
            grad_and_var = (grad, grad_and_vars[0][1])

            # Add the current tuple to average_grads
            average_grads.append(grad_and_var)

    # Return result to caller
    return average_grads

In [8]:

dataset = Dataset(32)
global_step = tf.get_variable(
        'global_step', [],
        initializer=tf.constant_initializer(0), trainable=False)
optimizer = tf.train.AdamOptimizer(learning_rate=0.0001)
tower_gradient , loss = get_tower_results(dataset,2,optimizer)


Tensor("tower_0/fifo_queue_DequeueMany:0", shape=(32, 64, 64, 3), dtype=float32, device=/device:GPU:0)


[(<tf.Tensor 'tower_0/gradients/tower_0/Conv2D_grad/tuple/control_dependency_1:0' shape=(5, 5, 3, 64) dtype=float32>, <tf.Variable 'tower_0/Variable:0' shape=(5, 5, 3, 64) dtype=float32_ref>), (<tf.Tensor 'tower_0/gradients/tower_0/add_grad/tuple/control_dependency_1:0' shape=(64,) dtype=float32>, <tf.Variable 'tower_0/Variable_1:0' shape=(64,) dtype=float32_ref>), (<tf.Tensor 'tower_0/gradients/tower_0/Conv2D_1_grad/tuple/control_dependency_1:0' shape=(5, 5, 64, 32) dtype=float32>, <tf.Variable 'tower_0/Variable_2:0' shape=(5, 5, 64, 32) dtype=float32_ref>), (<tf.Tensor 'tower_0/gradients/tower_0/add_1_grad/tuple/control_dependency_1:0' shape=(32,) dtype=float32>, <tf.Variable 'tower_0/Variable_3:0' shape=(32,) dtype=float32_ref>), (<tf.Tensor 'tower_0/gradients/tower_0/add_2_grad/tuple/control_dependency:0' shape=(100352, 26) dtype=float32>, <tf.Variable 'tower_0/Variable_4:0' sha

In [9]:
avarage_grad = average_gradients(tower_gradient)

In [10]:
train_op = optimizer.apply_gradients(avarage_grad)

In [11]:

# sess = tf.Session()
# coord = tf.train.Coordinator()
# dataset.start_queue_threads(sess,coord)

In [14]:
config = tf.ConfigProto(allow_soft_placement=True)
sess = tf.Session(config=config)
sess.run(tf.global_variables_initializer())
coord = tf.train.Coordinator()
dataset.start_queue_threads(sess,coord)

[<Thread(Thread-6, started daemon 140706060433152)>,
 <Thread(Thread-7, started daemon 140706052040448)>]

In [15]:
for e in range(10):
    for i in range(dataset.num_batch):
        _, cost = sess.run([train_op,loss])
        print(cost)

16549.4
16281.2
17291.9
16853.7
17114.1
17491.7
17844.0
18437.4
17930.4
18679.4
19061.2
18770.7
19357.5
20094.2
20005.4
20236.8
21303.5
21405.9
22026.5
21477.5
22906.7
22960.5
22951.0
24188.3
24428.7
24651.3
24912.6
25837.3
25859.6
26217.0
26802.5
27893.7
28401.5
27958.0
29679.1
29615.0
31635.5
30859.9
31425.0
32155.9
