## 异步读取数据

对于样本量较大，数据读取较慢的场景，应采用异步读取数据的方式：
- 同步数据读取：数据读取与模型训练串行。当模型需要数据时，才运行数据读取函数获得当前批次的数据。在读取数据期间，模型一直等待数据读取结束才进行训练，数据读取速度相对较慢。
- 异步数据读取：数据读取与模型训练并行。读取到的数据不断的放入缓存区，无需等待模型训练就可以启动下一轮数据读取。当模型训练完一个批次后，不用等待数据读取过程，直接从缓存区获得下一批次数据进行训练，从而加快了数据读取速度。
- 异步队列：数据读取和模型训练交互的仓库，二者均可以从仓库中读取数据，它的存在使得两者的工作节奏可以解耦。

In [6]:
# 加载飞浆及相关的库
import paddle
from paddle.nn import Linear
import paddle.nn.functional as F

from paddle.io import Dataset
from paddle.io import DataLoader

import numpy as np
import os
import gzip
import json
import random

In [8]:
"""使用飞浆实现异步数据读取

1.构建一个继承paddle.io.Dataset类的数据读取器。
2.通过paddle.io.DataLoader创建异步数据读取的迭代器。
"""

# 构建一个类，继承paddle.io.Dataset，创建数据读取器
class RandomDataset(Dataset):
    def __init__(self, samples_num):
        # 样本数量
        self.samples_num = samples_num
        
    def __getitem__(self, idx):
        # 随机产生数据和label
        image = np.random.random([784]).astype('float32')
        label = np.random.randint(0, 9, (1, )).astype('float32')
        return image, label
    
    def __len__(self):
        # 返回样本数量
        return self.samples_num
    

# 测试
random_dataset = RandomDataset(10)
for i in range(len(random_dataset)):
    print(dataset[i])

(array([5.97428262e-01, 3.99956912e-01, 9.26904142e-01, 2.97547281e-01,
       1.38531730e-01, 1.13758586e-01, 4.82560873e-01, 8.10693324e-01,
       1.15660779e-01, 3.41449678e-01, 3.67782533e-01, 4.33327854e-01,
       9.90412831e-01, 8.58322918e-01, 3.66193503e-01, 8.34501147e-01,
       3.13597143e-01, 4.16738093e-01, 7.33178318e-01, 5.47362566e-01,
       2.95027733e-01, 6.76932096e-01, 3.27824712e-01, 2.79769540e-01,
       7.14306056e-01, 1.38870567e-01, 9.51374769e-01, 2.34656423e-01,
       3.75501275e-01, 5.11832595e-01, 3.24342638e-01, 4.86201942e-01,
       6.18049324e-01, 6.68327436e-02, 4.68477637e-01, 3.29526037e-01,
       7.20115602e-01, 9.51535627e-03, 6.78442299e-01, 4.39052105e-01,
       4.17510360e-01, 3.11688304e-01, 2.99175441e-01, 9.65059638e-01,
       4.36541915e-01, 7.18894601e-01, 4.03054595e-01, 9.34317231e-01,
       1.53233837e-02, 1.91164613e-01, 2.04065442e-02, 3.09614480e-01,
       1.24083392e-01, 2.22486913e-01, 9.10502315e-01, 8.73941779e-01,
     

In [10]:
# 使用paddle.io.DataLoader API以batch的方式迭代数据
# 通过参数num_workers开启多线程
loader = DataLoader(random_dataset, batch_size=3, shuffle=True, drop_last=False, num_workers=0)
for i, data in enumerate(loader()):
    images, labels = data[0], data[1]
    print('batch_id: {}, 数据shape: {}, 标签shape: {}'.format(i, images.shape, labels.shape))

batch_id: 0, 数据shape: [3, 784], 标签shape: [3, 1]
batch_id: 1, 数据shape: [3, 784], 标签shape: [3, 1]
batch_id: 2, 数据shape: [3, 784], 标签shape: [3, 1]
batch_id: 3, 数据shape: [1, 784], 标签shape: [1, 1]


In [12]:
# 构建一个MnistDataset迭代器，继承paddle.io.Dataset
class MnistDataset(paddle.io.Dataset):
    def __init__(self, mode):
        # 声明数据集文件位置
        datafile = r'../datasets/mnist.json.gz'
        print('Loading mnist dataset from {} ...'.format(datafile))

        # 加载json数据文件
        data = json.load(gzip.open(datafile))
        print('MNIST dataset load done')
    
        # 划分训练集、验证集、测试集
        train_set, val_set, test_set = data
        if mode=='train':
            imgs, labels = train_set[0], train_set[1]
        elif mode=='valid':
            imgs, labels = val_set[0], val_set[1]
        elif mode=='test':
            imgs, labels = test_set[0], test_set[1]
        else:
            raise Exception("Mode can only be one of ['train', 'valid', 'test']")
        print('数据集长度：', len(imgs))
    
        # 校验数据
        imgs_length = len(imgs)
        assert len(imgs) == len(labels), \
            "Length of imgs({}) should be the same as labels({})".format(len(imgs), len(labels))
        
        self.imgs = imgs
        self.labels = labels
        
    def __getitem__(self, idx):
        img = np.array(self.imgs[idx]).astype('float32')
        label = np.array(self.labels[idx]).astype('float32')
        
        return img, label
    
    def __len__(self):
        return len(self.imgs)

In [14]:
# 声明数据加载函数，使用MnistDataset数据集
train_dataset = MnistDataset(mode='train')

# 批次数据生成器，并且是异步的
data_loader = paddle.io.DataLoader(train_dataset, batch_size=100, shuffle=True)

# 测试读取数据
for i, data in enumerate(data_loader()):
    images, labels = data
    
    print(i, images.shape, labels.shape)
    
    if i > 2:
        break

Loading mnist dataset from ../datasets/mnist.json.gz ...
MNIST dataset load done
数据集长度： 50000
0 [100, 784] [100]
1 [100, 784] [100]
2 [100, 784] [100]
3 [100, 784] [100]


In [17]:
# 定义一层网络结构，利用定义好的数据处理函数，完成神经网络的训练
class MNIST(paddle.nn.Layer):
    # 初始化
    def __init__(self):
        # 调用父类初始化方法
        super(MNIST, self).__init__()
        # 定义一层全连接层，输出维度是1
        self.fc = paddle.nn.Linear(in_features=784, out_features=1)
        
    # 定义网络结构的前向计算过程
    def forward(self, inputs):
        outputs = self.fc(inputs)
        return outputs

In [20]:
# 使用异步数据读取并训练
def train(model):
    # 启动训练模式
    model.train()
    # 定义优化算法，学习率设置为0.001
    opt = paddle.optimizer.SGD(learning_rate=0.001, parameters=model.parameters())
    
    # 定义外层循环次数
    EPOCH_NUM = 10
    for epoch in range(EPOCH_NUM):
        for batch_id, data in enumerate(data_loader()):
            # 准备训练数据
            images, labels = data
            images = paddle.to_tensor(images)
            labels = paddle.to_tensor(labels)
            
            # 前向计算的过程
            predicts = model(images)
            
            # 计算损失，取一个批次样本损失的平均值
            loss = F.square_error_cost(predicts, labels)
            avg_loss = paddle.mean(loss)
            
            # 每训练200批次的数据，打印当前Loss情况
            if batch_id % 200 == 0:
                print('epoch_id: {}, batch_id: {}, loss is {}'.format(epoch, batch_id, avg_loss.numpy()))
                
            # 反向传播，更新参数的过程
            avg_loss.backward()
            opt.step()
            opt.clear_grad()
            
    # 保存模型参数
    paddle.save(model.state_dict(), '../models/mnist.pdparams')

In [21]:
# 创建模型实例
model = MNIST()

# 启动训练
train(model)

epoch_id: 0, batch_id: 0, loss is [24.981949]
epoch_id: 0, batch_id: 200, loss is [8.089679]
epoch_id: 0, batch_id: 400, loss is [7.890456]
epoch_id: 1, batch_id: 0, loss is [7.911718]
epoch_id: 1, batch_id: 200, loss is [9.415227]
epoch_id: 1, batch_id: 400, loss is [8.298912]
epoch_id: 2, batch_id: 0, loss is [7.6903133]
epoch_id: 2, batch_id: 200, loss is [8.880564]
epoch_id: 2, batch_id: 400, loss is [9.873153]
epoch_id: 3, batch_id: 0, loss is [8.977135]
epoch_id: 3, batch_id: 200, loss is [9.000279]
epoch_id: 3, batch_id: 400, loss is [8.901228]
epoch_id: 4, batch_id: 0, loss is [8.884409]
epoch_id: 4, batch_id: 200, loss is [7.0588765]
epoch_id: 4, batch_id: 400, loss is [8.302091]
epoch_id: 5, batch_id: 0, loss is [8.271626]
epoch_id: 5, batch_id: 200, loss is [7.588281]
epoch_id: 5, batch_id: 400, loss is [8.591069]
epoch_id: 6, batch_id: 0, loss is [8.7791605]
epoch_id: 6, batch_id: 200, loss is [7.7298517]
epoch_id: 6, batch_id: 400, loss is [6.7005954]
epoch_id: 7, batch_id