In [None]:
import ipcmagic
import ipyparallel as ipp

In [None]:
%ipcluster --version

In [None]:
%ipcluster start -n 2 --mpi

Repeat a few times in case of `TimeoutError`.
After the cluser starts, the following calls won't do anything but printing "IPCluster is already running".
This mimics what the user would do in such case.

In [None]:
%ipcluster start -n 2 --mpi

In [None]:
%ipcluster start -n 2 --mpi

In [None]:
%ipcluster start -n 2 --mpi

In [None]:
%ipcluster start -n 2 --mpi

In [None]:
# In cells that take some time, IPyParallel shows a progress bar.
# That can be disabled with by passing `--progress-after -1` to `%%px`.
%pxconfig --progress-after -1

In [None]:
c = ipp.Client()
print('cluster ids:', c.ids)

In [None]:
import os
print(os.popen("ps -u $USER | grep ip").read())

In [None]:
%%px
import socket
socket.gethostname()

In [None]:
%%px
import numpy as np
import tensorflow as tf
import horovod.tensorflow.keras as hvd

In [None]:
%%px
hvd.init()

In [None]:
%%px
# Create a linear function with noise as our data
nsamples = 1000
ref_slope = 2.0
ref_offset = 0.0
noise = np.random.random((nsamples, 1)) - 0.5    # -0.5 to center the noise
x_train = np.random.random((nsamples, 1)) - 0.5  # -0.5 to center x around 0
y_train = ref_slope * x_train + ref_offset + noise

In [None]:
%%px
dataset = tf.data.Dataset.from_tensor_slices((x_train.astype(np.float32),
                                              y_train.astype(np.float32)))
dataset = dataset.shuffle(1000)
dataset = dataset.batch(100)
dataset = dataset.repeat(150)

In [None]:
%%px
model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(1, input_shape=(1,), activation='linear'),
])

opt = tf.keras.optimizers.SGD(lr=0.5)
opt = hvd.DistributedOptimizer(opt)

model.compile(optimizer=opt,
              loss='mse')

In [None]:
%%px
class TrainHistory(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.vars = []
        self.loss = []

    def on_batch_end(self, batch, logs={}):
        self.vars.append([v.numpy() for v in self.model.variables])
        self.loss.append(logs.get('loss'))
        
history = TrainHistory()

In [None]:
%%px
initial_sync = hvd.callbacks.BroadcastGlobalVariablesCallback(0)

In [None]:
%%px
fit = model.fit(dataset, callbacks=[initial_sync, history], verbose=2)

In [None]:
%%px
print(f'slope={history.vars[-1][0][0][0]}   '
      f'offset={history.vars[-1][1][0]}  '
      f'loss={history.loss[-1]}')

In [None]:
%ipcluster stop