# TensorFlow queues and input pipelines

## Initialization

In [20]:
import numpy as np
import tensorflow as tf
import threading
import time

import os
from cStringIO import StringIO
import numpy as np
from functools import partial

from IPython.display import clear_output, Image, display, HTML

import tensorflow as tf

def strip_consts(graph_def, max_const_size=32):
    """Strip large constant values from graph_def."""
    strip_def = tf.GraphDef()
    for n0 in graph_def.node:
        n = strip_def.node.add() 
        n.MergeFrom(n0)
        if n.op == 'Const':
            tensor = n.attr['value'].tensor
            size = len(tensor.tensor_content)
            if size > max_const_size:
                tensor.tensor_content = "<stripped %d bytes>"%size
    return strip_def
  
def rename_nodes(graph_def, rename_func):
    res_def = tf.GraphDef()
    for n0 in graph_def.node:
        n = res_def.node.add() 
        n.MergeFrom(n0)
        n.name = rename_func(n.name)
        for i, s in enumerate(n.input):
            n.input[i] = rename_func(s) if s[0]!='^' else '^'+rename_func(s[1:])
    return res_def
def show_graph(graph_def, max_const_size=32):
    """Visualize TensorFlow graph."""
    if hasattr(graph_def, 'as_graph_def'):
        graph_def = graph_def.as_graph_def()
    strip_def = strip_consts(graph_def, max_const_size=max_const_size)
    code = """
        <script>
          function load() {{
            document.getElementById("{id}").pbtxt = {data};
          }}
        </script>
        <link rel="import" href="https://tensorboard.appspot.com/tf-graph-basic.build.html" onload=load()>
        <div style="height:600px">
          <tf-graph-basic id="{id}"></tf-graph-basic>
        </div>
    """.format(data=repr(str(strip_def)), id='graph'+str(np.random.rand()))
  
    iframe = """
        <iframe seamless style="width:1200px;height:620px;border:0" srcdoc="{}"></iframe>
    """.format(code.replace('"', '&quot;'))
    display(HTML(iframe))
    
def create_session():
    sess = tf.InteractiveSession(config=tf.ConfigProto(operation_timeout_in_ms=3000))
    return sess


In [36]:
tf.reset_default_graph()
a = tf.constant(1)
b = tf.constant(2)
c = a+b
d = c*a
e = d-c

In [37]:
show_graph(tf.get_default_graph().as_graph_def())

# Simple queue

In [24]:
tf.reset_default_graph()
q = tf.FIFOQueue(capacity=20, dtypes=[tf.int32])
enqueue_placeholder = tf.placeholder(tf.int32)
enqueue_op = q.enqueue(enqueue_placeholder)
sess = create_session()
for i in range(10):
    sess.run(enqueue_op, feed_dict={enqueue_placeholder:i})
    print "Queue size is now: "+str(sess.run(q.size()))
sess.run(q.close())

Queue size is now: 1
Queue size is now: 2
Queue size is now: 3
Queue size is now: 4
Queue size is now: 5
Queue size is now: 6
Queue size is now: 7
Queue size is now: 8
Queue size is now: 9
Queue size is now: 10


Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x1103fbbd0>> ignored


In [26]:
try:
    for i in range(20):
        print sess.run(q.dequeue())
except tf.errors.OutOfRangeError:
    print "Done"

0
1
2
3
4
5
6
7
8
9
Done


# Simple queue with multiple enqueues in parallel

In [23]:
tf.reset_default_graph()
random_number = tf.random_uniform(shape=())
q = tf.FIFOQueue(capacity=20, dtypes=[tf.float32])
enqueue_op = q.enqueue(random_number)

sess = create_session()
print sess.run(q.size())
def run():
  for i in range(5):
    sess.run(enqueue_op)

threads = [threading.Thread(target=run, args=()) for i in range(2)]
[t.start() for t in threads]
print sess.run(q.size())
time.sleep(0.5)
print sess.run(q.size())


Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x1103f3c10>> ignored


0
4
10


# Setting up range_input_producer

In [26]:
inn = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)

In [29]:
show_graph(tf.get_default_graph().as_graph_def())

## Lookup name of size operation in GraphDef

If you don't want to create new Node with "op.size", you can find name of exiting node by looking for with op="QueueSize" in the graph definition, and fetching it directly. In this case, proper run call will look like this -- sess.run("input_producer/fraction_of_32_full/fraction_of_32_full_Size:0")

In [30]:
tf.get_default_graph().as_graph_def()

node {
  name: "input_producer/range/start"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_INT32
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_INT32
        tensor_shape {
        }
        int_val: 0
      }
    }
  }
}
node {
  name: "input_producer/range/limit"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_INT32
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_INT32
        tensor_shape {
        }
        int_val: 10
      }
    }
  }
}
node {
  name: "input_producer/range/delta"
  op: "Const"
  attr {
    key: "dtype"
    value {
      type: DT_INT32
    }
  }
  attr {
    key: "value"
    value {
      tensor {
        dtype: DT_INT32
        tensor_shape {
        }
        int_val: 1
      }
    }
  }
}
node {
  name: "input_producer/range"
  op: "Range"
  input: "input_producer/range/start"
  input: "input_producer/range/limit"
  input: "input_producer/range/delta"
}
nod

In [47]:
tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)

[<tensorflow.python.training.queue_runner.QueueRunner at 0x110f32050>]

In [48]:
threads = tf.train.start_queue_runners()

In [53]:
sess.graph.version

23

In [54]:
sess.run(inn.size())

10

In [55]:
sess.graph.version

24

In [50]:
"input_producer/fraction_of_32_full/fraction_of_32_full_Size:0"

<tensorflow.python.ops.data_flow_ops.FIFOQueue at 0x110e85610>

# Alternatives to "wait forever"

## Set session timeout

In [31]:
tf.reset_default_graph()
queue = tf.FIFOQueue(capacity=5, dtypes=[tf.int32])
config = tf.ConfigProto()
config.operation_timeout_in_ms=2000
sess = tf.InteractiveSession("", config=config)
try:
    sess.run(queue.dequeue())
except tf.errors.DeadlineExceededError:
    print "DeadlineExceededError"

Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x1103fbc90>> ignored


DeadlineExceededError


## Close the queue before reading

In [29]:
tf.reset_default_graph()
queue = tf.FIFOQueue(capacity=5, dtypes=[tf.int32])
config = tf.ConfigProto()
config.operation_timeout_in_ms=2000
sess = tf.InteractiveSession("", config=config)
sess.run(queue.close())
try: 
    sess.run(queue.dequeue())
except tf.errors.OutOfRangeError as e:
    print "OutOfRangeError"

OutOfRangeError


Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x11002a510>> ignored


## Close the queue before writing

In [30]:
tf.reset_default_graph()
queue = tf.FIFOQueue(capacity=5, dtypes=[tf.int32])
config = tf.ConfigProto()
config.operation_timeout_in_ms=2000
sess = tf.InteractiveSession("", config=config)
sess.run(queue.close())
try:
    sess.run(queue.enqueue(5))
except tf.errors.AbortedError:
    print "AbortedError"

AbortedError


Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x11002ae90>> ignored


# Simple batching example

In [73]:
tf.reset_default_graph()
ranges = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)
number = ranges.dequeue()
numeric_batch = tf.train.batch_join([[number]]*3, batch_size=2)
batch_number = numeric_batch

sess = tf.InteractiveSession(config=tf.ConfigProto(operation_timeout_in_ms=6000))
sess.run(tf.initialize_all_variables())
tf.train.start_queue_runners()

Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x110efbc50>> ignored


[<Thread(Thread-30, started daemon 4586786816)>,
 <Thread(Thread-31, started daemon 4590993408)>,
 <Thread(Thread-32, started daemon 4595200000)>,
 <Thread(Thread-33, started daemon 4599406592)>]

In [74]:
tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)

[<tensorflow.python.training.queue_runner.QueueRunner at 0x111160c50>,
 <tensorflow.python.training.queue_runner.QueueRunner at 0x1111dd850>]

### Python/TensorFlow gotcha

In [76]:
tf.reset_default_graph()
ranges = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)
number = ranges.dequeue()
sess = tf.InteractiveSession(config=tf.ConfigProto(operation_timeout_in_ms=6000))
sess.run(tf.initialize_all_variables())
tf.train.start_queue_runners()

print sess.run([number]*3)

[0, 0, 0]


Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x1110b33d0>> ignored


### Solution 1

In [77]:
tf.reset_default_graph()
ranges = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)
number = ranges.dequeue()
sess = tf.InteractiveSession(config=tf.ConfigProto(operation_timeout_in_ms=6000))
sess.run(tf.initialize_all_variables())
tf.train.start_queue_runners()

print sess.run([ranges.dequeue()]*3)

[0, 0, 0]


Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x1110f7f10>> ignored


### Solution 2

In [78]:
tf.reset_default_graph()
ranges = tf.train.range_input_producer(limit=10, num_epochs=1, shuffle=False)
number = ranges.dequeue()
sess = tf.InteractiveSession(config=tf.ConfigProto(operation_timeout_in_ms=6000))
sess.run(tf.initialize_all_variables())
tf.train.start_queue_runners()

print sess.run([ranges.dequeue(), ranges.dequeue(), ranges.dequeue()])

[1, 2, 0]


Exception AssertionError: AssertionError() in <bound method InteractiveSession.__del__ of <tensorflow.python.client.session.InteractiveSession object at 0x111257a50>> ignored
