# 导入飞桨的库
同步数据读取：数据读取与模型训练串行。当模型需要数据时，才运行数据读取函数获得当前批次的数据。在读取数据期间，模型一直等待数据读取结束才进行训练，数据读取速度相对较慢。
异步数据读取：数据读取和模型训练并行。读取到的数据不断的放入缓存区，无需等待模型训练就可以启动下一轮数据读取。当模型训练完一个批次后，不用等待数据读取过程，直接从缓存区获得下一批次数据进行训练，从而加快了数据读取速度。
异步队列：数据读取和模型训练交互的仓库，二者均可以从仓库中读取数据，它的存在使得两者的工作节奏可以解耦。

In [1]:
import paddle
import json
import gzip
import numpy as np
from paddle.nn import Linear
import paddle.nn.functional as F

# 创建一个类MnistDataset，继承paddle.io.Dataset 这个类
# MnistDataset的作用和上面load_data()函数的作用相同，均是构建一个迭代器
class MnistDataset(paddle.io.Dataset):
    def __init__(self, mode):
        datafile = 'mnist.json.gz'
        data = json.load(gzip.open(datafile))
        # 读取到的数据区分训练集，验证集，测试集
        train_set, val_set, eval_set = data
        if mode=='train':
            # 获得训练数据集
            imgs, labels = train_set[0], train_set[1]
        elif mode=='valid':
            # 获得验证数据集
            imgs, labels = val_set[0], val_set[1]
        elif mode=='eval':
            # 获得测试数据集
            imgs, labels = eval_set[0], eval_set[1]
        else:
            raise Exception("mode can only be one of ['train', 'valid', 'eval']")
        
        # 校验数据
        imgs_length = len(imgs)
        assert len(imgs) == len(labels), \
            "length of train_imgs({}) should be the same as train_labels({})".format(len(imgs), len(labels))
        
        self.imgs = imgs
        self.labels = labels

    def __getitem__(self, idx):
        img = np.array(self.imgs[idx]).astype('float32')
        label = np.array(self.labels[idx]).astype('float32')
        
        return img, label

    def __len__(self):
        return len(self.imgs)

In [6]:
# 声明数据加载函数，使用MnistDataset数据集
train_dataset = MnistDataset(mode='train')
# 使用paddle.io.DataLoader 定义DataLoader对象用于加载Python生成器产生的数据，
# DataLoader 返回的是一个批次数据迭代器，并且是异步的；
data_loader = paddle.io.DataLoader(train_dataset, batch_size=100, shuffle=True)
# 迭代的读取数据并打印数据的形状
for i, data in enumerate(data_loader()):
    images, labels = data
    print(i, images.shape, labels.shape)
    if i>=2:
        break

0 [100, 784] [100]
1 [100, 784] [100]
2 [100, 784] [100]


In [7]:
#数据处理部分之后的代码，数据读取的部分调用Load_data函数
#定义网络结构，同上一节所使用的网络结构
class MNIST(paddle.nn.Layer):
    def __init__(self):
        super(MNIST, self).__init__()
        # 定义一层全连接层，输出维度是1
        self.fc = paddle.nn.Linear(in_features=784, out_features=1)

    def forward(self, inputs):
        outputs = self.fc(inputs)
        return outputs
# 异步数据读取并训练的完整案例代码如下所示
def train(model):
    model = MNIST()
    model.train()
    opt = paddle.optimizer.SGD(learning_rate=0.001, parameters=model.parameters())
    EPOCH_NUM = 10
    for epoch_id in range(EPOCH_NUM):
        for batch_id, data in enumerate(data_loader()):
            images, labels = data
            images = paddle.to_tensor(images)
            labels = paddle.to_tensor(labels).astype('float32')
            
            #前向计算的过程  
            predicts = model(images)

            #计算损失，取一个批次样本损失的平均值
            loss = F.square_error_cost(predicts, labels)
            avg_loss = paddle.mean(loss)       
            
            #每训练了200批次的数据，打印下当前Loss的情况
            if batch_id % 200 == 0:
                print("epoch: {}, batch: {}, loss is: {}".format(epoch_id, batch_id, avg_loss.numpy()))
            
            #后向传播，更新参数的过程
            avg_loss.backward()
            opt.step()
            opt.clear_grad()

    #保存模型参数
    paddle.save(model.state_dict(), 'mnist')

#创建模型
model = MNIST()
#启动训练过程
train(model)

epoch: 0, batch: 0, loss is: 22.108009338378906
epoch: 0, batch: 200, loss is: 8.29177188873291
epoch: 0, batch: 400, loss is: 7.531145095825195
epoch: 1, batch: 0, loss is: 8.61501407623291
epoch: 1, batch: 200, loss is: 9.100568771362305
epoch: 1, batch: 400, loss is: 8.53562068939209
epoch: 2, batch: 0, loss is: 8.085770606994629
epoch: 2, batch: 200, loss is: 8.245512962341309
epoch: 2, batch: 400, loss is: 7.882917404174805
epoch: 3, batch: 0, loss is: 7.482028961181641
epoch: 3, batch: 200, loss is: 9.862553596496582
epoch: 3, batch: 400, loss is: 10.190206527709961
epoch: 4, batch: 0, loss is: 7.704078197479248
epoch: 4, batch: 200, loss is: 7.877786159515381
epoch: 4, batch: 400, loss is: 9.184771537780762
epoch: 5, batch: 0, loss is: 8.642525672912598
epoch: 5, batch: 200, loss is: 8.431878089904785
epoch: 5, batch: 400, loss is: 7.731142044067383
epoch: 6, batch: 0, loss is: 7.777571678161621
epoch: 6, batch: 200, loss is: 9.009143829345703
epoch: 6, batch: 400, loss is: 9.26

从异步数据读取的训练结果来看，损失函数下降与同步数据读取训练结果一致。注意，异步读取数据只在数据量规模巨大时会带来显著的性能提升，对于多数场景采用同步数据读取的方式已经足够。