Skip to content

Commit

Permalink
Merge pull request #5 from petrux/dev
Browse files Browse the repository at this point in the history
Added input module
  • Loading branch information
petrux committed Apr 27, 2017
2 parents 3e86d14 + b6bbbc1 commit e0e4845
Show file tree
Hide file tree
Showing 3 changed files with 319 additions and 1 deletion.
135 changes: 135 additions & 0 deletions liteflow/input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Utilities for input pipelines."""

import tensorflow as tf


def shuffle(tensors,
capacity=32,
min_after_dequeue=16,
num_threads=1,
dtypes=None,
shapes=None,
seed=None,
shared_name=None,
name='shuffle'):
"""Wrapper around a `tf.RandomShuffleQueue` creation.
Return a dequeue op that dequeues elements from `tensors` in a
random order, through a `tf.RandomShuffleQueue` -- see for further
documentation.
Arguments:
tensors: an iterable of tensors.
capacity: (Optional) the capacity of the queue; default value set to 32.
num_threads: (Optional) the number of threads to be used fo the queue runner;
default value set to 1.
min_after_dequeue: (Optional) minimum number of elements to remain in the
queue after a `dequeue` or `dequeu_many` has been performend,
in order to ensure better mixing of elements; default value set to 16.
dtypes: (Optional) list of `DType` objects, one for each tensor in `tensors`;
if not provided, will be inferred from `tensors`.
shapes: (Optional) list of shapes, one for each tensor in `tensors`.
seed: (Optional) seed for random shuffling.
shared_name: (Optional) If non-empty, this queue will be shared under
the given name across multiple sessions.
name: Optional name scope for the ops.
Returns:
The tuple of tensors that was randomly dequeued from `tensors`.
"""

tensors = list(tensors)
with tf.name_scope(name, tensors):
dtypes = dtypes or list([t.dtype for t in tensors])
queue = tf.RandomShuffleQueue(
seed=seed,
shared_name=shared_name,
name='random_shuffle_queue',
dtypes=dtypes,
shapes=shapes,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
enqueue = queue.enqueue(tensors)
runner = tf.train.QueueRunner(queue, [enqueue] * num_threads)
tf.train.add_queue_runner(runner)
dequeue = queue.dequeue()
return dequeue


def shuffle_batch(tensors,
batch_size,
capacity=32,
num_threads=1,
min_after_dequeue=16,
dtypes=None,
shapes=None,
seed=None,
enqueue_many=False,
dynamic_pad=True,
allow_smaller_final_batch=False,
shared_name=None,
name='shuffle_batch'):
"""Create shuffled and padded batches of tensors in `tensors`.
Dequeue elements from `tensors` shuffling, batching and dynamically
padding them. First a `tf.RandomShuffleQueue` is created and fed with
`tensors` (using the `dket.input.shuffle` function); the dequeued tensors
shapes are then set and fed into a `tf.train.batch` function that provides
batching and dynamic padding.
Arguments:
tensors: an iterable of tensors.
batch_size: an `int` representing th batch size.
capacity: (Optional) the capacity of the queues; default value set to 32.
num_threads: (Optional) the number of threads to be used fo the queue runner;
default value set to 1.
min_after_dequeue: (Optional) minimum number of elements to remain in the
shuffling queue after a `dequeue` or `dequeu_many` has been performend,
in order to ensure better mixing of elements; default value set to 16.
dtypes: (Optional) list of `DType` objects, one for each tensor in `tensors`;
if not provided, will be inferred from `tensors`.
shapes: (Optional) list of shapes, one for each tensor in `tensors`.
seed: (Optional) seed for random shuffling.
enqueue_many: Whether each tensor in tensors is a single example.
dynamic_pad: Boolean. Allow variable dimensions in input shapes.
The given dimensions are padded upon dequeue so that tensors within
a batch have the same shapes.
allow_smaller_final_batch: (Optional) Boolean. If True, allow the final
batch to be smaller if there are insufficient items left in the queue.
shared_name: if set, the queues will be shared under the given name
across different sessions.
name: scope name for the given ops.
Returns:
A batch of tensors from `tensors`, shuffled and padded.
"""

tensors = list(tensors)
with tf.name_scope(name, tensors):
dtypes = dtypes or list([t.dtype for t in tensors])
shapes = shapes or list([t.get_shape() for t in tensors])
inputs = shuffle(tensors,
seed=seed,
dtypes=dtypes,
capacity=capacity,
num_threads=num_threads,
min_after_dequeue=min_after_dequeue,
shared_name=shared_name,
name='shuffle')

# fix the shapes
for tensor, shape in zip(inputs, shapes):
tensor.set_shape(shape)

minibatch = tf.train.batch(
tensors=inputs,
batch_size=batch_size,
num_threads=num_threads,
capacity=capacity,
dynamic_pad=dynamic_pad,
allow_smaller_final_batch=allow_smaller_final_batch,
shared_name=shared_name,
enqueue_many=enqueue_many,
name='batch')
return minibatch
119 changes: 119 additions & 0 deletions liteflow/tests/test_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Test module for the `dket.input` module."""

import datetime
import os

import tensorflow as tf

from liteflow import input as linput


def _timestamp():
frmt = "%Y-%m-%d--%H-%M-%S.%f"
stamp = datetime.datetime.now().strftime(frmt)
print 'STAMP: ' + stamp
return stamp


def _encode(key, vector):
example = tf.train.Example(
features=tf.train.Features(
feature={
'key': tf.train.Feature(
int64_list=tf.train.Int64List(
value=[key])),
'vector': tf.train.Feature(
int64_list=tf.train.Int64List(
value=vector))}))
return example


def _decode(message):
features = {
'key': tf.FixedLenFeature([], tf.int64),
'vector': tf.VarLenFeature(tf.int64)
}
parsed = tf.parse_single_example(
serialized=message,
features=features)
key = parsed['key']
vector = tf.sparse_tensor_to_dense(parsed['vector'])
return key, vector


def _save_records(fpath, *records):
with tf.python_io.TFRecordWriter(fpath) as fout:
for record in records:
fout.write(record.SerializeToString())


def _read(fpath, num_epochs=None, shuffle=True):
queue = tf.train.string_input_producer(
string_tensor=[fpath],
num_epochs=num_epochs,
shuffle=shuffle)
reader = tf.TFRecordReader()
_, value = reader.read(queue)
key, vector = _decode(value)
return key, vector


class ShuffleBatchTest(tf.test.TestCase):
"""."""

TMP_DIR = '/tmp'


def test_base(self):
"""."""

# NOTA BENE: all the test depends on the value
# used for the random seed, so if you change it
# you HAVE TO re run the generation and check
# manually in order to update the expected results.
# Bottom line: DON'T CHANGE THE RANDOM SEED.
tf.reset_default_graph()
tf.set_random_seed(23)

filename = os.path.join(self.TMP_DIR, _timestamp() + '.rio')
data = [
(1, [1]),
(2, [2, 2]),
(3, [3, 3, 3]),
(4, [4, 4, 4, 4]),
(5, [5, 5, 5, 5, 5]),
(6, [6, 6, 6, 6, 6, 6])]
examples = [_encode(k, v) for k, v in data]
_save_records(filename, *examples)
tensors = _read(filename, num_epochs=4, shuffle=False)

batch_size = 3
batch = linput.shuffle_batch(tensors, batch_size)

actual_keys = []
expected_keys = [2, 5, 6, 1, 3, 6, 3, 4, 5, 1, 4, 1, 5, 6, 3, 2, 2, 4, 6, 1, 4, 5, 3, 2]

with tf.Session() as sess:
sess.run(tf.local_variables_initializer())
sess.run(tf.global_variables_initializer())
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
try:
while True:
bkey, bvector = sess.run(batch)
bkey = bkey.tolist()
length = max(bkey)
self.assertEqual((batch_size, length), bvector.shape)
actual_keys = actual_keys + bkey

except tf.errors.OutOfRangeError as ex:
coord.request_stop(ex=ex)
finally:
coord.request_stop()
coord.join(threads)

self.assertEquals(actual_keys, expected_keys)
os.remove(filename)

if __name__ == '__main__':
tf.test.main()
66 changes: 65 additions & 1 deletion liteflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,35 @@


def as_scope(scope):
"""Get the proper variable scope.
Given an object that can represent a `tf.VariableScope`,
namely a `str` or a `tf.VariableScope`, performs type checking
and return a proper `tf.VariableScope` object. Such function is
hancy when a function accepts an argument serving as a variable
scope but doesn's know its proper type.
Arguments:
scope: a `str` or a `tf.VariableScope` representing a variable scope.
Returns:
a `tf.VariableScope` instance.
Raises:
ValueError: if `scope` is `None`.
TypeError: if `scope` is neither `str` or `tf.VariableScope`.
Example:
```python
from dket import utils
def do_something(scope):
scope = utils.as_scope(scope or 'DefaultScope')
with tf.variable_scope(scope) as scope:
# do something
pass
```
"""
if scope is None:
raise ValueError('Cannot create a scope from a None.')
if isinstance(scope, str):
Expand All @@ -16,7 +45,42 @@ def as_scope(scope):


def get_variables(prefix=None):
"""Get variables by their name prefix."""
"""Get variables by their name prefix.
Arguments:
prefix: a `str` or a `tf.VariableScope` instance.
Returns:
a list of `tf.Variable` with their name starting with the
given prefix, i.e. all those variables under the scope
specified by the prefix.
"""
prefix = prefix or tf.get_variable_scope().name
return [var for var in tf.global_variables()
if var.name.startswith(prefix)]


def dypes(tensors):
"""Get the `dtype` for tensors in a list.
Arguments:
tensors: an iterable of `tf.Tensor`.
Returns:
a `list` of `dtype`s, one for each tensor in `tensors`,
representing their `dtype`.
"""
return [t.dtype for t in tensors]


def shapes(tensors):
"""Get the static shapes of tensors in a list.
Arguments:
tensors: an iterable of `tf.Tensor`.
Returns:
a `list` of `tf.TensorShape`, one for each tensor in `tensors`,
representing their static shape (via `tf.Tensor.get_shape()`).
"""
return [t.get_shape() for t in tensors]

0 comments on commit e0e4845

Please sign in to comment.