Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatibility with Tensorflow slim #140

Closed
gon9031 opened this issue Aug 29, 2018 · 5 comments
Closed

Compatibility with Tensorflow slim #140

gon9031 opened this issue Aug 29, 2018 · 5 comments
Labels
bug Something isn't working

Comments

@gon9031
Copy link

gon9031 commented Aug 29, 2018

There seems to be a problem with compatibility with functions used in Tensorflow slim.

Ex code)
[images, labels] = get_batch_test_dali(FLAGS.batch_size)
batch_queue = slim.prefetch_queue.prefetch_queue(
[images, labels], capacity=2 * deploy_config.num_clones)

(Output)
...
File "/usr/local/lib/python3.5/dist-packages/tensorflow/contrib/slim/python/slim/data/prefetch_queue.py", line 78, in prefetch_queue
dtypes = [t.dtype for t in tensor_list]
File "/usr/local/lib/python3.5/dist-packages/tensorflow/contrib/slim/python/slim/data/prefetch_queue.py", line 78, in
dtypes = [t.dtype for t in tensor_list]
AttributeError: 'list' object has no attribute 'dtype'

Did you go over this part?

@JanuszL
Copy link
Contributor

JanuszL commented Aug 29, 2018

Hi,
Could you provide full minimal case to reproduce this (standalone one we could run without any additional conditions)?
Honestly we have not targeted (tested) Tensorflow slim so it could be bug or just limitation.
Tracked as DALI-209

@JanuszL JanuszL added the bug Something isn't working label Aug 29, 2018
@gon9031
Copy link
Author

gon9031 commented Aug 30, 2018

from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
import nvidia.dali.tfrecord as tfrec
import tensorflow as tf
import nvidia.dali.plugin.tf as dali_tf
from subprocess import call
import os.path
import numpy as np

slim = tf.contrib.slim

lmdb_folder = "/data/ilsvrc12_train_lmdb"

tfrecord = "/data/imagenet/train-00001-of-01024"
tfrecord_idx = "idx_files/train-00001-of-01024.idx"
tfrecord2idx_script = "tfrecord2idx"

N = 4 # number of GPUs
BATCH_SIZE = 128 # batch size per GPU
ITERATIONS = 32
IMAGE_SIZE = 3

if not os.path.exists("idx_files"):
os.mkdir("idx_files")

if not os.path.isfile(tfrecord_idx):
call([tfrecord2idx_script, tfrecord, tfrecord_idx])

class CommonPipeline(Pipeline):
def init(self, batch_size, num_threads, device_id):
super(CommonPipeline, self).init(batch_size, num_threads, device_id)

    self.decode = ops.nvJPEGDecoder(device = "mixed", output_type = types.RGB)
    self.resize = ops.Resize(device = "gpu",
                             image_type = types.RGB,
                             interp_type = types.INTERP_LINEAR)
    self.cmn = ops.CropMirrorNormalize(device = "gpu",
                                        output_dtype = types.FLOAT,
                                        crop = (227, 227),
                                        image_type = types.RGB,
                                        mean = [128., 128., 128.],
                                        std = [1., 1., 1.])
    self.uniform = ops.Uniform(range = (0.0, 1.0))
    self.resize_rng = ops.Uniform(range = (256, 480))

def base_define_graph(self, inputs, labels):
    images = self.decode(inputs)
    images = self.resize(images, resize_shorter = self.resize_rng())
    output = self.cmn(images, crop_pos_x = self.uniform(),
                      crop_pos_y = self.uniform())
    return (output, labels.gpu())

class CaffeReadPipeline(CommonPipeline):
def init(self, batch_size, num_threads, device_id, num_gpus):
super(CaffeReadPipeline, self).init(batch_size, num_threads, device_id)
self.input = ops.CaffeReader(path = lmdb_folder,
random_shuffle = True, shard_id = device_id, num_shards = num_gpus)

def define_graph(self):
    images, labels = self.input()
    return self.base_define_graph(images, labels)

class TFRecordPipeline(CommonPipeline):
def init(self, batch_size, num_threads, device_id, num_gpus):
super(TFRecordPipeline, self).init(batch_size, num_threads, device_id)
self.input = ops.TFRecordReader(path = tfrecord,
index_path = tfrecord_idx,
features = {"image/encoded" : tfrec.FixedLenFeature((), tfrec.string, ""),
"image/class/label": tfrec.FixedLenFeature([1], tfrec.int64, -1)
})

def define_graph(self):
    inputs = self.input()
    images = inputs["image/encoded"]
    labels = inputs["image/class/label"]
    return self.base_define_graph(images, labels)

def get_batch_test_dali(batch_size, pipe_type):
pipe_name, label_type, _ = pipe_type
pipes = [pipe_name(batch_size=batch_size, num_threads=2, device_id = device_id, num_gpus = N) for device_id in range(N)]

serialized_pipes = [pipe.serialize() for pipe in pipes]
del pipes
daliop = dali_tf.DALIIterator()
images = []
labels = []
for d in range(N):
    with tf.device('/gpu:%i' % d):
        image, label = daliop(serialized_pipeline = serialized_pipes[d],
            shape = [BATCH_SIZE, 3, 227, 227],
            image_type = tf.int32,
            label_type = label_type,
            device_id = d)
        images.append(image)
        labels.append(label)

return [images, labels]

pipe_types = [[CaffeReadPipeline, tf.int32, (0, 999)], [TFRecordPipeline, tf.int64, (1, 1000)]]

for pipe_name in pipe_types:
print ("RUN: " + pipe_name[0].name)
test_batch = get_batch_test_dali(BATCH_SIZE, pipe_name)
batch_queue = slim.prefetch_queue.prefetch_queue(test_batch)
x = tf.placeholder(tf.float32, shape=[BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3], name='x')
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.8)
config = tf.ConfigProto(gpu_options=gpu_options)

with tf.Session(config=config) as sess:
    for i in range(ITERATIONS):
        imgs, labels = sess.run(test_batch)
        # Testing correctness of labels
        for label in labels:
            ## labels need to be integers
            assert(np.equal(np.mod(label, 1), 0).all())
            ## labels need to be in range pipe_name[2]
            assert((label >= pipe_name[2][0]).all())
            assert((label <= pipe_name[2][1]).all())
print("OK : " + pipe_name[0].__name__)

@JanuszL
Copy link
Contributor

JanuszL commented Aug 30, 2018

Hi,
Thank you for the full sample - it has really sped up my investigation. There are two things:

  1. error caused by the fact that prefetch_queue expects a list of tensors, while test_batch is a list of lists of tensors. Images and labels returned from define_graph are lists itself. To make that working use:
for elm in zip(test_batch[0], test_batch[1]):
    slim.prefetch_queue.prefetch_queue(elm)
  1. prefetch_queue expects that all tensors have their sizes defined, in our case only image tensor is, while labels are not. PR Define shape of TF lable tensor based on image tensor shape #142 should address this problem

@ptrendx
Copy link
Member

ptrendx commented Aug 30, 2018

Hmmm, the issues @JanuszL pointed out aside, I don't think there is a reason to use prefetch queue with DALI @gon9031 . DALI provides you with prefetching out of the box, without any additional work.

@JanuszL JanuszL closed this as completed Oct 4, 2018
@JanuszL
Copy link
Contributor

JanuszL commented Oct 8, 2018

One more comment, after all it looks like that defining labels shape cause additional problems in some networks. We will back out that change, you can address your problem calling:

label = tf.reshape(label, (BATCH_SIZE,))

For the output form the daliop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants