# 5. Model Training on Google Cloud ML [Cloud-Datalab]

This notebook submits a training job on [Google Cloud Machine Learning](https://cloud.google.com/ml/) and must be run inside a [Google Cloud Datalab](https://cloud.google.com/datalab/) Docker container


In [1]:
import tensorflow as tf
print(tf.__version__)

0.11.0rc0


In [35]:
%%mlalpha module --name svhn
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from scipy.ndimage.interpolation import zoom
import datetime
import traceback
import time
import numpy as np
import random
import tensorflow as tf
import os
import math
import google.cloud.ml as ml
import argparse
import json
import multiprocessing
from tensorflow.contrib.metrics.python.ops import metric_ops

img_w = 64
img_h = 64
SEED = 66478
num_channels = 1
num_labels = 6


def log(msg):
    tf.logging.info(msg)
    print(msg)


def tf_get_length_value(length_class_tensor):
    '''A utility function to get the length value [1,6] from the length class id [0,5]'''
    return tf.add(length_class_tensor, 1)


def batch_normalize(x, n_out, phase_train, scope='bn'):
    """
    Batch normalization on convolutional maps.
    Ref.: http://stackoverflow.com/questions/33949786/how-could-i-use-batch-normalization-in-tensorflow
    Args:
        x:           Tensor, 4D BHWD input maps
        n_out:       integer, depth of input maps
        phase_train: boolean tf.Varialbe, true indicates training phase
        scope:       string, variable scope
    Return:
        normed:      batch-normalized maps
    """
    with tf.variable_scope(scope):
        beta = tf.Variable(tf.constant(0.0, shape=[n_out]),
                           name='beta', trainable=True)
        gamma = tf.Variable(tf.constant(1.0, shape=[n_out]),
                            name='gamma', trainable=True)
        batch_mean, batch_var = tf.nn.moments(x, [0, 1, 2], name='moments')
        ema = tf.train.ExponentialMovingAverage(decay=0.5)

        def mean_var_with_update():
            ema_apply_op = ema.apply([batch_mean, batch_var])
            with tf.control_dependencies([ema_apply_op]):
                return tf.identity(batch_mean), tf.identity(batch_var)

        mean, var = tf.cond(phase_train,
                            mean_var_with_update,
                            lambda: (ema.average(batch_mean), ema.average(batch_var)))
        normed = tf.nn.batch_normalization(x, mean, var, beta, gamma, 1e-3)
    return normed


def get_conv2d(name, data, patch, d_in, d_out, stride=[1, 1, 1, 1], pooling_size=None, pooling_stride=None,
               padding='SAME', batch_norm=False, is_training=True, pool_type="max", keep_prob=1.0):
    with tf.name_scope(str('%s' % name)):
        filters = tf.Variable(tf.truncated_normal([patch, patch, d_in, d_out],
                                                  stddev=get_conv2d_filters_init_stddev(img_w, img_h, d_in)),
                              name=str('%s_filters' % name))
        biases = tf.Variable(tf.zeros([d_out]), name=str('%s_b' % name))
        layer = tf.nn.conv2d(data, filters, stride, padding=padding, name=str('%s_layers' % name))
        layer = layer + biases
        layer = tf.nn.relu(layer)
        if batch_norm:
            layer = batch_normalize((layer), d_out, tf.convert_to_tensor(is_training, dtype=tf.bool),
                                    str('%s_bn' % name))
        if pooling_stride is not None and pooling_size is not None:
            if pool_type == "max":
                layer = tf.nn.max_pool(layer, pooling_size, pooling_stride, padding=padding)
            else:
                layer = tf.nn.avg_pool(layer, pooling_size, pooling_stride, padding=padding)
        if keep_prob < 1.0:
            layer = tf.nn.dropout(layer, keep_prob=keep_prob)
    return filters, biases, layer


def get_fc(name, data, depth, relu=True, keep_prob=1):
    with tf.name_scope(str('%s' % name)):
        inbound = int(data.get_shape()[1])
        weights = tf.Variable(
            tf.truncated_normal([inbound, depth], stddev=math.sqrt(2.0 / inbound), name=str('%s_w' % name)))
        biases = tf.Variable(tf.zeros([depth]), name=str('%s_b' % name))
        layer = tf.matmul(data, weights) + biases
        if relu is True:
            layer = tf.nn.relu(layer)
        if keep_prob < 1:
            layer = tf.nn.dropout(layer, keep_prob=keep_prob, seed=SEED)
        return weights, biases, layer


def get_conv2d_filters_init_stddev(w, h, d_in):
    # from https://arxiv.org/pdf/1502.01852v1.pdf
    return math.sqrt(2.0 / (w * h * d_in))


def conv_to_fc(conv):
    shape = conv.get_shape().as_list()
    return tf.reshape(conv, [-1, shape[1] * shape[2] * shape[3]])


def get_features(data, keep_prob=0.7, is_training=True):
    with tf.name_scope('layers'):
        # Model.
        w1, b1, conv1 = get_conv2d('conv1', data=data, patch=5, d_in=3, d_out=48,
                                   stride=[1, 1, 1, 1], pooling_size=[1, 2, 2, 1], pooling_stride=[1, 2, 2, 1],
                                   padding='SAME', batch_norm=True, is_training=is_training)

        w2, b2, conv2 = get_conv2d('conv2', data=conv1, patch=5, d_in=48, d_out=64,
                                   stride=[1, 1, 1, 1], padding='SAME', batch_norm=True, is_training=is_training)

        w3, b3, conv3 = get_conv2d('conv3', data=conv2, patch=5, d_in=64, d_out=128,
                                   stride=[1, 1, 1, 1], pooling_size=[1, 2, 2, 1], pooling_stride=[1, 2, 2, 1],
                                   padding='SAME',
                                   batch_norm=True, is_training=is_training, keep_prob=keep_prob)

        w4, b4, conv4 = get_conv2d('conv4', data=conv3, patch=5, d_in=128, d_out=160,
                                   stride=[1, 1, 1, 1], padding='SAME', batch_norm=True, is_training=is_training)

        w5, b5, conv5 = get_conv2d('conv5', data=conv4, patch=5, d_in=160, d_out=192,
                                   stride=[1, 1, 1, 1], pooling_size=[1, 2, 2, 1], pooling_stride=[1, 2, 2, 1],
                                   padding='SAME',
                                   batch_norm=True, is_training=is_training, keep_prob=keep_prob)

        w6, b6, conv6 = get_conv2d('conv6', data=conv5, patch=3, d_in=192, d_out=192,
                                   stride=[1, 1, 1, 1], padding='SAME', batch_norm=True, is_training=is_training)

        w7, b7, conv7 = get_conv2d('conv7', data=conv6, patch=3, d_in=192, d_out=192,
                                   stride=[1, 1, 1, 1], pooling_size=[1, 2, 2, 1], pooling_stride=[1, 2, 2, 1],
                                   padding='SAME',
                                   batch_norm=True, is_training=is_training, pool_type="avg", keep_prob=keep_prob)

        w8, b8, conv8 = get_conv2d('conv8', data=conv7, patch=3, d_in=192, d_out=192,
                                   stride=[1, 1, 1, 1], padding='SAME', batch_norm=True, is_training=is_training)

        w9, b9, conv9 = get_conv2d('conv9', data=conv8, patch=3, d_in=192, d_out=384,
                                   stride=[1, 1, 1, 1], pooling_size=[1, 2, 2, 1], pooling_stride=[1, 2, 2, 1],
                                   padding='SAME',
                                   batch_norm=True, is_training=is_training, pool_type="avg", keep_prob=keep_prob)

        w10, b10, conv10 = get_conv2d('conv10', data=conv9, patch=2, d_in=384, d_out=768,
                                      stride=[1, 1, 1, 1], pooling_size=[1, 2, 2, 1], pooling_stride=[1, 2, 2, 1],
                                      padding='SAME',
                                      batch_norm=True, is_training=is_training, pool_type="avg", keep_prob=keep_prob)

        reg_vars = [w1, w2, w3, w4, w5, w6, w7, w8, w9, w10, b1, b2, b3, b4, b5, b6, b7, b8, b9, b10]
        regularizers = 0;
        for val in reg_vars:
            regularizers += tf.nn.l2_loss(val)
        return conv_to_fc(conv10), regularizers


def get_logits(features):
    with tf.name_scope('logits'):
        # Length logits and weights
        length_weigths, length_biases, logits_length = get_fc('logits_L', features, num_labels, relu=False)

        # Digits logits and weights
        digits_pack = [get_fc(str("logits_D%d" % i), features, 10, relu=False) for i in range(num_labels)]
        logits_digits = tf.pack([digits_pack[i][2] for i in range(num_labels)])
        return logits_length, logits_digits


def get_loss(logits_length, logits_digits, labels_length, digit_labels_t, l2, batch_size, regularizers):
    with tf.name_scope('loss'):
        # Loss calculation
        loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits_length, labels_length))

        # Add the regularization term to the loss.
        loss += l2 * regularizers
        # generating a tensor (?,num_labels) where each row contains 1s, 2s, 3s,... Will be used

        fills = tf.pack([tf.fill([batch_size], i) for i in range(num_labels)])

        masks = list()
        dmasked = list()
        lmasked = list()

        for i in range(num_labels):
            # get a mask of logits for the first (length - 1) digits
            # (?,6) => (count(digit_index < length), 6)
            masks.append(tf.less(fills[i], tf_get_length_value(labels_length)))
            dmasked.append(tf.boolean_mask(logits_digits[i], masks[i]))
            lmasked.append(tf.boolean_mask(digit_labels_t[i], masks[i]))
            loss += tf.cond(tf.less(0, tf.shape(lmasked[i])[0]),
                            lambda: tf.reduce_mean(
                                tf.nn.sparse_softmax_cross_entropy_with_logits(dmasked[i], lmasked[i])),
                            lambda: tf.constant(0.0))

        masks = tf.pack(masks)
        tf.scalar_summary("loss", loss)
        return loss, masks, lmasked, dmasked


def train(learning_rate, loss):
    with tf.name_scope('train_op'):
        global_step = tf.Variable(0, name="global_step")
        #         learning_rate = tf.train.polynomial_decay(starter_learning_rate, global_step, 7000, 0.0001, power=0.5)
        tf.scalar_summary("learning rate", learning_rate)
        train_op = tf.train.AdamOptimizer(learning_rate=learning_rate, name='Adam').minimize(loss,
                                                                                             global_step=global_step)
        #         train_op = tf.train.RMSPropOptimizer(starter_learning_rate, decay=0.9, momentum=0.0, epsilon=1e-10, use_locking=False, centered=True, name='RMSProp').minimize(loss,global_step=global_step)
        return global_step, train_op


def preds_and_accuracy(logits_length, logits_digits, masks, lmasked, dmasked, labels_length, digit_labels_t):
    # length prediction
    with tf.name_scope('predictions'):
        preds_l = tf.nn.softmax(logits_length)
        # digits predictions (including non existing digits)
        preds_d = [tf.nn.softmax(logits_digits[i]) for i in range(num_labels)]

    ### ACCURACY REPORTING ###
    with tf.name_scope('correct'):
        # Compute a (i=num_labels,j=?) tensor where True means 
        # that the ith digit from the jth number was correctly predicted.
        correct_preds_d = tf.pack(
            [tf.equal(tf.cast(tf.argmax(preds_d[i], 1), tf.int32), digit_labels_t[i]) for i in range(num_labels)])

        # Derive a (?,num_labels) binary matrix where:
        # 1 means the j-th digit from i-th number was predicted correctly and
        # 0 means the digit is either not predicted correctly or not present (if the digit position >= number length)
        correctness_matrix = tf.transpose(tf.mul(tf.cast(correct_preds_d, tf.int32), tf.cast(masks, tf.int32)))

        # The global prediction is correct if the sum along a given row equals the length of this row's number.
        correct = tf.equal(tf.reduce_sum(correctness_matrix, 1), tf.cast(tf_get_length_value(labels_length), tf.int32))
        correct_l = tf.equal(tf.cast(tf.argmax(preds_l, 1), tf.int32), labels_length)

        # Remove predictions of non existent digits: (ex: prediction for digit 4 when there are only 3 digits)
        # Doing so allows to compute accurate accuracy for each digit.
        preds_d_masked = [tf.nn.softmax(tf.boolean_mask(logits_digits[i], masks[i])) for i in range(num_labels)]
        correct_d = [tf.equal(tf.cast(tf.argmax(preds_d_masked[i], 1), tf.int32), lmasked[i]) for i in
                     range(num_labels)]

    with tf.name_scope('accuracy'):
        # Accuracy
        accuracy = tf.reduce_mean(tf.cast(correct, tf.float32)) * 100
        # Length prediction accuracy       
        accuracy_l = tf.reduce_mean(tf.cast(correct_l, tf.float32)) * 100
        # Individual digits predictions accuracy
        accuracy_d = tf.pack([tf.reduce_mean(tf.cast(correct_d[i], tf.float32)) * 100 for i in range(num_labels)])
        # create a summary for our cost and accuracy
        tf.scalar_summary("accuracy", accuracy)
        tf.scalar_summary("accuracy length", accuracy_l)
        for i in range(6):
            tf.scalar_summary(("accuracy digit %d" % i), accuracy_d[i])

    return preds_l, preds_d, accuracy_l, accuracy_d, accuracy, correct


def read_and_decode_single_example(filenames, num_epochs=None):
    filename_queue = tf.train.string_input_producer(filenames, num_epochs=None)
    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)
    features = tf.parse_single_example(
        serialized_example,
        features={
            'labels_value': tf.FixedLenFeature([], tf.int64),
            'labels_length': tf.FixedLenFeature([], tf.int64),
            'labels_digits': tf.FixedLenFeature([6], tf.int64),
            'data': tf.FixedLenFeature([64, 64, 3], tf.float32),
        })
    return features['data'], features['labels_digits'], features['labels_length'], features['labels_value']


def get_inputs(files, batch_size=16, num_epochs=None, is_training=True):
    thread_count = multiprocessing.cpu_count()
    # The minimum number of instances in a queue from which examples are drawn
    # randomly. The larger this number, the more randomness at the expense of
    # higher memory requirements.
    MIN_AFTER_DEQUEUE = 100

    # When batching data, the queue's capacity will be larger than the batch_size
    # by some factor. The recommended formula is (num_threads + a small safety
    # margin). For now, we use a single thread for reading, so this can be small.
    QUEUE_SIZE_MULTIPLIER = thread_count + 3
    capacity = MIN_AFTER_DEQUEUE + QUEUE_SIZE_MULTIPLIER * batch_size
    # input images
    with tf.name_scope('input'):
        # get single examples
        image, digits, length, value = read_and_decode_single_example(files, num_epochs)
        # groups examples into batches randomly
        data = None
        digit_labels = None
        labels_length = None
        labels_value = None

        if is_training:
            data, digit_labels, labels_length, labels_value = tf.train.shuffle_batch([image, digits, length, value],
                                                                                     batch_size=batch_size,
                                                                                     capacity=capacity,
                                                                                     min_after_dequeue=MIN_AFTER_DEQUEUE,
                                                                                     num_threads=thread_count)
        else:
            data, digit_labels, labels_length, labels_value = tf.train.batch([image, digits, length, value],
                                                                             batch_size=batch_size,
                                                                             capacity=capacity,
                                                                             num_threads=thread_count)

        labels_length = tf.cast(labels_length, tf.int32, name="cast_labels_lengths")
        labels_value = tf.cast(labels_value, tf.int32, name="cast_labels_value")
        digit_labels = tf.transpose(tf.cast(digit_labels, tf.int32, name="cast_digit_labels"),
                                    name="transpose_digit_labels")
        log(data.get_shape())
        return data, digit_labels, labels_length, labels_value;


def evaluate(args, trial_id, glob_step, test=False):
    with tf.Graph().as_default() as graph_eval:
        log("Running eval on %s set." % ('test' if test else 'valid'))
        paths = args.test_data_paths if test else args.eval_data_paths
        dataset_size = args.test_dataset_size if test else args.eval_dataset_size
        data, digit_labels_t, labels_length, labels_value = get_inputs(paths,
                                                                       batch_size=args.eval_batch_size, num_epochs=None,
                                                                       is_training=False)
        # get features
        features, regularizers = get_features(data, keep_prob=1, is_training=False)
        # get logits
        logits_length, logits_digits = get_logits(features)
        # get loss
        loss, masks, lmasked, dmasked = get_loss(logits_length, logits_digits, labels_length, digit_labels_t,
                                                 args.l2_beta, args.eval_batch_size, regularizers)
        # inference and accuracy
        preds_l, preds_d, accuracy_l, accuracy_d, accuracy, correct = preds_and_accuracy(logits_length, logits_digits,
                                                                                         masks, lmasked, dmasked,
                                                                                         labels_length, digit_labels_t)
        # merge all summaries into a single "operation" which we can execute in a session 
        summary_op = tf.merge_all_summaries()

        saver = tf.train.Saver()
        summary_writer = None
        
        subdir = 'eval' if not test else 'test'
        log('sub %s' % subdir)
        summary_writer = tf.train.SummaryWriter(os.path.join(args.summaries_path, subdir), graph=graph_eval)
        sv = tf.train.Supervisor(graph=graph_eval,
                                 logdir=os.path.join(args.checkpoints_dir, subdir),
                                 summary_op=None,
                                 summary_writer=summary_writer,
                                 global_step=None,
                                 save_model_secs=0,
                                 saver=saver)

        eval_qty = 0
        acc_d = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
        acc_l = 0.0
        acc = 0.0
        with sv.managed_session(master="", start_standard_services=False) as sess_eval:
            last_checkpoint = tf.train.latest_checkpoint(os.path.join(args.checkpoints_dir, 'train'))
            if last_checkpoint is None:
                log("No checkpoint available for model evaluation.")
                return
            sv.saver.restore(sess_eval, last_checkpoint)
            sv.start_queue_runners(sess_eval)
            while not sv.should_stop() and eval_qty < dataset_size:
                vpreds_d, vpreds_l, vacc_d, vacc_l, vcorr, vacc, values = sess_eval.run(
                    [preds_d, preds_l, accuracy_d, accuracy_l, correct, accuracy, labels_value])
                acc += vacc
                acc_l += vacc_l
                for i in range(len(vacc_d)):
                    acc_d[i] += vacc_d[i]
#                 if glob_step >= 0:
#                     log("Eval batch:%d - len:%d " % (eval_qty / args.eval_batch_size, len(vpreds_d[0])))
#                     log("Valid. acc: %.1f%% len: %.1f%% d0: %.1f%% d1: %.1f%% d2: %.1f%% d3: %.1f%% d4: %.1f%% d5: %.1f%%"
#                         % (vacc, vacc_l, vacc_d[0], vacc_d[1], vacc_d[2], vacc_d[3], vacc_d[4], vacc_d[5]))
                eval_qty += args.eval_batch_size
                
            # write summary when all batches are evaluated
            num_steps = eval_qty / args.eval_batch_size
            acc /= num_steps
            acc_l /= num_steps
            acc_d /= num_steps

            log("Valid. acc: %.1f%% len: %.1f%% d0: %.1f%% d1: %.1f%% d2: %.1f%% d3: %.1f%% d4: %.1f%% d5: %.1f%%"
                % (acc, acc_l, acc_d[0], acc_d[1], acc_d[2], acc_d[3], acc_d[4], acc_d[5]))
            summary = tf.Summary(value=[
                tf.Summary.Value(tag="accuracy", simple_value=acc),
                tf.Summary.Value(tag="accuracy length", simple_value=acc_l),
                tf.Summary.Value(tag="accuracy digit 0", simple_value=acc_d[0]),
                tf.Summary.Value(tag="accuracy digit 1", simple_value=acc_d[1]),
                tf.Summary.Value(tag="accuracy digit 2", simple_value=acc_d[2]),
                tf.Summary.Value(tag="accuracy digit 3", simple_value=acc_d[3]),
                tf.Summary.Value(tag="accuracy digit 4", simple_value=acc_d[4]),
                tf.Summary.Value(tag="accuracy digit 5", simple_value=acc_d[5]),
            ])
            summary_writer.add_summary(summary, max(0,glob_step))
        log("eval finished")

def get_length_from_class(class_id):
    return class_id + 1


def run_training(args, target, is_chief, device_fn, trial_id):
    with tf.Graph().as_default() as graph:
        # Assigns ops to the local worker by default.
        with tf.device(device_fn):

            # get inputs
            data, digit_labels_t, labels_length, _ = get_inputs(args.train_data_paths, batch_size=args.batch_size,
                                                                num_epochs=args.num_epochs)
            # get features
            features, regularizers = get_features(data, keep_prob=args.keep_prob, is_training=True)
            # get logits
            logits_length, logits_digits = get_logits(features)
            # get loss
            loss, masks, lmasked, dmasked = get_loss(logits_length, logits_digits, labels_length, digit_labels_t,
                                                     args.l2_beta, args.batch_size, regularizers)
            # Train op
            global_step, train_op = train(args.learning_rate, loss)

            # inference and accuracy
            preds_l, preds_d, accuracy_l, accuracy_d, accuracy, correct = preds_and_accuracy(logits_length,
                                                                                             logits_digits, masks,
                                                                                             lmasked, dmasked,
                                                                                             labels_length,
                                                                                             digit_labels_t)

            # merge all summaries into a single "operation" which we can execute in a session 
            summary_op = tf.merge_all_summaries()

            glob_step = 0
            init_op = tf.initialize_all_variables()
            summary_writer = tf.train.SummaryWriter(os.path.join(args.summaries_path, 'train'), graph=graph)
            saver = tf.train.Saver()

            # Create a "supervisor", which oversees the training process.
            sv = tf.train.Supervisor(is_chief=is_chief,
                                     logdir=os.path.join(args.checkpoints_dir, 'train'),
                                     init_op=init_op,
                                     saver=saver,
                                     summary_writer=summary_writer,
                                     summary_op=summary_op,
                                     global_step=global_step,
                                     save_summaries_secs=args.save_summaries_secs,
                                     save_model_secs=args.save_model_secs)

            with sv.managed_session(target) as sess:
                sess_start_time = time.time()
                eval_counter = time.time()
                while not sv.should_stop() and glob_step <= args.max_steps:

                    _, loss_value, batch_accuracy, glob_step = sess.run([train_op, loss, accuracy, global_step])

                    # Write the summaries and log an overview fairly often.
                    if glob_step % 100 == 0:
                        log("Step %d: loss = %.2f (%.3f sec)" % (glob_step, loss_value, time.time() - sess_start_time))

                    if time.time() - eval_counter > args.eval_model_secs and is_chief:
                        start_time2 = time.time()
                        evaluate(args, trial_id, glob_step)
                        log("Step %d: saving and reporting took %.3f sec" % (glob_step, time.time() - start_time2))
                        eval_counter = time.time()

                if is_chief:
                    # Force a save at the end of our loop.
                    log("Saving before closing")
                    sv.saver.save(sess, sv.save_path, global_step=global_step, write_meta_graph=False)
                    evaluate(args, trial_id, args.max_steps, test=True)

                    # Save the model for inference
    # export_model(args, sess, sv.saver, trial_id)

    # Ask for all the services to stop.
    sv.stop()
    tf.logging.info("Done training.")


In [36]:
%%mlalpha module --name task --main

import datetime
import random
import tensorflow as tf
import os
from . import svhn
import google.cloud.ml as ml
import argparse
import json
import multiprocessing

def start_server(cluster, task):
  # Create and start a server.
  return tf.train.Server(cluster,
                         protocol="grpc",
                         job_name=task['type'],
                         task_index=task['index'])

def dispatch(args, cluster, task, job, trial_id):
  
  if not cluster:
    # Run locally.
    if args.train_data_paths is None or len(args.train_data_paths) == 0:
      svhn.evaluate(args, trial_id, -1, test=True)
    else:
      svhn.run_training(args, target="", is_chief=True, device_fn="", trial_id=trial_id)
    return

  if task['type'] == "ps":
    server = start_server(cluster, task)
    server.join()
  elif task['type'] == "worker":
    server = start_server(cluster, task)
    is_chief = False
    device_fn = tf.train.replica_device_setter(
        ps_device="/job:ps",
        worker_device="/job:worker/task:%d" % task['index'],
        cluster=cluster)
    if args.train_data_paths is None or len(args.train_data_paths) == 0:
      svhn.evaluate(args, trial_id, -1, test=True)
    else:
      svhn.run_training(args, server.target, is_chief, device_fn, trial_id)
  elif task['type'] == "master":
    server = start_server(cluster, task)
    is_chief = (task['index'] == 0)
    device_fn = tf.train.replica_device_setter(
        ps_device="/job:ps",
        worker_device="/job:master/task:%d" % task['index'],
        cluster=cluster)
    if args.train_data_paths is None or len(args.train_data_paths) == 0:
      svhn.evaluate(args, trial_id, -1, test=True)
    else:
      svhn.run_training(args, server.target, is_chief, device_fn, trial_id)
  else:
    raise ValueError("invalid job_type %s" % task['type'])
    

def parse_arguments():
  parser = argparse.ArgumentParser()
  parser.add_argument('--train_data_paths', type=str, action='append')
  parser.add_argument('--eval_data_paths', type=str, action='append')
  parser.add_argument('--test_data_paths', type=str, action='append')
  parser.add_argument('--metadata_path', type=str)
  parser.add_argument('--output_path', type=str)
  parser.add_argument('--max_steps', type=int, default=5000)
  parser.add_argument('--verbose', type=bool, default=True)
  parser.add_argument('--learning_rate', type=float, default=0.001)
  parser.add_argument('--l2_beta', type=float, default=0.001)
  parser.add_argument('--batch_size', type=int, default=32)
  parser.add_argument('--eval_batch_size', type=int, default=128)
  parser.add_argument('--summaries_path', type=str)
  parser.add_argument('--checkpoints_dir', type=str)
  parser.add_argument('--num_epochs', type=int)
  parser.add_argument('--keep_prob', type=float, default=0.5)
  parser.add_argument('--save_model_secs', type=int, default=120)
  parser.add_argument('--save_summaries_secs', type=int, default=120)
  parser.add_argument('--eval_model_secs', type=int, default=600)
  parser.add_argument('--eval_dataset_size', type=int, default=0)
  parser.add_argument('--test_dataset_size', type=int, default=0)
  return parser.parse_args()

def main():
  config = json.loads(os.environ.get('TF_CONFIG', '{}'))
  cluster = config.get('cluster', None)
  task = config.get('task', None)
  job = config.get('job', None)
  # First find out if there's a task value on the environment variable.
  # If there is none or it is empty define a default one.
  task_data = config.get('task', None) or {'type': 'master', 'index': 0}
  args = parse_arguments()
  trial_id = task_data.get('trial')
  if trial_id is not None:
    output_dir = os.path.join(args.checkpoints_dir, trial_id)
  else:
    output_dir = args.checkpoints_dir
  dispatch(args, cluster, task, job, trial_id)

if __name__ == '__main__':
  tf.logging.set_verbosity(tf.logging.INFO)
  main()

Define paths variables for local training

In [37]:
import os
import re
# project_dir = "/content/git/TensorFlow-Udacity-Course/final_project/final"
project_dir = "/content/git/tensorflow/final_project/final"
package_local_dir = os.path.join(project_dir, 'tmp', 'distributed-rgb')
package_path_local = os.path.join(package_local_dir, "trainer-0.1.tar.gz")

train_paths_local = [str(os.path.join(project_dir , 'data', 'tfrecords', 'rgb','train', i)) 
                    for i in os.listdir(os.path.join(project_dir , 'data', 'tfrecords',  'rgb', 'train')) if re.match(r'.*\.tfrecords', i)]
eval_paths_local =[str(os.path.join(project_dir , 'data', 'tfrecords', 'rgb','valid', i)) 
                    for i in os.listdir(os.path.join(project_dir , 'data', 'tfrecords',  'rgb', 'valid')) if re.match(r'.*\.tfrecords', i)]
test_paths_local =[str(os.path.join(project_dir , 'data', 'tfrecords', 'rgb','test', i)) 
                    for i in os.listdir(os.path.join(project_dir , 'data', 'tfrecords',  'rgb', 'test')) if re.match(r'.*\.tfrecords', i)]
cropped_test_paths_local =[str(os.path.join(project_dir , 'data', 'tfrecords', 'rgb','test-cropped', i)) 
                    for i in os.listdir(os.path.join(project_dir , 'data', 'tfrecords',  'rgb', 'test-cropped')) if re.match(r'.*\.tfrecords', i)]

checkpoints_dir_local = os.path.join(project_dir, 'output','checkpoints')
summaries_path_local = os.path.join(project_dir, 'output','summaries')

if not os.path.exists(package_local_dir):
  os.makedirs(package_local_dir)
if not os.path.exists(summaries_path_local):
  os.makedirs(summaries_path_local)
if not os.path.exists(checkpoints_dir_local):
  os.makedirs(checkpoints_dir_local)

Export the package as a tar ball.

In [38]:
%%mlalpha package --out $package_local_dir --name trainer


Package created at /content/git/tensorflow/final_project/final/tmp/distributed-rgb/trainer-0.1.tar.gz.


Start the training job locally with the parameters defined above

In [None]:
%%mlalpha train
package_uris : $package_path_local
python_module: trainer.task
parameter_server_count: 1
worker_count: 1
args:
  train_data_paths : $train_paths_local
  eval_data_paths :  $eval_paths_local 
  test_data_paths :  $cropped_test_paths_local   
  checkpoints_dir : $checkpoints_dir_local  
  summaries_path: $summaries_path_local   
  learning_rate: 0.0001
  l2_beta: 16e-4
  max_steps : 30
  batch_size : 32
  keep_prob : 0.65
  eval_dataset_size : 3335
  eval_batch_size : 110
  save_summaries_sec : 30
  save_model_secs : 60
  eval_model_secs : 60

Run an evaluation pass locally. If no training paths are passed as arguments, the program runs an evaluation on the test files

In [40]:
%%mlalpha train
package_uris : $package_path_local
python_module: trainer.task
args:
  test_data_paths :  $cropped_test_paths_local   
  checkpoints_dir : $checkpoints_dir_local  
  summaries_path: $summaries_path_local  
  eval_batch_size : 16
  test_dataset_size : 160

## Training on the cloud with Google Cloud ML
- install gcloud from https://cloud.google.com/sdk/
- login with the command `gcloud auth login`
- set your current Google Cloud project with `gcloud config set project PROJECT_ID`
- to use TensorBoard you must provide Google Cloud Storage credentials with: `gcloud auth application-default login`
- Make sure Cloud ML APIs are enabled:
https://console.cloud.google.com/flows/enableapi?apiid=ml.googleapis.com,dataflow,compute_component,logging,storage_component,storage_api,bigquery
- Then grant your model access to your bucket. Change BUCKET_NAME and paste the instructions below in your shell:

        BUCKET_NAME="svhn-data"
        PROJECT_ID=$(gcloud config list project --format "value(core.project)")
  AUTH_TOKEN=$(gcloud auth print-access-token)
        SVC_ACCOUNT=$(curl -X GET -H "Content-Type: application/json" \
                -H "Authorization: Bearer $AUTH_TOKEN" \
                    https://ml.googleapis.com/v1beta1/projects/${PROJECT_ID}:getConfig \
                | python -c "import json; import sys; response = json.load(sys.stdin); \
                print response['serviceAccount']")
  gsutil -m defacl ch -u $SVC_ACCOUNT:R gs://$BUCKET_NAME
  gsutil -m acl ch -u $SVC_ACCOUNT:R -r gs://$BUCKET_NAME
  gsutil -m acl ch -u $SVC_ACCOUNT:W gs://$BUCKET_NAME

In [29]:
import os

train_file_max = 619 # the number of training .tfrecords files
valid_file_max = 3
test_file_max = 13
bucket = 'gs://' + 'svhn-data'
package_dir = os.path.join(bucket, 'model')
package_path_remote = os.path.join(package_dir, 'trainer-regularized-3.tar.gz')
train_paths_remote = [str(os.path.join(bucket, 'data', 'rgb', 'train', 'svhn-64x64-rgb-train.%d.tfrecords' % i)) for i in range (train_file_max+1)]
eval_paths_remote = [str(os.path.join(bucket, 'data', 'rgb', 'valid', 'svhn-64x64-rgb-valid.%d.tfrecords' % i)) for i in range (valid_file_max+1)]
checkpoints_dir_remote = os.path.join(bucket, 'output','checkpoints-regmodel-3')
checkpoints_dir_export = os.path.join(project_dir, 'output','export','checkpoints-regmodel-3')
summaries_path_remote = os.path.join(bucket, 'output','summaries-regmodel-3')
summaries_path_export = os.path.join(project_dir, 'output','export','summaries-regmodel-3')
test_paths_remote = [str(os.path.join(bucket, 'data', 'rgb','test', 'svhn-64x64-rgb-test.%d.tfrecords' % i)) for i in range (test_file_max+1)]
test_cropped_paths_remote = [str(os.path.join(bucket, 'data', 'rgb', 'test-cropped', 'svhn-64x64-rgb-test.%d.tfrecords' % i)) for i in range (test_file_max+1)]
if not os.path.exists(checkpoints_dir_export):
  os.makedirs(checkpoints_dir_export)
if not os.path.exists(summaries_path_export):
  os.makedirs(summaries_path_export)

Copy the package exported above to the Google Cloud Storage location defined above

In [30]:
!gsutil cp $package_path_local $package_path_remote

Copying file:///content/git/tensorflow/final_project/final/tmp/distributed-rgb/trainer-0.1.tar.gz [Content-Type=application/x-tar]...
- [1 files][  7.5 KiB/  7.5 KiB]                                                
Operation completed over 1 objects/7.5 KiB.                                      


Clean outputs from previous jobs

In [None]:
# !gsutil -m rm -r $summaries_path_remote
# !gsutil -m rm -r $checkpoints_dir_remote/*.*

Start training job in the cloud

In [31]:
%mlalpha train --cloud
package_uris: $package_path_remote
python_module: trainer.task
scale_tier: STANDARD_1
region: us-central1
args:
  train_data_paths : $train_paths_remote
  eval_data_paths :  $eval_paths_remote 
  test_data_paths : $test_paths_remote
  checkpoints_dir : $checkpoints_dir_remote  
  summaries_path: $summaries_path_remote
  learning_rate: 0.0001
  l2_beta: 6e-4
  max_steps : 200500
  batch_size : 32
  eval_dataset_size : 3300
  test_dataset_size : 13000
  eval_batch_size : 110  
  save_summaries_sec : 120
  save_model_secs : 120
  eval_model_secs : 900

Print out Tensorboard command line instruction
Authorize Tensorboard with `gcloud auth application-default login`before launching it.

In [33]:
print ("tensorboard start --logdir=train:%s,eval:%s,test:%s" % (os.path.join(summaries_path_remote,"train"),os.path.join(summaries_path_remote,"eval"),os.path.join(summaries_path_remote,"test")))

tensorboard start --logdir=train:gs://svhn-data/output/summaries-regmodel-3/train,eval:gs://svhn-data/output/summaries-regmodel-3/eval,test:gs://svhn-data/output/summaries-regmodel-3/test


## Evaluate the model
Backup first locally

In [None]:
!gsutil -m rsync -d -r $summaries_path_remote $summaries_path_export
!gsutil -m rsync -d -r $checkpoints_dir_remote $checkpoints_dir_export

Run an evaluation on the test set by omitting training data in arguments

In [42]:
%mlalpha train --cloud
package_uris: $package_path_remote
python_module: trainer.task
scale_tier: BASIC
region: us-central1
args:
  test_data_paths : $test_cropped_paths_remote 
  checkpoints_dir : $checkpoints_dir_remote
  summaries_path: $summaries_path_remote
  eval_batch_size : 130
  test_dataset_size : 13000