## Running Distributed TensorFlow on Slurm Clusters

In this notebook, we provide an example of how to run a TensorFlow experiment on a Slurm cluster. Since TensorFlow doesn't yet officailly support this task, we developed a simple Python module for automating the configuration. It parses the environment variables set by Slurm and creates a TensorFlow cluster configuration based on them. We are running this notebook with a simple image recognition example on CIFAR-10.

In [42]:
# load module slurm
%load_ext slurm_magic

The slurm_magic extension is already loaded. To reload it, use:
  %reload_ext slurm_magic


### Step1  Create conda environment
Create a new conda environment based off Python 3 (currently 3.7 for TensorFlow 1.15 and 2.0, or 3.6 for TensorFlow 1.14). Name the environment whatever you want, here we name it new_conda_env.


In [43]:
# First, check the conda environment list.
!conda env list

# conda environments:
#
base                  *  /opt/conda
tf2-gpu                  /opt/conda/envs/tf2-gpu



In [None]:
# If the name you difined has been used, you can choose to remove it
!conda env remove -p /lustre/-42ai/work/new_conda_env 
# Or change the name

In [44]:
# create conda environment new_conda_env with package TensorFlow
!yes | conda create --prefix /lustre/-42ai/work/new_conda_env tensorflow-gpu

Collecting package metadata (current_repodata.json): done
Solving environment: failed with repodata from current_repodata.json, will retry with next repodata source.
Collecting package metadata (repodata.json): done
Solving environment: done


  current version: 4.8.2
  latest version: 4.8.3

Please update conda by running

    $ conda update -n base conda



## Package Plan ##

  environment location: /lustre/-42ai/work/new_conda_env

  added / updated specs:
    - tensorflow-gpu


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    absl-py-0.9.0              |   py37hc8dfbb8_1         162 KB  conda-forge
    google-pasta-0.2.0         |     pyh8c360ce_0          42 KB  conda-forge
    grpcio-1.30.0              |   py37hb0870dc_0         2.0 MB  conda-forge
    h5py-2.10.0                |nompi_py37h90cd8ad_103         1.1 MB  conda-forge
    hdf5-1.10.6                |nompi_h3c11f04_100 

### Step 2: Testing of the Tensorflow installation

Now we write a python file test_tf.py to check if TensorFlow installed successfully, and then we print out the version of TensorFlow we installed.

Remember to activate the conda envirionment in your slurm script.

In [45]:
%%writefile /lustre/-42ai/work/test_tf.py
import tensorflow as tf
print(tf.__version__)
hello = tf.constant('Hello, TensorFlow!')
sess = tf.Session()
print(sess.run(hello))

Overwriting /lustre/-42ai/work/test_tf.py


In [46]:
%%writefile /lustre/-42ai/work/test_tf.sh
#! /bin/bash
#SBATCH --output=test_tf.out              # Name of stdout output file
#SBATCH --error=test_tf.err             # Name of stderr error file
#SBATCH --nodes=1                    # Total # of nodes
#SBATCH --ntasks=1                   # Total # of mpi tasks
#SBATCH --partition=debug                # Queue (partition) name
#SBATCH --ntasks-per-node  1   # Number of tasks per node
#SBATCH --time=12:00:00            # Run time (hh:mm:ss)

source activate /lustre/-42ai/work/new_conda_env 
echo "SLURM_JOB_ID $SLURM_JOB_ID  SLURM_JOB_NAME  $SLURM_JOB_NAME SLURM_JOB_NODELIST  $SLURM_JOB_NODELIST  SLURMD_NODENAME $SLURMD_NODENAME   SLURM_JOB_NUM_NODES  $SLURM_JOB_NUM_NODES";
python /lustre/-42ai/work/test_tf.py
echo "Done!"

Overwriting /lustre/-42ai/work/test_tf.sh


In [47]:
# submit the job
%sbatch -D /lustre/-42ai/work /lustre/-42ai/work/test_tf.sh

'Submitted batch job 138\n'

In [66]:
!cat /lustre/-42ai/work/test_tf.out

SLURM_JOB_ID 138  SLURM_JOB_NAME  test_tf.sh SLURM_JOB_NODELIST  slurm-worker-8  SLURMD_NODENAME slurm-worker-8   SLURM_JOB_NUM_NODES  1
1.14.0
b'Hello, TensorFlow!'
Done!


If you do not see "Done!" in the output of the following cell, please wiat for a moment and rerun the following cell untill the it contains "Done!".

You can also check the library list in your conda environment

In [69]:
!conda list -p /lustre/-42ai/work/new_conda_env

# packages in environment at /lustre/-42ai/work/new_conda_env:
#
# Name                    Version                   Build  Channel
_libgcc_mutex             0.1                 conda_forge    conda-forge
_openmp_mutex             4.5                       0_gnu    conda-forge
_tflow_select             2.1.0                       gpu    defaults
absl-py                   0.9.0            py37hc8dfbb8_1    conda-forge
astor                     0.8.1              pyh9f0ad1d_0    conda-forge
c-ares                    1.15.0            h516909a_1001    conda-forge
ca-certificates           2020.6.20            hecda079_0    conda-forge
certifi                   2020.6.20        py37hc8dfbb8_0    conda-forge
gast                      0.3.3                      py_0    conda-forge
google-pasta              0.2.0              pyh8c360ce_0    conda-forge
grpcio                    1.30.0           py37hb0870dc_0    conda-forge
h5py                      2.10.0          nompi_py37h9

### Step 3: Check Slurm Nodes

When running a Slurm job we can discover other nodes taking part by examining evnironment variables:

SLURMD_NODENAME – name of the current node


SLURM_JOB_NODELIST – number of nodes the job is using

SLURM_JOB_NUM_NODES – list of all nodes allocated to the job

let's see this example, the output will give us this job's ID, name, nodelist and other infomation:

In [70]:
%%writefile /lustre/-42ai/work/check_slurm.sh
#! /bin/bash
#SBATCH --output=check_slurm.out              # Name of stdout output file
#SBATCH --error=check_slurm.err             # Name of stderr error file
#SBATCH --nodes=1                    # Total # of nodes
#SBATCH --ntasks=1                   # Total # of mpi tasks
#SBATCH --partition=debug                # Queue (partition) name
#SBATCH --ntasks-per-node  1   # Number of tasks per node
#SBATCH --time=12:00:00            # Run time (hh:mm:ss)

source activate /lustre/-42ai/work/new_conda_env 
echo "SLURM_JOB_ID $SLURM_JOB_ID  SLURM_JOB_NAME  $SLURM_JOB_NAME SLURM_JOB_NODELIST  $SLURM_JOB_NODELIST  SLURMD_NODENAME $SLURMD_NODENAME   SLURM_JOB_NUM_NODES  $SLURM_JOB_NUM_NODES";
echo "Done!"

Writing /lustre/-42ai/work/check_slurm.sh


In [71]:
# submit the job
%sbatch -D /lustre/-42ai/work /lustre/-42ai/work/check_slurm.sh

'Submitted batch job 139\n'

In [72]:
!cat /lustre/-42ai/work/check_slurm.out

SLURM_JOB_ID 139  SLURM_JOB_NAME  check_slurm.sh SLURM_JOB_NODELIST  slurm-worker-8  SLURMD_NODENAME slurm-worker-8   SLURM_JOB_NUM_NODES  1
Done!


### Step 3: Testing of reading data

In [73]:
!mkdir /lustre/-42ai/work/example

mkdir: cannot create directory ‘/lustre/-42ai/work/example’: File exists


Download the dataset

In [74]:
# !wget --directory-prefix=/lustre/-42ai/work/example "https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz"

In [75]:
# !tar -xvzf /lustre/-42ai/work/example/cifar-10-python.tar.gz --directory /lustre/-42ai/work/example

In [76]:
!ls /lustre/-42ai/work/example/cifar-10-batches-py/

batches.meta  data_batch_2  data_batch_4  readme.html
data_batch_1  data_batch_3  data_batch_5  test_batch


In [77]:
%%writefile  /lustre/-42ai/work/example/check_dataset.py

# check dataset load successfully
import pickle
import os
import numpy as np

data_dir = '/lustre/-42ai/work/example/cifar-10-batches-py'
# try the first two batches in this example
filelist = [os.path.join(data_dir, 'data_batch_1'),
            os.path.join(data_dir, 'data_batch_2')]
data, labels = [], []

for f in filelist:
    with open(f, 'rb') as fo:
        data_elem = pickle.load(fo, encoding='latin1')
        data.append(data_elem['data'])
        labels.extend(data_elem['labels'])
data = np.vstack(d for d in data)
print('data shape: ', data.shape)


Overwriting /lustre/-42ai/work/example/check_dataset.py


In [79]:
%%writefile /lustre/-42ai/work/check_dataset.sh
#! /bin/bash
#SBATCH --output=check_dataset.out              # Name of stdout output file
#SBATCH --error=check_dataset.error             # Name of stderr error file
#SBATCH --nodes=1                    # Total # of nodes
#SBATCH --ntasks=1                   # Total # of mpi tasks
#SBATCH --partition=debug                # Queue (partition) name
#SBATCH --ntasks-per-node  1   # Number of tasks per node
#SBATCH --time=12:00:00            # Run time (hh:mm:ss)

source activate /lustre/-42ai/work/new_conda_env 
echo "SLURM_JOB_ID $SLURM_JOB_ID  SLURM_JOB_NAME  $SLURM_JOB_NAME SLURM_JOB_NODELIST  $SLURM_JOB_NODELIST  SLURMD_NODENAME $SLURMD_NODENAME   SLURM_JOB_NUM_NODES  $SLURM_JOB_NUM_NODES";
python /lustre/-42ai/work/example/check_dataset.py
echo "Done!"

Overwriting /lustre/-42ai/work/check_dataset.sh


In [80]:
# submit the job
%sbatch -D /lustre/-42ai/work /lustre/-42ai/work/check_dataset.sh

'Submitted batch job 140\n'

In [81]:
!cat /lustre/-42ai/work/check_dataset.out

SLURM_JOB_ID 140  SLURM_JOB_NAME  check_dataset.sh SLURM_JOB_NODELIST  slurm-worker-8  SLURMD_NODENAME slurm-worker-8   SLURM_JOB_NUM_NODES  1
data shape:  (20000, 3072)
Done!


### Step 4: Start server

First we are going to define the cluster spec which is necessary to crete the server along with the task name and ask index of the current job. The 'ps_number' parameter specifies how many parameter servers to set up(we use 1). All other nodes will be working as normal workers and everything gets passed to the tf.distribute.Server constructor.



Pay a attention that Do Not Run the following two cells repetedly, Because if you have started the server and did not kill it, you cannot use that port again. It will give you error "UnknownError: Could not start gRPC server". Restart the kernel can solve this problem

In [112]:
%%writefile  /lustre/-42ai/work/example/train.py

from __future__ import print_function
from __future__ import absolute_import
from __future__ import division

import pickle
import os
import tensorflow as tf
import numpy as np
import sys
import time
import os
import re
import tensorflow as tf

Overwriting /lustre/-42ai/work/example/train.py


In [113]:
%%writefile -a  /lustre/-42ai/work/example/train.py
import traceback
try:

    def _pad_zeros(iterable, length):
        return (str(t).rjust(length, '0') for t in iterable)

    def _expand_ids(ids):
        ids = ids.split(',')
        result = []
        for id in ids:
            if '-' in id:
                begin, end = [int(token) for token in id.split('-')]
                result.extend(_pad_zeros(range(begin, end+1), len( id.split('-')[0] )))
            else:
                result.append(id)
        return result

    def _expand_nodelist(nodelist):
        prefix, ids = re.findall("(.*)\[(.*)\]", nodelist)[0]
        ids = _expand_ids(ids)
        result = [prefix + str(id) for id in ids]
        return result

    nodename = os.environ["SLURMD_NODENAME"]
    nodelist = os.environ["SLURM_JOB_NODELIST"]
    nodelist = _expand_nodelist(nodelist)
    num_nodes = int(os.getenv("SLURM_JOB_NUM_NODES"))

    if len(nodelist) != num_nodes:
        raise ValueError("Number of slurm nodes {} not equal to {}".format(len(nodelist), num_nodes))

    if nodename not in nodelist:
        raise ValueError("Nodename({}) not in nodelist({}). This should not happen! ".format(nodename,nodelist))
except Exception:
    traceback.print_exc()

Appending to /lustre/-42ai/work/example/train.py


In [114]:
%%writefile -a  /lustre/-42ai/work/example/train.py
try:
    ps_number = 1

    ps_nodes = [node for i, node in enumerate(nodelist) if i < ps_number]
    worker_nodes = [node for i, node in enumerate(nodelist) if i >= ps_number]

    print("Parameter nodes: ", ps_nodes)
    print("Worker nodes: ", worker_nodes)
except Exception:
    traceback.print_exc()

Appending to /lustre/-42ai/work/example/train.py


In [115]:
%%writefile -a  /lustre/-42ai/work/example/train.py
try: 
    if nodename in ps_nodes:
        my_job_name = "ps"
        my_task_index = ps_nodes.index(nodename)
    else:
        my_job_name = "worker"
        my_task_index = worker_nodes.index(nodename)

    print("My job name: ", my_job_name)
    print("My task index: ", my_task_index)
except Exception:
    traceback.print_exc()

Appending to /lustre/-42ai/work/example/train.py


In [116]:
%%writefile -a  /lustre/-42ai/work/example/train.py
try:
    port_number = 2222

    worker_sockets = [":".join([node, str(port_number)]) for node in worker_nodes]
    ps_sockets = [":".join([node, str(port_number)]) for node in ps_nodes]

    cluster = {"worker": worker_sockets, "ps" : ps_sockets}

    print("Cluster: ", cluster)
except Exception:
    traceback.print_exc()

Appending to /lustre/-42ai/work/example/train.py


In [117]:
%%writefile -a  /lustre/-42ai/work/example/train.py
try:
    cluster_spec = tf.train.ClusterSpec(cluster)
    server = tf.train.Server(server_or_cluster_def=cluster_spec,
                             job_name=my_job_name,
                             task_index=my_task_index)
except Exception:
    traceback.print_exc()


Appending to /lustre/-42ai/work/example/train.py


Check my cluster, my index name, my job name.

In [92]:
%%writefile /lustre/-42ai/work/example/job.sh
#! /bin/bash
#SBATCH --output=example_1.out              # Name of stdout output file
#SBATCH --error=example_1.err             # Name of stderr error file
#SBATCH --nodes=5                    # Total # of nodes
#SBATCH --ntasks=5                   # Total # of mpi tasks
#SBATCH --partition=debug                # Queue (partition) name
#SBATCH --ntasks-per-node  1   # Number of tasks per node
#SBATCH --mem=4G                 # total memory per node (default is 4 GB per CPU-core)
#SBATCH --gres=gpu:1             # number of gpus per node
#SBATCH --time=00:01:00            # Run time (hh:mm:ss)
source activate /lustre/-42ai/work/new_conda_env

echo "SLURM_JOB_ID " $SLURM_JOB_ID  "; SLURM_JOB_NAME " $SLURM_JOB_NAME "; SLURM_JOB_NODELIST " $SLURM_JOB_NODELIST "; SLURMD_NODENAME " $SLURMD_NODENAME  "; SLURM_JOB_NUM_NODES " $SLURM_JOB_NUM_NODES
python /lustre/-42ai/work/example/train.py
echo "Done!"

Overwriting /lustre/-42ai/work/example/job.sh


In [93]:
%sbatch -D /lustre/-42ai/work/example /lustre/-42ai/work/example/job.sh

'Submitted batch job 142\n'

In [95]:
!cat /lustre/-42ai/work/example/example_1.out

SLURM_JOB_ID  142 ; SLURM_JOB_NAME  job.sh ; SLURM_JOB_NODELIST  slurm-worker-[1-5] ; SLURMD_NODENAME  slurm-worker-1 ; SLURM_JOB_NUM_NODES  5
Parameter nodes:  ['slurm-worker-1']
Worker nodes:  ['slurm-worker-2', 'slurm-worker-3', 'slurm-worker-4', 'slurm-worker-5']
My job name:  ps
My task index:  0
Cluster:  {'worker': ['slurm-worker-2:2222', 'slurm-worker-3:2222', 'slurm-worker-4:2222', 'slurm-worker-5:2222'], 'ps': ['slurm-worker-1:2222']}
Done!


Afterwards we immediately check whether the current job is a parameter server. Since all the work in a parameter server(ps) is handled by the tf.distribute.Server(which is running in a separate thread), we can just call server.join() and not execute the rest of the script.

In [118]:
 %%writefile -a  /lustre/-42ai/work/example/train.py

if my_job_name == 'ps':
    print("Current job is a parameter server. Sever started")
    server.join()
    sys.exit(0)

Appending to /lustre/-42ai/work/example/train.py


In [119]:
%%writefile /lustre/-42ai/work/example/job.sh
#! /bin/bash
#SBATCH --output=example_2.out              # Name of stdout output file
#SBATCH --error=example_2.err             # Name of stderr error file
#SBATCH --nodes=5                    # Total # of nodes
#SBATCH --ntasks=5                   # Total # of mpi tasks
#SBATCH --partition=debug                # Queue (partition) name
#SBATCH --ntasks-per-node  1   # Number of tasks per node
#SBATCH --mem=4G                 # total memory per node (default is 4 GB per CPU-core)
#SBATCH --gres=gpu:1             # number of gpus per node
#SBATCH --time=00:01:00            # Run time (hh:mm:ss)
source activate /lustre/-42ai/work/new_conda_env

echo "SLURM_JOB_ID " $SLURM_JOB_ID  "; SLURM_JOB_NAME " $SLURM_JOB_NAME "; SLURM_JOB_NODELIST " $SLURM_JOB_NODELIST "; SLURMD_NODENAME " $SLURMD_NODENAME  "; SLURM_JOB_NUM_NODES " $SLURM_JOB_NUM_NODES
python /lustre/-42ai/work/example/train.py
echo "Done!"

Overwriting /lustre/-42ai/work/example/job.sh


In [120]:
%sbatch -D /lustre/-42ai/work/example /lustre/-42ai/work/example/job.sh

'Submitted batch job 145\n'

In [124]:
!cat /lustre/-42ai/work/example/example_2.err

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])

2020-07-16 23:33:38.927284: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 AVX512F FMA
2020-07-16 23:33:38.982667: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2200000000 Hz
2020-07-16 23:33:38.986062: I tensorflow/compiler/x

You can see ther server is started now.


The warning "TensorFlow binary was not compiled to use: AVX2 AVX512F FMA" means the CPU's speed is not as fast as it could be. The warning doesn’t impact functionality for now but would effect potentially performance. 

### Step 5: Placing the Variables on a parameter server
These two functions are used when defining the model parameters. Note the 'with tf.device("/job:ps/task:0")" statements telling TensorFlow that the variables should be placed on the parameter server, thus enabling them to be shared between the workers. The "0" index denotes the I.D. of the parameter sever used to store the variable. Here we're only using one server, so all the variables are placed on task "0".

In [125]:
%%writefile -a  /lustre/-42ai/work/example/train.py
    
def weight_variable(shape):
    with tf.device("/job:ps/task:0"):
        initial = tf.compat.v1.truncated_normal(shape, stddev=0.1)
        return tf.Variable(initial)
def bias_variable(shape):
    with tf.device("/job:ps/task:0"):
        initial = tf.constant(0.1, shape=shape)
        return tf.Variable(initial)

Appending to /lustre/-42ai/work/example/train.py


Read Data

In [126]:
%%writefile -a  /lustre/-42ai/work/example/train.py

data_dir = '/lustre/-42ai/work/example/cifar-10-batches-py'
# try the first two batches in this example
filelist = [os.path.join(data_dir, 'data_batch_1'),
            os.path.join(data_dir, 'data_batch_2')]
data, labels = [], []

for f in filelist:
    with open(f, 'rb') as fo:
        data_elem = pickle.load(fo, encoding='latin1')
        data.append(data_elem['data'])
        labels.extend(data_elem['labels'])
data = np.vstack(d for d in data)
print('data shape: ', data.shape)


Appending to /lustre/-42ai/work/example/train.py


### Step 6:Build network

In [127]:
%%writefile -a /lustre/-42ai/work/example/train.py

def conv2d(x, W):
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1],padding='VALID')

Appending to /lustre/-42ai/work/example/train.py


In [128]:
%%writefile -a /lustre/-42ai/work/example/train.py

def max_pool_2x2(x):
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                          strides=[1, 2, 2, 1], padding='SAME')

Appending to /lustre/-42ai/work/example/train.py


In [129]:
%%writefile -a /lustre/-42ai/work/example/train.py
try:
    print(my_task_index)
    with tf.device('/job:worker/task:{}'.format(my_task_index)):
        x = tf.placeholder(tf.float32, shape=[None, 3072], name='x')
        y = tf.placeholder(tf.uint8, shape=[None, 1], name='y')
        print("x,y")
except Exception:
    traceback.print_exc()
    # FIRST CONVOLUTIONAL LAYER
    y_one_hot = tf.one_hot(indices=y, depth=10)
    
    ks = 5
    n_filters1 = 16
    W_conv1 = weight_variable([ks, ks, 3, n_filters1])
    b_conv1 = bias_variable([n_filters1])
    
    reshaped = tf.reshape(x, [-1, 3, 32, 32])
    transposed = tf.transpose(reshaped, [0, 2, 3, 1])
    x_image = (transposed - 128) / 128
    
    h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
    h_pool1 = max_pool_2x2(h_conv1)
    
    # SECOND CONVOLUTIONAL LAYER
    n_filters2 = 64
    W_conv2 = weight_variable([ks, ks, n_filters1, n_filters2])
    b_conv2 = bias_variable([n_filters2])
    
    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
    h_pool2 = max_pool_2x2(h_conv2)
    
    # FULLY CONNECTED LAYER
    hidden_neurons = 512 
    W_fc1 = weight_variable([5 * 5 * n_filters2, hidden_neurons])
    b_fc1 = bias_variable([hidden_neurons])
    
    h_pool2_flat = tf.reshape(h_pool2, [-1, 5 * 5 * n_filters2])
    h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
    
    # DROPOUT
    keep_prob = tf.placeholder(tf.float32)
    h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
    
    # SOFTMAX
    W_fc2 = weight_variable([hidden_neurons, 10])
    b_fc2 = bias_variable([10])
    
    y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2
    cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=y_conv, labels=y_one_hot)
    loss = tf.reduce_mean(cross_entropy)
    opt = tf.train.AdamOptimizer(1e-3)
    opt = tf.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=len(cluster['worker']),
                                total_num_replicas=len(cluster['worker']))
    global_step = bias_variable([])
    train_step = opt.minimize(loss, global_step=global_step)
    sync_replicas_hook = opt.make_session_run_hook(is_chief)
    
    y_hat = tf.round(tf.argmax(tf.nn.softmax(y_conv), 1))
    y_hat = tf.cast(y_hat, tf.uint8)
    y_hat = tf.reshape(y_hat, [-1, 1])
    correct_prediction = tf.equal(y_hat, y)
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    print("1")
    
    x = tf.compat.v1.placeholder(tf.float32, shape=[None, 3072], name='x')
    y = tf.compat.v1.placeholder(tf.uint8, shape=[None, 1], name='y')
    
    # FIRST CONVOLUTIONAL LAYER
    y_one_hot = tf.one_hot(indices=y, depth=10)
    
    ks = 5
    n_filters1 = 16
    W_conv1 = weight_variable([ks, ks, 3, n_filters1])
    b_conv1 = bias_variable([n_filters1])
    
    reshaped = tf.reshape(x, [-1, 3, 32, 32])
    transposed = tf.transpose(reshaped, [0, 2, 3, 1])
    x_image = (transposed - 128) / 128
    
    h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
    h_pool1 = max_pool_2x2(h_conv1)
    
    # SECOND CONVOLUTIONAL LAYER
    n_filters2 = 64
    W_conv2 = weight_variable([ks, ks, n_filters1, n_filters2])
    b_conv2 = bias_variable([n_filters2])
    
    h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
    h_pool2 = max_pool_2x2(h_conv2)
    
    # FULLY CONNECTED LAYER
    hidden_neurons = 512 
    W_fc1 = weight_variable([5 * 5 * n_filters2, hidden_neurons])
    b_fc1 = bias_variable([hidden_neurons])
    
    h_pool2_flat = tf.reshape(h_pool2, [-1, 5 * 5 * n_filters2])
    h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
    
    # DROPOUT
    keep_prob = tf.compat.v1.placeholder(tf.float32)
    h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
    
    # SOFTMAX
    W_fc2 = weight_variable([hidden_neurons, 10])
    b_fc2 = bias_variable([10])

Appending to /lustre/-42ai/work/example/train.py


#### Optimizer
Instead of using the usual AdamOptimizer, we're wrapping it with the SyncReplicasOptimizer. This enables us to prevent the application of stale gradients. In distributed training, the network communication may introduce communication delays which make it harder to train the model.

In [130]:
%%writefile -a /lustre/-42ai/work/example/train.py

with tf.device('/job:worker/task:{}'.format(my_task_index)):
    y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2
    cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=y_conv, labels=y_one_hot)
    loss = tf.reduce_mean(cross_entropy)
    opt = tf.compat.v1.train.AdamOptimizer(1e-3)
    opt = tf.compat.v1.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=len(cluster['worker']),
                                total_num_replicas=len(cluster['worker']))
    global_step = bias_variable([])
    train_step = opt.minimize(loss, global_step=global_step)
    sync_replicas_hook = opt.make_session_run_hook(1)
    
    y_hat = tf.round(tf.argmax(tf.nn.softmax(y_conv), 1))
    y_hat = tf.cast(y_hat, tf.uint8)
    y_hat = tf.reshape(y_hat, [-1, 1])
    correct_prediction = tf.equal(y_hat, y)
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

Appending to /lustre/-42ai/work/example/train.py


In [131]:
%%writefile -a /lustre/-42ai/work/example/train.py

def batch_generator(data, labels, batch_size=32):
    x_batch, y_batch = [], []
    for d, l in zip(data, labels):
        x_batch.append(d)
        y_batch.append(l)
        if len(x_batch) == batch_size:
            yield np.vstack(x_batch),np.vstack(y_batch)
            x_batch = []
            y_batch = []

Appending to /lustre/-42ai/work/example/train.py


### Creating the session
In distributed settings we're using the tf.train.MonitoredTrainingSession instead of the usual tf.Session. This ensures the variables are properly initialized. It also allows you to restore a previously saved model and control how summaries and checkpoints are written to disk.

In [132]:
%%writefile -a /lustre/-42ai/work/example/train.py

epochs = 1
batch_size = 128
step = 0
sess = tf.compat.v1.train.MonitoredTrainingSession(master=server.target, is_chief=1,
                                         hooks=[sync_replicas_hook])

Appending to /lustre/-42ai/work/example/train.py


### Submit job to slurm to start training

In [133]:
%%writefile -a /lustre/-42ai/work/example/train.py

for i in range(epochs):
    bg = batch_generator(data, labels, batch_size)
    for j, (data_batch, label_batch) in enumerate(bg):
        if (j+i) % len(cluster['worker']) != my_task_index:
            continue
        _, loss_, acc = sess.run([train_step, loss, accuracy],
                                feed_dict={x: data_batch,
                                          y: label_batch.reshape(-1,1),
                                          keep_prob: 0.5})
        step += 1
        print(step, my_task_index, loss_, acc)
        sys.stdout.flush()

Appending to /lustre/-42ai/work/example/train.py


In [134]:
%%writefile /lustre/-42ai/work/example/job.sh
#! /bin/bash
#SBATCH --output=example_3.out              # Name of stdout output file
#SBATCH --error=example_3.err             # Name of stderr error file
#SBATCH --nodes=5                    # Total # of nodes
#SBATCH --ntasks=5                   # Total # of mpi tasks
#SBATCH --partition=debug                # Queue (partition) name
#SBATCH --ntasks-per-node  1   # Number of tasks per node
#SBATCH --mem=4G                 # total memory per node (default is 4 GB per CPU-core)
#SBATCH --gres=gpu:1             # number of gpus per node
#SBATCH --time=12:00:00            # Run time (hh:mm:ss)
source activate /lustre/-42ai/work/new_conda_env

echo "SLURM_JOB_ID " $SLURM_JOB_ID  "; SLURM_JOB_NAME " $SLURM_JOB_NAME "; SLURM_JOB_NODELIST " $SLURM_JOB_NODELIST "; SLURMD_NODENAME " $SLURMD_NODENAME  "; SLURM_JOB_NUM_NODES " $SLURM_JOB_NUM_NODES
python /lustre/-42ai/work/example/train.py
echo "Done"

Overwriting /lustre/-42ai/work/example/job.sh


In [135]:
%sbatch -D /lustre/-42ai/work/example /lustre/-42ai/work/example/job.sh

'Submitted batch job 146\n'

In [140]:
!cat /lustre/-42ai/work/example/example_3.out

SLURM_JOB_ID  146 ; SLURM_JOB_NAME  job.sh ; SLURM_JOB_NODELIST  slurm-worker-[1-5] ; SLURMD_NODENAME  slurm-worker-1 ; SLURM_JOB_NUM_NODES  5


In [141]:
!cat /lustre/-42ai/work/example/example_3.err

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])

2020-07-16 23:35:45.200106: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 AVX512F FMA
2020-07-16 23:35:45.254579: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2200000000 Hz
2020-07-16 23:35:45.259961: I tensorflow/compiler/x

In [142]:
%squeue

Unnamed: 0,JOBID,PARTITION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)
0,146,debug,job.sh,jovyan,R,0:52,5,slurm-worker-[1-5]
