In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline

import numpy as np
import pandas as pd
import sklearn
import os
import time
import sys
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)
print(sys.version_info)

for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)

2.0.0
sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)
matplotlib 3.1.2
numpy 1.17.4
pandas 0.25.3
sklearn 0.22
tensorflow 2.0.0
tensorflow_core.keras 2.2.4-tf


In [6]:
source_dir = './generate_csv/'

def get_filenamed_by_prefix(source_dir, prefix_name):
    all_files = os.listdir(source_dir)
    results = []
    for filename in all_files:
        if filename.startswith(prefix_name):
            results.append(os.path.join(source_dir, filename))
    return results
train_filenames = get_filenamed_by_prefix(source_dir, 'train')
valid_filenames = get_filenamed_by_prefix(source_dir, 'valid')
test_filenames = get_filenamed_by_prefix(source_dir, 'test')
print(train_filenames)

['./generate_csv/train_15.csv', './generate_csv/train_01.csv', './generate_csv/train_00.csv', './generate_csv/train_14.csv', './generate_csv/train_02.csv', './generate_csv/train_16.csv', './generate_csv/train_17.csv', './generate_csv/train_03.csv', './generate_csv/train_07.csv', './generate_csv/train_13.csv', './generate_csv/train_12.csv', './generate_csv/train_06.csv', './generate_csv/train_10.csv', './generate_csv/train_04.csv', './generate_csv/train_05.csv', './generate_csv/train_11.csv', './generate_csv/train_08.csv', './generate_csv/train_09.csv', './generate_csv/train_19.csv', './generate_csv/train_18.csv']


In [7]:
def parse_csv_line(line, n_fields=9):
    defs = [tf.constant(np.nan)] * n_fields
    parsed_fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(parsed_fields[0:-1])
    y = tf.stack(parsed_fields[-1:])
    return x, y

def csv_reader_dataset(filenames, n_readers=5, batch_size=32, n_parse_threads=5, shuffle_buffer_size=10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TextLineDataset(filename).skip(1),
        cycle_length = n_readers)
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_csv_line, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

batch_size =32
train_set = csv_reader_dataset(train_filenames)
valid_set = csv_reader_dataset(valid_filenames)
test_set = csv_reader_dataset(test_filenames)

In [9]:
def serialize_example(x, y):
    """converts x, y to tf.train.Example and serialize"""
    input_features = tf.train.FloatList(value = x)
    label = tf.train.FloatList(value = y)
    features = tf.train.Features(
        feature = {
            "input_features": tf.train.Feature(float_list = input_features),
            "label": tf.train.Feature(float_list = label)
        }
    )
    example = tf.train.Example(features = features)
    return example.SerializeToString()


def csv_dataset_to_tfrecords(base_filename, dataset, n_shards, steps_per_shard, 
                            compression_type = None):
    options = tf.io.TFRecordOptions(compression_type = compression_type)
    all_filenames = []
    for shard_id in range(n_shards):
        filename_fullpath = '{}_{:05d}-of-{:05d}'.format(base_filename, shard_id, n_shards)
        with tf.io.TFRecordWriter(filename_fullpath, options) as writer:
            for x_batch, y_batch in dataset.take(steps_per_shard):
                for x_example, y_example in zip(x_batch, y_batch):
                    writer.write(serialize_example(x_example, y_example))
        all_filenames.append(filename_fullpath)
    return all_filenames
    

In [10]:
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards

output_dir = 'generate_tfrecords'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

train_basename = os.path.join(output_dir, 'train')
valid_basename = os.path.join(output_dir, 'valid')
test_basename = os.path.join(output_dir, 'test')

train_tfrecord_filnames = csv_dataset_to_tfrecords(
    train_basename, train_set, n_shards, train_steps_per_shard, None)
valid_tfrecord_filenames = csv_dataset_to_tfrecords(
    valid_basename, valid_set, n_shards, valid_steps_per_shard, None)
test_tfrecord_filenames = csv_dataset_to_tfrecords(
    test_basename, test_set, n_shards, test_steps_per_shard, None)

In [11]:
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards

output_dir = 'generate_tfrecords_zip'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)

train_basename = os.path.join(output_dir, 'train')
valid_basename = os.path.join(output_dir, 'valid')
test_basename = os.path.join(output_dir, 'test')

train_tfrecord_filnames = csv_dataset_to_tfrecords(
    train_basename, train_set, n_shards, train_steps_per_shard, compression_type='GZIP')
valid_tfrecord_filenames = csv_dataset_to_tfrecords(
    valid_basename, valid_set, n_shards, valid_steps_per_shard, compression_type='GZIP')
test_tfrecord_filenames = csv_dataset_to_tfrecords(
    test_basename, test_set, n_shards, test_steps_per_shard, compression_type='GZIP')

In [14]:
expected_features = {
    'input_features': tf.io.FixedLenFeature([8], dtype=tf.float32),
    'label': tf.io.FixedLenFeature([1], dtype=tf.float32)
}

def parse_example(serialized_example):
    example = tf.io.parse_single_example(serialized_example, expected_features)
    return example['input_features'], example['label']

def tfrecords_reader_dataset(filenames, n_readers = 5, batch_size = 32, n_parse_threads = 5,
                            shuffle_buffer_size = 10000):
    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.repeat()
    dataset = dataset.interleave(
        lambda filename: tf.data.TFRecordDataset(
        filename, compression_type = 'GZIP'),
        cycle_length = n_readers)
    dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(parse_example, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset

tfrecords_train = tfrecords_reader_dataset(train_tfrecord_filnames, batch_size=3)

for x_batch, y_batch in tfrecords_train.take(2):
    print(x_batch)
    print(y_batch)

tf.Tensor(
[[ 0.04326301 -1.0895426  -0.38878718 -0.10789865 -0.68186635 -0.0723871
  -0.8883662   0.8213992 ]
 [-0.82195884  1.8741661   0.1821235  -0.03170019 -0.6011179  -0.14337493
   1.0852206  -0.8613995 ]
 [-1.1157656   0.99306357 -0.334192   -0.06535219 -0.32893205  0.04343066
  -0.12785879  0.30707204]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[1.426]
 [1.054]
 [0.524]], shape=(3, 1), dtype=float32)
tf.Tensor(
[[-1.119975   -1.3298433   0.14190045  0.4658137  -0.10301778 -0.10744184
  -0.7950524   1.5304717 ]
 [-1.0591781   1.3935647  -0.02633197 -0.1100676  -0.6138199  -0.09695935
   0.3247131  -0.03747724]
 [-0.66722274 -0.04823952  0.34529406  0.53826684  1.8521839  -0.06112538
  -0.8417093   1.5204847 ]], shape=(3, 8), dtype=float32)
tf.Tensor(
[[0.66 ]
 [0.672]
 [1.59 ]], shape=(3, 1), dtype=float32)


In [None]:
batch_size = 32
tfrecords_train_set = tfrecords_reader_dataset(train_tfrecord_filnames, batch_size=batch_size)
tfrecords_valid_set = tfrecords_reader_dataset(valid_tfrecord_filenames, batch_size=batch_size)
tf