Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StatisticsCollector to Options for custom statistics tracking #964

Merged
merged 7 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ public class Options {
* {@link Builder#errorListener(ErrorListener) errorListener}.
*/
public static final String PROP_ERROR_LISTENER = PFX + "callback.error";
/**
* Property used to configure a builder from a Properties object. {@value}, see
* {@link Builder#statisticsCollector(StatisticsCollector) statisticsCollector}.
*/
public static final String PROP_STATISTICS_COLLECTOR = PFX + "statisticscollector";
/**
* Property used to configure a builder from a Properties object. {@value}, see {@link Builder#maxPingsOut(int) maxPingsOut}.
*/
Expand Down Expand Up @@ -556,6 +561,7 @@ public class Options {

private final ErrorListener errorListener;
private final ConnectionListener connectionListener;
private final StatisticsCollector statisticsCollector;
private final String dataPortType;

private final boolean trackAdvancedStats;
Expand Down Expand Up @@ -661,6 +667,7 @@ public static class Builder {

private ErrorListener errorListener = null;
private ConnectionListener connectionListener = null;
private StatisticsCollector statisticsCollector = null;
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
private ExecutorService executor;
private List<java.util.function.Consumer<HttpRequest>> httpRequestInterceptors;
Expand Down Expand Up @@ -762,6 +769,7 @@ public Builder properties(Properties props) {

classnameProperty(props, PROP_ERROR_LISTENER, o -> this.errorListener = (ErrorListener) o);
classnameProperty(props, PROP_CONNECTION_CB, o -> this.connectionListener = (ConnectionListener) o);
classnameProperty(props, PROP_STATISTICS_COLLECTOR, o -> this.statisticsCollector = (StatisticsCollector) o);

stringProperty(props, PROP_DATA_PORT_TYPE, s -> this.dataPortType = s);
stringProperty(props, PROP_INBOX_PREFIX, this::inboxPrefix);
Expand Down Expand Up @@ -1327,6 +1335,19 @@ public Builder connectionListener(ConnectionListener listener) {
return this;
}

/**
* Set the {@link StatisticsCollector StatisticsCollector} to collect connection metrics.
* <p>
* If not set, then a default implementation will be used.
*
* @param collector the new StatisticsCollector for this connection.
* @return the Builder for chaining
*/
public Builder statisticsCollector(StatisticsCollector collector) {
this.statisticsCollector = collector;
return this;
}

/**
* Set the {@link ExecutorService ExecutorService} used to run threaded tasks. The default is a
* cached thread pool that names threads after the connection name (or a default). This executor
Expand Down Expand Up @@ -1532,6 +1553,7 @@ else if (useDefaultTls) {
new SynchronousQueue<>(),
new DefaultThreadFactory(threadPrefix));
}

return new Options(this);
}

Expand Down Expand Up @@ -1580,6 +1602,7 @@ public Builder(Options o) {

this.errorListener = o.errorListener;
this.connectionListener = o.connectionListener;
this.statisticsCollector = o.statisticsCollector;
this.dataPortType = o.dataPortType;
this.trackAdvancedStats = o.trackAdvancedStats;
this.executor = o.executor;
Expand Down Expand Up @@ -1638,6 +1661,7 @@ private Options(Builder b) {

this.errorListener = b.errorListener == null ? new ErrorListenerLoggerImpl() : b.errorListener;
this.connectionListener = b.connectionListener;
this.statisticsCollector = b.statisticsCollector;
this.dataPortType = b.dataPortType;
this.trackAdvancedStats = b.trackAdvancedStats;
this.executor = b.executor;
Expand Down Expand Up @@ -1689,6 +1713,13 @@ public ConnectionListener getConnectionListener() {
return this.connectionListener;
}

/**
* @return the statistics collector, or null, see {@link Builder#statisticsCollector(StatisticsCollector) statisticsCollector()} in the builder doc
*/
public StatisticsCollector getStatisticsCollector() {
return this.statisticsCollector;
}

/**
* @return the auth handler, or null, see {@link Builder#authHandler(AuthHandler) authHandler()} in the builder doc
*/
Expand Down
73 changes: 64 additions & 9 deletions src/main/java/io/nats/client/Statistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,88 @@
* <p>The Statistics toString() provides a summary of the statistics.
*/
public interface Statistics {

/**
* @return the total number of pings that have been sent from this connection.
*/
long getPings();

/**
* @return the total number of times this connection has tried to reconnect.
*/
long getReconnects();

/**
* @return the total number of messages dropped by this connection across all slow consumers.
*/
long getDroppedCount();

/**
* @return the total number of op +OKs received by this connection.
*/
long getOKs();

/**
* @return the total number of op -ERRs received by this connection.
*/
long getErrs();

/**
* @return the total number of exceptions seen by this connection.
*/
long getExceptions();

/**
* @return the total number of requests sent by this connection.
*/
long getRequestsSent();

/**
* @return the total number of replies received by this connection.
*/
long getRepliesReceived();

/**
* @return the total number of duplicate replies received by this connection.
*
* NOTE: This is only counted if advanced stats are enabled.
*/
long getDuplicateRepliesReceived();

/**
* @return the total number of orphan replies received by this connection.
*
* NOTE: This is only counted if advanced stats are enabled.
*/
long getOrphanRepliesReceived();

/**
* @return the total number of messages that have come in to this connection.
*/
public long getInMsgs();
long getInMsgs();

/**
* @return the total number of messages that have gone out of this connection.
*/
public long getOutMsgs();
long getOutMsgs();

/**
* @return the total number of message bytes that have come in to this connection.
*/
public long getInBytes();
long getInBytes();

/**
* @return the total number of message bytes that have gone out of to this connection.
* @return the total number of message bytes that have gone out of this connection.
*/
public long getOutBytes();
long getOutBytes();

/**
* @return the total number of times this connection has tried to reconnect.
* @return the total number of outgoing message flushes by this connection.
*/
public long getReconnects();
long getFlushCounter();

/**
* @return the total number of messages dropped by this connection across all slow consumers.
* @return the count of outstanding of requests from this connection.
*/
public long getDroppedCount();
long getOutstandingRequests();
}
131 changes: 131 additions & 0 deletions src/main/java/io/nats/client/StatisticsCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client;

/**
* A collector for connection metrics.
* <p>
* Information about key metrics is incremented on this collector by the connection.
* <p>
* See {@link Statistics} for accessing the collected metrics.
*/
public interface StatisticsCollector extends Statistics {
/**
* Sets whether advanced stats are/should be tracked.
*/
void setAdvancedTracking(boolean trackAdvanced);

/**
* Increments the total number of pings that have been sent from this connection.
*/
void incrementPingCount();

/**
* Increments the total number of times this connection has tried to reconnect.
*/
void incrementReconnects();

/**
* Increments the total number of messages dropped by this connection across all slow consumers.
*/
void incrementDroppedCount();

/**
* Increments the total number of op +OKs received by this connection.
*/
void incrementOkCount();

/**
* Increments the total number of op -ERRs received by this connection.
*/
void incrementErrCount();

/**
* Increments the total number of exceptions seen by this connection.
*/
void incrementExceptionCount();

/**
* Increments the total number of requests sent by this connection.
*/
void incrementRequestsSent();

/**
* Increments the total number of replies received by this connection.
*/
void incrementRepliesReceived();

/**
* Increments the total number of duplicate replies received by this connection.
* <p>
* NOTE: This is only counted if advanced stats are enabled.
*/
void incrementDuplicateRepliesReceived();

/**
* Increments the total number of orphan replies received by this connection.
* <p>
* NOTE: This is only counted if advanced stats are enabled.
*/
void incrementOrphanRepliesReceived();

/**
* Increments the total number of messages that have come in to this connection.
*/
void incrementInMsgs();

/**
* Increments the total number of messages that have gone out of this connection.
*/
void incrementOutMsgs();

/**
* Increment the total number of message bytes that have come in to this connection.
*/
void incrementInBytes(long bytes);

/**
* Increment the total number of message bytes that have gone out of this connection.
*/
void incrementOutBytes(long bytes);

/**
* Increment the total number of outgoing message flushes by this connection.
*/
void incrementFlushCounter();

/**
* Increments the count of outstanding of requests from this connection.
*/
void incrementOutstandingRequests();

/**
* Decrements the count of outstanding of requests from this connection.
*/
void decrementOutstandingRequests();

/**
* Registers a Socket read by this connection.
* <p>
* NOTE: Implementations should only count this if advanced stats are enabled.
*/
void registerRead(long bytes);

/**
* Registers a Socket write by this connection.
* <p>
* NOTE: Implementations should only count this if advanced stats are enabled.
*/
void registerWrite(long bytes);
}
18 changes: 10 additions & 8 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class NatsConnection implements Connection {

private final Options options;

private final NatsStatistics statistics;
private final StatisticsCollector statistics;

private boolean connecting; // you can only connect in one thread
private boolean disconnecting; // you can only disconnect in one thread
Expand Down Expand Up @@ -111,7 +111,8 @@ class NatsConnection implements Connection {
this.options = options;

advancedTracking = options.isTrackAdvancedStats();
this.statistics = new NatsStatistics(advancedTracking);
this.statistics = options.getStatisticsCollector() == null ? new NatsStatistics() : options.getStatisticsCollector();
this.statistics.setAdvancedTracking(advancedTracking);

this.statusLock = new ReentrantLock();
this.statusChanged = this.statusLock.newCondition();
Expand Down Expand Up @@ -1231,11 +1232,12 @@ void deliverReply(Message msg) {
statistics.incrementRepliesReceived();
}
else if (!oldStyle && !subject.startsWith(mainInbox)) {
if (advancedTracking && responsesRespondedTo.get(key) != null) {
statistics.incrementDuplicateRepliesReceived();
}
else {
statistics.incrementOrphanRepliesReceived();
if (advancedTracking) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, all orphan counter was being incremented for every duplicate or orphan when advancedTracking was not enabled. Does this change to first check the advancedTracking flag and only when enabled increment duplicate or orphan replies.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is some weird original code, and I'm to blame, sigh. This change is good.

if (responsesRespondedTo.get(key) != null) {
statistics.incrementDuplicateRepliesReceived();
} else {
statistics.incrementOrphanRepliesReceived();
}
}
}
}
Expand Down Expand Up @@ -1719,7 +1721,7 @@ public Statistics getStatistics() {
return this.statistics;
}

NatsStatistics getNatsStatistics() {
StatisticsCollector getNatsStatistics() {
return this.statistics;
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/nats/client/impl/NatsConnectionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.nats.client.impl;

import io.nats.client.Options;
import io.nats.client.StatisticsCollector;

import java.io.IOException;
import java.nio.BufferOverflowException;
Expand Down Expand Up @@ -108,7 +109,7 @@ Future<Boolean> stop() {
return this.stopped;
}

synchronized void sendMessageBatch(NatsMessage msg, DataPort dataPort, NatsStatistics stats)
synchronized void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector stats)
throws IOException {

int sendPosition = 0;
Expand Down Expand Up @@ -167,7 +168,7 @@ public void run() {

try {
dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
NatsStatistics stats = this.connection.getNatsStatistics();
StatisticsCollector stats = this.connection.getNatsStatistics();
int maxAccumulate = Options.MAX_MESSAGES_IN_NETWORK_BUFFER;

while (this.running.get()) {
Expand Down
Loading