Skip to content

Commit f01b0d7

Browse files
Runtime docs are done
1 parent 0af02a8 commit f01b0d7

File tree

2 files changed

+150
-5
lines changed

2 files changed

+150
-5
lines changed

codeflare/pipelines/Exceptions.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,40 @@
1+
"""codeflare.pipelines.Exceptions
2+
The exceptions that pipeline creation and execution throw are defined here.
3+
"""
4+
# Authors: Mudhakar Srivatsa <msrivats@us.ibm.com>
5+
# Raghu Ganti <rganti@us.ibm.com>
6+
# Carlos Costa <chcost@us.ibm.com>
7+
#
8+
# License: Apache v2.0
9+
10+
111
class BasePipelineException(Exception):
12+
"""
13+
Base pipeline exception
14+
"""
215
pass
316

417

518
class PipelineSaveException(BasePipelineException):
19+
"""
20+
Exception thrown when a pipeline save fails
21+
"""
622
def __init__(self, message):
723
self.message = message
824

925

1026
class PipelineNodeNotFoundException(BasePipelineException):
27+
"""
28+
Exception thrown when a node is not found in a pipeline, this can typically happen when pipelines
29+
are not properly constructed.
30+
"""
1131
def __init__(self, message):
1232
self.message = message
1333

1434

1535
class PipelineException(BasePipelineException):
36+
"""
37+
Generic pipeline exceptions
38+
"""
1639
def __init__(self, message):
1740
self.message = message

codeflare/pipelines/Runtime.py

Lines changed: 127 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,15 @@ def execute_pipeline(pipeline: dm.Pipeline, mode: ExecutionType, pipeline_input:
285285
A selected pipeline can be executed in SCORE and PREDICT modes for evaluating the results or saving them for future
286286
reuse.
287287
288+
Examples
289+
--------
290+
Execution of pipeline is fairly simple and getting the output can be done:
291+
292+
.. code-block:: python
293+
294+
pipeline_output = rt.execute_pipeline(pipeline, rt.ExecutionType.FIT, pipeline_input)
295+
node_rf_xyrefs = pipeline_output.get_xyrefs(node_rf)
296+
288297
:param pipeline: Abstract DAG representation of the pipeline
289298
:param mode: Execution mode
290299
:param pipeline_input: The input to this pipeline
@@ -328,6 +337,16 @@ def select_pipeline(pipeline_output: dm.PipelineOutput, chosen_xyref: dm.XYRef)
328337
Internally, the runtime has generated "trackers" to keep a lineage for every input and output and which node
329338
generated it. These are then selected to create the appropriate pipeline that can be scored, predicted, and saved.
330339
340+
Examples
341+
--------
342+
Selecting a pipeline can be done by identifying an output object of interest. One can select the pipeline without
343+
going to the output node, i.e. looking at some internal nodes as well
344+
345+
.. code-block:: python
346+
347+
# one can examine the output in more detail and select a pipeline of interest
348+
selected_pipeline = rt.select_pipeline(pipeline_output, node_rf_xyrefs[0])
349+
331350
:param pipeline_output: Pipeline output from execute pipeline
332351
:param chosen_xyref: The XYref for which the pipeline needs to be selected
333352
:return: Selected pipeline
@@ -356,11 +375,17 @@ def select_pipeline(pipeline_output: dm.PipelineOutput, chosen_xyref: dm.XYRef)
356375

357376
def get_pipeline_input(pipeline: dm.Pipeline, pipeline_output: dm.PipelineOutput, chosen_xyref: dm.XYRef) -> dm.PipelineInput:
358377
"""
359-
360-
:param pipeline:
361-
:param pipeline_output:
362-
:param chosen_xyref:
363-
:return:
378+
Given the output from a pipeline and a chosen output object, this method gets the inputs that were used to
379+
generate this output. Combining the input and the selected pipeline, one can then actually recreate the full
380+
provenance -- graph and data to execute the selected pipeline.
381+
382+
Note that once the persistence of objects in memory or other persistent stores is lost, it is not possible to
383+
get the data.
384+
385+
:param pipeline: Executed pipeline
386+
:param pipeline_output: Output from the executed pipeline
387+
:param chosen_xyref: Chosen object from the output
388+
:return: The pipeline input (for the given chosen object)
364389
"""
365390
pipeline_input = dm.PipelineInput()
366391

@@ -389,6 +414,14 @@ def get_pipeline_input(pipeline: dm.Pipeline, pipeline_output: dm.PipelineOutput
389414

390415
@ray.remote(num_returns=2)
391416
def split(cross_validator: BaseCrossValidator, xy_ref):
417+
"""
418+
A remote function that splits the data based on the provided cross validator. This allows for remote
419+
data to be split without having to "collect" the data to a driver.
420+
421+
:param cross_validator: Cross validator
422+
:param xy_ref: XYRef that needs to be split
423+
:return: List of train and test XYRefs, the number determined by the cross validator get_n_splits
424+
"""
392425
x = ray.get(xy_ref.get_Xref())
393426
y = ray.get(xy_ref.get_yref())
394427

@@ -420,6 +453,25 @@ def split(cross_validator: BaseCrossValidator, xy_ref):
420453

421454

422455
def cross_validate(cross_validator: BaseCrossValidator, pipeline: dm.Pipeline, pipeline_input: dm.PipelineInput):
456+
"""
457+
Similar to sklearn cross validate, but a parallelized version on Ray with zero copy sharing of data. This method
458+
allows for the user to explore a pipeline with a single input object to be explored by cross validation. The output
459+
is a list of scores that correspond to the SCORE mode of the pipeline execution.
460+
461+
Examples
462+
--------
463+
Cross validation is quite simple:
464+
465+
.. code-block:: python
466+
467+
kf = StratifiedKFold(n_splits=10)
468+
scores = rt.cross_validate(kf, pipeline, pipeline_input)
469+
470+
:param cross_validator: Cross validator to use
471+
:param pipeline: Pipeline to execute
472+
:param pipeline_input: Input to the pipeline
473+
:return: Scored outputs from the pipeline
474+
"""
423475
has_single_estimator = pipeline.has_single_estimator()
424476
if not has_single_estimator:
425477
raise pe.PipelineException("Cross validation can only be done on pipelines with single estimator, "
@@ -436,12 +488,61 @@ def cross_validate(cross_validator: BaseCrossValidator, pipeline: dm.Pipeline, p
436488

437489

438490
def grid_search_cv(cross_validator: BaseCrossValidator, pipeline: dm.Pipeline, pipeline_input: dm.PipelineInput, pipeline_params: dm.PipelineParam):
491+
"""
492+
A top-level method that does a grid search with cross validation. This method takes pipeline, the input to it,
493+
a set of parameters for the pipeline, and a cross validator similar to the traditional GridSearchCV of sklearn
494+
and executes the various pipelines and cross validation in parallel.
495+
496+
This method will first transform the input pipeline and expand it to perform a parameter grid search and then
497+
the cross validator is run in parallel. The goal is to execute each of the cross validation for each of the
498+
parameter combination in parallel to provide the results.
499+
500+
The results are captured in a dict that maps each pipeline to its corresponding cross validation scores.
501+
502+
Examples
503+
--------
504+
An example of grid search using a parameter grid similar to what SKLearn does:
505+
506+
.. code-block:: python
507+
508+
k = 2
509+
kf = KFold(k)
510+
result = rt._grid_search_cv(kf, pipeline, pipeline_input)
511+
512+
# Results can be examined by iterating over the pipeline, for example to pick a best pipeline based
513+
# on mean scores
514+
best_pipeline = None
515+
best_mean_scores = 0.0
516+
517+
for cv_pipeline, scores in result.items():
518+
mean = statistics.mean(scores)
519+
if mean > best_mean_scores:
520+
best_pipeline = cv_pipeline
521+
best_mean_scores = mean
522+
523+
:param cross_validator: Cross validator for grid search
524+
:param pipeline: Pipeline graph
525+
:param pipeline_input: Input to the pipeline
526+
:param pipeline_params: Parameter space to explore using a grid search approach
527+
:return: Dict from pipeline to the cross validation scores
528+
"""
439529
parameterized_pipeline = pipeline.get_parameterized_pipeline(pipeline_params)
440530
parameterized_pipeline_input = pipeline_input.get_parameterized_input(pipeline, parameterized_pipeline)
441531
return _grid_search_cv(cross_validator, parameterized_pipeline, parameterized_pipeline_input)
442532

443533

444534
def _grid_search_cv(cross_validator: BaseCrossValidator, pipeline: dm.Pipeline, pipeline_input: dm.PipelineInput):
535+
"""
536+
Internal helper method to do a grid search CV on the "expanded" pipeline. This method does not expand the
537+
input parameters and simply executes a grid search with a cross validator. The key is to explore the
538+
various pipelines in parallel and then provide the lineage from the output for each pipeline that was
539+
explored.
540+
541+
:param cross_validator: Cross validator
542+
:param pipeline: Pipeline graph
543+
:param pipeline_input: Pipeline input
544+
:return: Dict from pipeline to the resulting cross validation scores
545+
"""
445546
pipeline_input_train = dm.PipelineInput()
446547

447548
pipeline_input_test = []
@@ -517,5 +618,26 @@ def _grid_search_cv(cross_validator: BaseCrossValidator, pipeline: dm.Pipeline,
517618

518619

519620
def save(pipeline_output: dm.PipelineOutput, xy_ref: dm.XYRef, filehandle):
621+
"""
622+
Saves a selected pipeline, i.e. this selected pipeline will save the state of the estimators enabling for the
623+
end user to load and execute the pipeline in SCORE/PREDICT modes in the future.
624+
625+
Examples
626+
--------
627+
Saving a selected pipeline can be done as follows:
628+
629+
.. code-block:: python
630+
631+
# this pipeline can also be saved
632+
fname = 'random_forest.cfp'
633+
w_fh = open(fname, 'wb')
634+
rt.save(pipeline_output, node_rf_xyrefs[0], w_fh)
635+
w_fh.close()
636+
637+
:param pipeline_output: Pipeline output from an executed pipeline
638+
:param xy_ref: The chosen XYRef that will be used to materialize a selected pipeline
639+
:param filehandle: The file handle to save this pipeline to
640+
:return: None
641+
"""
520642
pipeline = select_pipeline(pipeline_output, xy_ref)
521643
pipeline.save(filehandle)

0 commit comments

Comments
 (0)