Skip to content

Commit

Permalink
Test: reproduce drain waiting indefinitely after request timeout on f…
Browse files Browse the repository at this point in the history
…etch
  • Loading branch information
MauriceVanVeen committed Oct 11, 2023
1 parent 92da51e commit c1fc788
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions src/test/java/io/nats/client/impl/DrainTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import io.nats.client.*;
import io.nats.client.ConnectionListener.Events;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StreamConfiguration;
import org.junit.jupiter.api.Test;

import java.time.Duration;
Expand Down Expand Up @@ -747,4 +749,40 @@ public void testThrowIfClosing() {
}
});
}

@Test
public void testDrainWithFetchRequestTimeoutStatusMessage() throws Exception {
try (NatsTestServer ts = new NatsTestServer(false)) {
final Connection nc = standardConnection(new Options.Builder().server(ts.getURI()).maxReconnects(0).build());

String stream = "stream";
nc.jetStreamManagement().addStream(StreamConfiguration.builder()
.name(stream)
.build()
);

StreamContext sc = nc.getStreamContext(stream);
ConsumerContext cc = sc.createOrUpdateConsumer(ConsumerConfiguration.builder()
.durable("consumer")
.build()
);

// fetch messages, let it time out, and don't close it manually
cc.fetch(FetchConsumeOptions.builder()
.maxMessages(10)
.expiresIn(Duration.ofSeconds(1).toMillis())
.build());

nc.flush(Duration.ofSeconds(1)); // Get the sub to the server, so drain has things to do


try {
Thread.sleep(2000); // go slow so fetch times out
} catch (Exception e) {
}

CompletableFuture<Boolean> tracker = nc.drain(Duration.ZERO);
assertTrue(tracker.get(1, TimeUnit.SECONDS));
}
}
}

0 comments on commit c1fc788

Please sign in to comment.