In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json
import os
import sys
import tensorflow as tf

In [2]:
%load_ext tensorboard

In [3]:
os.environ['CUDA_VISIBLE_DEVICES']='-1'

In [4]:
os.environ.pop('TF_CONFIG',None)

In [5]:
if '.' not in sys.path:
  sys.path.insert(0, '.')

In [6]:
import tensorflow as tf

In [7]:
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train,y_train),_=tf.keras.datasets.mnist.load_data(path='mnist.npz')
  x_train=x_train/np.float32(255)
  y_train=y_train.astype(np.int64)
  train_dataset=tf.data.Dataset.from_tensor_slices((x_train,y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model=tf.keras.Sequential([
                             tf.keras.layers.InputLayer(input_shape=(28,28)),
                             tf.keras.layers.Reshape(target_shape=(28,28,1)),
                             tf.keras.layers.Conv2D(32,3,activation='relu'),
                             tf.keras.layers.Flatten(),
                             tf.keras.layers.Dense(128,activation='relu'),
                             tf.keras.layers.Dense(10)
  ])
  model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
                metrics=['accuracy'])
  return model

Writing mnist.py


In [8]:
import mnist

batch_size=64
single_worker_dataset=mnist.mnist_dataset(batch_size)
single_worker_model=mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset,epochs=3,steps_per_epoch=70)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x7f7742945dd0>

In [9]:
tf_config={
    'cluster':{
        'worker':["localhost:12345",'localhost:23456']
    },
    'task':{'type':'worker','index':0}
}

In [10]:
json.dumps(tf_config)

'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0}}'

In [11]:
os.environ['GREETINGS']='Hello TensorFlow!'

In [12]:
%%bash
echo ${GREETINGS}

Hello TensorFlow!


In [13]:
strategy=tf.distribute.MultiWorkerMirroredStrategy()

INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO


In [None]:
with strategy.scope():
  multi_worker_model=mnist.build_and_compile_cnn_model()

In [None]:
%%writefile main.py
import os
import json
import tensorflow as tf

per_worker_batch_size=64
tf_config=json.loads(os.environ['TF_CONFIG'])
num_workers=len(tf_config['cluster']['worker'])

strategy=tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)

with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,epochs=3,steps_per_epoch=70)

In [None]:
%%bash
ls *.py

In [None]:
os.environ['TF_CONFIG']=json.dumps(tf_config)

In [None]:
%killbgscripts

In [None]:
import time
time.sleep(10)

In [None]:
%%bash --bg
python main.py &>job_0.log

In [None]:
cat job_0.log

In [None]:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

In [None]:
%%bash
python main.py

In [None]:
%%bash
cat job_0.log

In [None]:
os.environ.pop('TF_CONFIG', None)
%killbgscripts

In [None]:
options=tf.data.Options()
options.experimental_distribute.auto_shard_policy=tf.data.experimental.AutoShardPolicy().OFF

global_batch_dataset=64
multi_worker_dataset=mnist.mnist_dataset(batch_size=64)
dataset_no_auto_sard=multi_worker_dataset.with_options(options)

In [None]:
communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)

In [None]:
model_path='/tem/keras-model'

def _is_cheif(task_type,task_id):
  return(task_type=='worker' and task_id==0)or task_type is None


def _get_temp_dir(dirpath,task_id):
  base_dirpath='workertemp_'+str(task_id)
  temp_dir=os.path.join(dirpath,base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath,task_type,task_id):
  dirpath=os.path.dirname(filepath)
  base=os.path.basename(filepath)
  if not _is_cheif(task_type,task_id):
    dirpath=_get_temp_dir(dirpath,task_id)
  return os.path.join(dirpath,base)

task_type,task_id=(strategy.cluster_resolver.task_type,
                   strategy.cluster_resolver.task_id)
write_model_path=write_filepath(model_path,task_type,task_id)

In [None]:
multi_worker_model.save(write_model_path)

In [None]:
if not _is_cheif(task_type,task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

In [None]:
loaded_model=tf.keras.models.load_model(model_path)
loaded_model.fit(single_worker_dataset,epochs=2,steps_per_epoch=20)

In [None]:
checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

In [None]:
checkpoint_manager.save()
if not _is_cheif(task_type,task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

In [None]:
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)

In [None]:
callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)