Skip to content

Commit

Permalink
Merge pull request #493 from nats-io/fix_492
Browse files Browse the repository at this point in the history
[FIXED] JetStream: auto-ack callback ack handling
  • Loading branch information
kozlovic committed Dec 7, 2021
2 parents 5e3929f + 2f0dd37 commit 4837e7e
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 25 deletions.
2 changes: 1 addition & 1 deletion examples/js-sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
if (++count == total)
elapsed = nats_Now() - start;

natsMsg_Ack(msg, NULL);
// Since this is auto-ack callback, we don't need to ack here.
natsMsg_Destroy(msg);
}

Expand Down
28 changes: 4 additions & 24 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -1000,35 +1000,15 @@ static void
_autoAckCB(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
jsSub *jsi = (jsSub*) closure;
char _reply[256];
char *reply = NULL;
bool frply = false;

if (msg->reply != NULL)
{
if (strlen(msg->reply) < sizeof(_reply))
{
snprintf(_reply, sizeof(_reply), "%s", msg->reply);
reply = _reply;
}
else
{
reply = NATS_STRDUP(msg->reply);
frply = (reply != NULL ? true : false);
}
}
natsMsg_setNoDestroy(msg);

// Invoke user callback
(jsi->usrCb)(nc, sub, msg, jsi->usrCbClosure);

// Ack the message (unless we got a failure copying the reply subject)
if (reply == NULL)
return;

natsConnection_PublishString(nc, reply, jsAckAck);

if (frply)
NATS_FREE(reply);
natsMsg_Ack(msg, &(jsi->js->opts));
natsMsg_clearNoDestroy(msg);
natsMsg_Destroy(msg);
}

natsStatus
Expand Down
3 changes: 3 additions & 0 deletions src/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,9 @@ natsMsg_Destroy(natsMsg *msg)
if (msg == NULL)
return;

if (natsMsg_isNoDestroy(msg))
return;

if (natsGC_collect((natsGCItem *) msg))
return;

Expand Down
4 changes: 4 additions & 0 deletions src/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
#define natsMsg_isAcked(m) (((m)->flags & (1 << 1)) != 0)
#define natsMsg_clearAcked(m) ((m)->flags &= ~(1 << 1))

#define natsMsg_setNoDestroy(m) ((m)->flags |= (1 << 2))
#define natsMsg_isNoDestroy(m) (((m)->flags & (1 << 2)) != 0)
#define natsMsg_clearNoDestroy(m) ((m)->flags &= ~(1 << 2))

struct __natsMsg
{
natsGCItem gc;
Expand Down
59 changes: 59 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -23402,6 +23402,17 @@ _jsMsgHandler(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *clo
args->sum++;
if (args->control == 1)
natsMsg_Ack(msg, NULL);
else if (args->control == 3)
{
if (args->sum == 1)
natsMsg_Nak(msg, NULL);
else
{
while (!args->done)
natsCondition_Wait(args->c, args->m);
args->msgReceived = true;
}
}
if ((args->control != 2) || (args->sum == args->results[0]))
natsCondition_Broadcast(args->c);
natsMutex_Unlock(args->m);
Expand Down Expand Up @@ -23628,6 +23639,54 @@ test_JetStreamSubscribe(void)
natsSubscription_Destroy(sub);
sub = NULL;

test("Create sub with auto-ack: ");
natsMutex_Lock(args.m);
args.control = 3;
args.sum = 0;
natsMutex_Unlock(args.m);
jsSubOptions_Init(&so);
so.Stream = "TEST";
so.Config.DeliverPolicy = js_DeliverLast;
s = js_Subscribe(&sub, js, "foo", _jsMsgHandler, (void*)&args, NULL, &so, &jerr);
testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0));

test("Check msgs received: ");
natsMutex_Lock(args.m);
while ((s != NATS_TIMEOUT) && (args.sum < 1))
s = natsCondition_TimedWait(args.c, args.m, 2000);
natsMutex_Unlock(args.m);
testCond(s == NATS_OK);

test("Check NAck sent: ");
s = natsSubscription_NextMsg(&ack, ackSub, 1000);
testCond(s == NATS_OK);
natsMsg_Destroy(ack);
ack = NULL;

test("Check no auto-ack: ");
s = natsSubscription_NextMsg(&ack, ackSub, 100);
testCond((s == NATS_TIMEOUT) && (ack == NULL));
nats_clearLastError();
s = NATS_OK;

natsMutex_Lock(args.m);
args.done = true;
natsCondition_Broadcast(args.c);
while ((s != NATS_TIMEOUT) && !args.msgReceived)
s = natsCondition_TimedWait(args.c, args.m, 1000);
args.done = false;
args.msgReceived = false;
natsMutex_Unlock(args.m);

test("Check auto-ack sent: ");
s = natsSubscription_NextMsg(&ack, ackSub, 1000);
testCond(s == NATS_OK);
natsMsg_Destroy(ack);
ack = NULL;

natsSubscription_Destroy(sub);
sub = NULL;

test("Create queue sub: ");
natsMutex_Lock(args.m);
args.control = 0;
Expand Down

0 comments on commit 4837e7e

Please sign in to comment.