In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn 
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl,np,sklearn,tf,keras:
    print(module.__name__,module.__version__)

2.1.0
sys.version_info(major=3, minor=6, micro=9, releaselevel='final', serial=0)
matplotlib 3.2.1
numpy 1.18.2
sklearn 0.22.2.post1
tensorflow 2.1.0
tensorflow_core.python.keras.api._v2.keras 2.2.4-tf


In [8]:
source_dir="./generate_csv/"
#print(os.listdir(source_dir))

def get_filename_by_prefix(source_dir,prefix_name):
    all_files=os.listdir(source_dir)
    results=[]
    for filename in all_files:
        if filename.startswith(prefix_name):
            results.append(os.path.join(source_dir,filename))
    return results
train_filenames=get_filename_by_prefix(source_dir,"train")
valid_filenames=get_filename_by_prefix(source_dir,"valid")
test_filenames=get_filename_by_prefix(source_dir,"test")

import pprint
pprint.pprint(train_filename)
pprint.pprint(valid_filename)
pprint.pprint(test_filename)

['./generate_csv/train_09.csv',
 './generate_csv/train_18.csv',
 './generate_csv/train_14.csv',
 './generate_csv/train_16.csv',
 './generate_csv/train_12.csv',
 './generate_csv/train_00.csv',
 './generate_csv/train_11.csv',
 './generate_csv/train_10.csv',
 './generate_csv/train_03.csv',
 './generate_csv/train_02.csv',
 './generate_csv/train_15.csv',
 './generate_csv/train_06.csv',
 './generate_csv/train_07.csv',
 './generate_csv/train_17.csv',
 './generate_csv/train_01.csv',
 './generate_csv/train_19.csv',
 './generate_csv/train_04.csv',
 './generate_csv/train_13.csv',
 './generate_csv/train_05.csv',
 './generate_csv/train_08.csv']
['./generate_csv/valid_00.csv',
 './generate_csv/valid_01.csv',
 './generate_csv/valid_09.csv',
 './generate_csv/valid_06.csv',
 './generate_csv/valid_05.csv',
 './generate_csv/valid_03.csv',
 './generate_csv/valid_07.csv',
 './generate_csv/valid_02.csv',
 './generate_csv/valid_04.csv',
 './generate_csv/valid_08.csv']
['./generate_csv/test_06.csv',
 './gener

In [10]:
def parse_csv_line(line,n_fields=9):
    defs=[tf.constant(np.nan)]*n_fields
    parsed_fields=tf.io.decode_csv(line,record_defaults=defs)
    x=tf.stack(parsed_fields[0:-1])
    y=tf.stack(parsed_fields[-1:])
    return x,y

def csv_reader_dataset(filename,n_readers=5,bs=32,n_parse_threads=5,shuffle_buffer_size=10000):
    dataset=tf.data.Dataset.list_files(filename)
    dataset=dataset.repeat()
    dataset=dataset.interleave(
        lambda filename:tf.data.TextLineDataset(filename).skip(1),
        cycle_length=n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset=dataset.map(parse_csv_line,num_parallel_calls=n_parse_threads)
    dataset=dataset.batch(bs)
    return dataset

bs=32
train_set=csv_reader_dataset(train_filenames,bs=bs)
valid_set=csv_reader_dataset(valid_filenames,bs=bs)
test_set=csv_reader_dataset(test_filenames,bs=bs)

In [23]:
def serialize_example(x,y):
    """Converts x,y to tf.train.Example and serialize"""
    input_features=tf.train.FloatList(value=x)
    label=tf.train.FloatList(value=y)
    features=tf.train.Features(
        feature={
            "input_featrues": tf.train.Feature(
                float_list=input_features),
            "label": tf.train.Feature(float_list=label)
        }
    )
    example=tf.train.Example(features=features)
    return example.SerializeToString()

def csv_dataset_to_tfrecords(base_filename,dataset,n_shards,steps_per_shard,compression_type=None):
    options= tf.io.TFRecordOptions(compression_type=compression_type)
    all_filenames=[]
    for shard_id in range(n_shards):
        filename_fullpath='{}_{:05d}-of-{:05d}'.format(base_filename, shard_id,n_shards)
        with tf.io.TFRecordWriter(filename_fullpath,options) as writer:
            for x_batch,y_batch in dataset.take(steps_per_shard):
                for x_example,y_example in zip(x_batch,y_batch):
                    writer.write(serialize_example(x_example,y_example))
        all_filenames.append(filename_fullpath)
    return all_filenames

In [24]:
n_shards=20
train_steps_per_shard=11610//bs//n_shards
valid_steps_per_shard=3880//bs//n_shards
test_steps_per_shard=5170//bs//n_shards

output_dir="generate_tfrecords"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    
train_basename=os.path.join(output_dir,"train")
valid_basename=os.path.join(output_dir,"valid")
test_basename=os.path.join(output_dir,"test")

train_tfrecord_filenames=csv_dataset_to_tfrecords(train_basename,train_set,n_shards,train_steps_per_shard,None)
valid_tfrecord_filenames=csv_dataset_to_tfrecords(valid_basename,train_set,n_shards,valid_steps_per_shard,None)
test_tfrecord_filenames=csv_dataset_to_tfrecords(test_basename,train_set,n_shards,test_steps_per_shard,None)

In [25]:
n_shards=20
train_steps_per_shard=11610//bs//n_shards
valid_steps_per_shard=3880//bs//n_shards
test_steps_per_shard=5170//bs//n_shards

output_dir="generate_tfrecords_zip"
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
    
train_basename=os.path.join(output_dir,"train")
valid_basename=os.path.join(output_dir,"valid")
test_basename=os.path.join(output_dir,"test")

train_tfrecord_filenames=csv_dataset_to_tfrecords(train_basename,train_set,n_shards,train_steps_per_shard,compression_type="GZIP")
valid_tfrecord_filenames=csv_dataset_to_tfrecords(valid_basename,train_set,n_shards,valid_steps_per_shard,compression_type="GZIP")
test_tfrecord_filenames=csv_dataset_to_tfrecords(test_basename,train_set,n_shards,test_steps_per_shard,compression_type="GZIP")

In [26]:
pprint.pprint(train_tfrecord_filenames)
pprint.pprint(valid_tfrecord_filenames)
pprint.pprint(test_tfrecord_filenames)

['generate_tfrecords_zip/train_00000-of-00020',
 'generate_tfrecords_zip/train_00001-of-00020',
 'generate_tfrecords_zip/train_00002-of-00020',
 'generate_tfrecords_zip/train_00003-of-00020',
 'generate_tfrecords_zip/train_00004-of-00020',
 'generate_tfrecords_zip/train_00005-of-00020',
 'generate_tfrecords_zip/train_00006-of-00020',
 'generate_tfrecords_zip/train_00007-of-00020',
 'generate_tfrecords_zip/train_00008-of-00020',
 'generate_tfrecords_zip/train_00009-of-00020',
 'generate_tfrecords_zip/train_00010-of-00020',
 'generate_tfrecords_zip/train_00011-of-00020',
 'generate_tfrecords_zip/train_00012-of-00020',
 'generate_tfrecords_zip/train_00013-of-00020',
 'generate_tfrecords_zip/train_00014-of-00020',
 'generate_tfrecords_zip/train_00015-of-00020',
 'generate_tfrecords_zip/train_00016-of-00020',
 'generate_tfrecords_zip/train_00017-of-00020',
 'generate_tfrecords_zip/train_00018-of-00020',
 'generate_tfrecords_zip/train_00019-of-00020']
['generate_tfrecords_zip/valid_00000-of-

In [29]:
expected_features={
    "input_features": tf.io.FixedLenFeature([8],dtype=tf.float32),
    "label": tf.io.FixedLenFeature([1],dtype=tf.float32)
}

def parse(serialized_example):
    example= tf.io.parse_single_example(serialized_example,expected_features)
    return example["input_features"],example["label"]

def tfrecords_reader_dataset(filename,n_readers=5,bs=32,n_parse_threads=5,shuffle_buffer_size=10000):
    dataset=tf.data.Dataset.list_files(filename)
    dataset=dataset.repeat()
    dataset=dataset.interleave(
        lambda filename:tf.data.TFRecordDataset(filename,compression_type="GZIP"),
        cycle_length=n_readers
    )
    dataset.shuffle(shuffle_buffer_size)
    dataset=dataset.map(parse_csv_line,num_parallel_calls=n_parse_threads)
    dataset=dataset.batch(bs)
    return dataset

tfrecord_train=tfrecords_reader_dataset(train_tfrecord_filenames,bs=3)

for x_batch,y_batch in tfrecord_train.take(2):
    print(x_batch)
    print(y_batch)

InvalidArgumentError: Unquoted fields cannot have quotes/CRLFs inside
	 [[{{node DecodeCSV}}]]

In [31]:
bs=32
tfrecords_train_set=tfrecords_reader_dataset(train_tfrecord_filenames,bs=bs)
tfrecords_valid_set=tfrecords_reader_dataset(valid_tfrecord_filenames,bs=bs)
tfrecords_test_set=tfrecords_reader_dataset(test_tfrecord_filenames,bs=bs)

In [35]:

model=keras.models.Sequential([
    keras.layers.Dense(30,activation='relu',input_shape=[8]),
    keras.layers.Dense(1),
])

model.summary()
model.compile(loss='mean_squared_error',optimizer="sgd")

callbacks=[keras.callbacks.EarlyStopping(patience=5,min_delta=1e-2)]



history=model.fit(tfrecords_train_set,validation_data=tfrecords_valid_set,steps_per_epoch=11160//bs,validation_steps=3870//bs,epochs=100,callbacks=callbacks)

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 30)                270       
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 31        
Total params: 301
Trainable params: 301
Non-trainable params: 0
_________________________________________________________________
Train for 348 steps, validate for 120 steps
Epoch 1/100


InvalidArgumentError:  Unquoted fields cannot have quotes/CRLFs inside
	 [[{{node DecodeCSV}}]]
	 [[IteratorGetNext]] [Op:__inference_distributed_function_1806975]

Function call stack:
distributed_function


In [None]:
model.evaluate(test_set,steps=5169//bs)