Skip to content

Commit 6fe833a

Browse files
committed
Merge remote-tracking branch 'origin/develop' into complex-example-1
2 parents 001a9f6 + e2aac4e commit 6fe833a

File tree

11 files changed

+509
-77
lines changed

11 files changed

+509
-77
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ This project is under active development. Keep an eye on this page for our first
1313

1414
See the [design document](https://docs.google.com/document/d/1t1K8N07TcbBKBgrcI6jf9tPow00cOKE9whnEVxOd4-U/edit) for more information on our design goals.
1515

16+
This project uses ZenHub for tracking of issues and roadmap.
17+
1618
## Example notebooks
1719

1820
**TODO:** Add instructions for running the notebooks in the `notebooks` directory.

codeflare/pipelines/Datamodel.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ class Node(ABC):
200200
A node class that is an abstract one, this is capturing basic info re the Node.
201201
The hash code of this node is the name of the node and equality is defined if the
202202
node name and the type of the node match.
203+
204+
When doing a grid search, a node can be parameterized with new params for the estimator and updated. This
205+
is an internal method used by grid search.
203206
"""
204207

205208
def __init__(self, node_name, estimator: BaseEstimator, node_input_type: NodeInputType, node_firing_type: NodeFiringType, node_state_type: NodeStateType):
@@ -210,6 +213,11 @@ def __init__(self, node_name, estimator: BaseEstimator, node_input_type: NodeInp
210213
self.__node_state_type__ = node_state_type
211214

212215
def __str__(self):
216+
"""
217+
Returns a string representation of the node along with the parameters of the estimator of the node.
218+
219+
:return: String representation of the node
220+
"""
213221
estimator_params_str = str(self.get_estimator().get_params())
214222
retval = self.__node_name__ + estimator_params_str
215223
return retval
@@ -247,9 +255,22 @@ def get_node_state_type(self) -> NodeStateType:
247255
return self.__node_state_type__
248256

249257
def get_estimator(self):
258+
"""
259+
Return the estimator of the node
260+
261+
:return: The node's estimator
262+
"""
250263
return self.__estimator__
251264

252265
def get_parameterized_node(self, node_name, **params):
266+
"""
267+
Get a parameterized node, given kwargs **params, convert this node and update the estimator with the
268+
new set of parameters. It will clone the node and its underlying estimator.
269+
270+
:param node_name: New node name
271+
:param params: Updated parameters
272+
:return:
273+
"""
253274
cloned_node = self.clone()
254275
cloned_node.__node_name__ = node_name
255276
estimator = cloned_node.get_estimator()
@@ -311,7 +332,6 @@ def __init__(self, node_name: str, estimator: BaseEstimator):
311332
"""
312333
super().__init__(node_name, estimator, NodeInputType.OR, NodeFiringType.ANY, NodeStateType.IMMUTABLE)
313334

314-
315335
def clone(self):
316336
"""
317337
Clones the given node and the underlying estimator as well, if it was initialized with
@@ -323,6 +343,17 @@ def clone(self):
323343

324344

325345
class AndEstimator(BaseEstimator):
346+
"""
347+
An and estimator, is part of the AndNode, it is very similar to a standard estimator, however the key
348+
difference is that it takes a `xy_list` as input and outputs an `xy`, contrasting to the EstimatorNode,
349+
which takes an input as `xy` and outputs `xy_t`.
350+
351+
In the pipeline execution, we expect three modes: (a) FIT: A regressor or classifier will call the fit
352+
and then pass on the transform results downstream, a non-regressor/classifier will call the fit_transform
353+
method, (b) PREDICT: A regressor or classifier will call the predict method, whereas a non-regressor/classifier
354+
will call the transform method, and (c) SCORE: A regressor will call the score method, and a non-regressor/classifer
355+
will call the transform method.
356+
"""
326357
@abstractmethod
327358
def transform(self, xy_list: list) -> Xy:
328359
raise NotImplementedError("And estimator needs to implement a transform method")

codeflare/pipelines/tests/test_Datamodel.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,32 +7,29 @@
77
from sklearn.pipeline import Pipeline
88
from sklearn.preprocessing import StandardScaler, MinMaxScaler
99
from sklearn.tree import DecisionTreeClassifier
10+
import sklearn.base as base
1011
import codeflare.pipelines.Datamodel as dm
1112
import codeflare.pipelines.Runtime as rt
1213
from codeflare.pipelines.Datamodel import Xy
1314
from codeflare.pipelines.Runtime import ExecutionType
1415

15-
1616
class FeatureUnion(dm.AndEstimator):
1717
def __init__(self):
1818
pass
19-
20-
def fit_transform(self, xy_list: list):
21-
return self.transform(xy_list)
22-
2319
def get_estimator_type(self):
2420
return 'transform'
25-
21+
def clone(self):
22+
return base.clone(self)
23+
def fit_transform(self, xy_list):
24+
return self.transform(xy_list)
2625
def transform(self, xy_list):
2726
X_list = []
28-
y_list = []
29-
27+
y_vec = None
3028
for xy in xy_list:
3129
X_list.append(xy.get_x())
32-
X_concat = np.concatenate(X_list, axis=0)
33-
34-
return Xy(X_concat, None)
35-
30+
y_vec = xy.get_y()
31+
X_concat = np.concatenate(X_list, axis=1)
32+
return Xy(X_concat, y_vec)
3633

3734
class MultibranchTestCase(unittest.TestCase):
3835

codeflare/pipelines/tests/test_and.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,30 @@
22
import ray
33
import pandas as pd
44
import numpy as np
5+
import sklearn.base as base
56
from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler
67
import codeflare.pipelines.Datamodel as dm
78
import codeflare.pipelines.Runtime as rt
89
from codeflare.pipelines.Datamodel import Xy
910
from codeflare.pipelines.Datamodel import XYRef
1011
from codeflare.pipelines.Runtime import ExecutionType
1112

12-
class FeatureUnion(dm.AndTransform):
13+
class FeatureUnion(dm.AndEstimator):
1314
def __init__(self):
1415
pass
15-
16+
def get_estimator_type(self):
17+
return 'transform'
18+
def clone(self):
19+
return base.clone(self)
20+
def fit_transform(self, xy_list):
21+
return self.transform(xy_list)
1622
def transform(self, xy_list):
1723
X_list = []
1824
y_vec = None
19-
2025
for xy in xy_list:
2126
X_list.append(xy.get_x())
2227
y_vec = xy.get_y()
2328
X_concat = np.concatenate(X_list, axis=1)
24-
2529
return Xy(X_concat, y_vec)
2630

2731
def test_two_tier_and():

codeflare/pipelines/tests/test_multibranch.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,30 @@
88
from sklearn.preprocessing import StandardScaler, MinMaxScaler
99
from sklearn.tree import DecisionTreeClassifier
1010
from sklearn.linear_model import LogisticRegression
11+
import sklearn.base as base
1112
import codeflare.pipelines.Datamodel as dm
1213
import codeflare.pipelines.Runtime as rt
1314
from codeflare.pipelines.Datamodel import Xy
1415
from codeflare.pipelines.Datamodel import XYRef
1516
from codeflare.pipelines.Runtime import ExecutionType
1617

17-
class FeatureUnion(dm.AndTransform):
18+
class FeatureUnion(dm.AndEstimator):
1819
def __init__(self):
1920
pass
20-
21+
def get_estimator_type(self):
22+
return 'transform'
23+
def clone(self):
24+
return base.clone(self)
25+
def fit_transform(self, xy_list):
26+
return self.transform(xy_list)
2127
def transform(self, xy_list):
2228
X_list = []
2329
y_vec = None
24-
2530
for xy in xy_list:
2631
X_list.append(xy.get_x())
2732
y_vec = xy.get_y()
2833
X_concat = np.concatenate(X_list, axis=1)
29-
30-
return Xy(X_concat, y_vec.values.ravel())
34+
return Xy(X_concat, y_vec)
3135

3236
def test_multibranch_1():
3337

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import pytest
2+
import ray
3+
4+
# Taking an example from sklearn pipeline to assert that
5+
# the classification report from a rediction from sklearn pipeline is
6+
# the same as that from the converted codeflare pipeline
7+
8+
from sklearn import set_config
9+
set_config(display='diagram')
10+
from sklearn.datasets import make_classification
11+
from sklearn.model_selection import train_test_split
12+
from sklearn.feature_selection import SelectKBest, f_classif
13+
from sklearn.pipeline import make_pipeline
14+
from sklearn.svm import LinearSVC
15+
from sklearn.metrics import classification_report
16+
17+
import codeflare.pipelines.Datamodel as dm
18+
import codeflare.pipelines.Runtime as rt
19+
from codeflare.pipelines.Datamodel import Xy
20+
from codeflare.pipelines.Datamodel import XYRef
21+
from codeflare.pipelines.Runtime import ExecutionType
22+
23+
#
24+
# prediction from an sklearn pipeline
25+
#
26+
27+
def test_pipeline_predict():
28+
29+
ray.shutdown()
30+
ray.init()
31+
32+
#
33+
# prediction from an sklearn pipeline
34+
#
35+
X, y = make_classification(
36+
n_features=20, n_informative=3, n_redundant=0, n_classes=2,
37+
n_clusters_per_class=2, random_state=42)
38+
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
39+
40+
anova_filter = SelectKBest(f_classif, k=3)
41+
clf = LinearSVC()
42+
43+
anova_svm = make_pipeline(anova_filter, clf)
44+
anova_svm.fit(X_train, y_train)
45+
46+
y_pred = anova_svm.predict(X_test)
47+
48+
report_sklearn = classification_report(y_test, y_pred)
49+
print(report_sklearn)
50+
51+
#
52+
# constructing a codeflare pipeline
53+
#
54+
pipeline = dm.Pipeline()
55+
node_anova_filter = dm.EstimatorNode('anova_filter', anova_filter)
56+
node_clf = dm.EstimatorNode('clf', clf)
57+
pipeline.add_edge(node_anova_filter, node_clf)
58+
59+
pipeline_input = dm.PipelineInput()
60+
xy = dm.Xy(X_train, y_train)
61+
62+
pipeline_input.add_xy_arg(node_anova_filter, xy)
63+
64+
pipeline_output = rt.execute_pipeline(pipeline, ExecutionType.FIT, pipeline_input)
65+
66+
node_clf_output = pipeline_output.get_xyrefs(node_clf)
67+
68+
Xout = ray.get(node_clf_output[0].get_Xref())
69+
yout = ray.get(node_clf_output[0].get_yref())
70+
71+
selected_pipeline = rt.select_pipeline(pipeline_output, node_clf_output[0])
72+
73+
pipeline_input = dm.PipelineInput()
74+
pipeline_input.add_xy_arg(node_anova_filter, dm.Xy(X_test, y_test))
75+
76+
predict_output = rt.execute_pipeline(selected_pipeline, ExecutionType.PREDICT, pipeline_input)
77+
78+
predict_clf_output = predict_output.get_xyrefs(node_clf)
79+
80+
#y_pred = ray.get(predict_clf_output[0].get_yref())
81+
y_pred = ray.get(predict_clf_output[0].get_Xref())
82+
83+
84+
report_codeflare = classification_report(y_test, y_pred)
85+
86+
print(report_codeflare)
87+
88+
assert(report_sklearn == report_codeflare)
89+
90+
ray.shutdown()
91+
92+
93+
if __name__ == "__main__":
94+
sys.exit(pytest.main(["-v", __file__]))
95+

codeflare/pipelines/utils.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import graphviz
2+
import codeflare.pipelines.Datamodel as dm
3+
4+
5+
def pipeline_to_graph(pipeline: dm.Pipeline) -> graphviz.Digraph:
6+
"""
7+
Converts the given pipeline to a networkX graph for visualization.
8+
9+
:param pipeline: Pipeline to convert to networkX graph
10+
:return: A directed graph representing this pipeline
11+
"""
12+
graph = graphviz.Digraph()
13+
pipeline_nodes = pipeline.get_nodes()
14+
for pre_node in pipeline_nodes.values():
15+
post_nodes = pipeline.get_post_nodes(pre_node)
16+
graph.node(pre_node.get_node_name())
17+
for post_node in post_nodes:
18+
graph.node(post_node.get_node_name())
19+
graph.edge(pre_node.get_node_name(), post_node.get_node_name())
20+
return graph

codeflare_pipelines.egg-info/SOURCES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ codeflare/pipelines/Datamodel.py
55
codeflare/pipelines/Exceptions.py
66
codeflare/pipelines/Runtime.py
77
codeflare/pipelines/__init__.py
8+
codeflare/pipelines/utils.py
89
codeflare_pipelines.egg-info/PKG-INFO
910
codeflare_pipelines.egg-info/SOURCES.txt
1011
codeflare_pipelines.egg-info/dependency_links.txt

docs/.DS_Store

6 KB
Binary file not shown.

0 commit comments

Comments
 (0)