Skip to content

Commit fa8e90f

Browse files
committed
testing pipeline with more than one branch
1 parent 170086d commit fa8e90f

File tree

1 file changed

+102
-0
lines changed

1 file changed

+102
-0
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import pytest
2+
import ray
3+
import pandas as pd
4+
import numpy as np
5+
from sklearn.compose import ColumnTransformer
6+
from sklearn.model_selection import train_test_split
7+
from sklearn.pipeline import Pipeline
8+
from sklearn.preprocessing import StandardScaler, MinMaxScaler
9+
from sklearn.tree import DecisionTreeClassifier
10+
import codeflare.pipelines.Datamodel as dm
11+
import codeflare.pipelines.Runtime as rt
12+
from codeflare.pipelines.Datamodel import Xy
13+
from codeflare.pipelines.Datamodel import XYRef
14+
from codeflare.pipelines.Runtime import ExecutionType
15+
16+
class FeatureUnion(dm.AndTransform):
17+
def __init__(self):
18+
pass
19+
20+
def transform(self, xy_list):
21+
X_list = []
22+
y_list = []
23+
24+
for xy in xy_list:
25+
X_list.append(xy.get_x())
26+
X_concat = np.concatenate(X_list, axis=0)
27+
28+
return Xy(X_concat, None)
29+
30+
def test_multibranch():
31+
32+
ray.shutdown()
33+
ray.init()
34+
35+
## prepare the data
36+
X = pd.DataFrame(np.random.randint(0,100,size=(10000, 4)), columns=list('ABCD'))
37+
y = pd.DataFrame(np.random.randint(0,2,size=(10000, 1)), columns=['Label'])
38+
39+
numeric_features = X.select_dtypes(include=['int64']).columns
40+
numeric_transformer = Pipeline(steps=[
41+
('scaler', StandardScaler())])
42+
43+
## set up preprocessor as StandardScaler
44+
preprocessor = ColumnTransformer(
45+
transformers=[
46+
('num', numeric_transformer, numeric_features),
47+
])
48+
49+
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
50+
51+
X_ref = ray.put(X_train)
52+
y_ref = ray.put(y_train)
53+
54+
Xy_ref = XYRef(X_ref, y_ref)
55+
Xy_ref_ptr = ray.put(Xy_ref)
56+
Xy_ref_ptrs = [Xy_ref_ptr]
57+
58+
## create two decision tree classifiers with different depth limit
59+
c_a = DecisionTreeClassifier(max_depth=3)
60+
c_b = DecisionTreeClassifier(max_depth=5)
61+
62+
## initialize codeflare pipeline by first creating the nodes
63+
pipeline = dm.Pipeline()
64+
node_a = dm.EstimatorNode('preprocess', preprocessor)
65+
node_b = dm.EstimatorNode('c_a', c_a)
66+
node_c = dm.EstimatorNode('c_b', c_b)
67+
68+
node_d = dm.EstimatorNode('d', MinMaxScaler())
69+
node_e = dm.EstimatorNode('e', StandardScaler())
70+
node_f = dm.AndNode('f', FeatureUnion())
71+
72+
## codeflare nodes are then connected by edges
73+
pipeline.add_edge(node_a, node_b)
74+
pipeline.add_edge(node_a, node_c)
75+
76+
pipeline.add_edge(node_a, node_d)
77+
pipeline.add_edge(node_d, node_e)
78+
pipeline.add_edge(node_d, node_f)
79+
80+
in_args={node_a: Xy_ref_ptrs}
81+
## execute the codeflare pipeline
82+
out_args = rt.execute_pipeline(pipeline, ExecutionType.FIT, in_args)
83+
assert out_args
84+
85+
## retrieve node b
86+
node_b_out_args = ray.get(out_args[node_b])
87+
b_out_xyref = node_b_out_args[0]
88+
ray.get(b_out_xyref.get_Xref())
89+
b_out_node = ray.get(b_out_xyref.get_currnoderef())
90+
sct_b = b_out_node.get_estimator()
91+
assert sct_b
92+
print(sct_b.feature_importances_)
93+
94+
## retrieve node f
95+
out_Xyrefs_f = ray.get(out_args[node_f])
96+
assert out_Xyrefs_f
97+
98+
ray.shutdown()
99+
100+
101+
if __name__ == "__main__":
102+
sys.exit(pytest.main(["-v", __file__]))

0 commit comments

Comments
 (0)