From cf7e60073ecb150186b4ad3099bb33b4b120df54 Mon Sep 17 00:00:00 2001 From: Yuan-Chi Chang Date: Wed, 21 Jul 2021 10:41:01 -0400 Subject: [PATCH] fix singleton bug plus unit test --- codeflare/pipelines/Datamodel.py | 4 +- codeflare/pipelines/Runtime.py | 18 +++++---- codeflare/pipelines/tests/test_singleton.py | 45 +++++++++++++++++++++ 3 files changed, 59 insertions(+), 8 deletions(-) create mode 100644 codeflare/pipelines/tests/test_singleton.py diff --git a/codeflare/pipelines/Datamodel.py b/codeflare/pipelines/Datamodel.py index 0148fa5..96a21ba 100644 --- a/codeflare/pipelines/Datamodel.py +++ b/codeflare/pipelines/Datamodel.py @@ -837,7 +837,9 @@ def get_post_edges(self, node: Node): :return: Outgoing edges for the node """ post_edges = [] - post_nodes = self.__post_graph__[node] + post_nodes = [] + if node in self.__post_graph__.keys(): + post_nodes = self.__post_graph__[node] # Empty post if not post_nodes: post_edges.append(Edge(node, None)) diff --git a/codeflare/pipelines/Runtime.py b/codeflare/pipelines/Runtime.py index 9dc60a7..96818ee 100644 --- a/codeflare/pipelines/Runtime.py +++ b/codeflare/pipelines/Runtime.py @@ -49,7 +49,6 @@ from queue import Queue import pandas as pd - class ExecutionType(Enum): """ Pipelines can be executed in different modes, this is targeting the typical AI/ML parlance, with the supported @@ -58,7 +57,8 @@ class ExecutionType(Enum): """ FIT = 0, PREDICT = 1, - SCORE = 2 + SCORE = 2, + TRANSFORM = 3 @ray.remote @@ -107,8 +107,8 @@ def execute_or_node_remote(node: dm.EstimatorNode, mode: ExecutionType, xy_ref: elif mode == ExecutionType.SCORE: if base.is_classifier(estimator) or base.is_regressor(estimator): estimator = node.get_estimator() - res_Xref = ray.put(estimator.score(X, y)) - result = dm.XYRef(res_Xref, xy_ref.get_yref(), prev_node_ptr, prev_node_ptr, [xy_ref]) + score_ref = ray.put(estimator.score(X, y)) + result = dm.XYRef(score_ref, score_ref, prev_node_ptr, prev_node_ptr, [xy_ref]) return result else: res_Xref = ray.put(estimator.transform(X)) @@ -118,13 +118,17 @@ def execute_or_node_remote(node: dm.EstimatorNode, mode: ExecutionType, xy_ref: elif mode == ExecutionType.PREDICT: # Test mode does not clone as it is a simple predict or transform if base.is_classifier(estimator) or base.is_regressor(estimator): - res_Xref = ray.put(estimator.predict(X)) - result = dm.XYRef(res_Xref, xy_ref.get_yref(), prev_node_ptr, prev_node_ptr, [xy_ref]) + predict_ref = ray.put(estimator.predict(X)) + result = dm.XYRef(predict_ref, predict_ref, prev_node_ptr, prev_node_ptr, [xy_ref]) return result else: res_Xref = ray.put(estimator.transform(X)) result = dm.XYRef(res_Xref, xy_ref.get_yref(), prev_node_ptr, prev_node_ptr, [xy_ref]) return result + elif mode == ExecutionType.TRANSFORM: + res_Xref = ray.put(estimator.fit_transform(X)) + result = dm.XYRef(res_Xref, xy_ref.get_yref(), prev_node_ptr, prev_node_ptr, [xy_ref]) + return result def execute_or_node(node, pre_edges, edge_args, post_edges, mode: ExecutionType): @@ -662,4 +666,4 @@ def save(pipeline_output: dm.PipelineOutput, xy_ref: dm.XYRef, filehandle): :return: None """ pipeline = select_pipeline(pipeline_output, xy_ref) - pipeline.save(filehandle) \ No newline at end of file + pipeline.save(filehandle) diff --git a/codeflare/pipelines/tests/test_singleton.py b/codeflare/pipelines/tests/test_singleton.py new file mode 100644 index 0000000..70f4411 --- /dev/null +++ b/codeflare/pipelines/tests/test_singleton.py @@ -0,0 +1,45 @@ +import pytest +import ray +import pandas as pd +import numpy as np +import sklearn.base as base +from sklearn.preprocessing import MinMaxScaler +import codeflare.pipelines.Datamodel as dm +import codeflare.pipelines.Runtime as rt +from codeflare.pipelines.Datamodel import Xy +from codeflare.pipelines.Datamodel import XYRef +from codeflare.pipelines.Runtime import ExecutionType + +def test_singleton(): + + ray.shutdown() + ray.init() + + ## prepare the data + X = np.random.randint(0,100,size=(10000, 4)) + y = np.random.randint(0,2,size=(10000, 1)) + + ## initialize codeflare pipeline by first creating the nodes + pipeline = dm.Pipeline() + node_a = dm.EstimatorNode('a', MinMaxScaler()) + pipeline.add_node(node_a) + + pipeline_input = dm.PipelineInput() + xy = dm.Xy(X,y) + pipeline_input.add_xy_arg(node_a, xy) + + ## execute the codeflare pipeline + pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.TRANSFORM, pipeline_input) + + ## retrieve node e + node_a_output = pipeline_output.get_xyrefs(node_a) + Xout = ray.get(node_a_output[0].get_Xref()) + yout = ray.get(node_a_output[0].get_yref()) + + assert Xout.shape[0] == 10000 + assert yout.shape[0] == 10000 + + ray.shutdown() + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__]))