Skip to content

Commit

Permalink
Simplification Tuning (#926)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Jun 14, 2023
1 parent bd1e460 commit dde7c17
Show file tree
Hide file tree
Showing 13 changed files with 50 additions and 57 deletions.
4 changes: 4 additions & 0 deletions src/examples/java/io/nats/examples/jetstream/NatsJsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,10 @@ public static int count408s(List<Message> messages) {
return count;
}

public static void createCleanMemStream(Connection nc, String stream, String... subs) throws IOException, JetStreamApiException {
createCleanMemStream(nc.jetStreamManagement(), stream, subs);
}

public static void createCleanMemStream(JetStreamManagement jsm, String stream, String... subs) throws IOException, JetStreamApiException {
try {
jsm.deleteStream(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static void main(String[] args) {
System.out.println("S3. " + streamContext.getStreamInfo());

// when you create a consumer from the stream context you get a ConsumerContext in return
ConsumerContext consumerContext = streamContext.addConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
System.out.println("C1. " + consumerContext.getCachedConsumerInfo());

// get a ConsumerContext from the connection for a pre-existing consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
consumerContext = streamContext.addConsumer(ConsumerConfiguration.builder().durable(consumerName).build());
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build());
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand All @@ -108,7 +108,8 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m
// create the consumer then use it
int receivedMessages = 0;
long receivedBytes = 0;
try (FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions)){
try {
FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions);
Message msg = consumer.nextMessage();
while (msg != null) {
msg.ack();
Expand Down Expand Up @@ -136,9 +137,6 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m
System.err.println("Exception should be handled properly, just exiting here.");
System.exit(-1);
}
catch (Exception e) {
// For FetchConsumer since it is AutoCloseable
}
long elapsed = System.currentTimeMillis() - start;

printSummary(receivedMessages, receivedBytes, elapsed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
consumerContext = streamContext.addConsumer(ConsumerConfiguration.builder().durable(consumerName).build());
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build());
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand All @@ -97,7 +97,8 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m

// create the consumer then use it
int receivedMessages = 0;
try (FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions)){
try {
FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions);
Message msg = consumer.nextMessage();
while (msg != null) {
msg.ack();
Expand All @@ -121,9 +122,6 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m
System.err.println("Exception should be handled properly, just exiting here.");
System.exit(-1);
}
catch (Exception e) {
// For FetchConsumer since it is AutoCloseable
}
long elapsed = System.currentTimeMillis() - start;

printSummary(receivedMessages, elapsed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static void main(String[] args) {
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
consumerContext = streamContext.addConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand All @@ -61,7 +61,8 @@ public static void main(String[] args) {
Thread consumeThread = new Thread(() -> {
int count = 0;
long start = System.nanoTime();
try (IterableConsumer consumer = consumerContext.consume()){
try {
IterableConsumer consumer = consumerContext.consume();
System.out.println("Starting main loop.");
while (count < STOP_COUNT) {
Message msg = consumer.nextMessage(1000);
Expand Down Expand Up @@ -95,9 +96,6 @@ public static void main(String[] args) {
// developer interrupted this thread?
return;
}
catch (Exception e) {
// For IterableConsumer since it is AutoCloseable
}
report("Done", System.nanoTime() - start, count);
});
consumeThread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.nats.client.api.ConsumerConfiguration;

import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static io.nats.examples.jetstream.simple.Utils.createOrReplaceStream;
Expand Down Expand Up @@ -52,7 +52,7 @@ public static void main(String[] args) {
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
streamContext.addConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
consumerContext = streamContext.consumerContext(CONSUMER_NAME);
}
catch (JetStreamApiException | IOException e) {
Expand All @@ -78,17 +78,12 @@ public static void main(String[] args) {
};

// create the consumer then use it
try (MessageConsumer consumer = consumerContext.consume(handler)){
try {
MessageConsumer consumer = consumerContext.consume(handler);
latch.await();
// once the consumer is stopped, the client will drain messages
System.out.println("Stop the consumer...");
CompletableFuture<Boolean> stopFuture = consumer.stop(1000);
try {
stopFuture.get(1, TimeUnit.SECONDS);
}
catch (ExecutionException | TimeoutException e) {
// from the future.get
}
consumer.stop(1000);
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand All @@ -98,9 +93,6 @@ public static void main(String[] args) {
// likely a connection problem
return;
}
catch (Exception e) {
// For IterableConsumer since it is AutoCloseable
}

report("Final", start, atomicCount.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static void main(String[] args) {
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
consumerContext = streamContext.addConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand Down
8 changes: 3 additions & 5 deletions src/main/java/io/nats/client/MessageConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
import io.nats.client.api.ConsumerInfo;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* The MessageConsumer interface is the core interface replacing
* a subscription for a simplified consumer.
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public interface MessageConsumer extends AutoCloseable {
public interface MessageConsumer {
/**
* Gets information about the consumer behind this subscription.
* @return consumer information
Expand All @@ -35,12 +34,11 @@ public interface MessageConsumer extends AutoCloseable {

/**
* Stop the MessageConsumer from asking for any more messages from the server.
* Messages do not immediately stop
* There still may be messages available and coming across the wire.
* @param timeout The time to wait for the stop to succeed, pass 0 to wait
* forever. Stop involves moving messages to and from the server
* so a very short timeout is not recommended.
* @return A future so you could wait for the stop to know when there are no more messages.
* @throws InterruptedException if one is thrown, in order to propagate it up
*/
CompletableFuture<Boolean> stop(long timeout) throws InterruptedException;
void stop(long timeout) throws InterruptedException;
}
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/StreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ public interface StreamContext {
ConsumerContext consumerContext(String consumerName) throws IOException, JetStreamApiException;

/**
* Management function to creates a consumer on this stream.
* Management function to create or update a consumer on this stream.
* @param config the consumer configuration to use.
* @return a ConsumerContext object
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
ConsumerContext addConsumer(ConsumerConfiguration config) throws IOException, JetStreamApiException;
ConsumerContext createOrUpdateConsumer(ConsumerConfiguration config) throws IOException, JetStreamApiException;

/**
* Management function to deletes a consumer.
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsMessageConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements TrackPendin

@Override
public void track(int pendingMessages, long pendingBytes, boolean trackingBytes) {
if (drainFuture == null &&
if (!stopped &&
(pmm.pendingMessages <= thresholdMessages
|| (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes)))
{
Expand Down
31 changes: 19 additions & 12 deletions src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

class NatsMessageConsumerBase implements MessageConsumer {
protected NatsJetStreamPullSubscription sub;
protected PullMessageManager pmm;
protected final Object subLock;
protected CompletableFuture<Boolean> drainFuture;
protected boolean stopped;

NatsMessageConsumerBase() {
subLock = new Object();
Expand All @@ -50,24 +49,28 @@ public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException
* {@inheritDoc}
*/
@Override
public CompletableFuture<Boolean> stop(long timeout) throws InterruptedException {
public void stop(long timeout) throws InterruptedException {
synchronized (subLock) {
if (drainFuture == null) {
if (sub.getNatsDispatcher() != null) {
drainFuture = sub.getDispatcher().drain(Duration.ofMillis(timeout));
if (!stopped) {
try {
if (sub.getNatsDispatcher() != null) {
sub.getDispatcher().drain(Duration.ofMillis(timeout));
}
else {
sub.drain(Duration.ofMillis(timeout));
}
}
else {
drainFuture = sub.drain(Duration.ofMillis(timeout));
finally {
stopped = true;
}
}
return drainFuture;
}
}

@Override
public void close() throws Exception {
synchronized (subLock) {
if (drainFuture == null && sub.isActive()) {
protected void finalize() throws Throwable {
try {
if (!stopped && sub.isActive()) {
if (sub.getNatsDispatcher() != null) {
sub.getDispatcher().unsubscribe(sub);
}
Expand All @@ -76,5 +79,9 @@ public void close() throws Exception {
}
}
}
catch (Throwable ignore) {
// nothing to do
}
super.finalize();
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsStreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public ConsumerContext consumerContext(String consumerName) throws IOException,
* {@inheritDoc}
*/
@Override
public ConsumerContext addConsumer(ConsumerConfiguration config) throws IOException, JetStreamApiException {
public ConsumerContext createOrUpdateConsumer(ConsumerConfiguration config) throws IOException, JetStreamApiException {
return new NatsConsumerContext(this, jsm.addOrUpdateConsumer(streamName, config));
}

Expand Down
10 changes: 4 additions & 6 deletions src/test/java/io/nats/client/impl/SimplificationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static io.nats.client.BaseConsumeOptions.*;
Expand Down Expand Up @@ -60,7 +59,7 @@ private static void _testStreamContext(JetStream js, StreamContext streamContext
assertThrows(JetStreamApiException.class, () -> streamContext.deleteConsumer(DURABLE));

ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(DURABLE).build();
ConsumerContext consumerContext = streamContext.addConsumer(cc);
ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(cc);
ConsumerInfo ci = consumerContext.getConsumerInfo();
assertEquals(STREAM, ci.getStreamName());
assertEquals(DURABLE, ci.getName());
Expand Down Expand Up @@ -367,8 +366,8 @@ public void testCoverage() throws Exception {
ConsumerContext cctx2 = nc.consumerContext(STREAM, name(2), JetStreamOptions.DEFAULT_JS_OPTIONS);
ConsumerContext cctx3 = js.consumerContext(STREAM, name(3));
ConsumerContext cctx4 = sctx1.consumerContext(name(4));
ConsumerContext cctx5 = sctx1.addConsumer(ConsumerConfiguration.builder().durable(name(5)).build());
ConsumerContext cctx6 = sctx1.addConsumer(ConsumerConfiguration.builder().durable(name(6)).build());
ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(name(5)).build());
ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(name(6)).build());

closeConsumer(cctx1.consume(), name(1), true);
closeConsumer(cctx2.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), name(2), true);
Expand All @@ -383,9 +382,8 @@ private void closeConsumer(MessageConsumer con, String name, boolean doStop) thr
ConsumerInfo ci = con.getConsumerInfo();
assertEquals(name, ci.getName());
if (doStop) {
assertTrue(con.stop(100).get(100, TimeUnit.MILLISECONDS));
con.stop(100);
}
con.close();
}

@Test
Expand Down

0 comments on commit dde7c17

Please sign in to comment.