-
Notifications
You must be signed in to change notification settings - Fork 0
/
tensorflow_ray_train_mnist_example.py
139 lines (114 loc) · 4.08 KB
/
tensorflow_ray_train_mnist_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Example taken from Ray
# https://docs.ray.io/en/latest/train/examples/tensorflow_mnist_example.html
# This example showcases how to use Tensorflow with Ray Train.
# Original code:
# https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
import argparse
import json
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import Callback
from ray.train import Trainer
class TrainReportCallback(Callback):
def on_epoch_end(self, epoch, logs=None):
ray.train.report(**logs)
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
ds_train = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.shuffle(60000)
.repeat()
.batch(batch_size)
)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = (
tf.data.experimental.AutoShardPolicy.DATA
)
ds_train = ds_train.with_options(options)
return ds_train
def build_and_compile_cnn_model(config):
learning_rate = config.get("lr", 0.001)
model = tf.keras.Sequential(
[
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation="relu"),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation="relu"),
tf.keras.layers.Dense(10),
]
)
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
metrics=["accuracy"],
)
return model
def train_func(config):
per_worker_batch_size = config.get("batch_size", 64)
epochs = config.get("epochs", 3)
steps_per_epoch = config.get("steps_per_epoch", 70)
tf_config = json.loads(os.environ["TF_CONFIG"])
num_workers = len(tf_config["cluster"]["worker"])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = build_and_compile_cnn_model(config)
history = multi_worker_model.fit(
multi_worker_dataset,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
callbacks=[TrainReportCallback()],
verbose=False,
)
results = history.history
return results
def train_tensorflow_mnist(num_workers=2, use_gpu=False, epochs=4):
trainer = Trainer(backend="tensorflow", num_workers=num_workers, use_gpu=use_gpu)
trainer.start()
results = trainer.run(
train_func=train_func, config={"lr": 1e-3, "batch_size": 64, "epochs": epochs}
)
trainer.shutdown()
print(f"Results: {results[0]}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--address", required=False, type=str, help="the address to use for Ray"
)
parser.add_argument(
"--num-workers",
"-n",
type=int,
default=2,
help="Sets number of workers for training.",
)
parser.add_argument(
"--use-gpu", action="store_true", default=False, help="Enables GPU training"
)
parser.add_argument(
"--epochs", type=int, default=3, help="Number of epochs to train for."
)
parser.add_argument(
"--smoke-test",
action="store_true",
default=False,
help="Finish quickly for testing.",
)
args, _ = parser.parse_known_args()
import ray
if args.smoke_test:
ray.init(num_cpus=2)
train_tensorflow_mnist()
else:
ray.init(address=args.address)
train_tensorflow_mnist(
num_workers=args.num_workers, use_gpu=args.use_gpu, epochs=args.epochs
)