In [1]:
!pip install ray

Collecting ray
  Downloading ray-1.12.0-cp37-cp37m-manylinux2014_x86_64.whl (53.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.2/53.2 MB[0m [31m24.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting msgpack<2.0.0,>=1.0.0
  Downloading msgpack-1.0.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (299 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m299.4/299.4 KB[0m [31m38.7 MB/s[0m eta [36m0:00:00[0m
Collecting virtualenv
  Downloading virtualenv-20.14.1-py2.py3-none-any.whl (8.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.8/8.8 MB[0m [31m93.9 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
Collecting filelock
  Downloading filelock-3.6.0-py3-none-any.whl (10.0 kB)
Collecting grpcio<=1.43.0,>=1.28.1
  Downloading grpcio-1.43.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.1/4.1 MB[0m [31m91.0 MB/s[0m

In [2]:
import ray

In [3]:
!which -a ray

/opt/conda/bin/ray


In [4]:
!conda env list

# conda environments:
#
base                  *  /opt/conda



In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.layers import Flatten, Conv2D, MaxPooling2D
from sklearn.model_selection import train_test_split
import os
from tensorflow.keras.layers import BatchNormalization
import tensorflow as tf
import keras
import tempfile

In [2]:

import numpy as np
import tensorflow as tf


def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices(
        (x_train, y_train)).batch(batch_size)
    print("HI=========================================================================================================================")
    print(train_dataset.cardinality())
    return train_dataset


def build_and_compile_cnn_model():
    
    model = Sequential()

    model.add(Conv2D(32, 3, activation='relu', padding='same', input_shape=(28, 28, 1)))
    model.add(Conv2D(64, 3, activation='relu', padding='same'))
    model.add(MaxPooling2D(2, 2))
    model.add(BatchNormalization())

    model.add(Conv2D(128, 3, activation='relu', padding='same'))
    model.add(Conv2D(128, 3, activation='relu', padding='same'))
    model.add(MaxPooling2D(2, 2))
    model.add(BatchNormalization())

    model.add(Conv2D(256, 3, activation='relu', padding='same'))
    model.add(Conv2D(256, 3, activation='relu', padding='same'))
    model.add(Conv2D(256, 3, activation='relu', padding='same'))
    model.add(MaxPooling2D(2, 2))
    model.add(BatchNormalization())

    model.add(Conv2D(512, 3, activation='relu', padding='same'))
    model.add(Conv2D(512, 3, activation='relu', padding='same'))
    model.add(Conv2D(512, 3, activation='relu', padding='same'))
    model.add(MaxPooling2D(2, 1)) # default stride is 2
    model.add(BatchNormalization())

    model.add(Conv2D(512, 3, activation='relu', padding='same'))
    model.add(Conv2D(512, 3, activation='relu', padding='same'))
    model.add(Conv2D(512, 3, activation='relu', padding='same'))
    model.add(MaxPooling2D(2, 1)) # default stride is 2
    model.add(BatchNormalization())

    model.add(Flatten())
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(4096, activation='relu'))
    model.add(Dropout(0.5))

    model.add(Dense(10, activation='softmax'))
    model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
    return model

In [3]:

import json
import os

def train_func_distributed():
    per_worker_batch_size = 64
    # This environment variable will be set by Ray Train.
    tf_config = json.loads(os.environ['TF_CONFIG'])
    num_workers = len(tf_config['cluster']['worker'])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_worker_batch_size * num_workers
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_and_compile_cnn_model()

    multi_worker_model.fit(multi_worker_dataset, epochs=3)


In [None]:
from ray.train import Trainer

trainer = Trainer(backend="tensorflow", num_workers=1, use_gpu=True)

# For GPU Training, set `use_gpu` to True.
# trainer = Trainer(backend="tensorflow", num_workers=4, use_gpu=True)

trainer.start()
results = trainer.run(train_func_distributed)
trainer.shutdown()


2022-04-18 07:06:18,227	INFO trainer.py:223 -- Trainer logs will be logged in: /home/jupyter/ray_results/train_2022-04-18_07-06-18
2022-04-18 07:06:23,298	INFO trainer.py:229 -- Run results will be logged in: /home/jupyter/ray_results/train_2022-04-18_07-06-18/run_001
[2m[36m(BaseWorkerMixin pid=10036)[0m 2022-04-18 07:06:23.339154: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
[2m[36m(BaseWorkerMixin pid=10036)[0m 2022-04-18 07:06:23.349492: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
[2m[36m(BaseWorkerMixin pid=10036)[0m 2022-04-18 07:06:23.350144: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be 

[2m[36m(BaseWorkerMixin pid=10036)[0m tf.Tensor(938, shape=(), dtype=int64)


[2m[36m(BaseWorkerMixin pid=10036)[0m 2022-04-18 07:06:25.092721: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
[2m[36m(BaseWorkerMixin pid=10036)[0m op: "TensorSliceDataset"
[2m[36m(BaseWorkerMixin pid=10036)[0m input: "Placeholder/_0"
[2m[36m(BaseWorkerMixin pid=10036)[0m input: "Placeholder/_1"
[2m[36m(BaseWorkerMixin pid=10036)[0m attr {
[2m[36m(BaseWorkerMixin pid=10036)[0m   key: "Toutput_types"
[2m[36m(BaseWorkerMixin pid=10036)[0m   value {
[2m[36m(BaseWorkerMixin pid=10036)[0m     list {
[2m[36m(BaseWorkerMixin pid=10036)[0m       type: DT_FLOAT
[2m[36m(BaseWorkerMixin pid=10036)[0m       type: DT_INT64
[2m[36m(BaseWorkerMixin pid=10036)[0m     }
[2m[36m(BaseWorkerMixin pid=10036)[0m   }
[2m[36m(BaseWorkerMixin pid=10036)[0m }
[2

[2m[36m(BaseWorkerMixin pid=10036)[0m Epoch 1/15


[2m[36m(BaseWorkerMixin pid=10036)[0m   return dispatch_target(*args, **kwargs)
[2m[36m(BaseWorkerMixin pid=10036)[0m 2022-04-18 07:06:29.711425: I tensorflow/stream_executor/cuda/cuda_dnn.cc:368] Loaded cuDNN version 8200


  1/938 [..............................] - ETA: 1:45:45 - loss: 2.3664 - accuracy: 0.1250
  3/938 [..............................] - ETA: 36s - loss: 3.4850 - accuracy: 0.1667    
  7/938 [..............................] - ETA: 36s - loss: 3.5849 - accuracy: 0.2277
  9/938 [..............................] - ETA: 36s - loss: 3.1988 - accuracy: 0.2795
 11/938 [..............................] - ETA: 36s - loss: 2.8557 - accuracy: 0.3239
 13/938 [..............................] - ETA: 36s - loss: 2.6211 - accuracy: 0.3630
 17/938 [..............................] - ETA: 35s - loss: 2.3616 - accuracy: 0.4145
 19/938 [..............................] - ETA: 35s - loss: 2.3542 - accuracy: 0.4235
 21/938 [..............................] - ETA: 35s - loss: 2.2758 - accuracy: 0.4353
 25/938 [..............................] - ETA: 35s - loss: 2.1384 - accuracy: 0.4556
 27/938 [..............................] - ETA: 35s - loss: 2.0428 - accuracy: 0.4728
 29/938 [..............................] - ETA