In [47]:
import os
import numpy as np
import h5py
import pandas as pd
from pathlib import Path
import tensorflow as tf

## TensorFlow Records (TFRecords)
TFRecords are TensorFlow specific objects for quickly and efficiently storing data on disk and reading it back in for training. They use [Protocol Buffers](https://developers.google.com/protocol-buffers) (a google code project) to efficiently store different data types. TensorFlow introduced some convenience (sorta) functions to work with protocol buffers without directly using the base library. This is better explained with an example.

So first the dataset which we will read in with Pandas (cause Pandas makes it easy to look at). The input features are some variables done to a plane simulation to alter magnetometer readings. 

In [48]:
inputs = pd.read_csv('/opt/data/MagData/EE699_IN.txt')
inputs

Unnamed: 0,cosX,cosY,cosZ,flap angle(rad),rudder angle(rad)
0,-0.364360,-0.23860,0.900170,0.783300,-0.29378
1,-0.516630,-0.56340,0.644730,0.343350,0.59530
2,-0.037998,-0.99223,-0.118450,0.588880,-0.46642
3,0.629490,-0.18223,0.755340,-0.115410,-0.38319
4,-0.500480,0.11948,0.857470,-0.055621,0.52118
...,...,...,...,...,...
9995,-0.139920,0.91155,0.386650,0.226880,0.33034
9996,0.577470,0.73689,-0.351450,-0.191640,0.68059
9997,0.087447,-0.93206,-0.351580,0.181340,-0.50669
9998,0.206470,0.97796,0.030993,-0.669570,0.13727


Out output target data is the magentic field reading of the magnetometer.

In [49]:
outputs = pd.read_csv('/opt/data/MagData/EE699_Out.txt')
outputs

Unnamed: 0,Magnetic Field (nT)
0,-112.69
1,844.88
2,1851.20
3,-1279.80
4,-246.01
...,...
9995,-703.47
9996,-871.67
9997,1929.20
9998,-932.63


In [50]:
inputs.mean()

cosX                 -0.016558
 cosY                -0.006312
 cosZ                 0.354091
 flap angle(rad)      0.002300
 rudder angle(rad)   -0.005623
dtype: float64

In [51]:
outputs.mean()

Magnetic Field (nT)   -0.000074
dtype: float64

In [52]:
inputs.std()

cosX                  0.603413
 cosY                 0.598194
 cosZ                 0.390447
 flap angle(rad)      0.450745
 rudder angle(rad)    0.454901
dtype: float64

In [53]:
outputs.std()

Magnetic Field (nT)    1172.838237
dtype: float64

Next we would usually split the data into train, valid, test and preprocess it by normalization etc. but for right now we are only interested in making our generator so assume we do that here

In [54]:
# we did some preprocessing by turning into numpy arrays
inputs = inputs.to_numpy()
outputs = outputs.to_numpy()

inputs.shape, outputs.shape

((10000, 5), (10000, 1))

Later we will need to know the types of these two arrays so lets print it out here

In [55]:
inputs.dtype, outputs.dtype

(dtype('float64'), dtype('float64'))

# TFRecord helper functions

Before we get into how we make tfrecords objects and files we will define these helper functions right from the tfrecords [webpage](https://www.tensorflow.org/tutorials/load_data/tfrecord)

These functions allow us to turn primitives like an `int` into `Feature` objects for the tfrecords.

In [56]:
# The following functions can be used to convert a value to a type compatible
# with tf.train.Example.

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()  # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))


def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

## Made Dataset into TFRecords
Now we will take our data and store it in tfrecord files. The idea is we split our data into different files which can be read in parallel. For specifics on the number of files and sizes see the tfrecords [webpage](https://www.tensorflow.org/tutorials/load_data/tfrecord)

Our basic gameplan will be to loop over each file we want to make, and for each file we will loop over samples writing them one at a time to the file via a writer.

One thing to note is that our helper function expect a single float, int64, or some byte and not a list. Thus, if we want to store a list of values we need to make our own helper functions that work with lists or just convert everything to bytes (which is useful if the data is not 1D but ND)

In [57]:
# make our data path if it doesn't exist
out_path = Path('/opt/data/MagData/tfrecords/')
if not os.path.exists(out_path):
    os.makedirs(out_path)

# set the numer of records we want
records_per_file = 1000

# the total number of samples we are going to write
n_samples = inputs.shape[0]

# how many tfrecord files we will get
n_tfrecord_files = int(np.ceil(n_samples / records_per_file))

# a list to store name of each of the files we are going to write
record_files = [out_path / f'train{idx}.tfrecord' for idx in range(n_tfrecord_files)]

# Iterate over all the files we are going to make
for idx, record_file in enumerate(record_files):
    # make a slice for the current samples we are going to write to the file
    slicer = slice(idx * records_per_file, (idx + 1) * records_per_file)
    
    # open a tfrecordwriter to write to the record
    with tf.io.TFRecordWriter(str(record_file)) as writer:
        # individually loop over all the samples from the slice
        for x_sample, y_sample in zip(inputs[slicer], outputs[slicer]):
            # must convert any array types to bytes for storage if we do not want to store values individually
            x_sample_bytes = x_sample.tobytes()
            # Create a dict with the data we want to save in the
            # TFRecords file. You can add more relevant data here.
            data = {
                'input': _bytes_feature(x_sample_bytes),
                'output': _float_feature(y_sample)
            }
            # Wrap the data as TensorFlow Features.
            feature = tf.train.Features(feature=data)
            # Wrap again as a TensorFlow Example.
            example = tf.train.Example(features=feature)
            # Serialize the data.
            serialized = example.SerializeToString()
            # Write the serialized data to the TFRecords file.
            writer.write(serialized)

Now to prove that we are reading from the TFRecords and not using the original data I am going to delete the original inputs and outputs variables

In [58]:
del inputs
del outputs

try:
    print(features)
except NameError:
    print("features does not exist")

features does not exist


## Open our TFRecords
To open the tfrecords we will make a TFRecordDataset object. The TFRecordDataset simply wants the file names for each tfrecord and some info on how to extract the data. However we cannot do anything with the data until we tell the dataset how to parse the data. For this we are going to apply use the `map` function to apply the `_parse_function` to each individual sample. 

the `_parse_function` will have to know how we stored the data in order to parse it out. For example, the parse function needs to know that our `input` feature was a vector of `float64` in order to decode the bytes into the right type. If we tried to decode into `float32` we would get an error. NOTE: the type issue is because we converted everything to bytes and have lost the original typing.

In [59]:
def _parse_function(serialized):
    features = {
        'input': tf.io.FixedLenFeature([], tf.string),
        'output': tf.io.FixedLenFeature([], tf.float32)
    }
    # Parse the serialized data so we get a dict with our data.
    parsed_example = tf.io.parse_single_example(serialized=serialized, features=features)
    # cast the example to our data type
    
    # the vector of x input data
    x_sample_bytes = parsed_example['input']
    # Decode the raw bytes so it becomes a tensor with type.
    # we gave it float64 so thats what we need to decode
    x_sample_float64 = tf.io.decode_raw(x_sample_bytes, tf.float64)
    # however we usually want float32 for ANNs
    x_sample_float32 = tf.cast(x_sample_float64, tf.float32)
    x_sample = x_sample_float32
    
    # we don't need to do anything special to our y output data because it is one value
    y_sample = parsed_example['output']
    
    # return our samples as a tuple
    return x_sample, y_sample

# annoyingly the TFRecordDataset function wants string not cool Path objects
filename_str = [str(filename) for filename in record_files]
# create a tfrecord dataset from the list of tfrecord files
dataset = tf.data.TFRecordDataset(filenames=filename_str,
                                  compression_type=None,
                                  buffer_size=None,
                                  num_parallel_reads=tf.data.experimental.AUTOTUNE)
# Parse the serialized data in the TFRecords files.
# This returns TensorFlow tensors for the image and labels.
dataset = dataset.map(map_func=_parse_function,
                      num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset

<ParallelMapDataset shapes: ((None,), ()), types: (tf.float32, tf.float32)>

So our dataset objec is made but how do we acutally get anything form it?

## The Actual Generator
Anyway we didn't come here to store and read data (even though that may be useful for future reference) we actually want a generator. So lets do that next.

Actually making a generator out of our dataset is not necessary when using keras to train but it is *very* useful for debugging. The easiest way is to make a numpy iterator from our dataset, and use the `next()` function to get a sample.

In [60]:
np_iterator = dataset.as_numpy_iterator()
sample = next(np_iterator)
sample

(array([-0.36436, -0.2386 ,  0.90017,  0.7833 , -0.29378], dtype=float32),
 -112.69)

Now we can see our sample is a tuple of an array of numbers and a single number. This corresponds to our features and label we will use for training. 

# Building a Pipeline
As our dataset is right now we would not be able to do any useful training. For example, we have not scaled our data or but it into batches. We only get one sample at a time. We could do this upfront in storing the data but then we could not change our scaling or batch size without rewriting the data to disk. An easier method is to chain dataset objects together with pipeline functions. 

For example here is how to get batches of data:

In [61]:
dataset_batch = dataset.batch(batch_size=10)
np_iterator = dataset_batch.as_numpy_iterator()
sample = next(np_iterator)
sample

(array([[-0.36436  , -0.2386   ,  0.90017  ,  0.7833   , -0.29378  ],
        [-0.49361  , -0.7813   , -0.382    , -0.64873  ,  0.50364  ],
        [-0.52344  ,  0.80814  ,  0.27003  , -0.49646  ,  0.4856   ],
        [-0.51663  , -0.5634   ,  0.64473  ,  0.34335  ,  0.5953   ],
        [ 0.26176  , -0.91461  ,  0.30819  ,  0.36011  ,  0.16335  ],
        [-0.39692  ,  0.91785  , -0.0011791,  0.77746  , -0.0076665],
        [-0.037998 , -0.99223  , -0.11845  ,  0.58888  , -0.46642  ],
        [-0.83825  ,  0.33201  ,  0.43256  , -0.26341  , -0.65432  ],
        [ 0.68502  , -0.51613  ,  0.51416  ,  0.45831  ,  0.75734  ],
        [ 0.62949  , -0.18223  ,  0.75534  , -0.11541  , -0.38319  ]],
       dtype=float32),
 array([ -112.69 ,  2644.5  ,   -44.774,   844.88 ,   744.13 ,    78.066,
         1851.2  ,   609.59 ,  -632.39 , -1279.8  ], dtype=float32))

See how our sample is now a batch of data? That is what our training expects in order to run. An interesting property of the pipeline functions is that they do not modify the original dataset (unless you overwrite the object). So next we can use the original dataset object to do shuffling and batching:

In [62]:
dataset_batch_shuffled = dataset.shuffle(buffer_size=10*10).batch(batch_size=10)
np_iterator = dataset_batch_shuffled.as_numpy_iterator()
sample = next(np_iterator)
sample

(array([[ 2.2899e-01, -8.3163e-01,  5.0593e-01,  6.2202e-01,  5.1776e-03],
        [-4.1377e-01, -8.2834e-01,  3.7768e-01, -2.9729e-01, -2.5149e-01],
        [ 7.4149e-01, -5.5486e-01, -3.7726e-01,  2.3469e-01, -2.3987e-01],
        [-6.9353e-01, -6.2053e-01,  3.6600e-01, -5.3542e-01, -1.5070e-01],
        [-2.3439e-01,  6.5775e-01,  7.1584e-01, -2.5533e-01, -1.4883e-04],
        [ 8.3862e-01, -5.1312e-02,  5.4229e-01, -6.4154e-01, -7.0942e-01],
        [ 8.9846e-01,  4.3139e-01,  8.1604e-02, -4.2677e-02, -5.0773e-01],
        [-8.5809e-01,  3.6642e-01, -3.5976e-01, -1.7021e-01,  4.2081e-01],
        [-5.2344e-01,  8.0814e-01,  2.7003e-01, -4.9646e-01,  4.8560e-01],
        [ 8.6083e-01, -1.8033e-01,  4.7587e-01, -8.3081e-02,  2.6939e-01]],
       dtype=float32),
 array([  355.17 ,  1460.3  ,   499.79 ,  1585.5  ,  -866.65 , -1382.9  ,
        -1321.9  ,  1515.2  ,   -44.774, -1173.   ], dtype=float32))

Note that before when we kept recalling our batch creator we would always get the same samples. Now we are shuffling a buffer of samples then putting them in batches.

One important point you might not realize is that our dataset is currently finite as this example will show. We are going to make batches of 1000 and try to get 11 batches will will go beyond our 10,000 samples

In [63]:
dataset_batch_shuffled = dataset.shuffle(buffer_size=10*10).batch(batch_size=1000)
np_iterator = dataset_batch_shuffled.as_numpy_iterator()
try:
    for i in range(11):
        sample = next(np_iterator)
    else:
        print("for loop ended")
except StopIteration:
    print("tried to get too many samples")

tried to get too many samples


To fix this we can call the repeat function which will *repeat* our data forever or a set number of times.

In [64]:
dataset_batch_shuffled_repeated = dataset.repeat().shuffle(buffer_size=10*10).batch(batch_size=1000)
np_iterator = dataset_batch_shuffled_repeated.as_numpy_iterator()
try:
    for i in range(11):
        sample = next(np_iterator)
    else:
        print("for loop ended")
except StopIteration:
    print("tried to get too many samples")

for loop ended


# Custom Pipeline
The other important function we wanted our pipeline to do was scaling. For this we can make a custom function to do our mapping for us. Note this is the same process we used to parse the data originally but with a different function.

In [65]:
# do the scaling on our image batches
def mapper(x_sample, y_sample):
    x_sample = tf.tensordot(x_sample, x_sample, axes=1) * x_sample
    y_sample = y_sample / 1000
    return x_sample, y_sample

dataset_scaled = dataset.map(map_func=mapper,
                                  num_parallel_calls=tf.data.experimental.AUTOTUNE)
np_iterator = dataset_scaled.as_numpy_iterator()
sample = next(np_iterator)
sample

(array([-0.6193609 , -0.40558657,  1.5301629 ,  1.3315002 , -0.49938482],
       dtype=float32),
 -0.11269)

It is important to note that where you put the scaling matters. For example above we scaled on a per sample basis. It would probably be more efficient to do this on a batch basis.

In [66]:
# do the scaling on our image batches
def mapper(x_sample, y_sample):
    x_sample = tf.tensordot(x_sample, x_sample, axes=1) * x_sample
    y_sample = y_sample / 1000
    return x_sample, y_sample

from tensorflow.errors import InvalidArgumentError
try:
    dataset_batch = dataset.batch(batch_size=100)
    dataset_scaled = dataset_batch.map(map_func=mapper,
                                      num_parallel_calls=tf.data.experimental.AUTOTUNE)
    np_iterator = dataset_scaled.as_numpy_iterator()
    sample = next(np_iterator)
    sample
except InvalidArgumentError:
    print('failed')


failed


What happened? apparently we messed up and our function doesn't work on a batch of data. This will happen if we are not careful with what shapes our tensors are.

For a working example tensordot was not what we needed and so we went with reduce_sum and multiply to achieve the same result over the batch

In [67]:
# do the scaling on our image batches
def mapper(x_sample, y_sample):
    x_sample = tf.math.reduce_sum(x_sample * x_sample, axis=1)[:, None] * x_sample
    y_sample = y_sample / 1000
    return x_sample, y_sample

dataset_batch = dataset.batch(batch_size=100)
dataset_scaled = dataset_batch.map(map_func=mapper,
                                  num_parallel_calls=tf.data.experimental.AUTOTUNE)
np_iterator = dataset_scaled.as_numpy_iterator()
sample_batched = next(np_iterator)
sample_batched

(array([[-6.19360924e-01, -4.05586571e-01,  1.53016293e+00,
          1.33150017e+00, -4.99384820e-01],
        [-8.26554060e-01, -1.30829334e+00, -6.39662206e-01,
         -1.08630371e+00,  8.43349397e-01],
        [-7.75882423e-01,  1.19788623e+00,  4.00258899e-01,
         -7.35890567e-01,  7.19793081e-01],
        [-7.60620952e-01, -8.29479158e-01,  9.49219227e-01,
          5.05505264e-01,  8.76444757e-01],
        [ 3.02692264e-01, -1.05763054e+00,  3.56382668e-01,
          4.16421592e-01,  1.88893571e-01],
        [-6.36857450e-01,  1.47268879e+00, -1.89186388e-03,
          1.24743319e+00, -1.23008853e-02],
        [-5.94411045e-02, -1.55216718e+00, -1.85293943e-01,
          9.21197951e-01, -7.29631066e-01],
        [-1.25529718e+00,  4.97192055e-01,  6.47767782e-01,
         -3.94462079e-01, -9.79858100e-01],
        [ 1.22181213e+00, -9.20577228e-01,  9.17063534e-01,
          8.17448676e-01,  1.35080314e+00],
        [ 7.30307877e-01, -2.11415589e-01,  8.76313746e-01,
    

To verify we did this correctly lets examine the first sample of the batch compared to our original sample. Remember our samples are tuples of the input and output so lets compare the inputs (using the zero index) since that is what gave us trouble

In [68]:
sample[0], sample_batched[0][0]

(array([-0.6193609 , -0.40558657,  1.5301629 ,  1.3315002 , -0.49938482],
       dtype=float32),
 array([-0.6193609 , -0.40558657,  1.5301629 ,  1.3315002 , -0.49938482],
       dtype=float32))

Comparing numbers manually is annoying lets use the `allclose` function from numpy

In [69]:
np.allclose(sample[0], sample_batched[0][0])

True