diff --git a/tensorflowonspark/TFCluster.py b/tensorflowonspark/TFCluster.py index b19f56d4..7ee3a3d9 100755 --- a/tensorflowonspark/TFCluster.py +++ b/tensorflowonspark/TFCluster.py @@ -34,6 +34,8 @@ from . import TFManager from . import TFSparkNode +logger = logging.getLogger(__name__) + # status of TF background job tf_status = {} @@ -73,7 +75,7 @@ def train(self, dataRDD, num_epochs=0, feed_timeout=600, qname='input'): :feed_timeout: number of seconds after which data feeding times out (600 sec default) :qname: *INTERNAL USE*. """ - logging.info("Feeding training data") + logger.info("Feeding training data") assert self.input_mode == InputMode.SPARK, "TFCluster.train() requires InputMode.SPARK" assert qname in self.queues, "Unknown queue: {}".format(qname) assert num_epochs >= 0, "num_epochs cannot be negative" @@ -107,7 +109,7 @@ def inference(self, dataRDD, feed_timeout=600, qname='input'): Returns: A Spark RDD representing the output of the TensorFlow inferencing """ - logging.info("Feeding inference data") + logger.info("Feeding inference data") assert self.input_mode == InputMode.SPARK, "TFCluster.inference() requires InputMode.SPARK" assert qname in self.queues, "Unknown queue: {}".format(qname) return dataRDD.mapPartitions(TFSparkNode.inference(self.cluster_info, feed_timeout=feed_timeout, qname=qname)) @@ -123,7 +125,7 @@ def shutdown(self, ssc=None, grace_secs=0, timeout=259200): :grace_secs: Grace period to wait after all executors have completed their tasks before terminating the Spark application, e.g. to allow the chief worker to perform any final/cleanup duties like exporting or evaluating the model. Default is 0. :timeout: Time in seconds to wait for TF cluster to complete before terminating the Spark application. This can be useful if the TF code hangs for any reason. Default is 3 days. Use -1 to disable timeout. """ - logging.info("Stopping TensorFlow nodes") + logger.info("Waiting for TensorFlow nodes to complete...") # identify ps/workers ps_list, worker_list, eval_list = [], [], [] @@ -133,7 +135,7 @@ def shutdown(self, ssc=None, grace_secs=0, timeout=259200): # setup execution timeout if timeout > 0: def timeout_handler(signum, frame): - logging.error("TensorFlow execution timed out, exiting Spark application with error status") + logger.error("TensorFlow execution timed out, exiting Spark application with error status") self.sc.cancelAllJobs() self.sc.stop() sys.exit(1) @@ -146,7 +148,7 @@ def timeout_handler(signum, frame): # Spark Streaming while not ssc.awaitTerminationOrTimeout(1): if self.server.done: - logging.info("Server done, stopping StreamingContext") + logger.info("Server done, stopping StreamingContext") ssc.stop(stopSparkContext=False, stopGraceFully=True) break elif self.input_mode == InputMode.TENSORFLOW: @@ -175,12 +177,12 @@ def timeout_handler(signum, frame): # exit Spark application w/ err status if TF job had any errors if 'error' in tf_status: - logging.error("Exiting Spark application with error status.") + logger.error("Exiting Spark application with error status.") self.sc.cancelAllJobs() self.sc.stop() sys.exit(1) - logging.info("Shutting down cluster") + logger.info("Shutting down cluster") # shutdown queues and managers for "PS" executors. # note: we have to connect/shutdown from the spark driver, because these executors are "busy" and won't accept any other tasks. for node in ps_list + eval_list: @@ -230,7 +232,7 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo Returns: A TFCluster object representing the started cluster. """ - logging.info("Reserving TFSparkNodes {0}".format("w/ TensorBoard" if tensorboard else "")) + logger.info("Reserving TFSparkNodes {0}".format("w/ TensorBoard" if tensorboard else "")) if driver_ps_nodes and input_mode != InputMode.TENSORFLOW: raise Exception('running PS nodes on driver locally is only supported in InputMode.TENSORFLOW') @@ -263,7 +265,7 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo if num_workers > 0: cluster_template['worker'] = executors[:num_workers] - logging.info("cluster_template: {}".format(cluster_template)) + logger.info("cluster_template: {}".format(cluster_template)) # get default filesystem from spark defaultFS = sc._jsc.hadoopConfiguration().get("fs.defaultFS") @@ -279,7 +281,7 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo server_addr = server.start() # start TF nodes on all executors - logging.info("Starting TensorFlow on executors") + logger.info("Starting TensorFlow on executors") cluster_meta = { 'id': random.getrandbits(64), 'cluster_template': cluster_template, @@ -295,7 +297,7 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo if driver_ps_nodes: def _start_ps(node_index): - logging.info("starting ps node locally %d" % node_index) + logger.info("starting ps node locally %d" % node_index) TFSparkNode.run(map_fun, tf_args, cluster_meta, @@ -319,7 +321,7 @@ def _start(status): queues, background=(input_mode == InputMode.SPARK))) except Exception as e: - logging.error("Exception in TF background thread") + logger.error("Exception in TF background thread") status['error'] = str(e) t = threading.Thread(target=_start, args=(tf_status,)) @@ -329,23 +331,23 @@ def _start(status): t.start() # wait for executors to register and start TFNodes before continuing - logging.info("Waiting for TFSparkNodes to start") + logger.info("Waiting for TFSparkNodes to start") cluster_info = server.await_reservations(sc, tf_status, reservation_timeout) - logging.info("All TFSparkNodes started") + logger.info("All TFSparkNodes started") # print cluster_info and extract TensorBoard URL tb_url = None for node in cluster_info: - logging.info(node) + logger.info(node) if node['tb_port'] != 0: tb_url = "http://{0}:{1}".format(node['host'], node['tb_port']) if tb_url is not None: - logging.info("========================================================================================") - logging.info("") - logging.info("TensorBoard running at: {0}".format(tb_url)) - logging.info("") - logging.info("========================================================================================") + logger.info("========================================================================================") + logger.info("") + logger.info("TensorBoard running at: {0}".format(tb_url)) + logger.info("") + logger.info("========================================================================================") # since our "primary key" for each executor's TFManager is (host, executor_id), sanity check for duplicates # Note: this may occur if Spark retries failed Python tasks on the same executor. diff --git a/tensorflowonspark/TFNode.py b/tensorflowonspark/TFNode.py index 82754f7b..131c1260 100644 --- a/tensorflowonspark/TFNode.py +++ b/tensorflowonspark/TFNode.py @@ -19,6 +19,7 @@ from six.moves.queue import Empty from . import marker +logger = logging.getLogger(__name__) def hdfs_path(ctx, path): """Convenience function to create a Tensorflow-compatible absolute HDFS path from relative paths @@ -54,7 +55,7 @@ def hdfs_path(ctx, path): elif ctx.defaultFS.startswith("file://"): return "{0}/{1}/{2}".format(ctx.defaultFS, ctx.working_dir[1:], path) else: - logging.warn("Unknown scheme {0} with relative path: {1}".format(ctx.defaultFS, path)) + logger.warn("Unknown scheme {0} with relative path: {1}".format(ctx.defaultFS, path)) return "{0}/{1}".format(ctx.defaultFS, path) @@ -120,7 +121,7 @@ def next_batch(self, batch_size): Returns: A batch of items or a dictionary of tensors. """ - logging.debug("next_batch() invoked") + logger.debug("next_batch() invoked") queue = self.mgr.get_queue(self.qname_in) tensors = [] if self.input_tensors is None else {tensor: [] for tensor in self.input_tensors} count = 0 @@ -128,13 +129,13 @@ def next_batch(self, batch_size): item = queue.get(block=True) if item is None: # End of Feed - logging.info("next_batch() got None") + logger.info("next_batch() got None") queue.task_done() self.done_feeding = True break elif type(item) is marker.EndPartition: # End of Partition - logging.info("next_batch() got EndPartition") + logger.info("next_batch() got EndPartition") queue.task_done() if not self.train_mode and count > 0: break @@ -147,7 +148,7 @@ def next_batch(self, batch_size): tensors[self.input_tensors[i]].append(item[i]) count += 1 queue.task_done() - logging.debug("next_batch() returning {0} items".format(count)) + logger.debug("next_batch() returning {0} items".format(count)) return tensors def should_stop(self): @@ -163,11 +164,11 @@ def batch_results(self, results): Args: :results: array of output data for the equivalent batch of input data. """ - logging.debug("batch_results() invoked") + logger.debug("batch_results() invoked") queue = self.mgr.get_queue(self.qname_out) for item in results: queue.put(item, block=True) - logging.debug("batch_results() returning data") + logger.debug("batch_results() returning data") def terminate(self): """Terminate data feeding early. @@ -177,7 +178,7 @@ def terminate(self): to terminate an RDD operation early, so the extra partitions will still be sent to the executors (but will be ignored). Because of this, you should size your input data accordingly to avoid excessive overhead. """ - logging.info("terminate() invoked") + logger.info("terminate() invoked") self.mgr.set('state', 'terminating') # drop remaining items in the queue @@ -190,5 +191,5 @@ def terminate(self): queue.task_done() count += 1 except Empty: - logging.info("dropped {0} items from queue".format(count)) + logger.info("dropped {0} items from queue".format(count)) done = True diff --git a/tensorflowonspark/TFSparkNode.py b/tensorflowonspark/TFSparkNode.py index 948ad95e..e3ab3948 100755 --- a/tensorflowonspark/TFSparkNode.py +++ b/tensorflowonspark/TFSparkNode.py @@ -28,6 +28,8 @@ from . import reservation from . import util +logger = logging.getLogger(__name__) + class TFNodeContext: """Encapsulates unique metadata for a TensorFlowOnSpark node/executor and provides methods to interact with Spark and HDFS. @@ -114,7 +116,7 @@ def _get_manager(cluster_info, host, executor_id): "3. Spark dynamic allocation is disabled." raise Exception(msg) - logging.info("Connected to TFSparkNode.mgr on {0}, executor={1}, state={2}".format(host, executor_id, str(TFSparkNode.mgr.get('state')))) + logger.info("Connected to TFSparkNode.mgr on {0}, executor={1}, state={2}".format(host, executor_id, str(TFSparkNode.mgr.get('state')))) return TFSparkNode.mgr @@ -169,7 +171,7 @@ def _mapfn(iter): raise Exception("TFManager already started on {0}, executor={1}, state={2}".format(host, executor_id, str(TFSparkNode.mgr.get("state")))) else: # old state, just continue with creating new manager - logging.warn("Ignoring old TFManager with cluster_id {0}, requested cluster_id {1}".format(TFSparkNode.cluster_id, cluster_id)) + logger.warn("Ignoring old TFManager with cluster_id {0}, requested cluster_id {1}".format(TFSparkNode.cluster_id, cluster_id)) # start a TFManager and get a free port # use a random uuid as the authkey @@ -193,7 +195,7 @@ def _mapfn(iter): classpath = os.environ['CLASSPATH'] hadoop_path = os.path.join(os.environ['HADOOP_PREFIX'], 'bin', 'hadoop') hadoop_classpath = subprocess.check_output([hadoop_path, 'classpath', '--glob']).decode() - logging.debug("CLASSPATH: {0}".format(hadoop_classpath)) + logger.debug("CLASSPATH: {0}".format(hadoop_classpath)) os.environ['CLASSPATH'] = classpath + os.pathsep + hadoop_classpath # start TensorBoard if requested @@ -255,7 +257,7 @@ def _mapfn(iter): 'authkey': authkey } # register node metadata with server - logging.info("TFSparkNode.reserve: {0}".format(node_meta)) + logger.info("TFSparkNode.reserve: {0}".format(node_meta)) client.register(node_meta) # wait for other nodes to finish reservations cluster_info = client.await_reservations() @@ -269,7 +271,7 @@ def _mapfn(iter): if (node['executor_id'] == last_executor_id): raise Exception("Duplicate worker/task in cluster_info") last_executor_id = node['executor_id'] - logging.info("node: {0}".format(node)) + logger.info("node: {0}".format(node)) (njob, nhost, nport) = (node['job_name'], node['host'], node['port']) hosts = [] if njob not in cluster_spec else cluster_spec[njob] hosts.append("{0}:{1}".format(nhost, nport)) @@ -282,7 +284,7 @@ def _mapfn(iter): 'task': {'type': job_name, 'index': task_index}, 'environment': 'cloud' }) - logging.info("export TF_CONFIG: {}".format(tf_config)) + logger.info("export TF_CONFIG: {}".format(tf_config)) os.environ['TF_CONFIG'] = tf_config # reserve GPU(s) again, just before launching TF process (in case situation has changed) @@ -297,7 +299,7 @@ def _mapfn(iter): num_gpus = tf_args.num_gpus if 'num_gpus' in tf_args else 1 gpus_to_use = gpu_info.get_gpus(num_gpus, my_index) gpu_str = "GPUs" if num_gpus > 1 else "GPU" - logging.debug("Requested {} {}, setting CUDA_VISIBLE_DEVICES={}".format(num_gpus, gpu_str, gpus_to_use)) + logger.debug("Requested {} {}, setting CUDA_VISIBLE_DEVICES={}".format(num_gpus, gpu_str, gpus_to_use)) os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use # create a context object to hold metadata for TF @@ -333,7 +335,7 @@ def wrapper_fn_background(args, context): if job_name in ('ps', 'evaluator') or background: # invoke the TensorFlow main function in a background thread - logging.info("Starting TensorFlow {0}:{1} as {2} on cluster node {3} on background process".format( + logger.info("Starting TensorFlow {0}:{1} as {2} on cluster node {3} on background process".format( job_name, task_index, job_name, executor_id)) p = multiprocessing.Process(target=wrapper_fn_background, args=(tf_args, ctx)) @@ -353,17 +355,17 @@ def wrapper_fn_background(args, context): e_str = equeue.get() raise Exception("Exception in " + job_name + ":\n" + e_str) msg = queue.get(block=True) - logging.info("Got msg: {0}".format(msg)) + logger.info("Got msg: {0}".format(msg)) if msg is None: - logging.info("Terminating {}".format(job_name)) + logger.info("Terminating {}".format(job_name)) TFSparkNode.mgr.set('state', 'stopped') done = True queue.task_done() else: # otherwise, just run TF function in the main executor/worker thread - logging.info("Starting TensorFlow {0}:{1} on cluster node {2} on foreground thread".format(job_name, task_index, executor_id)) + logger.info("Starting TensorFlow {0}:{1} on cluster node {2} on foreground thread".format(job_name, task_index, executor_id)) wrapper_fn(tf_args, ctx) - logging.info("Finished TensorFlow {0}:{1} on cluster node {2}".format(job_name, task_index, executor_id)) + logger.info("Finished TensorFlow {0}:{1} on cluster node {2}".format(job_name, task_index, executor_id)) return _mapfn @@ -391,14 +393,14 @@ def _train(iter): raise Exception(msg) state = str(mgr.get('state')) - logging.info("mgr.state={0}".format(state)) + logger.info("mgr.state={0}".format(state)) terminating = state == "'terminating'" if terminating: - logging.info("mgr is terminating, skipping partition") + logger.info("mgr is terminating, skipping partition") count = sum(1 for item in iter) - logging.info("Skipped {0} items from partition".format(count)) + logger.info("Skipped {0} items from partition".format(count)) else: - logging.info("Feeding partition {0} into {1} queue {2}".format(iter, qname, queue)) + logger.info("Feeding partition {0} into {1} queue {2}".format(iter, qname, queue)) count = 0 for item in iter: count += 1 @@ -417,7 +419,7 @@ def _train(iter): if timeout <= 0: raise Exception("Timeout while feeding partition") - logging.info("Processed {0} items in partition".format(count)) + logger.info("Processed {0} items in partition".format(count)) # check if TF is terminating feed after this partition if not terminating: @@ -425,13 +427,13 @@ def _train(iter): terminating = state == "'terminating'" if terminating: try: - logging.info("TFSparkNode: requesting stop") + logger.info("TFSparkNode: requesting stop") client = reservation.Client(cluster_meta['server_addr']) client.request_stop() client.close() except Exception as e: # ignore any errors while requesting stop - logging.debug("Error while requesting stop: {0}".format(e)) + logger.debug("Error while requesting stop: {0}".format(e)) return [terminating] @@ -459,7 +461,7 @@ def _inference(iter): msg = "Queue '{}' not found on this node, check for exceptions on other nodes.".format(qname) raise Exception(msg) - logging.info("Feeding partition {0} into {1} queue {2}".format(iter, qname, queue_in)) + logger.info("Feeding partition {0} into {1} queue {2}".format(iter, qname, queue_in)) count = 0 for item in iter: count += 1 @@ -485,7 +487,7 @@ def _inference(iter): if timeout <= 0: raise Exception("Timeout while feeding partition") - logging.info("Processed {0} items in partition".format(count)) + logger.info("Processed {0} items in partition".format(count)) # read result queue results = [] @@ -496,7 +498,7 @@ def _inference(iter): count -= 1 queue_out.task_done() - logging.info("Finished processing partition") + logger.info("Finished processing partition") return results return _inference @@ -524,16 +526,16 @@ def _shutdown(iter): if node['host'] == host and node['executor_id'] == executor_id: tb_pid = node['tb_pid'] if tb_pid != 0: - logging.info("Stopping tensorboard (pid={0})".format(tb_pid)) + logger.info("Stopping tensorboard (pid={0})".format(tb_pid)) subprocess.Popen(["kill", str(tb_pid)]) # terminate any listening queues - logging.info("Stopping all queues") + logger.info("Stopping all queues") for q in queues: if q != 'error': try: queue = mgr.get_queue(q) - logging.info("Feeding None into {0} queue".format(q)) + logger.info("Feeding None into {0} queue".format(q)) queue.put(None, block=True) except (AttributeError, KeyError): msg = "Queue '{}' not found on this node, check for exceptions on other nodes.".format(q) @@ -541,7 +543,7 @@ def _shutdown(iter): # wait for grace period (after terminating feed queues) if grace_secs > 0: - logging.info("Waiting for {} second grace period".format(grace_secs)) + logger.info("Waiting for {} second grace period".format(grace_secs)) time.sleep(grace_secs) # then check for any late exceptions @@ -552,7 +554,7 @@ def _shutdown(iter): equeue.put(e_str) raise Exception("Exception in worker:\n" + e_str) - logging.info("Setting mgr.state to 'stopped'") + logger.info("Setting mgr.state to 'stopped'") mgr.set('state', 'stopped') return [True] diff --git a/tensorflowonspark/gpu_info.py b/tensorflowonspark/gpu_info.py index 9e776ab6..69ffdbc1 100644 --- a/tensorflowonspark/gpu_info.py +++ b/tensorflowonspark/gpu_info.py @@ -14,6 +14,8 @@ import subprocess import time +logger = logging.getLogger(__name__) + MAX_RETRIES = 3 #: Maximum retries to allocate GPUs @@ -54,7 +56,7 @@ def get_gpus(num_gpu=1, worker_index=-1): """ # get list of gpus (index, uuid) list_gpus = subprocess.check_output(["nvidia-smi", "--list-gpus"]).decode() - logging.debug("all GPUs:\n{0}".format(list_gpus)) + logger.debug("all GPUs:\n{0}".format(list_gpus)) # parse index and guid gpus = [x for x in list_gpus.split('\n') if len(x) > 0] @@ -68,24 +70,24 @@ def parse_gpu(gpu_str): retries = 0 while len(free_gpus) < num_gpu and retries < MAX_RETRIES: smi_output = subprocess.check_output(["nvidia-smi", "--format=csv,noheader,nounits", "--query-compute-apps=gpu_uuid"]).decode() - logging.debug("busy GPUs:\n{0}".format(smi_output)) + logger.debug("busy GPUs:\n{0}".format(smi_output)) busy_uuids = [x for x in smi_output.split('\n') if len(x) > 0] for uuid, index in gpu_list: if uuid not in busy_uuids: free_gpus.append(index) if len(free_gpus) < num_gpu: - logging.warn("Unable to find available GPUs: requested={0}, available={1}".format(num_gpu, len(free_gpus))) + logger.warn("Unable to find available GPUs: requested={0}, available={1}".format(num_gpu, len(free_gpus))) retries += 1 time.sleep(30 * retries) free_gpus = [] - logging.info("Available GPUs: {}".format(free_gpus)) + logger.info("Available GPUs: {}".format(free_gpus)) # if still can't find available GPUs, raise exception if len(free_gpus) < num_gpu: smi_output = subprocess.check_output(["nvidia-smi", "--format=csv", "--query-compute-apps=gpu_uuid,pid,process_name,used_gpu_memory"]).decode() - logging.info(": {0}".format(smi_output)) + logger.info(": {0}".format(smi_output)) raise Exception("Unable to find {} free GPU(s)\n{}".format(num_gpu, smi_output)) # Get logical placement @@ -99,7 +101,7 @@ def parse_gpu(gpu_str): if worker_index * num_gpu + num_gpu > num_available: worker_index = worker_index * num_gpu % num_available proposed_gpus = free_gpus[worker_index * num_gpu:(worker_index * num_gpu + num_gpu)] - logging.info("Proposed GPUs: {}".format(proposed_gpus)) + logger.info("Proposed GPUs: {}".format(proposed_gpus)) return ','.join(str(x) for x in proposed_gpus) diff --git a/tensorflowonspark/pipeline.py b/tensorflowonspark/pipeline.py index c06dfaea..bc7e37c6 100755 --- a/tensorflowonspark/pipeline.py +++ b/tensorflowonspark/pipeline.py @@ -28,6 +28,8 @@ import logging import sys +logger = logging.getLogger(__name__) + # TensorFlowOnSpark Params @@ -375,10 +377,10 @@ def _fit(self, dataset): """ sc = SparkContext.getOrCreate() - logging.info("===== 1. train args: {0}".format(self.args)) - logging.info("===== 2. train params: {0}".format(self._paramMap)) + logger.info("===== 1. train args: {0}".format(self.args)) + logger.info("===== 2. train params: {0}".format(self._paramMap)) local_args = self.merge_args_params() - logging.info("===== 3. train args + params: {0}".format(local_args)) + logger.info("===== 3. train args + params: {0}".format(local_args)) tf_args = self.args.argv if self.args.argv else local_args cluster = TFCluster.run(sc, self.train_fn, tf_args, local_args.cluster_size, local_args.num_ps, @@ -429,14 +431,14 @@ def _transform(self, dataset): output_cols = [col for tensor, col in sorted(self.getOutputMapping().items())] # output tensor => output col # run single-node inferencing on each executor - logging.info("input_cols: {}".format(input_cols)) - logging.info("output_cols: {}".format(output_cols)) + logger.info("input_cols: {}".format(input_cols)) + logger.info("output_cols: {}".format(output_cols)) # merge args + params - logging.info("===== 1. inference args: {0}".format(self.args)) - logging.info("===== 2. inference params: {0}".format(self._paramMap)) + logger.info("===== 1. inference args: {0}".format(self.args)) + logger.info("===== 2. inference params: {0}".format(self._paramMap)) local_args = self.merge_args_params() - logging.info("===== 3. inference args + params: {0}".format(local_args)) + logger.info("===== 3. inference args + params: {0}".format(local_args)) tf_args = self.args.argv if self.args.argv else local_args rdd_out = dataset.select(input_cols).rdd.mapPartitions(lambda it: _run_model(it, local_args, tf_args)) @@ -464,8 +466,8 @@ def _run_model(iterator, args, tf_args): """ single_node_env(tf_args) - logging.info("===== input_mapping: {}".format(args.input_mapping)) - logging.info("===== output_mapping: {}".format(args.output_mapping)) + logger.info("===== input_mapping: {}".format(args.input_mapping)) + logger.info("===== output_mapping: {}".format(args.output_mapping)) input_tensor_names = [tensor for col, tensor in sorted(args.input_mapping.items())] output_tensor_names = [tensor for tensor, col in sorted(args.output_mapping.items())] @@ -474,16 +476,16 @@ def _run_model(iterator, args, tf_args): # cache saved_model pred_fn to avoid reloading the model for each partition if not pred_fn or args != pred_args: assert args.export_dir, "Inferencing requires --export_dir argument" - logging.info("===== loading saved_model from: {}".format(args.export_dir)) + logger.info("===== loading saved_model from: {}".format(args.export_dir)) saved_model = tf.saved_model.load(args.export_dir, tags=args.tag_set) - logging.info("===== signature_def_key: {}".format(args.signature_def_key)) + logger.info("===== signature_def_key: {}".format(args.signature_def_key)) pred_fn = saved_model.signatures[args.signature_def_key] pred_args = args inputs_tensor_info = {i.name: i for i in pred_fn.inputs} - logging.info("===== inputs_tensor_info: {0}".format(inputs_tensor_info)) + logger.info("===== inputs_tensor_info: {0}".format(inputs_tensor_info)) outputs_tensor_info = pred_fn.outputs - logging.info("===== outputs_tensor_info: {0}".format(outputs_tensor_info)) + logger.info("===== outputs_tensor_info: {0}".format(outputs_tensor_info)) result = [] diff --git a/tensorflowonspark/reservation.py b/tensorflowonspark/reservation.py index e4e3463a..94a61835 100644 --- a/tensorflowonspark/reservation.py +++ b/tensorflowonspark/reservation.py @@ -20,6 +20,8 @@ from . import util +logger = logging.getLogger(__name__) + TFOS_SERVER_PORT = "TFOS_SERVER_PORT" TFOS_SERVER_HOST = "TFOS_SERVER_HOST" BUFSIZE = 1024 @@ -112,7 +114,7 @@ def await_reservations(self, sc, status={}, timeout=600): """Block until all reservations are received.""" timespent = 0 while not self.reservations.done(): - logging.info("waiting for {0} reservations".format(self.reservations.remaining())) + logger.info("waiting for {0} reservations".format(self.reservations.remaining())) # check status flags for any errors if 'error' in status: sc.cancelAllJobs() @@ -122,11 +124,11 @@ def await_reservations(self, sc, status={}, timeout=600): timespent += 1 if (timespent > timeout): raise Exception("timed out waiting for reservations to complete") - logging.info("all reservations completed") + logger.info("all reservations completed") return self.reservations.get() def _handle_message(self, sock, msg): - logging.debug("received: {0}".format(msg)) + logger.debug("received: {0}".format(msg)) msg_type = msg['type'] if msg_type == 'REG': self.reservations.add(msg['data']) @@ -137,7 +139,7 @@ def _handle_message(self, sock, msg): rinfo = self.reservations.get() MessageSocket.send(self, sock, rinfo) elif msg_type == 'STOP': - logging.info("setting server.done") + logger.info("setting server.done") MessageSocket.send(self, sock, 'OK') self.done = True else: @@ -155,7 +157,7 @@ def start(self): host = self.get_server_ip() port = server_sock.getsockname()[1] addr = (host, port) - logging.info("listening for reservations at {0}".format(addr)) + logger.info("listening for reservations at {0}".format(addr)) def _listen(self, sock): CONNECTIONS = [] @@ -167,13 +169,13 @@ def _listen(self, sock): if sock == server_sock: client_sock, client_addr = sock.accept() CONNECTIONS.append(client_sock) - logging.debug("client connected from {0}".format(client_addr)) + logger.debug("client connected from {0}".format(client_addr)) else: try: msg = self.receive(sock) self._handle_message(sock, msg) except Exception as e: - logging.debug(e) + logger.debug(e) sock.close() CONNECTIONS.remove(sock) @@ -215,7 +217,7 @@ def __init__(self, server_addr): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(server_addr) self.server_addr = server_addr - logging.info("connected to server at {0}".format(server_addr)) + logger.info("connected to server at {0}".format(server_addr)) def _request(self, msg_type, msg_data=None): """Helper function to wrap msg w/ msg_type.""" @@ -239,9 +241,9 @@ def _request(self, msg_type, msg_data=None): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(self.server_addr) - logging.debug("sent: {0}".format(msg)) + logger.debug("sent: {0}".format(msg)) resp = MessageSocket.receive(self, self.sock) - logging.debug("received: {0}".format(resp)) + logger.debug("received: {0}".format(resp)) return resp def close(self): diff --git a/tensorflowonspark/util.py b/tensorflowonspark/util.py index 5ebaa83b..38d1d117 100644 --- a/tensorflowonspark/util.py +++ b/tensorflowonspark/util.py @@ -15,6 +15,8 @@ from socket import error as socket_error from . import gpu_info +logger = logging.getLogger(__name__) + def single_node_env(num_gpus=1): """Setup environment variables for Hadoop compatibility and GPU allocation""" @@ -30,11 +32,11 @@ def single_node_env(num_gpus=1): # reserve GPU, if requested if tf.test.is_built_with_cuda(): gpus_to_use = gpu_info.get_gpus(num_gpus) - logging.info("Using gpu(s): {0}".format(gpus_to_use)) + logger.info("Using gpu(s): {0}".format(gpus_to_use)) os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use else: # CPU - logging.info("Using CPU") + logger.info("Using CPU") os.environ['CUDA_VISIBLE_DEVICES'] = '' diff --git a/test/README.md b/test/README.md index 7be39818..e6d3f4e6 100644 --- a/test/README.md +++ b/test/README.md @@ -27,7 +27,7 @@ cd ${TFoS_HOME}/tests ``` # Start Spark Standalone cluster export MASTER=spark://$(hostname):7077 -export SPARK_WORKER_INSTANCES=3; export CORES_PER_WORKER=1 +export SPARK_WORKER_INSTANCES=2; export CORES_PER_WORKER=1 export TOTAL_CORES=$((${CORES_PER_WORKER}*${SPARK_WORKER_INSTANCES})) ${SPARK_HOME}/sbin/start-master.sh; ${SPARK_HOME}/sbin/start-slave.sh -c ${CORES_PER_WORKER} -m 3G ${MASTER}