diff --git a/docs/.buildinfo b/docs/.buildinfo
index 36fb09ab..54c36710 100644
--- a/docs/.buildinfo
+++ b/docs/.buildinfo
@@ -1,4 +1,4 @@
# Sphinx build info version 1
# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done.
-config: abbb35398bf3c41c0f421213a6263bf9
+config: c31f3f4c132de601af2c7dd3d80ea76f
tags: 645f666f9bcd5a90fca523b33c5a78b7
diff --git a/docs/_modules/index.html b/docs/_modules/index.html
index 10c1b7d0..821b0c21 100644
--- a/docs/_modules/index.html
+++ b/docs/_modules/index.html
@@ -6,7 +6,7 @@
[docs]defrun(sc,map_fun,tf_args,num_executors,num_ps,tensorboard=False,input_mode=InputMode.TENSORFLOW,
- log_dir=None,driver_ps_nodes=False,queues=['input','output']):
+ log_dir=None,driver_ps_nodes=False,master_node=None,reservation_timeout=600,queues=['input','output','error']):"""Starts the TensorFlowOnSpark cluster and Runs the TensorFlow "main" function on the Spark executors Args:
@@ -247,6 +252,8 @@
Source code for tensorflowonspark.TFCluster
:input_mode: TFCluster.InputMode :log_dir: directory to save tensorboard event logs. If None, defaults to a fixed path on local filesystem. :driver_ps_nodes: run the PS nodes on the driver locally instead of on the spark executors; this help maximizing computing resources (esp. GPU). You will need to set cluster_size = num_executors + num_ps
+ :master_node: name of the "master" or "chief" node in the cluster_template, used for `tf.estimator` applications.
+ :reservation_timeout: number of seconds after which cluster reservation times out (600 sec default) :queues: *INTERNAL_USE* Returns:
@@ -261,8 +268,13 @@
Source code for tensorflowonspark.TFCluster
# build a cluster_spec template using worker_numscluster_template={}cluster_template['ps']=range(num_ps)
- cluster_template['worker']=range(num_ps,num_executors)
- logging.info("worker node range %s, ps node range %s"%(cluster_template['worker'],cluster_template['ps']))
+ ifmaster_nodeisNone:
+ cluster_template['worker']=range(num_ps,num_executors)
+ else:
+ cluster_template[master_node]=range(num_ps,num_ps+1)
+ ifnum_executors>num_ps+1:
+ cluster_template['worker']=range(num_ps+1,num_executors)
+ logging.info("cluster_template: {}".format(cluster_template))# get default filesystem from sparkdefaultFS=sc._jsc.hadoopConfiguration().get("fs.defaultFS")
@@ -308,20 +320,28 @@
Source code for tensorflowonspark.TFCluster
ps_thread.start()# start TF on a background thread (on Spark driver) to allow for feeding job
- def_start():
- nodeRDD.foreachPartition(TFSparkNode.run(map_fun,
- tf_args,
- cluster_meta,
- tensorboard,
- log_dir,
- queues,
- background=(input_mode==InputMode.SPARK)))
- t=threading.Thread(target=_start)
+ def_start(status):
+ try:
+ nodeRDD.foreachPartition(TFSparkNode.run(map_fun,
+ tf_args,
+ cluster_meta,
+ tensorboard,
+ log_dir,
+ queues,
+ background=(input_mode==InputMode.SPARK)))
+ exceptExceptionase:
+ logging.error("Exception in TF background thread")
+ status['error']=str(e)
+
+ t=threading.Thread(target=_start,args=(tf_status,))
+ # run as daemon thread so that in spark mode main thread can exit
+ # if feeder spark stage fails and main thread can't do explicit shutdown
+ t.daemon=Truet.start()# wait for executors to register and start TFNodes before continuinglogging.info("Waiting for TFSparkNodes to start")
- cluster_info=server.await_reservations()
+ cluster_info=server.await_reservations(sc,tf_status,reservation_timeout)logging.info("All TFSparkNodes started")# print cluster_info and extract TensorBoard URL
@@ -338,13 +358,17 @@
Source code for tensorflowonspark.TFCluster
logging.info("")logging.info("========================================================================================")
- # since our "primary key" for each executor's TFManager is (host, ppid), sanity check for duplicates
+ # since our "primary key" for each executor's TFManager is (host, executor_id), sanity check for duplicates# Note: this may occur if Spark retries failed Python tasks on the same executor.tb_nodes=set()fornodeincluster_info:
- node_id=(node['host'],node['ppid'])
+ node_id=(node['host'],node['executor_id'])ifnode_idintb_nodes:
- raiseException("Duplicate cluster node id detected (host={0}, ppid={1}). Please ensure that (1) the number of executors >= number of TensorFlow nodes, (2) the number of tasks per executors == 1, and (3) TFCluster.shutdown() is successfully invoked when done.".format(node_id[0],node_id[1]))
+ raiseException("Duplicate cluster node id detected (host={0}, executor_id={1})".format(node_id[0],node_id[1])+
+ "Please ensure that:\n"+
+ "1. Number of executors >= number of TensorFlow nodes\n"+
+ "2. Number of tasks per executors is 1\n"+
+ "3, TFCluster.shutdown() is successfully invoked when done.")else:tb_nodes.add(node_id)
@@ -358,8 +382,8 @@
[docs]defhdfs_path(ctx,path):"""Convenience function to create a Tensorflow-compatible absolute HDFS path from relative paths
@@ -85,6 +86,7 @@
Source code for tensorflowonspark.TFNode
logging.warn("Unknown scheme {0} with relative path: {1}".format(ctx.defaultFS,path))
return"{0}/{1}".format(ctx.defaultFS,path)
+
[docs]defstart_cluster_server(ctx,num_gpus=1,rdma=False):"""Function that wraps the creation of TensorFlow ``tf.train.Server`` for a node in a distributed TensorFlow cluster.
@@ -109,7 +111,8 @@
Source code for tensorflowonspark.TFNode
iftf.test.is_built_with_cuda():# GPUgpu_initialized=False
- whilenotgpu_initialized:
+ retries=3
+ whilenotgpu_initializedandretries>0:try:# override PS jobs to only reserve one GPUifctx.job_name=='ps':
@@ -135,7 +138,10 @@
[docs]defnext_batch(mgr,batch_size,qname='input'):"""*DEPRECATED*. Use TFNode.DataFeed class instead."""
raiseException("DEPRECATED: Use TFNode.DataFeed class instead")
+
[docs]defexport_saved_model(sess,export_dir,tag_set,signatures):"""Convenience function to export a saved_model using provided arguments
@@ -186,25 +194,29 @@
[docs]defbatch_results(mgr,results,qname='output'):"""*DEPRECATED*. Use TFNode.DataFeed class instead."""
raiseException("DEPRECATED: Use TFNode.DataFeed class instead")
+
[docs]defterminate(mgr,qname='input'):"""*DEPRECATED*. Use TFNode.DataFeed class instead."""
raiseException("DEPRECATED: Use TFNode.DataFeed class instead")
+
[docs]classDataFeed(object):"""This class manages the *InputMode.SPARK* data feeding process from the perspective of the TensorFlow application.
@@ -222,7 +234,7 @@
from __future__importnested_scopesfrom__future__importprint_function
+importjsonimportloggingimportmultiprocessingimportos
@@ -60,10 +61,12 @@
Source code for tensorflowonspark.TFSparkNode
from .importTFManagerfrom.importTFNode
+from.importgpu_infofrom.importmarkerfrom.importreservationfrom.importutil
+
[docs]classTFNodeContext:"""Encapsulates unique metadata for a TensorFlowOnSpark node/executor and provides methods to interact with Spark and HDFS.
@@ -71,7 +74,7 @@
Source code for tensorflowonspark.TFSparkNode
To simply the end-user API, this class now mirrors the functions of the TFNode module.
Args:
- :worker_num: integer identifier for this executor, per ``nodeRDD = sc.parallelize(range(num_executors), num_executors).``
+ :executor_id: integer identifier for this executor, per ``nodeRDD = sc.parallelize(range(num_executors), num_executors).`` :job_name: TensorFlow job name (e.g. 'ps' or 'worker') of this TF node, per cluster_spec. :task_index: integer rank per job_name, e.g. "worker:0", "worker:1", "ps:0". :cluster_spec: dictionary for constructing a tf.train.ClusterSpec.
@@ -79,8 +82,9 @@
Source code for tensorflowonspark.TFSparkNode
:working_dir: the current working directory for local filesystems, or YARN containers.
:mgr: TFManager instance for this Python worker. """
- def__init__(self,worker_num,job_name,task_index,cluster_spec,defaultFS,working_dir,mgr):
- self.worker_num=worker_num
+ def__init__(self,executor_id,job_name,task_index,cluster_spec,defaultFS,working_dir,mgr):
+ self.worker_num=executor_id# for backwards-compatibility
+ self.executor_id=executor_idself.job_name=job_nameself.task_index=task_indexself.cluster_spec=cluster_spec
@@ -121,22 +125,23 @@
Source code for tensorflowonspark.TFSparkNode
mgr =None#: TFManager instance
cluster_id=None#: Unique ID for a given TensorFlowOnSpark cluster, used for invalidating state for new clusters.
-def_get_manager(cluster_info,host,ppid):
+
+def_get_manager(cluster_info,host,executor_id):"""Returns this executor's "singleton" instance of the multiprocessing.Manager, reconnecting per python-worker if needed. Args: :cluster_info: cluster node reservations
- :host: host IP
- :ppid: parent (executor JVM) PID
+ :host: host IP address
+ :executor_id: unique id per executor (created during initial call to run()) Returns: TFManager instance for this executor/python-worker """fornodeincluster_info:
- ifnode['host']==hostandnode['ppid']==ppid:
+ ifnode['host']==hostandnode['executor_id']==executor_id:addr=node['addr']authkey=node['authkey']
- TFSparkNode.mgr=TFManager.connect(addr,authkey)
+ TFSparkNode.mgr=TFManager.connect(addr,authkey)breakifTFSparkNode.mgrisNone:
@@ -146,9 +151,10 @@
Source code for tensorflowonspark.TFSparkNode
"3. Spark dynamic allocation is disabled."
raiseException(msg)
- logging.info("Connected to TFSparkNode.mgr on {0}, ppid={1}, state={2}".format(host,ppid,str(TFSparkNode.mgr.get('state'))))
+ logging.info("Connected to TFSparkNode.mgr on {0}, executor={1}, state={2}".format(host,executor_id,str(TFSparkNode.mgr.get('state'))))returnTFSparkNode.mgr
+
[docs]defrun(fn,tf_args,cluster_meta,tensorboard,log_dir,queues,background):"""Wraps the user-provided TensorFlow main function in a Spark mapPartitions function.
@@ -165,9 +171,15 @@
Source code for tensorflowonspark.TFSparkNode
A nodeRDD.mapPartitions() function.
"""def_mapfn(iter):
+ importtensorflowastf
+
# Note: consuming the input iterator helps Pyspark re-use this worker,foriiniter:
- worker_num=i
+ executor_id=i
+
+ # run quick check of GPU infrastructure if using tensorflow-gpu
+ iftf.test.is_built_with_cuda():
+ gpus_to_use=gpu_info.get_gpus(1)# assign TF job/task based on provided cluster_spec template (or use default/null values)job_name='default'
@@ -176,21 +188,21 @@
Source code for tensorflowonspark.TFSparkNode
cluster_template =cluster_meta['cluster_template']forjobtypeincluster_template:nodes=cluster_template[jobtype]
- ifworker_numinnodes:
+ ifexecutor_idinnodes:job_name=jobtype
- task_index=nodes.index(worker_num)
+ task_index=nodes.index(executor_id)break
- # get unique id (hostname,ppid) for this executor's JVM
+ # get unique key (hostname, executor_id) for this executorhost=util.get_ip_address()
- ppid=os.getppid()
+ util.write_executor_id(executor_id)port=0# check for existing TFManagersifTFSparkNode.mgrisnotNoneandstr(TFSparkNode.mgr.get('state'))!="'stopped'":ifTFSparkNode.cluster_id==cluster_id:# raise an exception to force Spark to retry this "reservation" task on another executor
- raiseException("TFManager already started on {0}, ppid={1}, state={2}".format(host,ppid,str(TFSparkNode.mgr.get("state"))))
+ raiseException("TFManager already started on {0}, executor={1}, state={2}".format(host,executor_id,str(TFSparkNode.mgr.get("state"))))else:# old state, just continue with creating new managerlogging.warn("Ignoring old TFManager with cluster_id {0}, requested cluster_id {1}".format(TFSparkNode.cluster_id,cluster_id))
@@ -225,10 +237,10 @@
Source code for tensorflowonspark.TFSparkNode
tb_port =0iftensorboardandjob_name=='worker'andtask_index==0:tb_sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- tb_sock.bind(('',0))
+ tb_sock.bind(('',0))tb_port=tb_sock.getsockname()[1]tb_sock.close()
- logdir=log_diriflog_direlse"tensorboard_%d"%worker_num
+ logdir=log_diriflog_direlse"tensorboard_%d"%executor_id# search for tensorboard in python/bin, PATH, and PYTHONPATHpypath=sys.executable
@@ -252,8 +264,8 @@
# first, find a free port for TF
tmp_sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)tmp_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
- tmp_sock.bind(('',port))
+ tmp_sock.bind(('',port))port=tmp_sock.getsockname()[1]node_meta={
- 'worker_num':worker_num,
+ 'executor_id':executor_id,'host':host,
- 'ppid':ppid,'job_name':job_name,'task_index':task_index,'port':port,
@@ -285,21 +296,36 @@
Source code for tensorflowonspark.TFSparkNode
client.close()# construct a TensorFlow clusterspec from cluster_info
- sorted_cluster_info=sorted(cluster_info,key=lambdak:k['worker_num'])
+ sorted_cluster_info=sorted(cluster_info,key=lambdak:k['executor_id'])spec={}
- last_worker_num=-1
+ last_executor_id=-1fornodeinsorted_cluster_info:
- if(node['worker_num']==last_worker_num):
+ if(node['executor_id']==last_executor_id):raiseException("Duplicate worker/task in cluster_info")
- last_worker_num=node['worker_num']
+ last_executor_id=node['executor_id']logging.info("node: {0}".format(node))(njob,nhost,nport)=(node['job_name'],node['host'],node['port'])hosts=[]ifnjobnotinspecelsespec[njob]hosts.append("{0}:{1}".format(nhost,nport))spec[njob]=hosts
+ # update TF_CONFIG and reserve GPU for tf.estimator based code
+ # Note: this will execute but be ignored by non-tf.estimator code
+ tf_config=json.dumps({
+ 'cluster':spec,
+ 'task':{'type':job_name,'index':task_index},
+ 'environment':'cloud'
+ })
+ os.environ['TF_CONFIG']=tf_config
+ iftf.test.is_built_with_cuda():
+ num_gpus=tf_args.num_gpusif'num_gpus'intf_argselse1
+ gpus_to_use=gpu_info.get_gpus(num_gpus)
+ gpu_str="GPUs"ifnum_gpus>1else"GPU"
+ logging.debug("Requested {}{}, setting CUDA_VISIBLE_DEVICES={}".format(num_gpus,gpu_str,gpus_to_use))
+ os.environ['CUDA_VISIBLE_DEVICES']=gpus_to_use
+
# create a context object to hold metadata for TF
- ctx=TFNodeContext(worker_num,job_name,task_index,spec,cluster_meta['default_fs'],cluster_meta['working_dir'],TFSparkNode.mgr)
+ ctx=TFNodeContext(executor_id,job_name,task_index,spec,cluster_meta['default_fs'],cluster_meta['working_dir'],TFSparkNode.mgr)# release port reserved for TF as late as possibleiftmp_sockisnotNone:
@@ -333,7 +359,7 @@
Source code for tensorflowonspark.TFSparkNode
if job_name=='ps'orbackground:# invoke the TensorFlow main function in a background threadlogging.info("Starting TensorFlow {0}:{1} as {2} on cluster node {3} on background process".format(
- job_name,task_index,job_name,worker_num))
+ job_name,task_index,job_name,executor_id))p=multiprocessing.Process(target=wrapper_fn_background,args=(tf_args,ctx))ifjob_name=='ps':
@@ -361,17 +387,18 @@
Source code for tensorflowonspark.TFSparkNode
queue.task_done()else:# otherwise, just run TF function in the main executor/worker thread
- logging.info("Starting TensorFlow {0}:{1} on cluster node {2} on foreground thread".format(job_name,task_index,worker_num))
+ logging.info("Starting TensorFlow {0}:{1} on cluster node {2} on foreground thread".format(job_name,task_index,executor_id))wrapper_fn(tf_args,ctx)
- logging.info("Finished TensorFlow {0}:{1} on cluster node {2}".format(job_name,task_index,worker_num))
+ logging.info("Finished TensorFlow {0}:{1} on cluster node {2}".format(job_name,task_index,executor_id))
return_mapfn
+
[docs]deftrain(cluster_info,cluster_meta,qname='input'):"""Feeds Spark partitions into the shared multiprocessing.Queue. Args:
- :cluster_info: node reservation information for the cluster (e.g. host, ppid, pid, ports, etc)
+ :cluster_info: node reservation information for the cluster (e.g. host, executor_id, pid, ports, etc) :cluster_meta: dictionary of cluster metadata (e.g. cluster_id, reservation.Server address, etc) :qname: *INTERNAL_USE*
@@ -380,7 +407,7 @@
Source code for tensorflowonspark.TFSparkNode
"""
def_train(iter):# get shared queue, reconnecting if necessary
- mgr=_get_manager(cluster_info,util.get_ip_address(),os.getppid())
+ mgr=_get_manager(cluster_info,util.get_ip_address(),util.read_executor_id())try:queue=mgr.get_queue(qname)equeue=mgr.get_queue('error')
@@ -432,11 +459,12 @@
Source code for tensorflowonspark.TFSparkNode
return_train
+
[docs]definference(cluster_info,qname='input'):"""Feeds Spark partitions into the shared multiprocessing.Queue and returns inference results. Args:
- :cluster_info: node reservation information for the cluster (e.g. host, ppid, pid, ports, etc)
+ :cluster_info: node reservation information for the cluster (e.g. host, executor_id, pid, ports, etc) :qname: *INTERNAL_USE* Returns:
@@ -444,7 +472,7 @@
Source code for tensorflowonspark.TFSparkNode
"""
def_inference(iter):# get shared queue, reconnecting if necessary
- mgr=_get_manager(cluster_info,util.get_ip_address(),os.getppid())
+ mgr=_get_manager(cluster_info,util.get_ip_address(),util.read_executor_id())try:queue_in=mgr.get_queue(qname)equeue=mgr.get_queue('error')
@@ -491,11 +519,12 @@
Source code for tensorflowonspark.TFSparkNode
return_inference
+
[docs]defshutdown(cluster_info,queues=['input']):"""Stops all TensorFlow nodes by feeding ``None`` into the multiprocessing.Queues. Args:
- :cluster_info: node reservation information for the cluster (e.g. host, ppid, pid, ports, etc).
+ :cluster_info: node reservation information for the cluster (e.g. host, executor_id, pid, ports, etc). :queues: *INTERNAL_USE* Returns:
@@ -503,14 +532,14 @@
returnArrayType(sql_type)else:# represent everything else as base types (and empty tensors as StringType())returnsql_type
-
- returnStructType([StructField(k,_infer_sql_type(k,v),True)fork,vinsorted(example.features.feature.items())])
MAX_RETRIES=3#: Maximum retries to allocate GPUs
+
def_get_gpu():"""*DEPRECATED*. Allocates first available GPU using cudaSetDevice(), or returns 0 otherwise."""# Note: this code executes, but Tensorflow subsequently complains that the "current context was not created by the StreamExecutor cuda_driver API"
@@ -76,6 +77,7 @@
Source code for tensorflowonspark.gpu_info
breakreturngpu
+
[docs]defget_gpus(num_gpu=1):"""Get list of free GPUs according to nvidia-smi.
@@ -87,49 +89,46 @@
Source code for tensorflowonspark.gpu_info
Returns: Comma-delimited string of GPU ids, or raises an Exception if the requested number of GPUs could not be found. """
+ # get list of gpus (index, uuid)
+ list_gpus=subprocess.check_output(["nvidia-smi","--list-gpus"]).decode()
+ logging.debug("all GPUs:\n{0}".format(list_gpus))
+
+ # parse index and guid
+ gpus=[xforxinlist_gpus.split('\n')iflen(x)>0]
+
+ defparse_gpu(gpu_str):
+ cols=gpu_str.split(' ')
+ returncols[5].split(')')[0],cols[1].split(':')[0]
+ gpu_list=[parse_gpu(gpu)forgpuingpus]
+
+ # randomize the search order to get a better distribution of GPUs
+ random.shuffle(gpu_list)
+
+ free_gpus=[]
+ retries=0
+ whilelen(free_gpus)<num_gpuandretries<MAX_RETRIES:
+ smi_output=subprocess.check_output(["nvidia-smi","--format=csv,noheader,nounits","--query-compute-apps=gpu_uuid"]).decode()
+ logging.debug("busy GPUs:\n{0}".format(smi_output))
+ busy_uuids=[xforxinsmi_output.split('\n')iflen(x)>0]
+ foruuid,indexingpu_list:
+ ifuuidnotinbusy_uuids:
+ free_gpus.append(index)
- try:
- # get list of gpus (index, uuid)
- list_gpus=subprocess.check_output(["nvidia-smi","--list-gpus"]).decode()
- logging.debug("all GPUs:\n{0}".format(list_gpus))
-
- # parse index and guid
- gpus=[xforxinlist_gpus.split('\n')iflen(x)>0]
-
- defparse_gpu(gpu_str):
- cols=gpu_str.split(' ')
- returncols[5].split(')')[0],cols[1].split(':')[0]
- gpu_list=[parse_gpu(gpu)forgpuingpus]
-
- # randomize the search order to get a better distribution of GPUs
- random.shuffle(gpu_list)
-
- free_gpus=[]
- retries=0
- whilelen(free_gpus)<num_gpuandretries<MAX_RETRIES:
- smi_output=subprocess.check_output(["nvidia-smi","--format=csv,noheader,nounits","--query-compute-apps=gpu_uuid"]).decode()
- logging.debug("busy GPUs:\n{0}".format(smi_output))
- busy_uuids=[xforxinsmi_output.split('\n')iflen(x)>0]
- foruuid,indexingpu_list:
- ifuuidnotinbusy_uuids:
- free_gpus.append(index)
-
- iflen(free_gpus)<num_gpu:
- # keep trying indefinitely
- logging.warn("Unable to find available GPUs: requested={0}, available={1}".format(num_gpu,len(free_gpus)))
- retries+=1
- time.sleep(30*retries)
- free_gpus=[]
-
- # if still can't find GPUs, raise exceptioniflen(free_gpus)<num_gpu:
- smi_output=subprocess.check_output(["nvidia-smi","--format=csv","--query-compute-apps=gpu_uuid,pid,process_name,used_gpu_memory"]).decode()
- logging.info(": {0}".format(smi_output))
- raiseException("Unable to find free GPU:\n{0}".format(smi_output))
+ # keep trying indefinitely
+ logging.warn("Unable to find available GPUs: requested={0}, available={1}".format(num_gpu,len(free_gpus)))
+ retries+=1
+ time.sleep(30*retries)
+ free_gpus=[]
+
+ # if still can't find GPUs, raise exception
+ iflen(free_gpus)<num_gpu:
+ smi_output=subprocess.check_output(["nvidia-smi","--format=csv","--query-compute-apps=gpu_uuid,pid,process_name,used_gpu_memory"]).decode()
+ logging.info(": {0}".format(smi_output))
+ raiseException("Unable to find free GPU:\n{0}".format(smi_output))
+
- print("nvidia-smi error",e.output)# Function to get the gpu informationdef_get_free_gpu(max_gpu_utilization=40,min_free_memory=0.5,num_gpu=1):
@@ -164,7 +163,7 @@
Source code for tensorflowonspark.gpu_info
# Read the gpu information multiple timesnum_times_to_average=5current_array=[]
- forindinxrange(num_times_to_average):
+ forindinrange(num_times_to_average):current_array.append(get_gpu_info())time.sleep(1)
@@ -172,12 +171,12 @@
Source code for tensorflowonspark.gpu_info
num_gpus=len(current_array[0])# Average the gpu information
- avg_array=[(0,0,str(x))forxinxrange(num_gpus)]
- forindinxrange(num_times_to_average):
- forgpu_indinxrange(num_gpus):
+ avg_array=[(0,0,str(x))forxinrange(num_gpus)]
+ forindinrange(num_times_to_average):
+ forgpu_indinrange(num_gpus):avg_array[gpu_ind]=(avg_array[gpu_ind][0]+current_array[ind][gpu_ind][0],avg_array[gpu_ind][1]+current_array[ind][gpu_ind][1],avg_array[gpu_ind][2])
- forgpu_indinxrange(num_gpus):
+ forgpu_indinrange(num_gpus):avg_array[gpu_ind]=(float(avg_array[gpu_ind][0])/num_times_to_average,float(avg_array[gpu_ind][1])/num_times_to_average,avg_array[gpu_ind][2])avg_array.sort()
@@ -202,7 +201,6 @@
[docs]classTFTypeConverters(object):"""Custom DataFrame TypeConverter for dictionary types (since this is not provided by Spark core)."""
@@ -89,160 +81,227 @@
Source code for tensorflowonspark.pipeline
deftoDict(value):iftype(value)==dict:returnvalue
- else:
- raiseTypeError("Could not convert %s to OrderedDict"%value)
+ else:
+ raiseTypeError("Could not convert %s to OrderedDict"%value)
+
[docs]classHasBatchSize(Params):batch_size=Param(Params._dummy(),"batch_size","Number of records per batch",typeConverter=TypeConverters.toInt)
+
def__init__(self):super(HasBatchSize,self).__init__()
-
[docs]classHasClusterSize(Params):cluster_size=Param(Params._dummy(),"cluster_size","Number of nodes in the cluster",typeConverter=TypeConverters.toInt)
+
def__init__(self):super(HasClusterSize,self).__init__()
-
[docs]classHasEpochs(Params):epochs=Param(Params._dummy(),"epochs","Number of epochs to train",typeConverter=TypeConverters.toInt)
+
def__init__(self):super(HasEpochs,self).__init__()
-
[docs]classHasModelDir(Params):model_dir=Param(Params._dummy(),"model_dir","Path to save/load model checkpoints",typeConverter=TypeConverters.toString)
+
def__init__(self):super(HasModelDir,self).__init__()
-
[docs]classHasNumPS(Params):num_ps=Param(Params._dummy(),"num_ps","Number of PS nodes in cluster",typeConverter=TypeConverters.toInt)driver_ps_nodes=Param(Params._dummy(),"driver_ps_nodes","Run PS nodes on driver locally",typeConverter=TypeConverters.toBoolean)
+
def__init__(self):super(HasNumPS,self).__init__()
-
[docs]classHasSteps(Params):steps=Param(Params._dummy(),"steps","Maximum number of steps to train",typeConverter=TypeConverters.toInt)
+
def__init__(self):super(HasSteps,self).__init__()
-
[docs]classHasTFRecordDir(Params):tfrecord_dir=Param(Params._dummy(),"tfrecord_dir","Path to temporarily export a DataFrame as TFRecords (for InputMode.TENSORFLOW apps)",typeConverter=TypeConverters.toString)
+
def__init__(self):super(HasTFRecordDir,self).__init__()
-
[docs]classHasSignatureDefKey(Params):signature_def_key=Param(Params._dummy(),"signature_def_key","Identifier for a specific saved_model signature",typeConverter=TypeConverters.toString)
+
def__init__(self):super(HasSignatureDefKey,self).__init__()self._setDefault(signature_def_key=None)
-
[docs]classHasTagSet(Params):tag_set=Param(Params._dummy(),"tag_set","Comma-delimited list of tags identifying a saved_model metagraph",typeConverter=TypeConverters.toString)
+
def__init__(self):super(HasTagSet,self).__init__()
-
Based on https://docs.python.org/dev/library/types.html#types.SimpleNamespace """argv=None
+
def__init__(self,d):ifisinstance(d,list):self.argv=d
@@ -282,18 +342,21 @@
[docs]classTFParams(Params):"""Mix-in class to store namespace-style args and merge w/ SparkML-style params."""args=None
+
[docs]defmerge_args_params(self):local_args=copy.copy(self.args)# make a local copy of argsargs_dict=vars(local_args)# get dictionary viewforpinself.params:
- args_dict[p.name]=self.getOrDefault(p.name)# update with params
- returnlocal_args
+ args_dict[p.name]=self.getOrDefault(p.name)# update with params
"""spark=SparkSession.builder.getOrCreate()
- logging.info("===== 1. inference args: {0}".format(self.args))
- logging.info("===== 2. inference params: {0}".format(self._paramMap))
- local_args=self.merge_args_params()
- logging.info("===== 3. inference args + params: {0}".format(local_args))
-
# set a deterministic order for input/output columns (lexicographic by key)
- input_cols=[colforcol,tensorinsorted(self.getInputMapping().items())]# input col => input tensor
- output_cols=[colfortensor,colinsorted(self.getOutputMapping().items())]# output tensor => output col
+ input_cols=[colforcol,tensorinsorted(self.getInputMapping().items())]# input col => input tensor
+ output_cols=[colfortensor,colinsorted(self.getOutputMapping().items())]# output tensor => output col# run single-node inferencing on each executorlogging.info("input_cols: {}".format(input_cols))logging.info("output_cols: {}".format(output_cols))
+ # merge args + params
+ logging.info("===== 1. inference args: {0}".format(self.args))
+ logging.info("===== 2. inference params: {0}".format(self._paramMap))
+ local_args=self.merge_args_params()
+ logging.info("===== 3. inference args + params: {0}".format(local_args))
+
tf_args=self.args.argvifself.args.argvelselocal_args
- rdd_out=dataset.select(input_cols).rdd.mapPartitions(lambdait:_run_model(it,tf_args))
+ rdd_out=dataset.select(input_cols).rdd.mapPartitions(lambdait:_run_model(it,local_args,tf_args))# convert to a DataFrame-friendly format
- rows_out=rdd_out.map(lambdax:Row(*x))
- returnspark.createDataFrame(rows_out,output_cols)
+ rows_out=rdd_out.map(lambdax:Row(*x))
+ returnspark.createDataFrame(rows_out,output_cols)# global to each python worker process on the executorsglobal_sess=None# tf.Session cacheglobal_args=None# args provided to the _run_model() method. Any change will invalidate the global_sess cache.
-def_run_model(iterator,args):
+
+def_run_model(iterator,args,tf_args):"""mapPartitions function to run single-node inferencing from a checkpoint/saved_model, using the model's input/output mappings. Args: :iterator: input RDD partition iterator.
- :args: a merged view of command-line args and ML Params.
+ :args: arguments for TFModel, in argparse format
+ :tf_args: arguments for TensorFlow inferencing code, in argparse or ARGV format. Returns: An iterator of result data. """
- single_node_env(args)
+ single_node_env(tf_args)logging.info("===== input_mapping: {}".format(args.input_mapping))logging.info("===== output_mapping: {}".format(args.output_mapping))
- input_tensor_names=[tensorforcol,tensorinsorted(args.input_mapping.items())]
- output_tensor_names=[tensorfortensor,colinsorted(args.output_mapping.items())]
+ input_tensor_names=[tensorforcol,tensorinsorted(args.input_mapping.items())]
+ output_tensor_names=[tensorfortensor,colinsorted(args.output_mapping.items())]# if using a signature_def_key, get input/output tensor info from the requested signatureifargs.signature_def_key:
@@ -473,11 +542,11 @@
inputs_feed_dict[input_tensors[i]]=tensors[i]outputs=sess.run(output_tensors,feed_dict=inputs_feed_dict)
- lengths=[len(output)foroutputinoutputs]
+ lengths=[len(output)foroutputinoutputs]input_size=len(tensors[0])
- assertall([l==input_sizeforlinlengths]),"Output array sizes {} must match input size: {}".format(lengths,input_size)
- python_outputs=[output.tolist()foroutputinoutputs]# convert from numpy to standard python types
- result.extend(zip(*python_outputs))# convert to an array of tuples of "output columns"
+ assertall([length==input_sizeforlengthinlengths]),"Output array sizes {} must match input size: {}".format(lengths,input_size)
+ python_outputs=[output.tolist()foroutputinoutputs]# convert from numpy to standard python types
+ result.extend(zip(*python_outputs))# convert to an array of tuples of "output columns"returnresult
+
[docs]defsingle_node_env(args):"""Sets up environment for a single-node TF session. Args:
- :args: command line arguments as argparse args.
- :argv: command line arguments as ARGV (array of string).
+ :args: command line arguments as either argparse args or argv list """
- ifargs.argv:
+ ifisinstance(args,list):
+ sys.argv=args
+ elifargs.argv:sys.argv=args.argv# ensure expanded CLASSPATH w/o glob characters (required for Spark 2.1 + JNI)
@@ -561,8 +632,9 @@
Source code for tensorflowonspark.pipeline
# Note: if there is a GPU conflict (CUDA_ERROR_INVALID_DEVICE), the entire task will fail and retry.else:# CPU
- logging.info("Using CPU")
- os.environ['CUDA_VISIBLE_DEVICES']=''
+ logging.info("Using CPU")
+ os.environ['CUDA_VISIBLE_DEVICES']=''
+
[docs]defget_meta_graph_def(saved_model_dir,tag_set):"""Utility function to read a meta_graph_def from disk.
@@ -580,8 +652,9 @@
Source code for tensorflowonspark.pipeline
set_of_tags=set(tag_set.split(','))formeta_graph_definsaved_model.meta_graphs:ifset(meta_graph_def.meta_info_def.tags)==set_of_tags:
- returnmeta_graph_def
- raiseRuntimeError("MetaGraphDef associated with tag-set {0} could not be found in SavedModel".format(tag_set))
+ returnmeta_graph_def
+ raiseRuntimeError("MetaGraphDef associated with tag-set {0} could not be found in SavedModel".format(tag_set))
+
[docs]defyield_batch(iterable,batch_size,num_tensors=1):"""Generator that yields batches of a DataFrame iterator.
@@ -594,7 +667,7 @@
Source code for tensorflowonspark.pipeline
Returns: An array of ``num_tensors`` arrays, each of length `batch_size` """
- tensors=[[]foriinrange(num_tensors)]
+ tensors=[[]foriinrange(num_tensors)]foriteminiterable:ifitemisNone:break
@@ -603,9 +676,9 @@
"""
server_sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
- server_sock.bind(('',0))
+ server_sock.bind(('',0))server_sock.listen(10)# hostname may not be resolvable but IP address probably will behost=util.get_ip_address()port=server_sock.getsockname()[1]
- addr=(host,port)
+ addr=(host,port)logging.info("listening for reservations at {0}".format(addr))def_listen(self,sock):
@@ -225,6 +227,7 @@
Source code for tensorflowonspark.reservation
"""Stop the Server's socket listener."""
self.done=True
+
[docs]classClient(MessageSocket):"""Client to register and await node reservations.
@@ -325,7 +328,7 @@
[docs]defwrite_executor_id(num):
+ """Write executor_id into a local file in the executor's current working directory"""
+ withopen("executor_id","w")asf:
+ f.write(str(num))
+
+
+
[docs]defread_executor_id():
+ """Read worker id from a local file in the executor's current working directory"""
+ withopen("executor_id","r")asf:
This module provides a high-level API to manage the TensorFlowOnSpark cluster.
+
There are three main phases of operation:
+
+
Reservation/Startup - reserves a port for the TensorFlow process on each executor, starts a multiprocessing.Manager to
+listen for data/control messages, and then launches the Tensorflow main function on the executors.
+
Data feeding - For InputMode.SPARK only. Sends RDD data to the TensorFlow nodes via each executor’s multiprocessing.Manager. PS
+nodes will tie up their executors, so they won’t receive any subsequent data feeding tasks.
+
Shutdown - sends a shutdown control message to the multiprocessing.Managers of the PS nodes and pushes end-of-feed markers into the data
+queues of the worker nodes.
For InputMode.SPARK only: Feeds Spark RDD partitions into the TensorFlow worker nodes and returns an RDD of results
+
It is the responsibility of the TensorFlow “main” function to interpret the rows of the RDD and provide valid data for the output RDD.
+
This will use the distributed TensorFlow cluster for inferencing, so the TensorFlow “main” function should be capable of inferencing.
+Per Spark design, the output RDD will be lazily-executed only when a Spark action is invoked on the RDD.
+
+
Args:
+
+
+
+
+
dataRDD:
input data as a Spark RDD
+
+
qname:
INTERNAL_USE
+
+
+
+
+
Returns:
+
A Spark RDD representing the output of the TensorFlow inferencing
For InputMode.SPARK only. Feeds Spark RDD partitions into the TensorFlow worker nodes
+
It is the responsibility of the TensorFlow “main” function to interpret the rows of the RDD.
+
Since epochs are implemented via RDD.union() and the entire RDD must generally be processed in full, it is recommended
+to set num_epochs to closely match your training termination condition (e.g. steps or accuracy). See TFNode.DataFeed
+for more details.
+
+
Args:
+
+
+
+
+
dataRDD:
input data as a Spark RDD.
+
+
num_epochs:
number of times to repeat the dataset during training.
Starts the TensorFlowOnSpark cluster and Runs the TensorFlow “main” function on the Spark executors
+
+
Args:
+
+
+
+
+
sc:
SparkContext
+
+
map_fun:
user-supplied TensorFlow “main” function
+
+
tf_args:
argparse args, or command-line ARGV. These will be passed to the map_fun.
+
+
num_executors:
number of Spark executors. This should match your Spark job’s --num_executors.
+
+
num_ps:
number of Spark executors which are reserved for TensorFlow PS nodes. All other executors will be used as TensorFlow worker nodes.
+
+
tensorboard:
boolean indicating if the chief worker should spawn a Tensorboard server.
+
+
input_mode:
TFCluster.InputMode
+
+
log_dir:
directory to save tensorboard event logs. If None, defaults to a fixed path on local filesystem.
+
+
driver_ps_nodes:
+
run the PS nodes on the driver locally instead of on the spark executors; this help maximizing computing resources (esp. GPU). You will need to set cluster_size = num_executors + num_ps
+
+
master_node:
name of the “master” or “chief” node in the cluster_template, used for tf.estimator applications.
+
+
reservation_timeout:
+
number of seconds after which cluster reservation times out (600 sec default)
+
+
queues:
INTERNAL_USE
+
+
+
+
+
Returns:
+
A TFCluster object representing the started cluster.
mapPartition function to convert an RDD of serialized tf.train.Example bytestring into an RDD of Row.
+
Note: TensorFlow represents both strings and binary types as tf.train.BytesList, and we need to
+disambiguate these types for Spark DataFrames DTypes (StringType and BinaryType), so we require a “hint”
+from the caller in the binary_features argument.
+
+
Args:
+
+
+
+
+
iter:
the RDD partition iterator
+
+
binary_features:
+
a list of tf.train.Example features which are expected to be binary/bytearrays.
+
+
+
+
+
Returns:
+
An array/iterator of DataFrame Row with features converted into columns.
Given a tf.train.Example, infer the Spark DataFrame schema (StructFields).
+
Note: TensorFlow represents both strings and binary types as tf.train.BytesList, and we need to
+disambiguate these types for Spark DataFrames DTypes (StringType and BinaryType), so we require a “hint”
+from the caller in the binary_features argument.
+
+
Args:
+
+
+
+
+
example:
a tf.train.Example
+
+
binary_features:
+
a list of tf.train.Example features which are expected to be binary/bytearrays.
This will attempt to automatically convert the tf.train.Example features into Spark DataFrame columns of equivalent types.
+
Note: TensorFlow represents both strings and binary types as tf.train.BytesList, and we need to
+disambiguate these types for Spark DataFrames DTypes (StringType and BinaryType), so we require a “hint”
+from the caller in the binary_features argument.
+
+
Args:
+
+
+
+
+
sc:
SparkContext
+
+
input_dir:
location of TFRecords on disk.
+
+
binary_features:
+
a list of tf.train.Example features which are expected to be binary/bytearrays.
+
+
+
+
+
Returns:
+
A Spark DataFrame mirroring the tf.train.Example schema.
mapPartition function to convert a Spark RDD of Row into an RDD of serialized tf.train.Example bytestring.
+
Note that tf.train.Example is a fairly flat structure with limited datatypes, e.g. tf.train.FloatList,
+tf.train.Int64List, and tf.train.BytesList, so most DataFrame types will be coerced into one of these types.
+
+
Args:
+
+
+
+
+
dtypes:
the DataFrame.dtypes of the source DataFrame.
+
+
+
+
+
Returns:
+
A mapPartition function which converts the source DataFrame into tf.train.Example bytestrings.
This module extends the TensorFlowOnSpark API to support Spark ML Pipelines.
+
It provides a TFEstimator class to fit a TFModel using TensorFlow. The TFEstimator will actually spawn a TensorFlowOnSpark cluster
+to conduct distributed training, but due to architectural limitations, the TFModel will only run single-node TensorFlow instances
+when inferencing on the executors. The executors will run in parallel, but the TensorFlow model must fit in the memory
+of each executor.
+
There is also an option to provide a separate “export” function, which allows users to export a different graph for inferencing vs. training.
+This is useful when the training graph uses InputMode.TENSORFLOW with queue_runners, but the inferencing graph needs placeholders.
+And this is especially useful for exporting saved_models for TensorFlow Serving.
+tfrecord_dir = Param(parent='undefined', name='tfrecord_dir', doc='Path to temporarily export a DataFrame as TFRecords (for InputMode.TENSORFLOW apps)')¶
Spark ML Estimator which launches a TensorFlowOnSpark cluster for distributed training.
+
The columns of the DataFrame passed to the fit() method will be mapped to TensorFlow tensors according to the setInputMapping() method.
+
If an export_fn was provided to the constructor, it will be run on a single executor immediately after the distributed training has completed.
+This allows users to export a TensorFlow saved_model with a different execution graph for inferencing, e.g. replacing an input graph of
+TFReaders and QueueRunners with Placeholders.
+
For InputMode.TENSORFLOW, the input DataFrame will be exported as TFRecords to a temporary location specified by the tfrecord_dir.
+The TensorFlow application will then be expected to read directly from this location during training. However, if the input DataFrame was
+produced by the dfutil.loadTFRecords() method, i.e. originated from TFRecords on disk, then the tfrecord_dir will be set to the
+original source location of the TFRecords with the additional export step.
+
+
Args:
+
+
+
+
+
train_fn:
TensorFlow “main” function for training.
+
+
tf_args:
Arguments specific to the TensorFlow “main” function.
Spark ML Model backed by a TensorFlow model checkpoint/saved_model on disk.
+
During transform(), each executor will run an independent, single-node instance of TensorFlow in parallel, so the model must fit in memory.
+The model/session will be loaded/initialized just once for each Spark Python worker, and the session will be cached for
+subsequent tasks/partitions to avoid re-loading the model for each partition.
+
+
Args:
+
+
+
+
+
tf_args:
Dictionary of arguments specific to TensorFlow “main” function.