In [84]:
%matplotlib inline

In [1]:
import argparse
import tempfile
import time
from typing import List

import pandas
import pyarrow

import ray
from ray.data.dataset_pipeline import DatasetPipeline

import numpy as np
import tensorflow as tf

2022-02-17 15:51:57.006407: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/extras/CUPTI/lib64:/usr/local/cuda/lib64:/usr/local/nvidia/lib:/usr/local/nvidia/lib64
2022-02-17 15:51:57.006435: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [2]:
def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64) 
    train_dataset = tf.data.Dataset.from_tensor_slices(
        (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
    return train_dataset

In [None]:
# =======================================

In [2]:
def mnist_dataset_ray_distributed():
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    
    x_2d = x_train.reshape(len(x_train), 28*28) 
    y_train_re = y_train.reshape(-1, 1)   
    df = pandas.DataFrame(x_2d)
    df['y'] = y_train_re

    return ray.data.from_pandas(df)

In [3]:
def create_shuffle_pipeline(num_epochs: int,num_wokers: int) -> List[DatasetPipeline]:
    datasource = mnist_dataset_ray_distributed()
    return datasource.repeat(num_epochs) \
        .random_shuffle_each_window() \
        .split(num_wokers, equal=True)

In [4]:
@ray.remote
class TrainingWorker:
    def __init__(self, rank: int, shard: DatasetPipeline):
        self.rank = rank
        self.shard = shard
        self.model = self.build_cnn_model()

    def train(self):
        for epoch, training_dataset in enumerate(self.shard.iter_epochs()):
            # Following code emulates epoch based SGD training.
            print(f"Training... worker: {self.rank}, epoch: {epoch}")
            for i, batch in enumerate(training_dataset.iter_batches(batch_format="pandas")):
                # TODO: replace the code for real training. 
                trans_x = batch[batch.columns.difference(['y'])].to_numpy()  
                x_3d = np.reshape(trans_x, (len(trans_x), 28, 28))
                y_1d = np.reshape(batch['y'].to_numpy(), len(batch['y']))
                self.model.fit(x_3d, y_1d, epochs=1)

    def build_cnn_model(self):
        model = tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation='relu'),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(10)
        ])
        model.compile(
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
            metrics=['accuracy'])
        return model 

In [5]:
# create a connection to a ray cluster.
ray.init("ray://10.100.3.60:10001")

ClientContext(dashboard_url='192.168.95.118:8265', python_version='3.7.7', ray_version='1.10.0', ray_commit='5ea565317a8104c04ae7892bb9bb41c6d72f12df', protocol_version='2021-12-07', _num_clients=1, _context_to_restore=<ray.util.client._ClientContext object at 0x7f6fd60dcf50>)

In [6]:
NUM_TRAINING_WORKERS = 3
NUM_EPOCHS = 2 

In [7]:
splits = create_shuffle_pipeline(NUM_EPOCHS,NUM_TRAINING_WORKERS)

Stage 0:   0%|          | 0/2 [00:00<?, ?it/s]=3590)[0m 
  0%|          | 0/2 [00:00<?, ?it/s][Aor pid=3590)[0m 
Stage 1:   0%|          | 0/2 [00:00<?, ?it/s][A90)[0m 


In [8]:
training_workers = [
    TrainingWorker.remote(rank, shard) for rank, shard in enumerate(splits)
]

[2m[36m(TrainingWorker pid=847, ip=192.168.30.1)[0m 2022-02-17 07:52:25.567465: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/nvidia/lib:/usr/local/nvidia/lib64
[2m[36m(TrainingWorker pid=847, ip=192.168.30.1)[0m 2022-02-17 07:52:25.567487: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
[2m[36m(TrainingWorker pid=847, ip=192.168.30.1)[0m 2022-02-17 07:52:25.567510: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ray-test-cluster-ray-worker-type-z4scc): /proc/driver/nvidia/version does not exist
[2m[36m(TrainingWorker pid=847, ip=192.168.30.1)[0m 2022-02-17 07:52:25.567697: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural N

In [9]:
# Let's run the e2e pipeline
start = time.time()

print(ray.get([worker.train.remote() for worker in training_workers]))

print("total ingestion time: {%.4f}s" % float(time.time() - start))

Stage 0:  50%|█████     | 1/2 [00:13<00:13, 13.25s/it]0m 
[2m[36m(PipelineSplitExecutorCoordinator pid=3590)[0m 
Stage 0: 100%|██████████| 2/2 [00:14<00:00,  6.45s/it][A
[2m[36m(PipelineSplitExecutorCoordinator pid=3590)[0m 
Stage 1: 100%|██████████| 2/2 [00:17<00:00,  7.44s/it][A


[2m[36m(TrainingWorker pid=267, ip=192.168.43.62)[0m Training... worker: 0, epoch: 0
[2m[36m(TrainingWorker pid=268, ip=192.168.43.62)[0m Training... worker: 2, epoch: 0


Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]3.62)[0m 
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]3.62)[0m 
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]0.1)[0m 


[2m[36m(TrainingWorker pid=847, ip=192.168.30.1)[0m Training... worker: 1, epoch: 0


[2m[36m(TrainingWorker pid=267, ip=192.168.43.62)[0m 2022-02-17 07:52:38.126703: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


  1/625 [..............................] - ETA: 4:00 - loss: 2.3069 - accuracy: 0.0938
  4/625 [..............................] - ETA: 30s - loss: 2.3095 - accuracy: 0.0938
  7/625 [..............................] - ETA: 25s - loss: 2.3028 - accuracy: 0.1027
  9/625 [..............................] - ETA: 26s - loss: 2.3047 - accuracy: 0.1007


Stage 0: 100%|██████████| 1/1 [00:01<00:00,  1.15s/it] 
[2m[36m(TrainingWorker pid=268, ip=192.168.43.62)[0m 2022-02-17 07:52:38.989026: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


 11/625 [..............................] - ETA: 27s - loss: 2.3037 - accuracy: 0.0966
 13/625 [..............................] - ETA: 27s - loss: 2.3006 - accuracy: 0.1130
 15/625 [..............................] - ETA: 28s - loss: 2.3010 - accuracy: 0.1104
 17/625 [..............................] - ETA: 28s - loss: 2.3008 - accuracy: 0.1140
 20/625 [..............................] - ETA: 26s - loss: 2.3007 - accuracy: 0.1109
 23/625 [>.............................] - ETA: 25s - loss: 2.3008 - accuracy: 0.1168


[2m[36m(TrainingWorker pid=847, ip=192.168.30.1)[0m 2022-02-17 07:52:39.447161: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


 26/625 [>.............................] - ETA: 24s - loss: 2.3010 - accuracy: 0.1202
 30/625 [>.............................] - ETA: 23s - loss: 2.2997 - accuracy: 0.1156
  1/625 [..............................] - ETA: 3:19 - loss: 2.2988 - accuracy: 0.1562
  4/625 [..............................] - ETA: 13s - loss: 2.3026 - accuracy: 0.1406 
 35/625 [>.............................] - ETA: 23s - loss: 2.2979 - accuracy: 0.1161
 38/625 [>.............................] - ETA: 22s - loss: 2.2966 - accuracy: 0.1176
  5/625 [..............................] - ETA: 19s - loss: 2.3041 - accuracy: 0.1187
 41/625 [>.............................] - ETA: 22s - loss: 2.2963 - accuracy: 0.1181
 12/625 [..............................] - ETA: 17s - loss: 2.3069 - accuracy: 0.1172
 44/625 [=>............................] - ETA: 22s - loss: 2.2951 - accuracy: 0.1193
 16/625 [..............................] - ETA: 14s - loss: 2.3054 - accuracy: 0.1094
 17/625 [..............................] - ETA: 16s 

Stage 0: 100%|██████████| 1/1 [00:18<00:00, 18.17s/it] 
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]0.1)[0m 


  4/625 [..............................] - ETA: 10s - loss: 1.5929 - accuracy: 0.7812
  7/625 [..............................] - ETA: 13s - loss: 1.5921 - accuracy: 0.7723
[2m[36m(TrainingWorker pid=268, ip=192.168.43.62)[0m  - ETA: 16s - loss: 2.1758 - accuracy: 0.4112
 11/625 [..............................] - ETA: 14s - loss: 1.5963 - accuracy: 0.7670
 16/625 [..............................] - ETA: 13s - loss: 1.6059 - accuracy: 0.7656
 23/625 [>.............................] - ETA: 14s - loss: 1.6044 - accuracy: 0.7649
 27/625 [>.............................] - ETA: 14s - loss: 1.6001 - accuracy: 0.7662
 31/625 [>.............................] - ETA: 14s - loss: 1.5879 - accuracy: 0.7692
 34/625 [>.............................] - ETA: 14s - loss: 1.5860 - accuracy: 0.7675
 38/625 [>.............................] - ETA: 14s - loss: 1.5821 - accuracy: 0.7623
 42/625 [=>............................] - ETA: 14s - loss: 1.5720 - accuracy: 0.7723
 46/625 [=>...........................

Stage 0: 100%|██████████| 1/1 [00:32<00:00, 32.88s/it]m 
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]3.62)[0m 




Stage 0: 100%|██████████| 1/1 [00:00<00:00,  9.70it/s]m 


  1/625 [..............................] - ETA: 46s - loss: 1.5897 - accuracy: 0.8750
  3/625 [..............................] - ETA: 28s - loss: 1.6747 - accuracy: 0.8021
  5/625 [..............................] - ETA: 29s - loss: 1.6556 - accuracy: 0.7812
  7/625 [..............................] - ETA: 30s - loss: 1.6438 - accuracy: 0.7857
  9/625 [..............................] - ETA: 30s - loss: 1.6599 - accuracy: 0.7604
 11/625 [..............................] - ETA: 30s - loss: 1.6554 - accuracy: 0.7642
 13/625 [..............................] - ETA: 30s - loss: 1.6504 - accuracy: 0.7716
 14/625 [..............................] - ETA: 32s - loss: 1.6472 - accuracy: 0.7790
 16/625 [..............................] - ETA: 31s - loss: 1.6561 - accuracy: 0.7617
 21/625 [>.............................] - ETA: 29s - loss: 1.6549 - accuracy: 0.7500
[2m[36m(TrainingWorker pid=268, ip=192.168.43.62)[0m Training... worker: 2, epoch: 1
 24/625 [>.............................] - ETA: 28s 

Stage 0: 100%|██████████| 1/1 [00:34<00:00, 34.69s/it]m 
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]3.62)[0m 
Stage 0: 100%|██████████| 1/1 [00:00<00:00,  9.72it/s]m 
Stage 0: 100%|██████████| 1/1 [00:16<00:00, 16.11s/it] 


 26/625 [>.............................] - ETA: 28s - loss: 1.6682 - accuracy: 0.7272
 29/625 [>.............................] - ETA: 27s - loss: 1.6692 - accuracy: 0.7274
 31/625 [>.............................] - ETA: 27s - loss: 1.6697 - accuracy: 0.7278
 34/625 [>.............................] - ETA: 26s - loss: 1.6635 - accuracy: 0.7279
 36/625 [>.............................] - ETA: 26s - loss: 1.6567 - accuracy: 0.7326
 39/625 [>.............................] - ETA: 25s - loss: 1.6600 - accuracy: 0.7276
 42/625 [=>............................] - ETA: 25s - loss: 1.6594 - accuracy: 0.7240
  2/625 [..............................] - ETA: 48s - loss: 1.3517 - accuracy: 0.8594
 44/625 [=>............................] - ETA: 25s - loss: 1.6567 - accuracy: 0.7244
  3/625 [..............................] - ETA: 47s - loss: 1.4507 - accuracy: 0.7604
 46/625 [=>............................] - ETA: 25s - loss: 1.6516 - accuracy: 0.7289
  5/625 [..............................] - ETA: 40s - 

Stage 0: 100%|██████████| 1/1 [00:32<00:00, 32.20s/it]m 


[None, None, None]
total ingestion time: {70.7700}s


Stage 0: 100%|██████████| 1/1 [00:31<00:00, 31.38s/it]m 


In [10]:
# Connect to a cluster
ray.shutdown()

In [None]:
# Test Code =================================

In [None]:
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)

print(len(x_train), x_train.shape) 
print(len(x_train[0]), x_train[0].shape)
print(len(x_train[0][0]), x_train[0][0].shape)

x_2d = x_train.reshape(len(x_train), 28*28)
print(len(x_2d), x_2d.shape)

y_train_re = y_train.reshape(-1, 1)  
print(len(y_train), y_train.shape)
print(len(y_train_re), y_train_re.shape)

df = pandas.DataFrame(x_2d)
df['y'] = y_train_re
y_1d = np.reshape(df['y'].to_numpy(), len(df['y']))
print(df['y'], df['y'].shape)
print(y_1d, y_1d.shape)

print(df, df.shape)

trans_x = df[df.columns.difference(['y'])]
trans_x_np = trans_x.to_numpy()
print(trans_x_np, trans_x_np.shape)

x_3d = np.reshape(trans_x_np, (len(trans_x), 28, 28))
print(x_3d, x_3d.shape)
print(x_3d[0], x_3d[0].shape)