New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Plugin mechanism for messaging layer (Issue #372) #518
Conversation
merge changes from nathanmarz/storm
merge changes from nathanmarz/storm
We have now 2 messaging plugins: local.TransportPlugin and zmq.TransportPlugin. The newly created test program illustrates how topologies should work with such plugins. |
|
||
(bootstrap) | ||
|
||
(deftest test-messaging-plugins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test doesn't test the messaging as there's only one worker.
A few additional notes besides the inline comments I made:
|
Nathan: I have made the following changes with the latest commit: I could not remove ITransport due to the limitation of Clojure deftype. For both zmq and local tranport, we use clojure to define a context class, which needs to have some data fields. Clojure deftypes requires all those fields are be defined and passed in via constructor parameters. Clojure does not allow us to create a default constructor in such cases. Any suggestion? |
Nathan: I have addressed all issues you raised so far. zmq plugin is now implemented via Java, not Clojure. |
I created a simple implementation of this plugin API based on Java NIO.2. It's available at https://github.com/anfeng/storm-messaging-nio Once you accept this pull request, I will work on NIO.2 and/or Netty implementation. NIO.2 is simpler than Netty, but requires dependency on Java7. |
@@ -245,7 +244,7 @@ | |||
(dofor [endpoint-str new-connections | |||
:let [[node port] (string->endpoint endpoint-str)]] | |||
[endpoint-str | |||
(msg/connect | |||
(.connect | |||
(:mq-context worker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a type hint
Let's actually keep the zmq stuff implemented in clojure and do the zmq -> java stuff in a separate pull request. Then I'm comfortable merging this in. |
Please take a look at this revision. zmq is now put back into Clojure. TransportFactory has a slightly modified logic. |
(recv-with-flags [this flags] | ||
IConnection | ||
(^TaskMessage recv [this ^int flags] | ||
(log-debug "ZMQConnection recv()") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's get rid of these log-debugs in the critical path. They actually show up in the profiler.
Made a few more comments. As soon as those are addressed I can merge this in. |
Code revised per your suggestion. Hopefully we are ready to merge. |
Merged. |
This pull request proposes a plugin mechanism for messaging layer.
We are introducing a new Storm config parameter, storm.messaging.transport, to specify a desired plugin. For now, defaults.yaml states that zmq is such a transport.
A messaging plugin needs to implements a simple interface, ITransport, and should have a default constructor and implements, newContext(), to return an IContext.
public interface ITransport {
public IContext newContext();
}
IContext is defined to have the following methods, where prepare(storm_conf) is invoked immediately after IContext is constructed by Storm core (TransportFactory).
public interface IContext {
public void prepare(Map storm_conf);
};
IConnection is defined as below, where TaskMessage is a simple Java class contains task ID and message.
public interface IConnection {
public TaskMessage recv();
public TaskMessage recv_with_flags(int flags);
public void send(int task, byte[] message);
public void close();
}
I have modified zmq.clj to implements all these interfaces. local.clj has been revised to implement IContext and IConnection.
worker.clj has been revised slightly to leverage the plugin mechanism. It does not knows ZMQ anymore :-)