In [5]:
import itertools
import numpy as np
import pandas as pd
import tensorflow as tf

class FrameConverter:
    def __init__(self, X_transforms=[], y_transforms=[], repeat_count=1, n_parallel=1):
        self.filename_base = '/home/data/full/frame/{}{}.tfrecord'
        self.X_transforms = X_transforms
        self.y_transforms = y_transforms
        self.repeat_count = repeat_count
        self.n_parallel = n_parallel
        
        self.keys_to_features = {
            'rgb': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
            'audio': tf.FixedLenSequenceFeature([], tf.string, allow_missing=True),
        }
        self.key_to_label = {
            'labels': tf.VarLenFeature(tf.int64)
        }
        
    def get_data(self, filename):
        y, X = tf.parse_single_sequence_example(filename,
                                                self.key_to_label,
                                                self.keys_to_features)
        # X is still bytes; convert to float
        X['audio'] = tf.cast(tf.decode_raw(X['audio'], tf.uint8), tf.float32)
        X['rgb'] = tf.cast(tf.decode_raw(X['rgb'], tf.uint8), tf.float32)

        # now apply custom transformations
        for transform in self.X_transforms:
            X = transform(X)

        y = tf.sparse_to_dense(y['labels'].values, [3862], 1)
        for transform in self.y_transforms:
            y = transform(y)
        return X, y
    
    def make_provider(self, subset, record_indices):
        filenames = [self.filename_base.format(subset, index) for index in record_indices]
        
        dataset = tf.data.TFRecordDataset(filenames)
        dataset = dataset.map(self.get_data,
                              num_parallel_calls=self.n_parallel)
        dataset = dataset.repeat(self.repeat_count)
        dataset = dataset.shuffle(buffer_size=256)
            
        dataset = dataset.batch(1)
        dataset = dataset.prefetch(1)
        iterator = dataset.make_one_shot_iterator()
        return iterator
    
    def make_generator(self, subset, record_indices):
        self.provider = self.make_provider(subset, record_indices)
        sess = tf.Session()
        next_sample = self.provider.get_next()
        while True:
            try:
                yield sess.run(next_sample)
            except tf.errors.OutOfRangeError:
                print("Iterations exhausted")
                break
    
frame_converter = FrameConverter(n_parallel=3)
train_generator = frame_converter.make_generator('train', [2500, 2501, 2502])
valid_generator = frame_converter.make_generator('validate', [2000, 2001, 2002])

In [6]:
from tensorflow.python.keras.layers import Input, Dense, GRU, Flatten, Add
from tensorflow.python.keras.models import Model

n_classes = 3862
rgb_in = Input((None, 1024), name='rgb')
audio_in = Input((None, 128), name='audio')
rgb_mid = GRU(512, activation='relu')(rgb_in)
audio_mid = GRU(512, activation='relu')(audio_in)
combined_mid = Add()([rgb_mid, audio_mid])
out = Dense(32, activation='relu')(combined_mid)
out = Dense(n_classes, activation='softmax')(out)
model = Model([rgb_in, audio_in], out)
model.compile(loss='binary_crossentropy',
              optimizer='adam',
              metrics=['acc'])

In [3]:
import time
start_time = time.time()
model.fit_generator(train_generator, steps_per_epoch=30, epochs=2,
                    validation_data=valid_generator,
                    validation_steps=20)
print(time.time() - start_time)

Epoch 1/2
Epoch 2/2
26.148629426956177


In [7]:
import time
start_time = time.time()
model.fit_generator(train_generator, steps_per_epoch=30, epochs=2,
                    validation_data=valid_generator,
                    validation_steps=20)
print(time.time() - start_time)

Epoch 1/2
Epoch 2/2
26.875075101852417


In [8]:
import time
from glob import glob
from keras.models import Sequential
from keras.layers import Dense

import tensorflow as tf

def parser(record, training=True):
    """
    In training mode labels will be returned, otherwise they won't be
    """
    keys_to_features = {
        "mean_rgb": tf.FixedLenFeature([1024], tf.float32),
        "mean_audio": tf.FixedLenFeature([128], tf.float32)
    }
    
    if training:
        keys_to_features["labels"] =  tf.VarLenFeature(tf.int64)
    
    parsed = tf.parse_single_example(record, keys_to_features)
    x = tf.concat([parsed["mean_rgb"], parsed["mean_audio"]], axis=0)
    if training:
        y = tf.sparse_to_dense(parsed["labels"].values, [3862], 1)
        return x, y
    else:
        x = tf.concat([parsed["mean_rgb"], parsed["mean_audio"]], axis=0)
        return x
    
def make_datasetprovider(tf_records, repeats=1000, num_parallel_calls=12, 
                         batch_size=32): 
    """
    tf_records: list of strings - tf records you are going to use.
    repeats: how many times you want to iterate over the data.
    """
    dataset = tf.data.TFRecordDataset(tf_records)
    dataset = dataset.map(map_func=parser, num_parallel_calls=num_parallel_calls)
    dataset = dataset.repeat(repeats)

    dataset = dataset.shuffle(buffer_size=1000)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(batch_size)

    d_iter = dataset.make_one_shot_iterator()
    return d_iter

def data_generator(tf_records, batch_size=32, repeats=1000, num_parallel_calls=1, ):
    tf_provider = make_datasetprovider(tf_records, repeats=repeats, num_parallel_calls=num_parallel_calls,
                                       batch_size=batch_size)
    sess = tf.Session()
    next_el = tf_provider.get_next()
    while True:
        try:
          yield sess.run(next_el)
        except tf.errors.OutOfRangeError:
            print("Iterations exhausted")
            break
            
def fetch_model():
    model = Sequential()
    model.add(Dense(2048, activation="relu", input_shape=(1024 + 128,)))
    model.add(Dense(3862, activation="sigmoid"))
    model.compile("adam", loss="binary_crossentropy")
    return model

train_data = glob("/home/data/full//video/train2000.tfrecord")
eval_data = glob("/home/data/full/video/train2001.tfrecord")

my_train_iter = data_generator(train_data)
my_eval_iter = data_generator(eval_data)
model = fetch_model()

start_time = time.time()
model.fit_generator(my_train_iter,
                    steps_per_epoch=300,
                    epochs=10, 
                    validation_data=my_eval_iter, 
                    validation_steps=20)
print(time.time() - start_time)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
30.176326990127563
