Skip to content

Commit

Permalink
Ordered Consumr reset changes
Browse files Browse the repository at this point in the history
- Was not setting to MemoryStorage and Replicas of 1 on recreate
- Added test that checks for MemoryStorage after a reset
- Improved error reporting when failing to recreate the consumer

Related to #596

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Oct 4, 2022
1 parent 95a2316 commit 5728b3a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
9 changes: 7 additions & 2 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -3028,6 +3028,8 @@ _recreateOrderedCons(void *closure)
cc.DeliverSubject = sub->subject;
cc.DeliverPolicy = js_DeliverByStartSequence;
cc.OptStartSeq = oci->sseq;
cc.MemoryStorage = true;
cc.Replicas = 1;
natsSub_Unlock(sub);

s = js_AddConsumer(&ci, jsi->js, jsi->stream, &cc, NULL, NULL);
Expand All @@ -3048,8 +3050,11 @@ _recreateOrderedCons(void *closure)
if (nc->opts->asyncErrCb != NULL)
{
char tmp[256];
snprintf(tmp, sizeof(tmp), "failed recreating ordered consumer: %u (%s), will try again",
s, natsStatus_GetText(s));
const char *lastErr = nats_GetLastError(NULL);

snprintf(tmp, sizeof(tmp),
"error recreating ordered consumer, will try again: status=%u error=%s",
s, (lastErr == NULL ? natsStatus_GetText(s) : lastErr));
natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp));
}
natsConn_Unlock(nc);
Expand Down
11 changes: 11 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -28483,6 +28483,8 @@ test_JetStreamOrderedConsSrvRestart(void)
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsOptions *opts = NULL;
const char *cons = NULL;
jsConsumerInfo *ci = NULL;
jsErrCode jerr= 0;
jsStreamConfig sc;
jsSubOptions so;
Expand Down Expand Up @@ -28563,6 +28565,15 @@ test_JetStreamOrderedConsSrvRestart(void)
natsMsg_Destroy(msg);
msg = NULL;

test("Check still memory storage: ");
natsSub_Lock(sub);
if (sub->jsi != NULL)
cons = sub->jsi->consumer;
natsSub_Unlock(sub);
s = js_GetConsumerInfo(&ci, js, "OCRESTART", cons, NULL, NULL);
testCond((s == NATS_OK) && (ci->Config->MemoryStorage) && (ci->Config->Replicas == 1))
jsConsumerInfo_Destroy(ci);

natsSubscription_Destroy(sub);
natsOptions_Destroy(opts);
_destroyDefaultThreadArgs(&args);
Expand Down

0 comments on commit 5728b3a

Please sign in to comment.