Converts TFRecord data into numpy. Feel free to modify based on your needs.

Data is saved into pickle files. Every file contains a list of samples. # of the samples in a file can be set via config['num_samples_in_numpy_list']. 

Loading:
data = pickle.load(open(<i>path-to-pkl-file</i>, 'rb'))

Each sample is a dictionary with the following fields:
<ol>
  <li>'label': label of the gesture. A unique ID in {0,1,..,19},</li>
  <li>'length': length of the gesture sequence, i.e., # of frames,</li>
  <li>'depth': tensor of depth images (length, height, width, 1),</li>
  <li>'skeleton': tensor of skeleton joints (length, 180),</li>
  <li>'rgb': tensor of rgb images (length, height, width, 3),</li>
  <li>'segmentation': tensor of segmentation masks (length, height, width, 3).</li>
</ol>


Note that samples have different number of frames.

In [13]:
import tensorflow as tf
import numpy as np
import os
import time
import datetime
%matplotlib inline
import matplotlib.pyplot as plt
import pickle

In [14]:
def applyMask(img, segmentedUser):
    """
    Applies mask on the given image for visualization.
    """
    if len(img.shape) > 2: # Color image
        mask3 = segmentedUser > 150
        masked_img = img * mask3
    else:
        mask2 = np.mean(segmentedUser, axis=2) > 150
        masked_img = img * mask2
    return masked_img

def read_and_decode_sequence(filename_queue, config):
    # Create a TFRecordReader.
    readerOptions = tf.python_io.TFRecordOptions(compression_type=tf.python_io.TFRecordCompressionType.GZIP)
    reader = tf.TFRecordReader(options=readerOptions)
    _, serialized_example = reader.read(filename_queue)
    
    # Read one sequence sample.
    # The training and validation files contains the following fields:
    # - label: label of the sequence which take values between 1 and 20.
    # - length: length of the sequence, i.e., number of frames.
    # - depth: sequence of depth images. [length x height x width x numChannels]
    # - rgb: sequence of rgb images. [length x height x width x numChannels]
    # - segmentation: sequence of segmentation maskes. [length x height x width x numChannels]
    # - skeleton: sequence of flattened skeleton joint positions. [length x numJoints]
    #
    # The test files doesn't contain "label" field.
    # [height, width, numChannels] = [80, 80, 3]
    with tf.name_scope("TFRecordDecoding"):
        context_encoded, sequence_encoded = tf.parse_single_sequence_example(
                serialized_example,
                # "label" and "lenght" are encoded as context features. 
                context_features={
                    "label": tf.FixedLenFeature([], dtype=tf.int64),
                    "length": tf.FixedLenFeature([], dtype=tf.int64)
                },
                # "depth", "rgb", "segmentation", "skeleton" are encoded as sequence features.
                sequence_features={
                    "depth": tf.FixedLenSequenceFeature([], dtype=tf.string),
                    "rgb": tf.FixedLenSequenceFeature([], dtype=tf.string),
                    "segmentation": tf.FixedLenSequenceFeature([], dtype=tf.string),
                    "skeleton": tf.FixedLenSequenceFeature([], dtype=tf.string),
                })

        # Fetch required data fields. 
        seq_rgb = tf.decode_raw(sequence_encoded['rgb'], tf.uint8)
        seq_depth = tf.decode_raw(sequence_encoded['depth'], tf.uint8)
        seq_segmentation = tf.decode_raw(sequence_encoded['segmentation'], tf.uint8)
        seq_skeleton = tf.decode_raw(sequence_encoded['skeleton'], tf.float32)
        seq_len = tf.to_int32(context_encoded['length'])
        seq_label = context_encoded['label']
        # Tensorflow requires the labels start from 0. Before you create submission csv,
        # increment the predictions by 1.
        seq_label = seq_label - 1
        
        
        #[seq_len, num_skeleton_joints]
        seq_skeleton = tf.reshape(seq_skeleton, (seq_len, 180))
        
        # Reshape images.
        #[seq_len, height, width, num_channels]
        seq_rgb = tf.to_float(tf.reshape(seq_rgb, (-1, config['img_height'], config['img_width'], 3)))
        seq_depth = tf.to_float(tf.reshape(seq_depth, (-1, config['img_height'], config['img_width'], 1)))
        seq_segmentation = tf.to_float(tf.reshape(seq_segmentation, (-1, config['img_height'], config['img_width'], 3)))
    
        sample = {}
        sample['rgb'] = seq_rgb
        sample['depth'] = seq_depth
        sample['segmentation'] = seq_segmentation
        sample['skeleton'] = seq_skeleton
        sample['seq_len'] = seq_len
        sample['labels'] = seq_label
        
        return sample
    

def input_pipeline(filenames, config):
    with tf.name_scope("input_pipeline"):
        # Create a queue of TFRecord input files.
        filename_queue = tf.train.string_input_producer(filenames, num_epochs=config['num_epochs'], shuffle=True)
        # Read the data from TFRecord files, decode and create a list of data samples by using threads.
        sample_list = [read_and_decode_sequence(filename_queue, config) for _ in range(config['ip_num_read_threads'])]
        # Create batches.
        # Since the data consists of variable-length sequences, allow padding by setting dynamic_pad parameter.
        # "batch_join" creates batches of samples and pads the sequences w.r.t the max-length sequence in the batch.
        # Hence, the padded sequence length can be different for different batches.
        batch_sample = tf.train.batch_join(sample_list,
                                            batch_size=config['batch_size'],
                                            capacity=config['ip_queue_capacity'],
                                            enqueue_many=False,
                                            dynamic_pad=True,
                                            name="batch_join_and_pad")

        return batch_sample

In [15]:
config = {}
# TODO: You can change these fields.
config['input_dir'] = "/Users/zhou/Machine_Perception/mp18-dynamic-gesture-recognition/train" # Directory of the tfrecords.
config['input_file_format'] = "dataTrain_%d.tfrecords" # File naming
config['input_file_ids'] = list(range(1,5)) # File IDs to be used for training.

config['num_samples_in_numpy_list'] = 100 # Put 100 samples in a pickle data file. You can put everything in a single file as well.
config['output_dir'] = config['input_dir']
config['output_file_format'] = config['input_file_format'].split(".")[0]+".pkl"
config['output_file_start_id'] = 1

# Keep these fields fixed.
config['img_height'] = 80
config['img_width'] = 80
config['img_num_channels'] = 3
config['num_epochs'] = 1
config['batch_size'] = 1
# Capacity of the queue which contains the samples read by data readers.
# Make sure that it has enough capacity.
config['ip_queue_capacity'] = config['batch_size']*10  
config['ip_num_read_threads'] = 1


# Create a list of TFRecord input files.
filenames = [os.path.join(config['input_dir'], config['input_file_format'] % i) for i in config['input_file_ids']]

# Create data loading operators. This will be represented as a node in the computational graph.
# Each <key,value> pair in `batch_sample_dict_op` corresponds to Tensorflow placeholder. Alternatively we could 
# load data into memory and feed to the model by using feed_dict approach.
batch_sample_dict_op = input_pipeline(filenames, config)

# Create tensorflow session and initialize the variables (if any).
sess = tf.Session()
init_op = tf.group(tf.global_variables_initializer(),tf.local_variables_initializer())
sess.run(init_op)
# Create threads to prefetch the data.
# https://www.tensorflow.org/programmers_guide/reading_data#creating_threads_to_prefetch_using_queuerunner_objects
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

In [16]:
np_file_id = config['output_file_start_id']
output_list = []
num_samples_read = 0
try:
    while not coord.should_stop():
        batch_sample_dict = sess.run(batch_sample_dict_op)
        num_samples_read += 1
        data_sample = {}
        data_sample['rgb'] = batch_sample_dict['rgb'][0] # Data is in batch format. Get rid of the first dimension.
        data_sample['depth'] = batch_sample_dict['depth'][0]
        data_sample['segmentation'] = batch_sample_dict['segmentation'][0]
        data_sample['skeleton'] = batch_sample_dict['skeleton'][0]
        data_sample['labels'] = batch_sample_dict['labels'][0]
        data_sample['seq_len'] = batch_sample_dict['seq_len'][0]
        output_list.append(data_sample)
        
        if num_samples_read%config['num_samples_in_numpy_list'] == 0:
            pickle.dump(output_list, open(os.path.join(config['output_dir'], config['output_file_format'] % np_file_id), 'wb'))
            np_file_id += 1
            output_list = []
        
except tf.errors.OutOfRangeError:
    # Save last run.
    if len(output_list) > 0:
        pickle.dump(output_list, open(os.path.join(config['output_dir'], config['output_file_format'] % np_file_id), 'wb'))
        output_list = []
    print('Done.')
finally:
    # When done, ask the threads to stop.
    coord.request_stop()

# Wait for threads to finish.
coord.join(threads)

INFO:tensorflow:Error reported to Coordinator: <class 'tensorflow.python.framework.errors_impl.NotFoundError'>, Path to training split./dataTrain_3.tfrecords; No such file or directory
	 [[Node: input_pipeline/ReaderReadV2 = ReaderReadV2[_device="/job:localhost/replica:0/task:0/device:CPU:0"](input_pipeline/TFRecordReaderV2, input_pipeline/input_producer)]]
Done.


NotFoundError: Path to training split./dataTrain_3.tfrecords; No such file or directory
	 [[Node: input_pipeline/ReaderReadV2 = ReaderReadV2[_device="/job:localhost/replica:0/task:0/device:CPU:0"](input_pipeline/TFRecordReaderV2, input_pipeline/input_producer)]]