Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion codeflare/pipelines/Datamodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,9 @@ def get_post_edges(self, node: Node):
:return: Outgoing edges for the node
"""
post_edges = []
post_nodes = self.__post_graph__[node]
post_nodes = []
if node in self.__post_graph__.keys():
post_nodes = self.__post_graph__[node]
# Empty post
if not post_nodes:
post_edges.append(Edge(node, None))
Expand Down
18 changes: 11 additions & 7 deletions codeflare/pipelines/Runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
from queue import Queue
import pandas as pd


class ExecutionType(Enum):
"""
Pipelines can be executed in different modes, this is targeting the typical AI/ML parlance, with the supported
Expand All @@ -58,7 +57,8 @@ class ExecutionType(Enum):
"""
FIT = 0,
PREDICT = 1,
SCORE = 2
SCORE = 2,
TRANSFORM = 3


@ray.remote
Expand Down Expand Up @@ -107,8 +107,8 @@ def execute_or_node_remote(node: dm.EstimatorNode, mode: ExecutionType, xy_ref:
elif mode == ExecutionType.SCORE:
if base.is_classifier(estimator) or base.is_regressor(estimator):
estimator = node.get_estimator()
res_Xref = ray.put(estimator.score(X, y))
result = dm.XYRef(res_Xref, xy_ref.get_yref(), prev_node_ptr, prev_node_ptr, [xy_ref])
score_ref = ray.put(estimator.score(X, y))
result = dm.XYRef(score_ref, score_ref, prev_node_ptr, prev_node_ptr, [xy_ref])
return result
else:
res_Xref = ray.put(estimator.transform(X))
Expand All @@ -118,13 +118,17 @@ def execute_or_node_remote(node: dm.EstimatorNode, mode: ExecutionType, xy_ref:
elif mode == ExecutionType.PREDICT:
# Test mode does not clone as it is a simple predict or transform
if base.is_classifier(estimator) or base.is_regressor(estimator):
res_Xref = ray.put(estimator.predict(X))
result = dm.XYRef(res_Xref, xy_ref.get_yref(), prev_node_ptr, prev_node_ptr, [xy_ref])
predict_ref = ray.put(estimator.predict(X))
result = dm.XYRef(predict_ref, predict_ref, prev_node_ptr, prev_node_ptr, [xy_ref])
return result
else:
res_Xref = ray.put(estimator.transform(X))
result = dm.XYRef(res_Xref, xy_ref.get_yref(), prev_node_ptr, prev_node_ptr, [xy_ref])
return result
elif mode == ExecutionType.TRANSFORM:
res_Xref = ray.put(estimator.fit_transform(X))
result = dm.XYRef(res_Xref, xy_ref.get_yref(), prev_node_ptr, prev_node_ptr, [xy_ref])
return result


def execute_or_node(node, pre_edges, edge_args, post_edges, mode: ExecutionType):
Expand Down Expand Up @@ -662,4 +666,4 @@ def save(pipeline_output: dm.PipelineOutput, xy_ref: dm.XYRef, filehandle):
:return: None
"""
pipeline = select_pipeline(pipeline_output, xy_ref)
pipeline.save(filehandle)
pipeline.save(filehandle)
45 changes: 45 additions & 0 deletions codeflare/pipelines/tests/test_singleton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import pytest
import ray
import pandas as pd
import numpy as np
import sklearn.base as base
from sklearn.preprocessing import MinMaxScaler
import codeflare.pipelines.Datamodel as dm
import codeflare.pipelines.Runtime as rt
from codeflare.pipelines.Datamodel import Xy
from codeflare.pipelines.Datamodel import XYRef
from codeflare.pipelines.Runtime import ExecutionType

def test_singleton():

ray.shutdown()
ray.init()

## prepare the data
X = np.random.randint(0,100,size=(10000, 4))
y = np.random.randint(0,2,size=(10000, 1))

## initialize codeflare pipeline by first creating the nodes
pipeline = dm.Pipeline()
node_a = dm.EstimatorNode('a', MinMaxScaler())
pipeline.add_node(node_a)

pipeline_input = dm.PipelineInput()
xy = dm.Xy(X,y)
pipeline_input.add_xy_arg(node_a, xy)

## execute the codeflare pipeline
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.TRANSFORM, pipeline_input)

## retrieve node e
node_a_output = pipeline_output.get_xyrefs(node_a)
Xout = ray.get(node_a_output[0].get_Xref())
yout = ray.get(node_a_output[0].get_yref())

assert Xout.shape[0] == 10000
assert yout.shape[0] == 10000

ray.shutdown()

if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))