Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

optimisations to event bus

  • Loading branch information...
commit 3f4212d6701a585cb19b25c70c72e5b407f5f75e 1 parent dfe5fce
@purplefox purplefox authored
View
2  gradle.properties
@@ -14,7 +14,7 @@
# limitations under the License.
#
# gradle version
-gradleVersion=1.0
+gradleVersion=1.1
vertxBuildToolsVersion=0.1-SNAPSHOT
# vert.x version
View
2  gradle/wrapper/gradle-wrapper.properties
@@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=http\://services.gradle.org/distributions/gradle-1.0-bin.zip
+distributionUrl=http\://services.gradle.org/distributions/gradle-1.1-bin.zip
View
50 vertx-core/src/main/java/org/vertx/java/core/eventbus/EventBus.java
@@ -352,7 +352,7 @@
* propagated to all nodes of the event bus, the handler will be called.
*/
void unregisterHandler(String address, Handler<? extends Message> handler,
- AsyncResultHandler<Void> resultHandler);
+ AsyncResultHandler<Void> resultHandler);
/**
* Unregisters a handler given the address and the handler
@@ -362,69 +362,29 @@ void unregisterHandler(String address, Handler<? extends Message> handler,
void unregisterHandler(String address, Handler<? extends Message> handler);
/**
- * Unregister a handler given the unique handler id
- * @param id The handler id
- */
- void unregisterHandler(String id);
-
- /**
- * Unregister a handler given the unique handler id
- * @param id The handler id
- * @param resultHandler Optional completion handler. If specified, when the unregister has been
- * propagated to all nodes of the event bus, the handler will be called.
- */
- void unregisterHandler(String id, AsyncResultHandler<Void> resultHandler);
-
- /**
- * Registers a handler against a uniquely generated address, the address is returned as the id
- * @param handler The handler
- * @return The unique id
- */
- String registerHandler(Handler<? extends Message> handler);
-
- /**
- * Registers a handler against a uniquely generated address, the address is returned as the id
- * @param handler
- * @param resultHandler Optional result handler. If specified, when the register has been
- * propagated to all nodes of the event bus, the handler will be called.
- * @return The unique id
- */
- String registerHandler(Handler<? extends Message> handler,
- AsyncResultHandler<Void> resultHandler);
-
- /**
* Registers a handler against the specified address
* @param address The address to register it at
* @param handler The handler
* @param resultHandler Optional completion handler. If specified, when the register has been
* propagated to all nodes of the event bus, the handler will be called.
- * @return The unique id
*/
- String registerHandler(String address, Handler<? extends Message> handler,
- AsyncResultHandler<Void> resultHandler);
+ void registerHandler(String address, Handler<? extends Message> handler,
+ AsyncResultHandler<Void> resultHandler);
/**
* Registers a handler against the specified address
* @param address The address to register it at
* @param handler The handler
- * @return The unique id
*/
- String registerHandler(String address, Handler<? extends Message> handler);
+ void registerHandler(String address, Handler<? extends Message> handler);
/**
* Registers a local handler against the specified address. The handler info won't
* be propagated across the cluster
* @param address The address to register it at
* @param handler The handler
- * @return The unqiue id
*/
- String registerLocalHandler(String address, Handler<? extends Message> handler);
+ void registerLocalHandler(String address, Handler<? extends Message> handler);
- /**
- * Registers a local handler against a uniquely generated address, the address is returned as the id
- * @param handler
- * @return The unique id
- */
- String registerLocalHandler(Handler<? extends Message> handler);
}
View
190 vertx-core/src/main/java/org/vertx/java/core/eventbus/impl/DefaultEventBus.java
@@ -36,16 +36,12 @@
import org.vertx.java.core.net.impl.ServerID;
import org.vertx.java.core.parsetools.RecordParser;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
/**
*
@@ -65,7 +61,8 @@
private SubsMap subs;
private final ConcurrentMap<ServerID, ConnectionHolder> connections = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<>();
- private final Map<String, HandlerInfo> handlersByID = new ConcurrentHashMap<>();
+ private final AtomicInteger seq = new AtomicInteger(0);
+ private final String prefix = UUID.randomUUID().toString();
public DefaultEventBus(VertxInternal vertx) {
// Just some dummy server ID
@@ -243,17 +240,36 @@ public void publish(String address, Byte message) {
sendOrPub(new ByteMessage(false, address, message), null);
}
+ public void registerHandler(String address, Handler<? extends Message> handler,
+ AsyncResultHandler<Void> completionHandler) {
+ registerHandler(address, handler, completionHandler, false, false);
+ }
+
+ public void registerHandler(String address, Handler<? extends Message> handler) {
+ registerHandler(address, handler, null);
+ }
+
+ public void registerLocalHandler(String address, Handler<? extends Message> handler) {
+ registerHandler(address, handler, null, false, true);
+ }
+
public void unregisterHandler(String address, Handler<? extends Message> handler,
AsyncResultHandler<Void> completionHandler) {
Context context = vertx.getOrAssignContext();
Handlers handlers = handlerMap.get(address);
if (handlers != null) {
- String handlerID = handlers.map.remove(new HandlerHolder(handler, false, context));
- if (handlerID != null) {
- handlersByID.remove(handlerID);
- getHandlerCloseHook(context).ids.remove(handlerID);
+ int size = handlers.list.size();
+ // Requires a list traversal. This is tricky to optimise since we can't use a set since
+ // we need fast ordered traversal for the round robin
+ for (int i = 0; i < size; i++) {
+ HandlerHolder holder = handlers.list.get(i);
+ if (holder.handler == handler) {
+ handlers.list.remove(i);
+ holder.removed = true;
+ break;
+ }
}
- if (handlers.map.isEmpty()) {
+ if (handlers.list.isEmpty()) {
handlerMap.remove(address);
if (subs != null) {
removeSub(address, serverID, completionHandler);
@@ -263,6 +279,7 @@ public void unregisterHandler(String address, Handler<? extends Message> handler
} else if (completionHandler != null) {
callCompletionHandler(completionHandler);
}
+ getHandlerCloseHook(context).entries.remove(new HandlerEntry(address, handler));
}
}
@@ -270,45 +287,6 @@ public void unregisterHandler(String address, Handler<? extends Message> handler
unregisterHandler(address, handler, null);
}
- public void unregisterHandler(String id) {
- unregisterHandler(id, (AsyncResultHandler<Void>) null);
- }
-
- public void unregisterHandler(String id, AsyncResultHandler<Void> completionHandler) {
- HandlerInfo info = handlersByID.get(id);
- if (info != null) {
- unregisterHandler(info.address, info.handler, completionHandler);
- } else if (completionHandler != null) {
- callCompletionHandler(completionHandler);
- }
- }
-
- public String registerHandler(Handler<? extends Message> handler) {
- return registerHandler(handler, null);
- }
-
- public String registerHandler(Handler<? extends Message> handler,
- AsyncResultHandler<Void> completionHandler) {
- return registerHandler(null, handler, completionHandler, false, false);
- }
-
- public String registerHandler(String address, Handler<? extends Message> handler,
- AsyncResultHandler<Void> completionHandler) {
- return registerHandler(address, handler, completionHandler, false, false);
- }
-
- public String registerHandler(String address, Handler<? extends Message> handler) {
- return registerHandler(address, handler, null);
- }
-
- public String registerLocalHandler(String address, Handler<? extends Message> handler) {
- return registerHandler(address, handler, null, false, true);
- }
-
- public String registerLocalHandler(Handler<? extends Message> handler) {
- return registerHandler(null, handler, null, false, true);
- }
-
public void close(Handler<Void> doneHandler) {
server.close(doneHandler);
}
@@ -376,7 +354,8 @@ private void sendOrPub(ServerID replyDest, final BaseMessage message, final Hand
try {
message.sender = serverID;
if (replyHandler != null) {
- message.replyAddress = UUID.randomUUID().toString();
+ //message.replyAddress = UUID.randomUUID().toString();
+ message.replyAddress = prefix + String.valueOf(seq.incrementAndGet());
registerHandler(message.replyAddress, replyHandler, null, true, false);
}
if (replyDest != null) {
@@ -416,15 +395,13 @@ public void handle(AsyncResult<ServerIDs> event) {
}
}
- private String registerHandler(String address, Handler<? extends Message> handler,
+ private void registerHandler(String address, Handler<? extends Message> handler,
AsyncResultHandler<Void> completionHandler,
boolean replyHandler, boolean localOnly) {
- Context context = vertx.getOrAssignContext();
- final String id = UUID.randomUUID().toString();
if (address == null) {
- address = id;
+ throw new NullPointerException("address");
}
- handlersByID.put(id, new HandlerInfo(address, handler));
+ Context context = vertx.getOrAssignContext();
Handlers handlers = handlerMap.get(address);
if (handlers == null) {
handlers = new Handlers();
@@ -441,8 +418,7 @@ public void handle(AsyncResult<Void> event) {
}
};
}
-
- handlers.map.put(new HandlerHolder(handler, replyHandler, context), id);
+ handlers.list.add(new HandlerHolder(handler, replyHandler, context));
if (subs != null && !replyHandler && !localOnly) {
// Propagate the information
subs.put(address, serverID, completionHandler);
@@ -450,14 +426,12 @@ public void handle(AsyncResult<Void> event) {
callCompletionHandler(completionHandler);
}
} else {
- handlers.map.put(new HandlerHolder(handler, replyHandler, context), id);
+ handlers.list.add(new HandlerHolder(handler, replyHandler, context));
if (completionHandler != null) {
callCompletionHandler(completionHandler);
}
}
- HandlerCloseHook hcl = getHandlerCloseHook(context);
- hcl.ids.add(id);
- return id;
+ getHandlerCloseHook(context).entries.add(new HandlerEntry(address, handler));
}
private HandlerCloseHook getHandlerCloseHook(Context context) {
@@ -579,35 +553,35 @@ private void receiveMessage(final BaseMessage msg) {
//Choose one
HandlerHolder holder = handlers.choose();
if (holder != null) {
- doReceive(handlers, msg, holder);
+ doReceive(msg, holder);
}
} else {
// Publish
- for (final HandlerHolder holder: handlers.map.keySet()) {
- doReceive(handlers, msg, holder);
+ for (final HandlerHolder holder: handlers.list) {
+ doReceive(msg, holder);
}
}
}
}
- private void doReceive(final Handlers handlers, final BaseMessage msg, final HandlerHolder holder) {
+ private void doReceive(final BaseMessage msg, final HandlerHolder holder) {
// Each handler gets a fresh copy
final Message copied = msg.copy();
holder.context.execute(new Runnable() {
public void run() {
- // Need to check handler is still there - the handler might have been removed after the message were sent but
- // before it was received
- if (handlers.map.containsKey(holder)) {
- try {
- holder.handler.handle(copied);
- } finally {
- if (holder.replyHandler) {
- unregisterHandler(msg.address, holder.handler);
- }
- }
+ // Need to check handler is still there - the handler might have been removed after the message were sent but
+ // before it was received
+ try {
+ if (!holder.removed) {
+ holder.handler.handle(copied);
+ }
+ } finally {
+ if (holder.replyHandler) {
+ unregisterHandler(msg.address, holder.handler);
}
}
+ }
});
}
@@ -615,6 +589,7 @@ public void run() {
final Context context;
final Handler handler;
final boolean replyHandler;
+ boolean removed;
HandlerHolder(Handler handler, boolean replyHandler, Context context) {
this.context = context;
@@ -706,43 +681,68 @@ public void handle(Exception e) {
}
}
- // TODO combine this with ServerIDs (make common class)
private static class Handlers {
- final Map<HandlerHolder, String> map = new ConcurrentHashMap<>();
- private Iterator<HandlerHolder> iter;
- synchronized HandlerHolder choose() {
- if (map.isEmpty()) {
- return null;
- } else {
- if (iter == null || !iter.hasNext()) {
- iter = map.keySet().iterator();
+
+ final List<HandlerHolder> list = new CopyOnWriteArrayList<>();
+ final AtomicInteger pos = new AtomicInteger(0);
+ HandlerHolder choose() {
+ while (true) {
+ int size = list.size();
+ if (size == 0) {
+ return null;
+ }
+ int p = pos.getAndIncrement();
+ if (p >= size - 1) {
+ pos.set(0);
}
try {
- return iter.next();
- } catch (NoSuchElementException e) {
- return null;
+ return list.get(p);
+ } catch (IndexOutOfBoundsException e) {
+ // Can happen
+ pos.set(0);
}
}
}
}
- private static class HandlerInfo {
+ private class HandlerEntry {
final String address;
final Handler<? extends Message> handler;
- private HandlerInfo(String address, Handler<? extends Message> handler) {
+ private HandlerEntry(String address, Handler<? extends Message> handler) {
this.address = address;
this.handler = handler;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (getClass() != o.getClass()) return false;
+ HandlerEntry entry = (HandlerEntry) o;
+ if (!address.equals(entry.address)) return false;
+ if (!handler.equals(entry.handler)) return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = address != null ? address.hashCode() : 0;
+ result = 31 * result + (handler != null ? handler.hashCode() : 0);
+ return result;
+ }
}
private class HandlerCloseHook implements Runnable {
- final List<String> ids = new ArrayList<>();
+
+ final Set<HandlerEntry> entries = new HashSet<>();
+ final Set<String> handlerIDs = new HashSet<>();
+
public void run() {
- for (String id: new ArrayList<>(ids)) {
- unregisterHandler(id);
+ for (HandlerEntry entry: new HashSet<>(entries)) {
+ unregisterHandler(entry.address, entry.handler);
}
}
+
}
}
View
20 vertx-lang/vertx-lang-jruby/src/main/ruby_scripts/core/event_bus.rb
@@ -98,11 +98,12 @@ def EventBus.register_handler(address, local_only = false, &message_hndlr)
raise "A message handler must be specified" if !message_hndlr
internal = InternalHandler.new(message_hndlr)
if local_only
- id = @@j_eventbus.registerLocalHandler(address, internal)
+ @@j_eventbus.registerLocalHandler(address, internal)
else
- id = @@j_eventbus.registerHandler(address, internal)
+ @@j_eventbus.registerHandler(address, internal)
end
- @@handler_map[id] = internal
+ id = java.util.UUID.randomUUID.toString
+ @@handler_map[id] = [address, internal]
id
end
@@ -114,12 +115,13 @@ def EventBus.register_handler(address, local_only = false, &message_hndlr)
def EventBus.register_simple_handler(local_only = false, &message_hndlr)
raise "A message handler must be specified" if !message_hndlr
internal = InternalHandler.new(message_hndlr)
+ id = java.util.UUID.randomUUID.toString
if local_only
- id = @@j_eventbus.registerLocalHandler(internal)
+ @@j_eventbus.registerLocalHandler(id, internal)
else
- id = @@j_eventbus.registerHandler(internal)
+ @@j_eventbus.registerHandler(id, internal)
end
- @@handler_map[id] = internal
+ @@handler_map[id] = [id, internal]
id
end
@@ -127,9 +129,9 @@ def EventBus.register_simple_handler(local_only = false, &message_hndlr)
# @param handler_id [FixNum] The id of the handler to unregister. Returned from {EventBus.register_handler}
def EventBus.unregister_handler(handler_id)
raise "A handler_id must be specified" if !handler_id
- handler = @@handler_map.delete(handler_id)
- raise "Cannot find handler for id #{handler_id}" if !handler
- @@j_eventbus.unregisterHandler(handler_id)
+ tuple = @@handler_map.delete(handler_id)
+ raise "Cannot find handler for id #{handler_id}" if !tuple
+ @@j_eventbus.unregisterHandler(tuple.first, tuple.last)
end
# @private
View
19 vertx-lang/vertx-lang-jython/src/main/python_scripts/core/event_bus.py
@@ -112,10 +112,11 @@ def register_handler(address, local_only=False, handler=None):
raise RuntimeError("handler is required")
internal = InternalHandler(handler)
if local_only:
- id = EventBus.java_eventbus().registerLocalHandler(address, internal)
+ EventBus.java_eventbus().registerLocalHandler(address, internal)
else:
- id = EventBus.java_eventbus().registerHandler(address, internal)
- EventBus.handler_dict[id] = internal
+ EventBus.java_eventbus().registerHandler(address, internal)
+ id = java.util.UUID.randomUUID().toString()
+ EventBus.handler_dict[id] = address, internal
return id
@staticmethod
@@ -133,11 +134,12 @@ def register_simple_handler(local_only=False, handler=None):
if handler is None:
raise RuntimeError("Handler is required")
internal = InternalHandler(handler)
+ id = java.util.UUID.randomUUID().toString()
if local_only:
- id = EventBus.java_eventbus().registerLocalHandler(internal)
+ EventBus.java_eventbus().registerLocalHandler(id, internal)
else:
- id = EventBus.java_eventbus().registerHandler(internal)
- EventBus.handler_dict[id] = internal
+ EventBus.java_eventbus().registerHandler(id, internal)
+ EventBus.handler_dict[id] = id, internal
return id
@staticmethod
@@ -147,8 +149,9 @@ def unregister_handler(handler_id):
Keyword arguments:
@param handler_id: the id of the handler to unregister. Returned from EventBus.register_handler
"""
- handler = EventBus.handler_dict.pop(handler_id)
- EventBus.java_eventbus().unregisterHandler(handler_id)
+ [address, handler] = EventBus.handler_dict.pop(handler_id)
+
+ EventBus.java_eventbus().unregisterHandler(address, handler)
@staticmethod
def convert_msg(message):
View
2  vertx-testsuite/src/test/java/org/vertx/java/tests/core/eventbus/PythonEventBusTest.java
@@ -49,7 +49,7 @@ public void test_echo_string() {
public void test_echo_fixnum() {
startTest(getMethodName());
}
-
+
public void test_echo_float() {
startTest(getMethodName());
}
View
8 vertx-testsuite/src/test/java/vertx/tests/core/eventbus/LocalClient.java
@@ -52,7 +52,8 @@ public void testPubSub() {
}
public void testPubSubMultipleHandlers() {
- Buffer buff = TestUtils.generateRandomBuffer(1000); eb.send("some-address", buff);
+ Buffer buff = TestUtils.generateRandomBuffer(1000);
+ eb.send("some-address", buff);
data.put("buffer", buff);
eb.publish("some-address", buff);
}
@@ -133,13 +134,14 @@ public void handle(Message<Buffer> msg) {
public void testRegisterNoAddress() {
final String msg = "foo";
final AtomicReference<String> idRef = new AtomicReference<>();
- String id = eb.registerHandler(new Handler<Message<String>>() {
+ String id = UUID.randomUUID().toString();
+ eb.registerHandler(id, new Handler<Message<String>>() {
boolean handled = false;
public void handle(Message<String> received) {
tu.azzert(!handled);
tu.azzert(msg.equals(received.body));
handled = true;
- eb.unregisterHandler(idRef.get());
+ eb.unregisterHandler(idRef.get(), this);
vertx.setTimer(100, new Handler<Long>() {
public void handle(Long timerID) {
tu.testComplete();
View
1  vertx-testsuite/src/test/java/vertx/tests/core/eventbus/LocalPeer.java
@@ -183,7 +183,6 @@ public void handle(AsyncResult<Void> event) {
public void handle(AsyncResult<Void> event) {
if (event.exception == null) {
tu.testComplete();
- System.out.println("Registered");
} else {
tu.azzert(false, "Failed to register");
}
View
2  vertx-testsuite/src/test/ruby_scripts/core/eventbus/test_client.rb
@@ -165,13 +165,11 @@ def echo(msg)
address = "some-address"
id = EventBus.register_handler(address) { |received|
- # puts "received: #{received.body}"
@tu.check_context
EventBus.unregister_handler(id)
received.reply(received.body)
}
EventBus.send(address, msg) { |reply|
- # puts "received reply #{reply.body}"
if reply.body.is_a? Hash
reply.body.each do |k, v|
@tu.azzert(msg[k] == v)
View
2  vertx-testsuite/src/test/ruby_scripts/core/isolation/test_client.rb
@@ -19,6 +19,7 @@
@tu = TestUtils.new
def test_isolation
+
# Make sure global variables aren't visible between applications
@tu.azzert($test_global == nil)
$test_global = 123
@@ -32,3 +33,4 @@ def vertx_stop
@tu.register_all(self)
@tu.app_ready
+

0 comments on commit 3f4212d

Please sign in to comment.
Something went wrong with that request. Please try again.