-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Streamz as input #113
base: master
Are you sure you want to change the base?
Conversation
…ake generators interact with streamz
Travis fails due to the latest version of sklearn. I will update a temporary workaround as part of the other open PR. I will upload the actual fix later today. |
from streamz import Stream | ||
|
||
@Stream.register_api() | ||
class predict(Stream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a method reserved for learning methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes predict
appear as a method on streamz.Stream instances. We might want a more explicit name, or, alternatively, not register the method at all, but instead use Stream.connect()
.
return self._emit(self.model.predict(X)) | ||
|
||
@Stream.register_api() | ||
class partial_fit(Stream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a method reserved for learning methods.
@@ -2,26 +2,26 @@ | |||
from skmultiflow.core.base_object import BaseObject | |||
|
|||
|
|||
class Stream(BaseObject, metaclass=ABCMeta): | |||
""" The abstract class setting up the minimum requirements of a stream, | |||
class BaseGenerator(BaseObject, metaclass=ABCMeta): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This renaming could raise some confusion given that not all extended-classes are generators. Suggestions: SMKStream
, BaseStream
, StreamMixin
,...
This is not limited to available methods, but also considering future sources such as kafka, tcp, http, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jacobmontiel do you have an example ? This was to avoid confusion with streamz.Stream.
IMO the extended-classes I have seen literally are generators, but I may be confused ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, there are two types of data sources supported: generators and data batches (files and raw data).
In this case, FileStream and DataStream are not generators. Future sources (via streamz) that come to mind include Kafka, TCP which also do not fall in the generator category.
During the first stage of development, we focused on generators because they are cheap (memory-wise). However, the next step is to allow multiple data sources. Generators are nice for reproducibility, but in order to make skmultiflow
useful for real-world applications, we need to provide the user with more options.
# TODO : inherit from pandas.DataFrame should make things easier | ||
""" DataGenerator | ||
|
||
A generator constructed from the entries of a static dataset (numpy array or pandas |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of having a single method for both file and raw data sources. However, it would be nice to make clear in the example section how both functionalities remain. This is because a lot of users rely on these methods. Optionally we could keep DataStream
and FileStreams
as wrappers with a deprecation warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless the users are well-involved in the development process, I would second deprecation or clear documentation on how to switch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File sources are already tackled by streamz via Source.from_textfile
I think I can add a deprecation warning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jacobmontiel for this case I don't see how a deprecation warning could help, taking the renaming into account
If users had pieces of code importing FileStream, they could not import it anyway because following the applied renaming it would be rename to FileGenerator ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean to have a wrapper class FileStream
(same name as before) that internally calls the new method. If the user calls FileStream
then the warning is raised.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the whole, I am impressed by how relatively easy it was to plug streamz into your workflow.
I have left some comments for discussion.
from streamz import Stream | ||
|
||
@Stream.register_api() | ||
class predict(Stream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes predict
appear as a method on streamz.Stream instances. We might want a more explicit name, or, alternatively, not register the method at all, but instead use Stream.connect()
.
def update(self, x, who=None): | ||
X, y = X | ||
self.model.partial_fit(X, y) | ||
return self._emit(self.model) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is what I was talking about. I guess it's OK to emit the whole model (in-process, anyway), and then downstream streamz nodes can decide what to do with it to get out any metrics of interest. On the other hand, the model is mutable, so downstreams had better not be async, else they may process on incorrect state.
Is there any obvious "output" to the fitting and current state that we could consider as a result of the partial fit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the sklearn way of doing, the output of partial_fit should be self.model
But in this case we are in a different context, calls in streamz may be asynchronous, which get things complicated
Option 1 :
Return self.model, and to make very explicit for users that this part of the code is not responsible for ensuring process on a correct state
Option 2 (requires more work):
Interally manage the access to self.model (via tornado conditions ? )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some consideration, Option 1 is OK for the moment - with appropriate caveats - and I don't expect we actually expect any async down-streams here for the moment. In examples, it would be good to show what you might do with the model to get some basic metrics out for plotting the performance of the model versus time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martindurant to answer to your question more deeply, most of the informations we could get from a model actually depend on the "state" of the model
Even something like a call to "get_infos()" would lead to incorrect results if runned asynchronously. So I don't see what useful information can be returned instead of the model itself ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map(lambda x: x.get_infos())
would run immediately, generate a string, and be correct at the moment it is called. If there are downstream nodes that want to do something else with the model object, they may be slightly out-of-date, but not by much. If the object is to plot a time-series of the model performance, I don't anticipate much of a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I got your point
I would actually be OK for returning get_infos() for a V1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_info
is just a method to provide the initial configuration of the estimator, so it does not change as the model is trained. I prefer return self
because the user could then call something like model.score(X, y_true)
to have an estimation on performance. score()
is now available in all classifiers and regressors.
""" | ||
return self.name + " - {} target(s), {} classes, {} features".format(self.n_targets, | ||
self.n_classes, self.n_features) | ||
|
||
def get_class_type(self): | ||
return 'stream' | ||
return type(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always a multiflow-specific type here? Streamz comes in many classes, hope it's not possible to end up here with one of those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_class_type
has been deprecated. However, we could take the chance to define a method in the base class to return the stream (streamz) type.
# TODO : inherit from pandas.DataFrame should make things easier | ||
""" DataGenerator | ||
|
||
A generator constructed from the entries of a static dataset (numpy array or pandas |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless the users are well-involved in the development process, I would second deprecation or clear documentation on how to switch.
while self.mf_gen.has_more_samples(): | ||
sample = self.mf_gen.next_sample(self.batch_size) | ||
yield self._emit(sample) | ||
yield gen.sleep(self.poll_interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self.stopped: break
?
A common pattern in other async sources, allowing stream.stop()
.
Is there a practical consequence to having this async? I doesn't need to be, to get closer to the functionality and performance overhead of the previous version. I note that the downstream fit/predict functionality is not async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I don't see any consequence of making this async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be checked: I think you will get exactly one event emitted per event loop tick, which may not be significant overhead compared to the time spent training for each sample.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
sliding_array = stream.sliding_window(10).map(pd.np.concatenate) # get last ten elements | ||
data_from_stream = sliding_array.map(SimpleImputer(missing_values=-47, strategy='median').fit_transform).sink_to_list() | ||
stream.start() | ||
time.sleep(1) # wait long enough to let all batches pass through the stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend against this in tests, but maybe OK in a demo. There are helpers like wait_for that could be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not know there was a function for this
Thanks !!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually @martindurant this is not a test, this is a demo.
In the future if demos are replaced by notebooks (I raised an issue for this)
Then calls to sleep or wait_for would not make sense
The cells of the notebook would have to be executed in time, that's all
…to stop this Source
Changes proposed in this pull request:
skmultiflow.data
andstreamz.Source
to make skmultiflow data generators as inputs to streamz.Stream(s)Note that this PR also deprecate some existing classes, in order to encourage usage of streamz as the default way of handling streaming
Checklist
This PR introduces new syntax
TODO : show syntax for partial_fit/predict