Skip to content

Commit

Permalink
Merge branch 'main' into issue-502-b
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Nov 8, 2021
2 parents b686256 + c9f26a0 commit 6d76c3d
Showing 1 changed file with 46 additions and 28 deletions.
74 changes: 46 additions & 28 deletions src/test/java/io/nats/client/impl/JetStreamManagementTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,56 +364,74 @@ public void testAddDeleteConsumer() throws Exception {
}

@Test
public void testInvalidConsumerUpdates() throws Exception {
public void testValidConsumerUpdates() throws Exception {
runInJsServer(nc -> {
JetStreamManagement jsm = nc.jetStreamManagement();
createMemoryStream(jsm, STREAM, SUBJECT_GT);

ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(durable(1))
.ackPolicy(AckPolicy.Explicit)
.deliverSubject(deliver(1))
.maxDeliver(3)
.filterSubject(SUBJECT_GT)
.build();
assertValidAddOrUpdate(jsm, cc);

ConsumerConfiguration cc = prepForUpdateTest(jsm);
cc = ConsumerConfiguration.builder(cc).deliverSubject(deliver(2)).build();
assertValidAddOrUpdate(jsm, cc);

cc = ConsumerConfiguration.builder(cc).deliverPolicy(DeliverPolicy.New).build();
assertInvalidConsumerUpdate(jsm, cc);

cc = prepForUpdateTest(jsm);
cc = ConsumerConfiguration.builder(cc).ackWait(Duration.ofSeconds(5)).build();
assertInvalidConsumerUpdate(jsm, cc);

cc = ConsumerConfiguration.builder(cc).filterSubject(SUBJECT_STAR).build();
assertInvalidConsumerUpdate(jsm, cc);
assertValidAddOrUpdate(jsm, cc);

cc = prepForUpdateTest(jsm);
cc = ConsumerConfiguration.builder(cc).rateLimit(100).build();
assertInvalidConsumerUpdate(jsm, cc);
assertValidAddOrUpdate(jsm, cc);

cc = prepForUpdateTest(jsm);
cc = ConsumerConfiguration.builder(cc).maxAckPending(100).build();
assertValidAddOrUpdate(jsm, cc);

cc = prepForUpdateTest(jsm);
cc = ConsumerConfiguration.builder(cc).maxDeliver(4).build();
assertValidAddOrUpdate(jsm, cc);
});
}

@Test
public void testInvalidConsumerUpdates() throws Exception {
runInJsServer(nc -> {
JetStreamManagement jsm = nc.jetStreamManagement();
createMemoryStream(jsm, STREAM, SUBJECT_GT);

ConsumerConfiguration cc = prepForUpdateTest(jsm);
cc = ConsumerConfiguration.builder(cc).deliverPolicy(DeliverPolicy.New).build();
assertInvalidConsumerUpdate(jsm, cc);

cc = ConsumerConfiguration.builder(cc).idleHeartbeat(Duration.ofMillis(111)).build();
cc = prepForUpdateTest(jsm);
cc = ConsumerConfiguration.builder(cc).filterSubject(SUBJECT_STAR).build();
assertInvalidConsumerUpdate(jsm, cc);

cc = ConsumerConfiguration.builder(cc).maxDeliver(4).build();
cc = prepForUpdateTest(jsm);
cc = ConsumerConfiguration.builder(cc).idleHeartbeat(Duration.ofMillis(111)).build();
assertInvalidConsumerUpdate(jsm, cc);
});
}

private ConsumerConfiguration prepForUpdateTest(JetStreamManagement jsm) throws IOException, JetStreamApiException {
try {
jsm.deleteConsumer(STREAM, durable(1));
}
catch (Exception e) { /* ignore */ }

ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(durable(1))
.ackPolicy(AckPolicy.Explicit)
.deliverSubject(deliver(1))
.maxDeliver(3)
.filterSubject(SUBJECT_GT)
.build();
assertValidAddOrUpdate(jsm, cc);
return cc;
}

private void assertInvalidConsumerUpdate(JetStreamManagement jsm, ConsumerConfiguration cc) {
JetStreamApiException e = assertThrows(JetStreamApiException.class, () -> jsm.addOrUpdateConsumer(STREAM, cc));
// 10013 consumer name already in use
// 10105 consumer already exists and is still active
assertTrue(e.getApiErrorCode() == 10013 || e.getApiErrorCode() == 10105);
assertEquals(400, e.getErrorCode());
assertEquals("consumer name already in use", e.getErrorDescription());
// \/ This is just a reminder that the server changed \/
// assertEquals(500, e.getErrorCode());
// assertEquals("consumer already exists", e.getErrorDescription());
assertEquals(10012, e.getApiErrorCode());
assertEquals(500, e.getErrorCode());
}

private void assertValidAddOrUpdate(JetStreamManagement jsm, ConsumerConfiguration cc) throws IOException, JetStreamApiException {
Expand Down

0 comments on commit 6d76c3d

Please sign in to comment.