-
Notifications
You must be signed in to change notification settings - Fork 150
A simple plugin system #380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b7a3692
b52b92c
dda82ff
e4975ad
0149237
eb93e56
319b9e2
6c7f1f2
6e39c65
fe0f9d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| Plugins | ||
| ======= | ||
|
|
||
| In addition to using ``@Stream.register_api()`` decorator, custom stream nodes can | ||
| be added to Streamz by installing 3rd-party Python packages. | ||
|
|
||
|
|
||
| Known plugins | ||
| ------------- | ||
|
|
||
| Extras | ||
| ++++++ | ||
|
|
||
| These plugins are supported by the Streamz community and can be installed as extras, | ||
| e.g. ``pip install streamz[kafka]``. | ||
|
|
||
| There are no plugins here yet, but hopefully soon there will be. | ||
|
|
||
| .. only:: comment | ||
| ================= ====================================================== | ||
| Extra name Description | ||
| ================= ====================================================== | ||
| ``files`` Advanced filesystem operations: listening for new | ||
| files in a directory, writing to multiple files etc. | ||
| ``kafka`` Reading from and writing to Kafka topics. | ||
| ================= ====================================================== | ||
|
|
||
|
|
||
| Entry points | ||
| ------------ | ||
|
|
||
| Plugins register themselves with Streamz by using ``entry_points`` argument | ||
| in ``setup.py``: | ||
|
|
||
| .. code-block:: Python | ||
|
|
||
| # setup.py | ||
|
|
||
| from setuptools import setup | ||
|
|
||
| setup( | ||
| name="streamz_example_plugin", | ||
| version="0.0.1", | ||
| entry_points={ | ||
| "streamz.nodes": [ | ||
| "repeat = streamz_example_plugin:RepeatNode" | ||
| ] | ||
| } | ||
| ) | ||
|
|
||
| In this example, ``RepeatNode`` class will be imported from | ||
| ``streamz_example_plugin`` package and will be available as ``Stream.repeat``. | ||
| In practice, class name and entry point name (the part before ``=`` in entry point | ||
| definition) are usually the same, but they `can` be different. | ||
|
|
||
| Different kinds of add-ons go into different entry point groups: | ||
|
|
||
| =========== ======================= ===================== | ||
| Node type Required parent class Entry point group | ||
| =========== ======================= ===================== | ||
| Source ``streamz.Source`` ``streamz.sources`` | ||
| Node ``streamz.Stream`` ``streamz.nodes`` | ||
| Sink ``streamz.Stream`` ``streamz.sinks`` | ||
| =========== ======================= ===================== | ||
|
|
||
|
|
||
| Lazy loading | ||
| ++++++++++++ | ||
|
|
||
| Streamz will attach methods from existing plugins to the ``Stream`` class when it's | ||
| imported, but actual classes will be loaded only when the corresponding ``Stream`` | ||
| method is first called. Streamz will also validate the loaded class before attaching it | ||
| and will raise an appropriate exception if validation fails. | ||
|
|
||
|
|
||
| Reference implementation | ||
| ------------------------ | ||
|
|
||
| Let's look at how stream nodes can be implemented. | ||
|
|
||
| .. code-block:: Python | ||
|
|
||
| # __init__.py | ||
|
|
||
| from tornado import gen | ||
| from streamz import Stream | ||
|
|
||
|
|
||
| class RepeatNode(Stream): | ||
|
|
||
| def __init__(self, upstream, n, **kwargs): | ||
| super().__init__(upstream, ensure_io_loop=True, **kwargs) | ||
| self._n = n | ||
|
|
||
| @gen.coroutine | ||
| def update(self, x, who=None, metadata=None): | ||
| for _ in range(self._n): | ||
| yield self._emit(x, metadata=metadata) | ||
|
|
||
| As you can see, implementation is the same as usual, but there's no | ||
| ``@Stream.register_api()`` — Streamz will take care of that when loading the plugin. | ||
| It will still work if you add the decorator, but you don't have to. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,3 +2,4 @@ tornado | |
| toolz | ||
| zict | ||
| six | ||
| setuptools | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -260,7 +260,7 @@ def _remove_upstream(self, upstream): | |
| self.upstreams.remove(upstream) | ||
|
|
||
| @classmethod | ||
| def register_api(cls, modifier=identity): | ||
| def register_api(cls, modifier=identity, attribute_name=None): | ||
| """ Add callable to Stream API | ||
|
|
||
| This allows you to register a new method onto this class. You can use | ||
|
|
@@ -273,7 +273,7 @@ def register_api(cls, modifier=identity): | |
| >>> Stream().foo(...) # this works now | ||
|
|
||
| It attaches the callable as a normal attribute to the class object. In | ||
| doing so it respsects inheritance (all subclasses of Stream will also | ||
| doing so it respects inheritance (all subclasses of Stream will also | ||
| get the foo attribute). | ||
|
|
||
| By default callables are assumed to be instance methods. If you like | ||
|
|
@@ -285,15 +285,49 @@ def register_api(cls, modifier=identity): | |
| ... ... | ||
|
|
||
| >>> Stream.foo(...) # Foo operates as a static method | ||
|
|
||
| You can also provide an optional ``attribute_name`` argument to control | ||
| the name of the attribute your callable will be attached as. | ||
|
|
||
| >>> @Stream.register_api(attribute_name="bar") | ||
| ... class foo(Stream): | ||
| ... ... | ||
|
|
||
| >> Stream().bar(...) # foo was actually attached as bar | ||
| """ | ||
| def _(func): | ||
| @functools.wraps(func) | ||
| def wrapped(*args, **kwargs): | ||
| return func(*args, **kwargs) | ||
| setattr(cls, func.__name__, modifier(wrapped)) | ||
| name = attribute_name if attribute_name else func.__name__ | ||
| setattr(cls, name, modifier(wrapped)) | ||
| return func | ||
| return _ | ||
|
|
||
| @classmethod | ||
| def register_plugin_entry_point(cls, entry_point, modifier=identity): | ||
| if hasattr(cls, entry_point.name): | ||
| raise ValueError( | ||
| f"Can't add {entry_point.name} from {entry_point.module_name} " | ||
| f"to {cls.__name__}: duplicate method name." | ||
| ) | ||
|
|
||
| def stub(*args, **kwargs): | ||
| """ Entrypoints-based streamz plugin. Will be loaded on first call. """ | ||
| node = entry_point.load() | ||
| if not issubclass(node, Stream): | ||
| raise TypeError( | ||
| f"Error loading {entry_point.name} " | ||
| f"from module {entry_point.module_name}: " | ||
| f"{node.__class__.__name__} must be a subclass of Stream" | ||
| ) | ||
| if getattr(cls, entry_point.name).__name__ == "stub": | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this can be False, but it doesn't hurt to check.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is false when the plugin class returned from |
||
| cls.register_api( | ||
| modifier=modifier, attribute_name=entry_point.name | ||
| )(node) | ||
| return node(*args, **kwargs) | ||
| cls.register_api(modifier=modifier, attribute_name=entry_point.name)(stub) | ||
|
|
||
| def start(self): | ||
| """ Start any upstream sources """ | ||
| for upstream in self.upstreams: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| import warnings | ||
|
|
||
| import pkg_resources | ||
|
|
||
|
|
||
| def try_register(cls, entry_point, *modifier): | ||
| try: | ||
| cls.register_plugin_entry_point(entry_point, *modifier) | ||
| except ValueError: | ||
| warnings.warn( | ||
| f"Can't add {entry_point.name} from {entry_point.module_name}: " | ||
| "name collision with existing stream node." | ||
| ) | ||
|
|
||
|
|
||
| def load_plugins(cls): | ||
| for entry_point in pkg_resources.iter_entry_points("streamz.sources"): | ||
| try_register(cls, entry_point, staticmethod) | ||
| for entry_point in pkg_resources.iter_entry_points("streamz.nodes"): | ||
| try_register(cls, entry_point) | ||
| for entry_point in pkg_resources.iter_entry_points("streamz.sinks"): | ||
| try_register(cls, entry_point) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| import inspect | ||
|
|
||
| import pytest | ||
| from streamz import Source, Stream | ||
|
|
||
|
|
||
| class MockEntryPoint: | ||
|
|
||
| def __init__(self, name, cls, module_name=None): | ||
| self.name = name | ||
| self.cls = cls | ||
| self.module_name = module_name | ||
|
|
||
| def load(self): | ||
| return self.cls | ||
|
|
||
|
|
||
| def test_register_plugin_entry_point(): | ||
| class test_stream(Stream): | ||
| pass | ||
|
|
||
| entry_point = MockEntryPoint("test_node", test_stream) | ||
| Stream.register_plugin_entry_point(entry_point) | ||
|
|
||
| assert Stream.test_node.__name__ == "stub" | ||
|
|
||
| Stream().test_node() | ||
|
|
||
| assert Stream.test_node.__name__ == "test_stream" | ||
|
|
||
|
|
||
| def test_register_plugin_entry_point_modifier(): | ||
| class test_source(Source): | ||
| pass | ||
|
|
||
| entry_point = MockEntryPoint("from_test", test_source) | ||
| Stream.register_plugin_entry_point(entry_point, staticmethod) | ||
|
|
||
| Stream.from_test() | ||
|
|
||
| assert inspect.isfunction(Stream().from_test) | ||
|
|
||
|
roveo marked this conversation as resolved.
|
||
|
|
||
| def test_register_plugin_entry_point_raises_type(): | ||
| class invalid_node: | ||
| pass | ||
|
|
||
| entry_point = MockEntryPoint("test", invalid_node, "test_module.test") | ||
|
|
||
| Stream.register_plugin_entry_point(entry_point) | ||
|
|
||
| with pytest.raises(TypeError): | ||
| Stream.test() | ||
|
|
||
|
|
||
| def test_register_plugin_entry_point_raises_duplicate_name(): | ||
| entry_point = MockEntryPoint("map", None) | ||
|
|
||
| with pytest.raises(ValueError): | ||
| Stream.register_plugin_entry_point(entry_point) | ||
Uh oh!
There was an error while loading. Please reload this page.