Skip to content

Commit

Permalink
Handle event bus discarded messages - closes #96 - fixes #97
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 18, 2020
1 parent 5dcce2d commit fb372bd
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ Base name: `vertx.eventbus`
* `messages.pending` - A <<counter>> of the number of messages received but not yet processed by an handler
* `messages.pending-local` - A <<counter>> of the number of messages locally received but not yet processed by an handler
* `messages.pending-remote` - A <<counter>> of the number of messages remotely received but not yet processed by an handler
* `messages.discarded` - A <<counter>> of the number of messages discarded by an handler
* `messages.discarded-local` - A <<counter>> of the number of messages locally discarded by an handler
* `messages.discarded-remote` - A <<counter>> of the number of messages remotely discarded by an handler
* `messages.received` - A <<throughput_meter>> representing the rate of which messages are being received
* `messages.received-local` - A <<throughput_meter>> representing the rate of which local messages are being received
* `messages.received-remote` - A <<throughput_meter>> representing the rate of which remote messages are being received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.ext.dropwizard.DropwizardMetricsOptions;
Expand All @@ -41,6 +42,9 @@ class EventBusMetricsImpl extends AbstractMetrics implements EventBusMetrics<Eve
private final Counter pending;
private final Counter pendingLocal;
private final Counter pendingRemote;
private final Counter discarded;
private final Counter discardedLocal;
private final Counter discardedRemote;
private final Meter bytesRead;
private final Meter bytesWritten;
private final ThroughputMeter receivedMessages;
Expand All @@ -64,6 +68,9 @@ class EventBusMetricsImpl extends AbstractMetrics implements EventBusMetrics<Eve
pending = counter("messages", "pending");
pendingLocal = counter("messages", "pending-local");
pendingRemote = counter("messages", "pending-remote");
discarded = counter("messages", "discarded");
discardedLocal = counter("messages", "discarded-local");
discardedRemote = counter("messages", "discarded-remote");
receivedMessages = throughputMeter("messages", "received");
receivedLocalMessages = throughputMeter("messages", "received-local");
receivedRemoteMessages = throughputMeter("messages", "received-remote");
Expand Down Expand Up @@ -147,6 +154,28 @@ public void scheduleMessage(HandlerMetric handler, boolean local) {
}
}

@Override
public void discardMessage(HandlerMetric handler, boolean local, Message<?> msg) {
if (handler.ignored) {
return;
}
pending.dec();
discarded.inc();
if (local) {
discardedLocal.inc();
pendingLocal.dec();
if (!handler.noMatch) {
handler.pendingLocalCount--;
}
} else {
discardedRemote.inc();
pendingRemote.dec();
if (!handler.noMatch) {
handler.pendingRemoteCount--;
}
}
}

@Override
public void beginHandleMessage(HandlerMetric handler, boolean local) {
if (handler.ignored) {
Expand Down
27 changes: 27 additions & 0 deletions src/test/java/io/vertx/ext/dropwizard/MetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,33 @@ public void testPendingCount() {
await();
}

@Test
public void testDiscardMessage() {
int num = 10;
EventBus eb = vertx.eventBus();
MessageConsumer<Object> consumer = eb.consumer("foo");
consumer.setMaxBufferedMessages(num);
consumer.pause();
consumer.handler(msg -> {
fail("should not be called");
});
for (int i = 0;i < num;i++) {
eb.send("foo", "the_message-" + i);
}
eb.send("foo", "last");
waitUntil(() -> {
JsonObject metrics = metricsService.getMetricsSnapshot(vertx.eventBus());
return getCount(metrics.getJsonObject("messages.discarded")) == 1L;
});
assertCount(metricsService.getMetricsSnapshot(vertx.eventBus()).getJsonObject("messages.pending"), 10L);
consumer.unregister();
waitUntil(() -> {
JsonObject metrics = metricsService.getMetricsSnapshot(vertx.eventBus());
return getCount(metrics.getJsonObject("messages.discarded")) == 11L;
});
assertCount(metricsService.getMetricsSnapshot(vertx.eventBus()).getJsonObject("messages.pending"), 0L);
}

@Test
public void testVertxMetrics() throws Exception {
JsonObject metrics = metricsService.getMetricsSnapshot(vertx);
Expand Down

0 comments on commit fb372bd

Please sign in to comment.