In [1]:
def mnist_fun(args, ctx):
    
    def print_log(worker_num, arg):
        print("%d: " %worker_num)
        print(arg)

    from tensorflowonspark import TFNode
    from datetime import datetime
    import getpass
    import math
    import numpy
    import os
    import signal
    import tensorflow as tf
    import time
    from skimage.color import lab2rgb
  
    # Used to get TensorBoard logdir for TensorBoard that show up in HopsWorks
    from hops import tensorboard

    worker_num = ctx.worker_num
    job_name = ctx.job_name
    task_index = ctx.task_index
    cluster_spec = ctx.cluster_spec
    num_workers = len(cluster_spec['worker'])

    # Delay PS nodes a bit, since workers seem to reserve GPUs more quickly/reliably (w/o conflict)
    if job_name == "ps":
        time.sleep((worker_num + 1) * 5)

    # Parameters
    batch_size   = 100

    # Get TF cluster and server instances
    cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)

    def read_tfr_examples(path, batch_size=100, num_epochs=None, task_index=None, num_workers=None):
        print_log(worker_num, "num_epochs: {0}".format(num_epochs))

        # Setup queue of TFRecord filenames
        tf_record_pattern = os.path.join(path, 'part-*')
        files = tf.gfile.Glob(tf_record_pattern)
        queue_name = "file_queue"

        # split input files across workers, if specified
        if task_index is not None and num_workers is not None:
            num_files = len(files)
            files = files[task_index:num_files:num_workers]
            queue_name = "file_queue_{0}".format(task_index)

        print_log(worker_num, "files: {0}".format(files))
        file_queue = tf.train.string_input_producer(files, shuffle=False, capacity=1000, num_epochs=num_epochs, name=queue_name)

        # Setup reader for examples
        reader = tf.TFRecordReader(name="reader")
        _, serialized = reader.read(file_queue)
        feature_def = {
            'L': tf.FixedLenFeature([65536], tf.float32), 
            'A': tf.FixedLenFeature([65536], tf.float32), 
            'B': tf.FixedLenFeature([65536], tf.float32) 
        }
        
        features = tf.parse_single_example(serialized, feature_def)
        L = tf.reshape(tf.to_float(features['L']), [256, 256, 1])
        
        print_log(worker_num, "L: {0}".format(L))
        A = tf.reshape(tf.to_float(features['A']), [256, 256, 1])
        print_log(worker_num, "A: {0}".format(A))
        B = tf.reshape(tf.to_float(features['B']), [256, 256, 1])
        print_log(worker_num, "B: {0}".format(B))
        AB = tf.concat([A, B], 2)

        # Return a batch of examples
        return tf.train.batch([L, AB], batch_size, num_threads=args.readers, name="batch")
    
    def conv2DRelu(X, W, B, strides, padding):
        # strides: [batch_step, height_step, width_step, channel_step] 
        return tf.nn.relu(tf.nn.conv2d(X, W, strides=strides, padding=padding) + B)

    def conv2DTanh(X, W, B, strides, padding):
        # strides: [batch_step, height_step, width_step, channel_step] 
        return tf.nn.tanh(tf.nn.conv2d(X, W, strides=strides, padding=padding) + B)

    def weight(width, height, input_channels, output_channels, variable_name):
        # [width, height, input channel, output channel]
        W = tf.Variable(tf.truncated_normal([width, height, input_channels, output_channels], stddev=0.1), name=variable_name)
        tf.summary.histogram(variable_name, W)
        return W

    def bias(outputChannels, variable_name):
        B = tf.Variable(tf.zeros([outputChannels]), name=variable_name)
        tf.summary.histogram(variable_name, B)
        return B # bias for each output channel.

    def upSampling2D(X, height, width):
        return tf.image.resize_images(X, [height, width], tf.image.ResizeMethod.NEAREST_NEIGHBOR)

    def upSampleToOriginalSize(X, size):
        return tf.image.resize_images(X, size, tf.image.ResizeMethod.NEAREST_NEIGHBOR)

    def Conv2D(X, input_channels, output_channels, scan=3, activation='relu', padding='SAME', strides=1, layer_name="layer"):
        W = weight(scan, scan, input_channels, output_channels, layer_name + "/weight")
        B = bias(output_channels, layer_name + "/bias")
        if activation == 'relu':
            Y = conv2DRelu(X, W, B, [1,strides,strides,1], padding)
            return tf.nn.dropout(Y, tf.constant(0.75)) #pkeep
            #return conv2DRelu(X, W, B, [1,strides,strides,1], padding)
        else:
            return conv2DTanh(X, W, B, [1,strides,strides,1], padding)
        
    def extractImage(L, Y):
        Y_temp = tf.scalar_mul(128, Y)
        A, B = tf.split(Y_temp, [1,1], axis=3)
        A = tf.multiply(A, 128)
        B = tf.multiply(B, 128)
        LAB = tf.concat([L,A,B], 3)
        #return tf.py_func(lab2rgb, [LAB], LAB.dtype)#tf.float32)
        return LAB
    
    if job_name == "ps":
        server.join()
    elif job_name == "worker":
        # Assigns ops to the local worker by default.
        with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
        
            # Placeholders or QueueRunner/Readers for input data
            num_epochs = 1 if args.mode == "inference" else None if args.epochs == 0 else args.epochs
            #num_epochs = 100
            index = task_index if args.mode == "inference" else None
            workers = num_workers if args.mode == "inference" else None

            images = TFNode.hdfs_path(ctx, args.images)
            X, Y_ = read_tfr_examples(images, batch_size, num_epochs, index, workers)
            
            Y1 = Conv2D(X, 1, 8, 3, 'relu', 'SAME', 2, "input_layer")
            Y2 = Conv2D(Y1, 8, 8, 3, 'relu', 'SAME', 1, "hidden_layer_1")
            Y3 = Conv2D(Y2, 8, 16, 3, 'relu', 'SAME', 1, "hidden_layer_2")
            Y4 = Conv2D(Y3, 16, 16, 3, 'relu', 'SAME', 2, "hidden_layer_3")
            Y5 = Conv2D(Y4, 16, 32, 3, 'relu', 'SAME', 1, "hidden_layer_4")
            Y6 = Conv2D(Y5, 32, 32, 3, 'relu', 'SAME', 2, "hidden_layer_5")
            Y7 = upSampling2D(Y6, 64, 64)
            Y8 = Conv2D(Y7, 32, 32, 3, 'relu', 'SAME', 1, "hidden_layer_6")
            Y9 = upSampling2D(Y8, 128, 128)
            Y10 = Conv2D(Y9, 32, 16, 3, 'relu', 'SAME', 1, "hidden_layer_7")
            Y11 = upSampling2D(Y10, 256, 256)
            Y = Conv2D(Y11, 16, 2, 3, 'tanh', 'SAME', 1, "output_layer")

            '''Display pics'''
            tf.summary.image("bw_img", X)
            #tf.summary.image("color_img", extractImage(X, Y_))
            #tf.summary.image("colorized_img", extractImage(X, Y))

            # Define the loss function 
            loss = tf.reduce_mean(tf.squared_difference(Y, Y_), 1)
            tf.summary.scalar("mean_loss", tf.reduce_mean(loss))

            global_step = tf.Variable(0, name="global_step")
            # Define an optimizer
            train_op = tf.train.AdamOptimizer(0.00001).minimize(loss, global_step=global_step)

            '''TODO GERA THETTA'''
            ## Test trained model
            #label = tf.argmax(y_, 1, name="label")
            #prediction = tf.argmax(y, 1,name="prediction")
            #correct_prediction = tf.equal(prediction, label)
            #accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
            #tf.summary.scalar("acc", accuracy)

            saver = tf.train.Saver()
            summary_op = tf.summary.merge_all()
            init_op = tf.global_variables_initializer()

            # Create a "supervisor", which oversees the training process and stores model state into HDFS
            logdir = tensorboard.logdir()
            print("tensorflow model path: {0}".format(logdir))

            if job_name == "worker" and task_index == 0:
                summary_writer = tf.summary.FileWriter(logdir, graph=tf.get_default_graph())

            if args.mode == "train":
                sv = tf.train.Supervisor(is_chief=(task_index == 0),
                                       logdir=logdir,
                                       init_op=init_op,
                                       summary_op=None,
                                       summary_writer=None,
                                       saver=saver,
                                       global_step=global_step,
                                       stop_grace_secs=300,
                                       save_model_secs=10)
            else:
                sv = tf.train.Supervisor(is_chief=(task_index == 0),
                                       logdir=logdir,
                                       summary_op=None,
                                       saver=saver,
                                       global_step=global_step,
                                       stop_grace_secs=300,
                                       save_model_secs=0)
            output_dir = TFNode.hdfs_path(ctx, args.output)
            output_file = tf.gfile.Open("{0}/part-{1:05d}".format(output_dir, worker_num), mode='w')

            # The supervisor takes care of session initialization, restoring from
            # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:
        print("{0} session ready".format(datetime.now().isoformat()))

        # Loop until the supervisor shuts down or 1000000 steps have completed.
        step = 0
        count = 0
        while not sv.should_stop() and step < args.steps:
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.

            # using QueueRunners/Readers
            if args.mode == "train":
                #if (step % 100 == 0):
                    #print("{0} step: {1} accuracy: {2}".format(datetime.now().isoformat(), step, sess.run(accuracy)))
                _, summary, step = sess.run([train_op, summary_op, global_step])
                if sv.is_chief:
                    summary_writer.add_summary(summary, step)
            #else: # args.mode == "inference"
            #    #labels, pred, acc = sess.run([label, prediction, accuracy])
            #    labels, pred = sess.run([label, prediction])
            #    #print("label: {0}, pred: {1}".format(labels, pred))
            #    print("acc: {0}".format(acc))
            #    for i in range(len(labels)):
            #        count += 1
            #        output_file.write("{0} {1}\n".format(labels[i], pred[i]))
            #    print("count: {0}".format(count))

        if args.mode == "inference":
            output_file.close()
            # Delay chief worker from shutting down supervisor during inference, since it can load model, start session,
            # run inference and request stop before the other workers even start/sync their sessions.
        if task_index == 0:
            time.sleep(60)

        # Ask for all the services to stop.
        print("{0} stopping supervisor".format(datetime.now().isoformat()))
        sv.stop()

Starting Spark application


The code failed because of a fatal error:
	Session 4769 unexpectedly reached final status 'dead'. See logs:
stdout: 
18/01/03 01:08:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/01/03 01:08:45 INFO RMProxy: Connecting to ResourceManager at /10.0.104.190:8032
18/01/03 01:08:45 INFO Client: Requesting a new application from cluster with 30 NodeManagers
18/01/03 01:08:45 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (216000 MB per container)
18/01/03 01:08:45 INFO Client: Will allocate AM container, with 17740 MB memory including 1612 MB overhead
18/01/03 01:08:45 INFO Client: Setting up container launch context for our AM
18/01/03 01:08:45 INFO Client: Setting up the launch environment for our AM container
18/01/03 01:08:45 INFO Client: Preparing resources for our AM container
18/01/03 01:08:47 WARN Client: Neither spark.yarn.jars nor spar

In [2]:
from pyspark.context import SparkContext
from pyspark.conf import SparkConf

import argparse
import os
import numpy
import sys
import tensorflow as tf
import threading
from datetime import datetime
from hops import util
from hops import hdfs

from tensorflowonspark import TFCluster

sc = spark.sparkContext
num_executors = util.num_executors(spark)
num_ps = util.num_param_servers(spark)

parser = argparse.ArgumentParser()
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=0)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format", default='/Projects/' + hdfs.project_name() + '/imbd_face_dataset/small_processed')
#parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format", default='/Projects/' + hdfs.project_name() + '/mnist/train/images')
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format", default = '/Projects/' + hdfs.project_name() + '/mnist/train/labels')
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/test", default="mnist_model")
#parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/test", default='/Projects/' + hdfs.project_name() + '/imbd_face_dataset/model')
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster (for Spark Standalone)", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
args = parser.parse_args()
print("args:",args)


print("{0} ===== Start".format(datetime.now().isoformat()))

cluster = TFCluster.run(sc, mnist_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW)
cluster.shutdown()

print("{0} ===== Stop".format(datetime.now().isoformat()))


The code failed because of a fatal error:
	Session 4769 unexpectedly reached final status 'dead'. See logs:
stdout: 
18/01/03 01:08:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/01/03 01:08:45 INFO RMProxy: Connecting to ResourceManager at /10.0.104.190:8032
18/01/03 01:08:45 INFO Client: Requesting a new application from cluster with 30 NodeManagers
18/01/03 01:08:45 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (216000 MB per container)
18/01/03 01:08:45 INFO Client: Will allocate AM container, with 17740 MB memory including 1612 MB overhead
18/01/03 01:08:45 INFO Client: Setting up container launch context for our AM
18/01/03 01:08:45 INFO Client: Setting up the launch environment for our AM container
18/01/03 01:08:45 INFO Client: Preparing resources for our AM container
18/01/03 01:08:47 WARN Client: Neither spark.yarn.jars nor spar