## 准备数据

In [3]:
# define reader 
def load_mnist(path, prefix='train'):
    import os
    import gzip
    import numpy as np

    """Load MNIST data from `path`"""
    labels_path = os.path.join(path,
                               '%s-labels-idx1-ubyte.gz'
                               % prefix)
    images_path = os.path.join(path,
                               '%s-images-idx3-ubyte.gz'
                               % prefix)

    with gzip.open(labels_path, 'rb') as lbpath:
        labels = np.frombuffer(lbpath.read(), dtype=np.uint8,
                               offset=8)

    with gzip.open(images_path, 'rb') as imgpath:
        images = np.frombuffer(imgpath.read(), dtype=np.uint8,
                               offset=16).reshape(len(labels), 784)

    return images, labels

In [4]:
# 读取本地训练数据, 并查看数据的 shape
data_path='../classification/cv/datasets/fashion'

train_images, train_labels = load_mnist(data_path, prefix='train')
test_images, test_labels = load_mnist(data_path, prefix='t10k')
print("train_images shape: {}, train_labels shape: {}".format(train_images.shape, train_labels.shape))
print("test_images shape: {}, test_labels shape: {}".format(test_images.shape, test_labels.shape))

train_images shape: (60000, 784), train_labels shape: (60000,)
test_images shape: (10000, 784), test_labels shape: (10000,)


### 构建 DataSets （Numpy -> Datasets）

In [5]:
import tensorflow as tf
train_datasets = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_datasets

<TensorSliceDataset shapes: ((784,), ()), types: (tf.uint8, tf.uint8)>

In [6]:
# 对标签 0-9 的数字做映射
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

In [7]:
BUFFER_SIZE = 10000
BATCH_SIZE = 64

# 将 MNIST 数据从 (0, 255] 缩放到 (0., 1.]
def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label
    
# (784,) --> (28, 28, 1)
def reshape(image, label):
    image = tf.reshape(image, [28, 28, 1])
    return image, label

local_train_datasets = train_datasets.map(reshape).map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

In [8]:
local_train_datasets

<BatchDataset shapes: ((None, 28, 28, 1), (None,)), types: (tf.float32, tf.uint8)>

In [9]:
len(list(local_train_datasets))

938

### 测试单节点的 trainning

In [10]:
# 构建网络模型
def build_and_compile_cnn_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
        metrics=['accuracy'])
    return model

In [None]:
# 执行训练
single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(x=train_datasets, epochs=3, steps_per_epoch=5)

# 多 worker 模式

## 配置 TF_CONFIG ENV

In [1]:
import os, json
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:2222", "localhost:2223"]
    },
    'task': {'type': 'worker', 'index': 1}
})
os.getenv('TF_CONFIG')

'{"cluster": {"worker": ["localhost:2222", "localhost:2223"]}, "task": {"type": "worker", "index": 1}}'

## 使用 MultiWorkerMirroredStrategy 训练模型
`MultiWorkerMirroredStrategy` 通过 `CollectiveCommunication` 参数提供多个实现。`RING` 使用 gRPC 作为跨主机通信层实现基于环的集合。`NCCL` 使用 Nvidia 的 `NCCL` 来实现集体。 `AUTO` 将选择推迟到运行时。 集体实现的最佳选择取决于GPU的数量和种类以及群集中的网络互连。

In [2]:
import tensorflow as tf
# MultiWorkerMirroredStrategy.__init__() 会使用到 TF_CONFIG 初始化，建立集群通信
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

INFO:tensorflow:Enabled multi-worker collective ops with available devices: ['/job:worker/replica:0/task:1/device:CPU:0', '/job:worker/replica:0/task:1/device:XLA_CPU:0']
INFO:tensorflow:Using MirroredStrategy with devices ('/job:worker/task:1',)
INFO:tensorflow:MultiWorkerMirroredStrategy with cluster_spec = {'worker': ['localhost:2222', 'localhost:2223']}, task_type = 'worker', task_id = 1, num_workers = 2, local_devices = ('/job:worker/task:1',), communication = CollectiveCommunication.AUTO


In [None]:
NUM_WORKERS = 2
# 由于 `tf.data.Dataset.batch` 需要全局的批处理大小，
# 因此此处的批处理大小按 worker 数量增加。
# 以前我们使用 64，现在变成 128
GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS

# 创建数据集需要在 MultiWorkerMirroredStrategy 对象实例化后。
dist_train_datasets = train_datasets.map(reshape).map(scale).cache().shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)

with strategy.scope():
  # 模型的建立/编译需要在 `strategy.scope()` 内部。
  multi_worker_model = build_and_compile_cnn_model()

# Keras 的 `model.fit()` 以特定的时期数和每时期的步数训练模型。
# 注意此处的数量仅用于演示目的，并不足以产生高质量的模型。
multi_worker_model.fit(x=dist_train_datasets, epochs=3, steps_per_epoch=5)