1+ """codeflare.pipelines.Runtime
2+ This class is the core runtime for CodeFlare pipelines. It provides the entry point for execution of the
3+ pipeline that was constructed from codeflare.pipelines.Datamodel. The key entry point is the basic
4+ execute_pipeline, with other enhanced entry points such as cross_validate and grid_search_cv.
5+
6+ The other methods provide supporting functions for execution of pipeline primitives. In addition to this,
7+ methods for selecting a pipeline are provided as well as saving a specific pipeline instance along with
8+ that pipeline's state.
9+
10+ Details on the execution and parallelism exposed are provided in the design documentation.
11+ """
12+ # Authors: Mudhakar Srivatsa <msrivats@us.ibm.com>
13+ # Raghu Ganti <rganti@us.ibm.com>
14+ # Carlos Costa <chcost@us.ibm.com>
15+ #
16+ # License: Apache v2.0
17+
118import ray
219
320import codeflare .pipelines .Datamodel as dm
1229
1330
1431class ExecutionType (Enum ):
32+ """
33+ Pipelines can be executed in different modes, this is targeting the typical AI/ML parlance, with the supported
34+ types being FIT for training a pipeline, PREDICT for predicting/transforming on the steps of a pipeline, and finally
35+ SCORE, which scores against a given input.
36+ """
1537 FIT = 0 ,
1638 PREDICT = 1 ,
1739 SCORE = 2
1840
1941
2042@ray .remote
2143def execute_or_node_remote (node : dm .EstimatorNode , mode : ExecutionType , xy_ref : dm .XYRef ):
44+ """
45+ Helper remote function that executes an OR node. As such, this is a remote task that runs the estimator
46+ in the provided mode with the data pointed to by XYRef. The key aspect to note here is the choice of input
47+ to be a pointer to data and not the data itself. This enables the access to the data to be delayed until
48+ it is absolutely necessary. The remote method further returns a pointer to XYref, which in itself is a pointer
49+ to the data. This again enables the execution to proceed in an asynchronous manner.
50+
51+ In the FIT mode, the node is always cloned along with its estimator, hence the pipeline state is always
52+ kept in the "cloned" node.
53+
54+ :param node: Estimator node whose estimator needs to be executed
55+ :param mode: The mode of execution
56+ :param xy_ref: Pointer to the data
57+ :return: A list of pointers to XYRefs
58+ """
2259 estimator = node .get_estimator ()
2360 # Blocking operation -- not avoidable
2461 X = ray .get (xy_ref .get_Xref ())
@@ -69,6 +106,20 @@ def execute_or_node_remote(node: dm.EstimatorNode, mode: ExecutionType, xy_ref:
69106
70107
71108def execute_or_node (node , pre_edges , edge_args , post_edges , mode : ExecutionType ):
109+ """
110+ Inner method that executes the estimator node parallelizing at the level of input objects. This defines the
111+ strategy of execution of the node, in this case, parallel for each object that is input. The function takes
112+ in the edges coming to this node (pre_edges) and the associated arguments (edge_args) and fires off remote
113+ tasks for each of the objects (this is defined by the ANY firing semantics). The resulting pointer(s) are then
114+ captured and passed to the post_edges.
115+
116+ :param node: Node to execute
117+ :param pre_edges: Input edges to the given node
118+ :param edge_args: Data arguments for the edges
119+ :param post_edges: Data arguments for downstream processing
120+ :param mode: Execution mode
121+ :return: None
122+ """
72123 for pre_edge in pre_edges :
73124 Xyref_ptrs = edge_args [pre_edge ]
74125 exec_xyrefs = []
@@ -85,6 +136,18 @@ def execute_or_node(node, pre_edges, edge_args, post_edges, mode: ExecutionType)
85136
86137@ray .remote
87138def execute_and_node_remote (node : dm .AndNode , mode : ExecutionType , Xyref_list ):
139+ """
140+ Similar to the estimator node (OR node), this is the remote function that executes the AND node. The key to
141+ note here is that the input to execute on is a list of XYRefs as opposed to a single XYRef, which differentiates
142+ the type of nodes. Similar to the OR node, the output is again a pointer to a list of XYRefs.
143+
144+ The execution mode is FIT for training, PREDICT for predicting/transforming, and SCORE for scoring.
145+
146+ :param node: Node to execute
147+ :param mode: Mode of execution
148+ :param Xyref_list: Input list of XYrefs
149+ :return: Output as list of XYrefs
150+ """
88151 xy_list = []
89152 prev_node_ptr = ray .put (node )
90153 for Xyref in Xyref_list :
@@ -153,6 +216,17 @@ def execute_and_node_remote(node: dm.AndNode, mode: ExecutionType, Xyref_list):
153216
154217
155218def execute_and_node_inner (node : dm .AndNode , mode : ExecutionType , Xyref_ptrs ):
219+ """
220+ This is a helper method for executing and nodes, which fires off remote tasks. Unlike the helper
221+ for OR nodes, which can fire off on single objects, this method retrieves the list of inputs,
222+ unmarshals the pointers to XYrefs to materialize XYRef and then passes it along to the and node
223+ remote executor.
224+
225+ :param node: Node to execute on
226+ :param mode: Mode of execution
227+ :param Xyref_ptrs: Object ref pointers for data input
228+ :return:
229+ """
156230 result = []
157231
158232 Xyref_list = []
@@ -166,6 +240,22 @@ def execute_and_node_inner(node: dm.AndNode, mode: ExecutionType, Xyref_ptrs):
166240
167241
168242def execute_and_node (node , pre_edges , edge_args , post_edges , mode : ExecutionType ):
243+ """
244+ Inner method that executes an and node by combining the inputs coming from multiple edges. Unlike the OR
245+ node, which only executes a remote task per input object, the and node combines input from across all the
246+ edges. For example, if there are two edges incoming to this node with two objects each, the combiner will
247+ create four input combinations. Each of these input combinations is then evaluated by the AND node in
248+ parallel.
249+
250+ The result is then sent to the edges outgoing from this node.
251+
252+ :param node: Node to execute on
253+ :param pre_edges: Incoming edges to this node
254+ :param edge_args: Data arguments for each of this edge
255+ :param post_edges: Outgoing edges
256+ :param mode: Execution mode
257+ :return: None
258+ """
169259 edge_args_lists = list ()
170260 for pre_edge in pre_edges :
171261 edge_args_lists .append (edge_args [pre_edge ])
@@ -183,6 +273,23 @@ def execute_and_node(node, pre_edges, edge_args, post_edges, mode: ExecutionType
183273
184274
185275def execute_pipeline (pipeline : dm .Pipeline , mode : ExecutionType , pipeline_input : dm .PipelineInput ) -> dm .PipelineOutput :
276+ """
277+ The entry point for a basic pipeline execution. This method takes a pipeline, the input to it and the execution
278+ mode and runs the pipeline. Based on the parallelism defined by the DAG structure and the input data, the execution
279+ of the pipeline will happen in parallel.
280+
281+ In the FIT mode of execution, the pipeline can materialize into several pipelines which can be examined in further
282+ detail based on metrics of interest. The method select_pipeline enables selecting a specific pipeline to examine
283+ further.
284+
285+ A selected pipeline can be executed in SCORE and PREDICT modes for evaluating the results or saving them for future
286+ reuse.
287+
288+ :param pipeline: Abstract DAG representation of the pipeline
289+ :param mode: Execution mode
290+ :param pipeline_input: The input to this pipeline
291+ :return: Pipeline output
292+ """
186293 nodes_by_level = pipeline .get_nodes_by_level ()
187294
188295 # track args per edge
@@ -212,7 +319,19 @@ def execute_pipeline(pipeline: dm.Pipeline, mode: ExecutionType, pipeline_input:
212319 return dm .PipelineOutput (out_args , edge_args )
213320
214321
215- def select_pipeline (pipeline_output : dm .PipelineOutput , chosen_xyref : dm .XYRef ):
322+ def select_pipeline (pipeline_output : dm .PipelineOutput , chosen_xyref : dm .XYRef ) -> dm .Pipeline :
323+ """
324+ Pipeline execution results in a materialization of several pipelines, this entry point method enables the end
325+ user to select a specific pipeline to examine in further detail. Typical way of examining a pipeline is to select
326+ a specific output and then "request" which pipeline generated it.
327+
328+ Internally, the runtime has generated "trackers" to keep a lineage for every input and output and which node
329+ generated it. These are then selected to create the appropriate pipeline that can be scored, predicted, and saved.
330+
331+ :param pipeline_output: Pipeline output from execute pipeline
332+ :param chosen_xyref: The XYref for which the pipeline needs to be selected
333+ :return: Selected pipeline
334+ """
216335 pipeline = dm .Pipeline ()
217336 xyref_queue = SimpleQueue ()
218337
@@ -235,7 +354,14 @@ def select_pipeline(pipeline_output: dm.PipelineOutput, chosen_xyref: dm.XYRef):
235354 return pipeline
236355
237356
238- def get_pipeline_input (pipeline : dm .Pipeline , pipeline_output : dm .PipelineOutput , chosen_xyref : dm .XYRef ):
357+ def get_pipeline_input (pipeline : dm .Pipeline , pipeline_output : dm .PipelineOutput , chosen_xyref : dm .XYRef ) -> dm .PipelineInput :
358+ """
359+
360+ :param pipeline:
361+ :param pipeline_output:
362+ :param chosen_xyref:
363+ :return:
364+ """
239365 pipeline_input = dm .PipelineInput ()
240366
241367 xyref_queue = SimpleQueue ()
0 commit comments