-
Notifications
You must be signed in to change notification settings - Fork 182
/
pipeline.py
325 lines (254 loc) · 10.1 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
from skmultiflow.core import BaseSKMObject
from sklearn.utils import tosequence
class Pipeline(BaseSKMObject):
""" [Experimental] Holds a set of sequential operation (transforms),
followed by a single estimator.
It allows for easy manipulation of datasets that may
require several transformation processes before being used by a learner.
Also allows for the cross-validation of several steps.
Each of the intermediate steps should be an extension of the BaseTransform
class, or at least implement the transform and partial_fit functions or the
partial_fit_transform.
The last step should be an estimator (learner), so it should implement
partial_fit, and predict at least.
Since it has an estimator as the last step, the Pipeline will act like
an estimator itself, in a way that it can be directly passed to evaluation
objects, as if it was a learner.
Parameters
----------
steps: list of tuple
Tuple list containing the set of transforms and the final estimator.
It doesn't need to contain a transform type object, but the estimator
is required. Each tuple should be of the format ('name', estimator).
Raises
------
TypeError: If the intermediate steps or the final estimator do not implement
the necessary functions for the pipeline to work, a TypeError is raised.
NotImplementedError: Some of the functions are yet to be implemented.
Notes
-----
This code is an experimental feature. Use with caution.
Examples
--------
>>> # Imports
>>> from skmultiflow.lazy import KNNADWINClassifier
>>> from skmultiflow.core import Pipeline
>>> from skmultiflow.data import FileStream
>>> from skmultiflow.evaluation import EvaluatePrequential
>>> from skmultiflow.transform import OneHotToCategorical
>>> # Setting up the stream
>>> stream = FileStream("https://raw.githubusercontent.com/scikit-multiflow/"
... "streaming-datasets/master/covtype.csv")
>>> transform = OneHotToCategorical([[10, 11, 12, 13],
... [14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
... 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53]])
>>> # Setting up the classifier
>>> classifier = KNNADWINClassifier(n_neighbors=8, max_window_size=2000, leaf_size=40)
>>> # Setup the pipeline
>>> pipe = Pipeline([('transform', transform), ('passive_aggressive', classifier)])
>>> # Setup the evaluator
>>> evaluator = EvaluatePrequential(show_plot=True, pretrain_size=1000, max_samples=500000)
>>> # Evaluate
>>> evaluator.evaluate(stream=stream, model=pipe)
"""
_estimator_type = 'pipeline'
def __init__(self, steps):
# Default values
super().__init__()
self.steps = tosequence(steps)
self.active = False
self.__configure()
def __configure(self):
""" __configure
Initial Pipeline configuration. Validates the Pipeline's steps.
"""
self._validate_steps()
def predict(self, X):
""" predict
Sequentially applies all transforms and then predict with last step.
Parameters
----------
X: numpy.ndarray of shape (n_samples, n_features)
All the samples we want to predict the label for.
Returns
-------
list
The predicted class label for all the samples in X.
"""
Xt = X
for name, transform in self.steps[:-1]:
if transform is not None:
Xt = transform.transform(Xt)
return self.steps[-1][-1].predict(Xt)
def fit(self, X, y):
""" fit
Sequentially fit and transform data in all but last step, then fit
the model in last step.
Parameters
----------
X: numpy.ndarray of shape (n_samples, n_features)
The data upon which the transforms/estimator will create their
model.
y: An array_like object of length n_samples
Contains the true class labels for all the samples in X.
Returns
-------
Pipeline
self
"""
Xt = X
for name, transform in self.steps[:-1]:
if transform is None:
pass
if hasattr(transform, "fit_transform"):
Xt = transform.fit_transform(Xt, y)
else:
Xt = transform.fit(Xt, y).transform(Xt)
if self._final_estimator is not None:
self._final_estimator.fit(Xt, y)
return self
def partial_fit(self, X, y, classes=None):
""" partial_fit
Sequentially partial fit and transform data in all but last step,
then partial fit data in last step.
Parameters
----------
X : numpy.ndarray of shape (n_samples, n_features)
The features to train the model.
y: numpy.ndarray of shape (n_samples)
An array-like with the class labels of all samples in X.
classes: numpy.ndarray
Array with all possible/known class labels. This is an optional parameter, except
for the first partial_fit call where it is compulsory.
Returns
-------
Pipeline
self
"""
Xt = X
for name, transform in self.steps[:-1]:
if transform is None:
pass
if hasattr(transform, 'fit_transform'):
Xt = transform.partial_fit_transform(Xt, y, classes=classes)
else:
Xt = transform.partial_fit(Xt, y, classes=classes).transform(Xt)
if self._final_estimator is not None:
if "classes" in self._final_estimator.partial_fit.__code__.co_varnames:
self._final_estimator.partial_fit(X=Xt, y=y, classes=classes)
else:
self._final_estimator.partial_fit(X=Xt, y=y)
return self
def partial_fit_predict(self, X, y):
""" partial_fit_predict
Partial fits and transforms data in all but last step, then partial
fits and predicts in the last step
Parameters
----------
X: numpy.ndarray of shape (n_samples, n_features)
All the samples we want to predict the label for.
y: An array_like object of length n_samples
Contains the true class labels for all the samples in X
Returns
-------
list
The predicted class label for all the samples in X.
"""
Xt = X
for name, transform in self.steps[:-1]:
if transform is None:
pass
if hasattr(transform, "partial_fit_transform"):
Xt = transform.partial_fit_transform(Xt, y)
else:
Xt = transform.partial_fit(Xt, y).transform(Xt)
if hasattr(self._final_estimator, "partial_fit_predict"):
return self._final_estimator.partial_fit_predict(Xt, y)
else:
return self._final_estimator.partial_fit(Xt, y).predict(Xt)
def partial_fit_transform(self, X, y=None):
""" partial_fit_transform
Partial fits and transforms data in all but last step, then
partial_fit in last step
Parameters
----------
X: numpy.ndarray of shape (n_samples, n_features)
The data upon which the transforms/estimator will create their
model.
y: An array_like object of length n_samples
Contains the true class labels for all the samples in X
Returns
-------
Pipeline
self
"""
raise NotImplementedError
def _validate_steps(self):
""" validate_steps
Validates all steps, guaranteeing that there's an estimator in its last step.
Alters the value of self.active according to the validity of the steps.
Raises
------
TypeError: If the intermediate steps or the final estimator do not implement
the necessary functions for the pipeline to work, a TypeError is raised.
"""
names, estimators = zip(*self.steps)
classifier = estimators[-1]
transforms = estimators[:-1]
self.active = True
for t in transforms:
if t is None:
continue
else:
if not (hasattr(t, "fit") or hasattr(t, "fit_transform")) \
or not hasattr(t, "transform"):
self.active = False
raise TypeError("All intermediate steps, including an evaluator, "
"should implement fit and transform.")
if classifier is not None and not hasattr(classifier, "partial_fit"):
self.active = False
def named_steps(self):
""" named_steps
Generates a dictionary to access all the steps' properties.
Returns
-------
dictionary
A steps dictionary, so that each step can be accessed by name.
"""
return dict(self.steps)
def get_info(self):
info = "Pipeline:\n["
names, estimators = zip(*self.steps)
learner = estimators[-1]
transforms = estimators[:-1]
i = 0
for t in transforms:
try:
if t.get_info() is not None:
info += t.get_info()
info += "\n"
else:
info += 'Transform: no info available'
except NotImplementedError:
info += 'Transform: no info available'
i += 1
if learner is not None:
try:
if hasattr(learner, 'get_info'):
info += learner.get_info()
else:
info += 'Learner: no info available'
except NotImplementedError:
info += 'Learner: no info available'
info += "]"
return info
@property
def _final_estimator(self):
""" _final_estimator
Easy to access estimator.
Returns
-------
Extension of BaseClassifier
The Pipeline's classifier
"""
return self.steps[-1][-1]