Skip to content

Commit

Permalink
Merge pull request #578 from nats-io/js_ephemeral_pull
Browse files Browse the repository at this point in the history
[ADDED] JetStream: support for ephemeral pull subscription.
  • Loading branch information
kozlovic committed Aug 30, 2022
2 parents df3c092 + 1eac401 commit aa827fd
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 16 deletions.
14 changes: 10 additions & 4 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -2347,7 +2347,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
s = nats_setDefaultError(NATS_NO_MEMORY);
else
{
if (isPullMode)
if (isPullMode && !nats_IsStringEmpty(consumer))
{
if (nats_asprintf(&(jsi->nxtMsgSubj), jsApiRequestNextT, jo.Prefix, stream, consumer) < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);
Expand Down Expand Up @@ -2452,7 +2452,16 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
// time, the consumer has been recreated (jsResetOrderedConsumer). So
// set only if jsi->consumer is NULL!
if (jsi->consumer == NULL)
{
DUP_STRING(s, jsi->consumer, info->Name);
if (s == NATS_OK)
{
NATS_FREE(jsi->nxtMsgSubj);
jsi->nxtMsgSubj = NULL;
if (nats_asprintf(&(jsi->nxtMsgSubj), jsApiRequestNextT, jo.Prefix, stream, jsi->consumer) < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);
}
}
natsSub_Unlock(sub);
}
}
Expand Down Expand Up @@ -2538,9 +2547,6 @@ js_PullSubscribe(natsSubscription **sub, jsCtx *js, const char *subject, const c
if (errCode != NULL)
*errCode = 0;

if (nats_IsStringEmpty(durable))
return nats_setError(NATS_INVALID_ARG, "%s", jsErrDurRequired);

// Check for invalid ack policy
if (opts != NULL)
{
Expand Down
9 changes: 5 additions & 4 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -5921,14 +5921,15 @@ js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject,
* A pull based consumer is a type of consumer that does not have a delivery subject,
* that is the library has to request for the messages to be delivered as needed from the server.
*
* \note All pull subscriptions must have a durable name.
*
* \note A durable name cannot contain the character ".".
* \note If no durable name is provided, the pull subscription will create an ephemeral
* JetStream consumer. The requirement for a durable name is lifted in NATS C client v3.4.0+
* and NATS Server v2.7.0+.
* \note If a durable name is specified, it cannot contain the character ".".
*
* @param sub the location where to store the pointer to the newly created #natsSubscription object.
* @param js the pointer to the #jsCtx object.
* @param subject the subject this subscription is created for.
* @param durable the durable name, which is required for pull subscriptions.
* @param durable the optional durable name.
* @param opts the pointer to the #jsOptions object, possibly `NULL`.
* @param subOpts the subscribe options, possibly `NULL`.
* @param errCode the location where to store the JetStream specific error code, or `NULL`
Expand Down
37 changes: 29 additions & 8 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -26499,14 +26499,6 @@ test_JetStreamSubscribePull(void)
testCond((s == NATS_INVALID_ARG) && (sub == NULL) && (jerr == 0));
nats_clearLastError();

test("Durable name required: ");
s = js_PullSubscribe(&sub, js, "foo", NULL, NULL, NULL, &jerr);
if (s == NATS_INVALID_ARG)
s = js_PullSubscribe(&sub, js, "foo", "", NULL, NULL, &jerr);
testCond((s == NATS_INVALID_ARG)
&& (strstr(nats_GetLastError(NULL), jsErrDurRequired) != NULL));
nats_clearLastError();

for (i=0; i<2; i++)
{
test("Create pull sub (invalid ack mode): ");
Expand Down Expand Up @@ -26807,6 +26799,35 @@ test_JetStreamSubscribePull(void)
nats_clearLastError();

natsSubscription_Destroy(sub);
sub = NULL;

test("Ephemeral pull allowed (NULL): ");
s = js_PullSubscribe(&sub, js, "bar", NULL, NULL, NULL, &jerr);
testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0));

test("Send a message: ");
s = js_Publish(NULL, js, "bar", "hello", 5, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Msgs received: ");
s = natsSubscription_Fetch(&list, sub, 1, 1000, &jerr);
testCond((s == NATS_OK) && (list.Msgs != NULL) && (list.Count == 1) && (jerr == 0));

natsMsgList_Destroy(&list);
natsSubscription_Destroy(sub);
sub = NULL;

test("Ephemeral pull allowed (empty): ");
s = js_PullSubscribe(&sub, js, "bar", "", NULL, NULL, &jerr);
testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0));

test("Msgs received: ");
s = natsSubscription_Fetch(&list, sub, 1, 1000, &jerr);
testCond((s == NATS_OK) && (list.Msgs != NULL) && (list.Count == 1) && (jerr == 0));

natsMsgList_Destroy(&list);
natsSubscription_Destroy(sub);

JS_TEARDOWN;
_destroyDefaultThreadArgs(&args);
}
Expand Down

0 comments on commit aa827fd

Please sign in to comment.