## Tensorflow Dataset API

在Tensorflow中，feed-dict方式对Model传输数据速率慢， 使用管道输入(pipeline)传输速率比较快，而Tensorflow内置了一个API(tf.data)，它可以方便地处理数据以及高效地传输数据给Model。
这篇我将会讲述tf.data API基本机制和一些常用的操作。

tf.data API主要有两个对象：tf.data.Dataset和tf.data.Iterator  
tf.data.Dataset：存储一列表数据元素的Tensor对象。  
tf.data.Iterator： 提供访问Dataset的数据元素的主要方式  

### 大概提纲
* 创建数据集(Create Dataset)
* 创建迭代器(Craete Iterator)



#### 创建数据集(Craete Dataset)
Dataset用于存储我们的数据元素

In [0]:
import numpy as np
import tensorflow as tf

1. From Numpy

In [4]:
# 创建随机数据
x = np.random.sample((5,2))
# 创建Dataset
dataset = tf.data.Dataset.from_tensor_slices(x)

dataset

<DatasetV1Adapter shapes: (2,), types: tf.float64>

In [5]:
# 创建两个numpy array
x, y = np.random.sample((5,2)), np.random.sample((5,1))
# 两个numpy array同时创建Dataset
dataset = tf.data.Dataset.from_tensor_slices((x, y))

dataset

<DatasetV1Adapter shapes: ((2,), (1,)), types: (tf.float64, tf.float64)>

2. From Tensor对象

In [6]:
a = tf.random_uniform([5,2])
dataset = tf.data.Dataset.from_tensor_slices(a)

dataset

<DatasetV1Adapter shapes: (2,), types: tf.float32>

3. From Placeholder

In [7]:
# 通过placeholder来创建Dataset，可以动态改变数据的来源，在训练Model的时候非常有用，比如训练数据集和测试数据集
input = tf.placeholder(tf.float64, shape=[None, 2])
dataset = tf.data.Dataset.from_tensor_slices(input)

dataset

<DatasetV1Adapter shapes: (2,), types: tf.float64>

4. From CSV文件

In [8]:
# 通过CSV文件数据创建Dataset
csv_file = 'GSPC.csv'
dataset = tf.contrib.data.make_csv_dataset(csv_file, batch_size=32)

iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()

Date, Open = next_element['Date'], next_element['Open']

with tf.Session() as sess:
  print(sess.run([Date, Open]))


For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
If you depend on functionality not listed there, please file an issue.

Instructions for updating:
Use `tf.data.experimental.make_csv_dataset(...)`.
[array([b'2019-02-07', b'2019-02-19', b'2019-02-14', b'2019-02-27',
       b'2019-02-20', b'2019-03-05', b'2019-03-06', b'2019-02-25',
       b'2019-02-15', b'2019-02-26', b'2019-03-04', b'2019-02-21',
       b'2019-02-08', b'2019-02-12', b'2019-03-01', b'2019-02-13',
       b'2019-02-28', b'2019-02-11', b'2019-02-22', b'2019-02-13',
       b'2019-03-04', b'2019-02-20', b'2019-02-25', b'2019-02-12',
       b'2019-02-22', b'2019-02-21', b'2019-03-01', b'2019-02-07',
       b'2019-02-15', b'2019-02-14', b'2019-02-26', b'2019-02-28'],
      dtype=object), array([2717.53, 2769.28, 2743.5 , 2787.5 , 2779.05, 2794.41, 2790.27,
       2804.35, 2760.24, 2792.36, 2814.37, 2780.24, 2692.3

#### 创建迭代器(Create Iterator)
Iterator用于访问和获取Dataset的每一个数据元素

1. One-hot Iterator  
one-hot是最简单的迭代器(Iterator)，它可以处理大多数的数据管道输入(pipline)的情况。

In [9]:
# 创建Dataset
x = np.random.sample((5,2))
dataset = tf.data.Dataset.from_tensor_slices(x)

# 创建Iterator
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()

with tf.Session() as sess:
  for _ in range(5):
    # 获取每个元素
    print(sess.run(next_element))

[0.85679913 0.10143722]
[0.88175387 0.55230242]
[0.00634388 0.84738638]
[0.75764302 0.8207592 ]
[0.60181308 0.59532544]


2. Initializable Iterator  
它可以初始化不同数据的，但还是同一个Dataset， 比如训练数据集和测试数据集。  
它配合placeholder来使用

In [22]:
# 创建Dataset
input = tf.placeholder(tf.float64,shape=[None, 2])
dataset = tf.data.Dataset.from_tensor_slices(input)

train_x = np.random.sample((5,2))
test_x = np.random.sample((3,2))

# 创建Iterator
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()

with tf.Session() as sess:
  # trainning model
  # 使用trainning数据集，并初始化Iteraotr
  sess.run(iterator.initializer, feed_dict={input: train_x})
  for _ in range(5):
    print(sess.run(next_element))
    
  print("--------")
  
  # evaluate model
  # 使用测试数据集，初始化Iterator
  sess.run(iterator.initializer, feed_dict={input: test_x})
  for _ in range(3):
    print(sess.run(next_element))

[0.58777345 0.34376131]
[0.07595064 0.3270094 ]
[0.47187587 0.13764359]
[0.93592063 0.983476  ]
[0.21084482 0.88479164]
--------
[0.59344643 0.7610382 ]
[0.75931599 0.21857005]
[0.69312244 0.24552298]


2. Reinitializable Iterator  
reinitializable可以初始化两个不同数据和不同Dataset

In [11]:
# 创建Dataset
training_dataset = tf.data.Dataset.from_tensor_slices(np.random.sample((5,2)))
test_dataset = tf.data.Dataset.from_tensor_slices(np.random.sample((3,2)))

# 创建Iterator
iterator = tf.data.Iterator.from_structure(training_dataset.output_types,
                                          training_dataset.output_shapes)
next_element = iterator.get_next()

# 创建两个不同数据集的初始化
training_init_op = iterator.make_initializer(training_dataset)
test_init_op = iterator.make_initializer(test_dataset)

with tf.Session() as sess:
  sess.run(training_init_op)
  for _ in range(5):
    print(sess.run(next_element))
   
  print("---------")
  
  sess.run(test_init_op)
  for _ in range(3):
    print(sess.run(next_element))

[0.45166403 0.74806626]
[0.75083313 0.80168765]
[0.34276602 0.71042542]
[0.45131295 0.32137104]
[0.92210205 0.04995708]
---------
[0.18816433 0.22128761]
[0.32184117 0.61118122]
[0.62841448 0.75044657]


3. Feedable Iterator  
feedable类似与reinitialazable，但是feedable是创建两个不同的Iterator，来自不同数据集，切换Iterator时不用重新初始化

In [23]:
training_dataset = tf.data.Dataset.from_tensor_slices(np.random.sample((5,2)))
test_dataset = tf.data.Dataset.from_tensor_slices(np.random.sample((3,2)))

handle = tf.placeholder(tf.string, shape=[])
iterator = tf.data.Iterator.from_string_handle(handle,
                                               training_dataset.output_types,
                                              training_dataset.output_shapes)
next_element = iterator.get_next()

training_iterator = training_dataset.make_one_shot_iterator()
test_iterator = test_dataset.make_initializable_iterator()

with tf.Session() as sess:
  training_handle = sess.run(training_iterator.string_handle())
  test_handle = sess.run(test_iterator.string_handle())
  
  for _ in range(5):
    print(sess.run(next_element, feed_dict={handle: training_handle}))
  
  print("----------")
  
  sess.run(test_iterator.initializer)
  for _ in range(3):
    print(sess.run(next_element, feed_dict={handle: test_handle}))

[0.7223556  0.94651226]
[0.77856537 0.51863231]
[0.91456479 0.18817305]
[0.60656066 0.78488042]
[0.50343049 0.98391472]
----------
[0.9246092  0.39914318]
[0.8977477  0.16831537]
[0.76376902 0.30904837]


#### 例子
前面的例子都是通过在Session中打印get_next()的值，下面通过一个例子，来实现Dataset的数据传值给Model来训练。

##### One-hot的例子

In [45]:
# 生成数据
features = np.random.sample((100, 1))
labels = 2 * features + 1.5

epoches = 10
batch_size = 32

# 创建Dataset
dataset = tf.data.Dataset.from_tensor_slices((features, labels))
dataset = dataset.batch(batch_size).repeat()

iterator = dataset.make_one_shot_iterator()
x, y = iterator.get_next()

# 建立Model
layer1 = tf.layers.dense(x, 4, activation=tf.tanh)
layer2 = tf.layers.dense(layer1, 4, activation=tf.tanh)
predictions = tf.layers.dense(layer2,1, activation=tf.sigmoid)

loss = tf.losses.mean_squared_error(predictions, y)
optimizer = tf.train.AdamOptimizer().minimize(loss)

with tf.Session() as sess:
  sess.run(tf.global_variables_initializer())
  for i in range(epoches):
    _, l = sess.run([optimizer, loss])
    print("Epoch: {}, Loss: {}".format((i+1), l))

Epoch: 1, Loss: 4.268806457519531
Epoch: 2, Loss: 3.5479159355163574
Epoch: 3, Loss: 3.6658082008361816
Epoch: 4, Loss: 4.615617752075195
Epoch: 5, Loss: 4.222705841064453
Epoch: 6, Loss: 3.5081288814544678
Epoch: 7, Loss: 3.623868703842163
Epoch: 8, Loss: 4.564029693603516
Epoch: 9, Loss: 4.177005767822266
Epoch: 10, Loss: 3.4687089920043945


##### Initializable例子

In [50]:
# 生成数据
training_data = (np.random.sample((100,2)), np.random.sample((100, 1)))
test_data = (np.random.sample((30, 2)), np.random.sample((30,1)))

epoches = 10
batch_size = 32

input, labels = tf.placeholder(tf.float32, shape=[None, 2]), tf.placeholder(tf.float32, shape=[None,1])
dataset = tf.data.Dataset.from_tensor_slices((input, labels)).batch(batch_size).repeat()


iterator = dataset.make_initializable_iterator()
x, y = iterator.get_next()

# 建立Model
layer1 = tf.layers.dense(x, 4, activation=tf.tanh)
layer2 = tf.layers.dense(layer1, 4, activation=tf.tanh)
predictions = tf.layers.dense(layer2,1, activation=tf.sigmoid)

loss = tf.losses.mean_squared_error(predictions, y)
optimizer = tf.train.AdamOptimizer().minimize(loss)

with tf.Session() as sess:
  sess.run(tf.global_variables_initializer())
  sess.run(iterator.initializer, feed_dict={input: training_data[0], 
                                           labels: training_data[1]})
  print("Training...")
  for i in range(epoches):
    total_loss = 0
    for _ in range(10):
      _, l = sess.run([optimizer, loss])
      total_loss += l
    print("Epoches: {}, Loss: {}".format(i, total_loss / 10))
  
  print("Testing...")
  sess.run(iterator.initializer, feed_dict={input: test_data[0],
                                           labels: test_data[1]})
  print("Test loss: {}".format(sess.run(loss)))
    

Training...
Epoches: 0, Loss: 0.10809506624937057
Epoches: 1, Loss: 0.11347251757979393
Epoches: 2, Loss: 0.10589925572276115
Epoches: 3, Loss: 0.11198231056332589
Epoches: 4, Loss: 0.10426002144813537
Epoches: 5, Loss: 0.11087124645709992
Epoches: 6, Loss: 0.10295571163296699
Epoches: 7, Loss: 0.11003697961568833
Epoches: 8, Loss: 0.10191557928919792
Epoches: 9, Loss: 0.10939777716994285
Testing...
Test loss: 0.08818259835243225


Datset API为我们提供了快捷和高效的方式来生成数据集输入管道(pipeline)，可为Model快速训练，评估，测试。