diff --git a/src/js.c b/src/js.c index 07cd998f..e31dd2f0 100644 --- a/src/js.c +++ b/src/js.c @@ -2197,38 +2197,68 @@ _checkConfig(jsConsumerConfig *s, jsConsumerConfig *u) static natsStatus _processConsInfo(const char **dlvSubject, jsConsumerInfo *info, jsConsumerConfig *userCfg, - bool isPullMode, const char *subj, const char *queue) + bool isPullMode, const char **subjects, int numSubjects, const char *queue) { - bool dlvSubjEmpty = false; - jsConsumerConfig *ccfg = info->Config; - const char *dg = NULL; - natsStatus s = NATS_OK; - bool matches = false; - int i; + bool dlvSubjEmpty = false; + jsConsumerConfig *ccfg = info->Config; + const char *dg = NULL; + natsStatus s = NATS_OK; + const char *stackFilterSubject[] = {ccfg->FilterSubject}; + const char **filterSubjects = stackFilterSubject; + int filterSubjectsLen = 1; + int incoming, existing; *dlvSubject = NULL; + // Always represent the consumer's filter subjects as a list, to match + // uniformly against the incoming subject list. Consider lists of 1 empty + // subject empty lists. + if (ccfg->FilterSubjectsLen > 0) + { + filterSubjects = ccfg->FilterSubjects; + filterSubjectsLen = ccfg->FilterSubjectsLen; + } + if ((filterSubjectsLen == 1) && nats_IsStringEmpty(filterSubjects[0])) + { + filterSubjects = NULL; + filterSubjectsLen = 0; + } + if ((numSubjects == 1) && nats_IsStringEmpty(subjects[0])) + { + subjects = NULL; + numSubjects = 0; + } - // Make sure this new subject matches or is a subset. - if (!nats_IsStringEmpty(subj)) + // Match the subjects against the consumer's filter subjects. + if (numSubjects > 0 && filterSubjectsLen > 0) { - if (nats_IsStringEmpty(ccfg->FilterSubject) && (ccfg->FilterSubjectsLen == 0)) - { - matches = true; - } - else if (!nats_IsStringEmpty(ccfg->FilterSubject) && nats_HasPrefix(subj, ccfg->FilterSubject)) - { - matches = true; - } - else if (ccfg->FilterSubjectsLen > 0) + // If the consumer has filter subject(s), then the subject(s) must match. + bool matches = true; + + // TODO - This is N**2, but we don't expect a large number of subjects. + for (incoming = 0; incoming < numSubjects; incoming++) { - for (i = 0; (i < ccfg->FilterSubjectsLen) && !matches; i++) + bool found = false; + for (existing = 0; existing < filterSubjectsLen; existing++) { - matches = nats_HasPrefix(subj, ccfg->FilterSubjects[i]); + if (strcmp(subjects[incoming], filterSubjects[existing]) == 0) + { + found = true; + break; + } + } + if (!found) + { + matches = false; + break; } } + if (!matches) { - return nats_setError(NATS_ERR, "subject '%s' does not match any consumer filter subjects.", subj); + if (numSubjects == 1 && filterSubjectsLen == 1) + return nats_setError(NATS_ERR, "subject '%s' does not match consumer filter subject '%s'.", subjects[0], filterSubjects[0]); + else + return nats_setError(NATS_ERR, "%d subjects do not match any consumer filter subjects.", numSubjects); } } // Check that if user wants to create a queue sub, @@ -2330,7 +2360,7 @@ js_checkConsName(const char *cons, bool isDurable) } static natsStatus -_subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const char *pullDurable, +_subscribeMulti(natsSubscription **new_sub, jsCtx *js, const char **subjects, int numSubjects, const char *pullDurable, natsMsgHandler usrCB, void *usrCBClosure, bool isPullMode, jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) { @@ -2393,7 +2423,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha consumer = opts->Consumer; consBound= (!nats_IsStringEmpty(stream) && !nats_IsStringEmpty(consumer)); - if (nats_IsStringEmpty(subject) && !consBound) + if (((numSubjects <= 0) || nats_IsStringEmpty(subjects[0])) && !consBound) return nats_setDefaultError(NATS_INVALID_ARG); // Do some quick checks here for ordered consumers. @@ -2450,9 +2480,10 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha // Find the stream mapped to the subject if not bound to a stream already, // that is, if user did not provide a `Stream` name through options). - if (nats_IsStringEmpty(stream)) + if (nats_IsStringEmpty(stream) && numSubjects > 0) { - s = _lookupStreamBySubject(&stream, nc, subject, &jo, errCode); + // Use the first subject to find the stream. + s = _lookupStreamBySubject(&stream, nc, subjects[0], &jo, errCode); if (s != NATS_OK) goto END; @@ -2475,7 +2506,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha s = nats_setError(NATS_ERR, "%s", "no configuration in consumer info"); goto END; } - s = _processConsInfo(&deliver, info, &(opts->Config), isPullMode, subject, opts->Queue); + s = _processConsInfo(&deliver, info, &(opts->Config), isPullMode, subjects, numSubjects, opts->Queue); if (s != NATS_OK) goto END; @@ -2509,7 +2540,15 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha } // Do filtering always, server will clear as needed. - cfg->FilterSubject = subject; + if (numSubjects == 1) + { + cfg->FilterSubject = subjects[0]; + } + else + { + cfg->FilterSubjects = subjects; + cfg->FilterSubjectsLen = numSubjects; + } if (opts->Ordered) { @@ -2559,8 +2598,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha s = nats_setDefaultError(NATS_NO_MEMORY); } IF_OK_DUP_STRING(s, jsi->stream, stream); - if ((s == NATS_OK) && !nats_IsStringEmpty(subject)) - DUP_STRING(s, jsi->psubj, subject); + IFOK(s, nats_formatStringArray(&jsi->psubj, subjects, numSubjects)); if (s == NATS_OK) { jsi->js = js; @@ -2713,6 +2751,26 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha return NATS_UPDATE_ERR_STACK(s); } +static natsStatus +_subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const char *pullDurable, + natsMsgHandler usrCB, void *usrCBClosure, bool isPullMode, + jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) +{ + natsStatus s = NATS_OK; + const char *singleSubject[] = {subject}; + int numSubjects = 1; + const char **subjects = singleSubject; + + if (nats_IsStringEmpty(subject)) + { + numSubjects = 0; + subjects = NULL; + } + + s = _subscribeMulti(new_sub, js, subjects, numSubjects, pullDurable, usrCB, usrCBClosure, isPullMode, jsOpts, opts, errCode); + return NATS_UPDATE_ERR_STACK(s); +} + natsStatus js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject, natsMsgHandler cb, void *cbClosure, @@ -2730,6 +2788,23 @@ js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject, return NATS_UPDATE_ERR_STACK(s); } +natsStatus +js_SubscribeMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects, + natsMsgHandler cb, void *cbClosure, + jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) +{ + natsStatus s; + + if (errCode != NULL) + *errCode = 0; + + if (cb == NULL) + return nats_setDefaultError(NATS_INVALID_ARG); + + s = _subscribeMulti(sub, js, subjects, numSubjects, NULL, cb, cbClosure, false, jsOpts, opts, errCode); + return NATS_UPDATE_ERR_STACK(s); +} + natsStatus js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject, jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) @@ -2743,6 +2818,19 @@ js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject, return NATS_UPDATE_ERR_STACK(s); } +natsStatus +js_SubscribeSyncMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects, + jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) +{ + natsStatus s; + + if (errCode != NULL) + *errCode = 0; + + s = _subscribeMulti(sub, js, subjects, numSubjects, NULL, NULL, NULL, false, jsOpts, opts, errCode); + return NATS_UPDATE_ERR_STACK(s); +} + natsStatus js_PullSubscribe(natsSubscription **sub, jsCtx *js, const char *subject, const char *durable, jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) diff --git a/src/jsm.c b/src/jsm.c index 7d8587ce..3c832149 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -3199,7 +3199,6 @@ js_unmarshalConsumerInfo(nats_JSON *json, jsConsumerInfo **new_ci) IFOK(s, nats_JSONGetBool(json, "push_bound", &(ci->PushBound))); IFOK(s, nats_JSONGetBool(json, "paused", &(ci->Paused))); IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(ci->PauseRemaining))); - if (s == NATS_OK) *new_ci = ci; else diff --git a/src/kv.c b/src/kv.c index cfb08ca9..81de4ce5 100644 --- a/src/kv.c +++ b/src/kv.c @@ -1057,14 +1057,30 @@ kvWatcher_Destroy(kvWatcher *w) natsStatus kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOptions *opts) +{ + const char *subjects = { key }; + return kvStore_WatchMulti(new_watcher, kv, &subjects, 1, opts); +} + +natsStatus +kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int numKeys, kvWatchOptions *opts) { natsStatus s; kvWatcher *w = NULL; jsSubOptions so; + char *singleSubject[1]; + char **multipleSubjects = NULL; // allocate if numKeys > 1 + char **subscribeSubjects = singleSubject; + int i; DEFINE_BUF_FOR_SUBJECT; - if ((new_watcher == NULL) || (kv == NULL) || nats_IsStringEmpty(key)) + if ((new_watcher == NULL) || (kv == NULL) || numKeys <= 0) return nats_setDefaultError(NATS_INVALID_ARG); + for (i=0; ikv = kv; w->refs = 1; - BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT); + if (numKeys == 1) + { + // special case for single key to avoid a calloc. + subscribeSubjects[0] = (char *)keys[0]; + + } + else + { + multipleSubjects = (char **)NATS_CALLOC(numKeys, sizeof(const char *)); + if (multipleSubjects == NULL) + { + _freeWatcher(w); + return nats_setDefaultError(NATS_NO_MEMORY); + } + subscribeSubjects = multipleSubjects; + } + for (i = 0; i < numKeys; i++) + { + const char *key = keys[i]; + BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT); // into buf, '\0'-terminated. + subscribeSubjects[i] = NATS_STRDUP(natsBuf_Data(&buf)); + if (subscribeSubjects[i] == NULL) + { + s = nats_setDefaultError(NATS_NO_MEMORY); + NATS_FREE_STRINGS(subscribeSubjects, i); + NATS_FREE(multipleSubjects); + _freeWatcher(w); + return nats_setDefaultError(NATS_NO_MEMORY); + } + } IFOK(s, natsMutex_Create(&(w->mu))); if (s == NATS_OK) { @@ -1096,7 +1141,7 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti // Need to explicitly bind to the stream here because the subject // we construct may not help find the stream when using mirrors. so.Stream = kv->stream; - s = js_SubscribeSync(&(w->sub), kv->js, natsBuf_Data(&buf), NULL, &so, NULL); + s = js_SubscribeSyncMulti(&(w->sub), kv->js, (const char **)subscribeSubjects, numKeys, NULL, &so, NULL); IFOK(s, natsSubscription_SetPendingLimits(w->sub, -1, -1)); if (s == NATS_OK) { @@ -1113,6 +1158,8 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti } natsBuf_Cleanup(&buf); + NATS_FREE_STRINGS(subscribeSubjects, numKeys); + NATS_FREE(multipleSubjects); if (s == NATS_OK) *new_watcher = w; diff --git a/src/mem.h b/src/mem.h index a73f4307..513998a2 100644 --- a/src/mem.h +++ b/src/mem.h @@ -27,6 +27,15 @@ #endif #define NATS_FREE(p) free((p)) +// **Note** does not free the array itself. +static void NATS_FREE_STRINGS(char **strings, int count) +{ + if (strings == NULL) + return; + for (int i = 0; i < count; i++) + NATS_FREE((char *)strings[i]); +} + // GNU C Library version 2.25 or later. #if defined(__GLIBC__) && \ (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 25)) diff --git a/src/nats.h b/src/nats.h index 4eff8210..9dd2c9b0 100644 --- a/src/nats.h +++ b/src/nats.h @@ -6338,14 +6338,16 @@ js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject, natsMsgHandler cb, void* cbClosure, jsOptions *opts, jsSubOptions *subOpts, jsErrCode *errCode); -/** \brief Create a synchronous subscription. +/** \brief Create an asynchronous subscription to multiple subjects. * - * See important notes in #js_Subscribe. + * Like #js_Subscribe, but accepts multiple subjects, each can be a wildcard. * * @param sub the location where to store the pointer to the newly created * #natsSubscription object. * @param js the pointer to the #jsCtx object. * @param subject the subject this subscription is created for. + * @param cb the #natsMsgHandler callback. + * @param cbClosure a pointer to an user defined object (can be `NULL`). See * the #natsMsgHandler prototype. * @param opts the pointer to the #jsOptions object, possibly `NULL`. * @param subOpts the subscribe options, possibly `NULL`. @@ -6353,9 +6355,46 @@ js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject, * if not needed. */ NATS_EXTERN natsStatus +js_SubscribeMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects, + natsMsgHandler cb, void *cbClosure, + jsOptions *opts, jsSubOptions *subOpts, jsErrCode *errCode); + +/** \brief Create a synchronous subscription. + * + * See important notes in #js_Subscribe. + * + * @param sub the location where to store the pointer to the newly created + * #natsSubscription object. + * @param js the pointer to the #jsCtx object. + * @param subject the subject this subscription is created for (consumer's FilterSubject). + * @param opts the pointer to the #jsOptions object, possibly `NULL`. + * @param subOpts the subscribe options, possibly `NULL`. + * @param errCode the location where to store the JetStream specific error code, or `NULL` + * if not needed. + */ +NATS_EXTERN natsStatus js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject, jsOptions *opts, jsSubOptions *subOpts, jsErrCode *errCode); +/** \brief Create an asynchronous subscription to multiple subjects. + * + * Like #js_SubscribeSync, but accepts multiple subjects, each can be a + * wildcard. + * + * @param sub the location where to store the pointer to the newly created + * #natsSubscription object. + * @param js the pointer to the #jsCtx object. + * @param subjects the subjects this subscription is created for (consumer's FilterSubjects). + * @param numSubjects the number of subjects for the subscription (consumer's FilterSubjectsLen). + * @param opts the pointer to the #jsOptions object, possibly `NULL`. + * @param subOpts the subscribe options, possibly `NULL`. + * @param errCode the location where to store the JetStream specific error code, or `NULL` + * if not needed. + */ +NATS_EXTERN natsStatus +js_SubscribeSyncMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects, + jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode); + /** \brief Create a pull subscriber. * * A pull based consumer is a type of consumer that does not have a delivery subject, @@ -7003,6 +7042,30 @@ kvStore_PurgeDeletes(kvStore *kv, kvPurgeOptions *opts); NATS_EXTERN natsStatus kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *keys, kvWatchOptions *opts); +/** \brief Returns a watcher for any updates to keys that match one of the + * `keys` argument. + * + * Returns a watcher for any updates to keys that match the one of `keys` + * argument, which could include wildcards. + * + * A `NULL` entry will be posted when the watcher has received all initial + * values. + * + * Call #kvWatcher_Next to get the next #kvEntry. + * + * \note The watcher should be destroyed to release memory using + * #kvWatcher_Destroy. + * + * @param new_watcher the location where to store the pointer to the new + * #kvWatcher object. + * @param kv the pointer to the #kvStore object. + * @param keys the keys (wildcard possible) to create the watcher for. + * @param numKeys the number of keys in the `keys` array. + * @param opts the watcher options, possibly `NULL`. + */ +NATS_EXTERN natsStatus +kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int numKeys, kvWatchOptions *opts); + /** \brief Returns a watcher for any updates to any keys of the KeyValue store bucket. * * Returns a watcher for any updates to any keys of the KeyValue store bucket. diff --git a/src/util.c b/src/util.c index f91e04eb..4510b4bf 100644 --- a/src/util.c +++ b/src/util.c @@ -2555,3 +2555,62 @@ nats_freeMetadata(natsMetadata *md) md->List = NULL; md->Count = 0; } + + +// allocates a sufficiently large buffer and formats the strings into it, as a +// ["unencoded-string-0","unencoded-string-1",...]. For an empty array of +// strings returns "[]". +natsStatus nats_formatStringArray(char **out, const char **strings, int count) +{ + natsStatus s = NATS_OK; + natsBuffer buf; + int len = 0; + int i; + + len++; // For the '[' + for (i = 0; i < count; i++) + { + len += 2; // For the quotes + if (i > 0) + len++; // For the ',' + if (strings[i] == NULL) + len += strlen("(null)"); + else + len += strlen(strings[i]); + } + len++; // For the ']' + len++; // For the '\0' + + s = natsBuf_Init(&buf, len); + + natsBuf_AppendByte(&buf, '['); + for (i = 0; (s == NATS_OK) && (i < count); i++) + { + if (i > 0) + { + IFOK(s, natsBuf_AppendByte(&buf, ',')); + } + IFOK(s, natsBuf_AppendByte(&buf, '"')); + if (strings[i] == NULL) + { + IFOK(s, natsBuf_Append(&buf, "(null)", -1)); + } + else + { + IFOK(s, natsBuf_Append(&buf, strings[i], -1)); + } + IFOK(s, natsBuf_AppendByte(&buf, '"')); + } + + IFOK(s, natsBuf_AppendByte(&buf, ']')); + IFOK(s, natsBuf_AppendByte(&buf, '\0')); + + if (s != NATS_OK) + { + natsBuf_Cleanup(&buf); + return s; + } + + *out = natsBuf_Data(&buf); + return NATS_OK; +} diff --git a/src/util.h b/src/util.h index 127bce0b..17e2d7bd 100644 --- a/src/util.h +++ b/src/util.h @@ -261,4 +261,7 @@ nats_IsSubjectValid(const char *subject, bool wcAllowed); natsStatus nats_parseTime(char *str, int64_t *timeUTC); +natsStatus +nats_formatStringArray(char **out, const char **strings, int count); + #endif /* UTIL_H_ */ diff --git a/test/list.txt b/test/list.txt index 18f5b898..60cee3e7 100644 --- a/test/list.txt +++ b/test/list.txt @@ -42,6 +42,7 @@ HeadersLift HeadersAPIs MsgIsJSControl SrvVersionAtLeast +FormatStringArray ReconnectServerStats ParseStateReconnectFunctionality ServersRandomize @@ -250,6 +251,7 @@ JetStreamInfoAlternates KeyValueManager KeyValueBasics KeyValueWatch +KeyValueWatchMulti KeyValueHistory KeyValueKeys KeyValueDeleteVsPurge diff --git a/test/test.c b/test/test.c index 7a3e1d97..07025563 100644 --- a/test/test.c +++ b/test/test.c @@ -5466,6 +5466,41 @@ test_natsSrvVersionAtLeast(void) natsConnection_Destroy(nc); } +static void +test_natsFormatStringArray(void) +{ + natsStatus s; + size_t i, N; + + typedef struct + { + const char *name; + const char **in; + int inLen; + const char *out; + } TC; + + TC tcs[] = { + {"Empty", NULL, 0, "[]"}, + {"One", (const char *[]){"one"}, 1, "[\"one\"]"}, + {"Three", (const char *[]){"one", "two", "three"}, 3, "[\"one\",\"two\",\"three\"]"}, + {"NULL", (const char *[]){NULL}, 1, "[\"(null)\"]"}, + }; + + char *out[sizeof(tcs) / sizeof(TC)]; + memset(out, 0, sizeof(out)); + + N = sizeof(tcs) / sizeof(TC); + for (i = 0; i < N; i++) + { + test(tcs[i].name); + s = nats_formatStringArray(&out[i], tcs[i].in, tcs[i].inLen); + testCond((s == NATS_OK) && (out[i] != NULL) && (strcmp(out[i], tcs[i].out) == 0)); + } + + NATS_FREE_STRINGS(out, N); +} + static natsStatus _checkStart(const char *url, int orderIP, int maxAttempts) { @@ -26167,6 +26202,9 @@ test_JetStreamSubscribe(void) char longsn[256]; #endif natsThread *threads[10] = {NULL}; + const char *subjectsStack[] = {"foo", "bar"}; + const char **subjects = subjectsStack; + const int numSubjects = sizeof(subjectsStack)/sizeof(char*); ENSURE_JS_VERSION(2, 3, 5); @@ -26222,8 +26260,8 @@ test_JetStreamSubscribe(void) test("Create stream: "); jsStreamConfig_Init(&sc); sc.Name = "TEST"; - sc.Subjects = (const char*[1]){"foo"}; - sc.SubjectsLen = 1; + sc.Subjects = subjects; + sc.SubjectsLen = numSubjects; s = js_AddStream(NULL, js, &sc, NULL, &jerr); testCond((s == NATS_OK) && (jerr == 0)); @@ -26231,6 +26269,8 @@ test_JetStreamSubscribe(void) s = js_Publish(NULL, js, "foo", "msg1", 4, NULL, &jerr); IFOK(s, js_Publish(NULL, js, "foo", "msg2", 4, NULL, &jerr)); IFOK(s, js_Publish(NULL, js, "foo", "msg3", 4, NULL, &jerr)); + IFOK(s, js_Publish(NULL, js, "bar", "msg1", 4, NULL, &jerr)); + IFOK(s, js_Publish(NULL, js, "bar", "msg2", 4, NULL, &jerr)); testCond(s == NATS_OK); test("Create sub to check lib sends ACKs: "); @@ -26664,6 +26704,31 @@ test_JetStreamSubscribe(void) natsSubscription_Destroy(sub); sub = NULL; + if (serverVersionAtLeast(2, 10, 14)) + { + test("Create consumer (multiple subjects): "); + jsSubOptions_Init(&so); + so.Config.Durable = "delcons3sync"; + s = js_SubscribeSyncMulti(&sub, js, subjects, numSubjects, NULL, &so, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Drain deletes consumer: "); + s = natsSubscription_Drain(sub); + for (i = 0; i < 5; i++) + { + natsMsg *msg = NULL; + IFOK(s, natsSubscription_NextMsg(&msg, sub, 1000)); + IFOK(s, natsMsg_Ack(msg, NULL)); + natsMsg_Destroy(msg); + msg = NULL; + } + IFOK(s, natsSubscription_WaitForDrainCompletion(sub, 1000)); + IFOK(s, js_GetConsumerInfo(&ci, js, "TEST", "delcons3sync", NULL, &jerr)); + testCond((s == NATS_NOT_FOUND) && (ci == NULL) && (jerr == JSConsumerNotFoundErr) && (nats_GetLastError(NULL) == NULL)); + natsSubscription_Destroy(sub); + sub = NULL; + } + test("Create consumer: "); jsSubOptions_Init(&so); so.Config.Durable = "delcons3"; @@ -31366,6 +31431,54 @@ test_KeyValueWatch(void) JS_TEARDOWN; } +static void +test_KeyValueWatchMulti(void) +{ + natsStatus s; + kvStore *kv = NULL; + kvWatcher *w = NULL; + kvConfig kvc; + + JS_SETUP(2, 10, 14); + + test("Create KV: "); + kvConfig_Init(&kvc); + kvc.Bucket = "WATCH"; + s = js_CreateKeyValue(&kv, js, &kvc); + testCond(s == NATS_OK); + + // Now try wildcard matching and make sure we only get last value when starting. + test("Put values in different keys: "); + s = kvStore_PutString(NULL, kv, "a.name", "A"); + IFOK(s, kvStore_PutString(NULL, kv, "b.name", "B")); + IFOK(s, kvStore_PutString(NULL, kv, "a.name", "AA")); + IFOK(s, kvStore_PutString(NULL, kv, "b.name", "BB")); + IFOK(s, kvStore_PutString(NULL, kv, "a.age", "22")); + IFOK(s, kvStore_PutString(NULL, kv, "a.age", "99")); + testCond(s == NATS_OK); + + test("Create watcher: "); + const char **subjects = (const char *[]){"a.*", "b.*", "c.*"}; + s = kvStore_WatchMulti(&w, kv, subjects, 3, NULL); + testCond(s == NATS_OK); + + testCond(_expectUpdate(w, "a.name", "AA", 3)); + testCond(_expectUpdate(w, "b.name", "BB", 4)); + testCond(_expectUpdate(w, "a.age", "99", 6)); + testCond(_expectInitDone(w)); + + IFOK(s, kvStore_PutString(NULL, kv, "c.occupation", "gardener")); + IFOK(s, kvStore_PutString(NULL, kv, "XXX.occupation", "no, plumber")); + IFOK(s, kvStore_PutString(NULL, kv, "c.occupation", "a digital plumber for a digital garden")); + testCond(_expectUpdate(w, "c.occupation", "gardener", 7)); + testCond(_expectUpdate(w, "c.occupation", "a digital plumber for a digital garden", 9)); + + kvWatcher_Destroy(w); + kvStore_Destroy(kv); + + JS_TEARDOWN; +} + static void test_KeyValueHistory(void) { @@ -36047,6 +36160,7 @@ static testInfo allTests[] = {"HeadersAPIs", test_natsMsgHeaderAPIs}, {"MsgIsJSControl", test_natsMsgIsJSCtrl}, {"SrvVersionAtLeast", test_natsSrvVersionAtLeast}, + {"FormatStringArray", test_natsFormatStringArray}, // Package Level Tests @@ -36268,6 +36382,7 @@ static testInfo allTests[] = {"KeyValueManager", test_KeyValueManager}, {"KeyValueBasics", test_KeyValueBasics}, {"KeyValueWatch", test_KeyValueWatch}, + {"KeyValueWatchMulti", test_KeyValueWatchMulti}, {"KeyValueHistory", test_KeyValueHistory}, {"KeyValueKeys", test_KeyValueKeys}, {"KeyValueDeleteVsPurge", test_KeyValueDeleteVsPurge},