From 36f1f4ee14d3b6cc3dd6279d3979ffdc0b24a9c8 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 25 Apr 2024 05:39:57 -0700 Subject: [PATCH 01/14] WIP: passing existing tests --- src/js.c | 130 +++++++++++++++++++++++++++++++++++++++++------------ src/jsm.c | 1 - src/kv.c | 33 ++++++++++++-- src/mem.h | 10 +++++ src/nats.h | 48 +++++++++++++++++++- src/util.c | 53 ++++++++++++++++++++++ src/util.h | 3 ++ 7 files changed, 243 insertions(+), 35 deletions(-) diff --git a/src/js.c b/src/js.c index 2dc52d517..1a0b8b796 100644 --- a/src/js.c +++ b/src/js.c @@ -2195,38 +2195,68 @@ _checkConfig(jsConsumerConfig *s, jsConsumerConfig *u) static natsStatus _processConsInfo(const char **dlvSubject, jsConsumerInfo *info, jsConsumerConfig *userCfg, - bool isPullMode, const char *subj, const char *queue) + bool isPullMode, const char **subjects, int numSubjects, const char *queue) { - bool dlvSubjEmpty = false; - jsConsumerConfig *ccfg = info->Config; - const char *dg = NULL; - natsStatus s = NATS_OK; - bool matches = false; - int i; + bool dlvSubjEmpty = false; + jsConsumerConfig *ccfg = info->Config; + const char *dg = NULL; + natsStatus s = NATS_OK; + const char *stackFilterSubject[] = {ccfg->FilterSubject}; + const char **filterSubjects = stackFilterSubject; + int filterSubjectsLen = 1; + int incoming, existing; *dlvSubject = NULL; + // Always represent the consumer's filter subjects as a list, to match + // uniformly against the incoming subject list. Consider lists of 1 empty + // subject empty lists. + if (ccfg->FilterSubjectsLen > 0) + { + filterSubjects = ccfg->FilterSubjects; + filterSubjectsLen = ccfg->FilterSubjectsLen; + } + if ((filterSubjectsLen == 1) && nats_IsStringEmpty(filterSubjects[0])) + { + filterSubjects = NULL; + filterSubjectsLen = 0; + } + if ((numSubjects == 1) && nats_IsStringEmpty(subjects[0])) + { + subjects = NULL; + numSubjects = 0; + } - // Make sure this new subject matches or is a subset. - if (!nats_IsStringEmpty(subj)) + // Match the subjects against the consumer's filter subjects. + if (numSubjects > 0 && filterSubjectsLen > 0) { - if (nats_IsStringEmpty(ccfg->FilterSubject) && (ccfg->FilterSubjectsLen == 0)) - { - matches = true; - } - else if (!nats_IsStringEmpty(ccfg->FilterSubject) && nats_HasPrefix(subj, ccfg->FilterSubject)) - { - matches = true; - } - else if (ccfg->FilterSubjectsLen > 0) + // If the consumer has filter subject(s), then the subject(s) must match. + bool matches = true; + + // TODO - This is N**2, but we don't expect a large number of subjects. + for (incoming = 0; incoming < numSubjects; incoming++) { - for (i = 0; (i < ccfg->FilterSubjectsLen) && !matches; i++) + bool found = false; + for (existing = 0; existing < filterSubjectsLen; existing++) { - matches = nats_HasPrefix(subj, ccfg->FilterSubjects[i]); + if (strcmp(subjects[incoming], filterSubjects[existing]) == 0) + { + found = true; + break; + } + } + if (!found) + { + matches = false; + break; } } + if (!matches) { - return nats_setError(NATS_ERR, "subject '%s' does not match any consumer filter subjects.", subj); + if (numSubjects == 1 && filterSubjectsLen == 1) + return nats_setError(NATS_ERR, "subject '%s' does not match consumer filter subject '%s'.", subjects[0], filterSubjects[0]); + else + return nats_setError(NATS_ERR, "%d subjects do not match any consumer filter subjects.", numSubjects); } } // Check that if user wants to create a queue sub, @@ -2328,7 +2358,7 @@ js_checkConsName(const char *cons, bool isDurable) } static natsStatus -_subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const char *pullDurable, +_subscribeMulti(natsSubscription **new_sub, jsCtx *js, const char **subjects, int numSubjects, const char *pullDurable, natsMsgHandler usrCB, void *usrCBClosure, bool isPullMode, jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) { @@ -2391,7 +2421,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha consumer = opts->Consumer; consBound= (!nats_IsStringEmpty(stream) && !nats_IsStringEmpty(consumer)); - if (nats_IsStringEmpty(subject) && !consBound) + if (((numSubjects <= 0) || nats_IsStringEmpty(subjects[0])) && !consBound) return nats_setDefaultError(NATS_INVALID_ARG); // Do some quick checks here for ordered consumers. @@ -2448,9 +2478,10 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha // Find the stream mapped to the subject if not bound to a stream already, // that is, if user did not provide a `Stream` name through options). - if (nats_IsStringEmpty(stream)) + if (nats_IsStringEmpty(stream) && numSubjects > 0) { - s = _lookupStreamBySubject(&stream, nc, subject, &jo, errCode); + // Ise the first subject to find the stream. + s = _lookupStreamBySubject(&stream, nc, subjects[0], &jo, errCode); if (s != NATS_OK) goto END; @@ -2473,7 +2504,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha s = nats_setError(NATS_ERR, "%s", "no configuration in consumer info"); goto END; } - s = _processConsInfo(&deliver, info, &(opts->Config), isPullMode, subject, opts->Queue); + s = _processConsInfo(&deliver, info, &(opts->Config), isPullMode, subjects, numSubjects, opts->Queue); if (s != NATS_OK) goto END; @@ -2507,7 +2538,15 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha } // Do filtering always, server will clear as needed. - cfg->FilterSubject = subject; + if (numSubjects == 1) + { + cfg->FilterSubject = subjects[0]; + } + else + { + cfg->FilterSubjects = subjects; + cfg->FilterSubjectsLen = numSubjects; + } if (opts->Ordered) { @@ -2557,8 +2596,15 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha s = nats_setDefaultError(NATS_NO_MEMORY); } IF_OK_DUP_STRING(s, jsi->stream, stream); - if ((s == NATS_OK) && !nats_IsStringEmpty(subject)) - DUP_STRING(s, jsi->psubj, subject); + if ((s == NATS_OK) && numSubjects > 0) + { + int l = nats_printStringArray(NULL, 0, subjects, numSubjects); + jsi->psubj = NATS_CALLOC(l, 1); // '\0' included + if (jsi->psubj == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + if (s == NATS_OK) + nats_printStringArray(jsi->psubj, l, subjects, numSubjects); + } if (s == NATS_OK) { jsi->js = js; @@ -2711,6 +2757,19 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha return NATS_UPDATE_ERR_STACK(s); } +static natsStatus +_subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const char *pullDurable, + natsMsgHandler usrCB, void *usrCBClosure, bool isPullMode, + jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) +{ + const char *subjects[] = {subject}; + + if (nats_IsStringEmpty(subject)) + return _subscribeMulti(new_sub, js, NULL, 0, pullDurable, usrCB, usrCBClosure, isPullMode, jsOpts, opts, errCode); + + return _subscribeMulti(new_sub, js, subjects, 1, pullDurable, usrCB, usrCBClosure, isPullMode, jsOpts, opts, errCode); +} + natsStatus js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject, natsMsgHandler cb, void *cbClosure, @@ -2741,6 +2800,19 @@ js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject, return NATS_UPDATE_ERR_STACK(s); } +natsStatus +js_SubscribeSyncMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects, + jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) +{ + natsStatus s; + + if (errCode != NULL) + *errCode = 0; + + s = _subscribeMulti(sub, js, subjects, numSubjects, NULL, NULL, NULL, false, jsOpts, opts, errCode); + return NATS_UPDATE_ERR_STACK(s); +} + natsStatus js_PullSubscribe(natsSubscription **sub, jsCtx *js, const char *subject, const char *durable, jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) diff --git a/src/jsm.c b/src/jsm.c index 7d8587ce0..3c8321491 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -3199,7 +3199,6 @@ js_unmarshalConsumerInfo(nats_JSON *json, jsConsumerInfo **new_ci) IFOK(s, nats_JSONGetBool(json, "push_bound", &(ci->PushBound))); IFOK(s, nats_JSONGetBool(json, "paused", &(ci->Paused))); IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(ci->PauseRemaining))); - if (s == NATS_OK) *new_ci = ci; else diff --git a/src/kv.c b/src/kv.c index cfb08ca90..8a4d9dde6 100644 --- a/src/kv.c +++ b/src/kv.c @@ -1057,13 +1057,22 @@ kvWatcher_Destroy(kvWatcher *w) natsStatus kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOptions *opts) +{ + const char *subjects = { key }; + return kvStore_WatchMulti(new_watcher, kv, &subjects, 1, opts); +} + +natsStatus +kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int numKeys, kvWatchOptions *opts) { natsStatus s; kvWatcher *w = NULL; jsSubOptions so; + char **subscribeSubjects = NULL; + int i; DEFINE_BUF_FOR_SUBJECT; - if ((new_watcher == NULL) || (kv == NULL) || nats_IsStringEmpty(key)) + if ((new_watcher == NULL) || (kv == NULL) || numKeys <= 0) return nats_setDefaultError(NATS_INVALID_ARG); *new_watcher = NULL; @@ -1076,7 +1085,25 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti w->kv = kv; w->refs = 1; - BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT); + subscribeSubjects = (char**) NATS_CALLOC(numKeys, sizeof(const char*)); + if (subscribeSubjects == NULL) + { + _freeWatcher(w); + return nats_setDefaultError(NATS_NO_MEMORY); + } + for (i=0; imu))); if (s == NATS_OK) { @@ -1096,7 +1123,7 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti // Need to explicitly bind to the stream here because the subject // we construct may not help find the stream when using mirrors. so.Stream = kv->stream; - s = js_SubscribeSync(&(w->sub), kv->js, natsBuf_Data(&buf), NULL, &so, NULL); + s = js_SubscribeSyncMulti(&(w->sub), kv->js, (const char **)subscribeSubjects, numKeys, NULL, &so, NULL); IFOK(s, natsSubscription_SetPendingLimits(w->sub, -1, -1)); if (s == NATS_OK) { diff --git a/src/mem.h b/src/mem.h index a73f43070..f3c88803c 100644 --- a/src/mem.h +++ b/src/mem.h @@ -27,6 +27,16 @@ #endif #define NATS_FREE(p) free((p)) +static void NATS_FREE_STRINGS(char **strings, int count) +{ + if (strings == NULL) + return; + + for (int i = 0; i < count; i++) + NATS_FREE((char *)strings[i]); + NATS_FREE(strings); +} + // GNU C Library version 2.25 or later. #if defined(__GLIBC__) && \ (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 25)) diff --git a/src/nats.h b/src/nats.h index 4eff82106..31d7aa2f3 100644 --- a/src/nats.h +++ b/src/nats.h @@ -6345,8 +6345,9 @@ js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject, * @param sub the location where to store the pointer to the newly created * #natsSubscription object. * @param js the pointer to the #jsCtx object. - * @param subject the subject this subscription is created for. - * the #natsMsgHandler prototype. + * @param subject the subject this subscription is created for (consumer's FilterSubject). + * @param cb the #natsMsgHandler prototype. + * @param cbClosure a pointer to an user defined object (can be `NULL`). * @param opts the pointer to the #jsOptions object, possibly `NULL`. * @param subOpts the subscribe options, possibly `NULL`. * @param errCode the location where to store the JetStream specific error code, or `NULL` @@ -6356,6 +6357,25 @@ NATS_EXTERN natsStatus js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject, jsOptions *opts, jsSubOptions *subOpts, jsErrCode *errCode); +/** \brief Create a synchronous subscription to multiple subjects. + * + * See important notes in #js_Subscribe. + * + * @param sub the location where to store the pointer to the newly created + * #natsSubscription object. + * @param js the pointer to the #jsCtx object. + * @param subjects the subjects this subscription is created for (consumer's FilterSubjects). + * @param cb the #natsMsgHandler prototype. + * @param cbClosure a pointer to an user defined object (can be `NULL`). + * @param opts the pointer to the #jsOptions object, possibly `NULL`. + * @param subOpts the subscribe options, possibly `NULL`. + * @param errCode the location where to store the JetStream specific error code, or `NULL` + * if not needed. + */ +NATS_EXTERN natsStatus +js_SubscribeSyncMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects, + jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode); + /** \brief Create a pull subscriber. * * A pull based consumer is a type of consumer that does not have a delivery subject, @@ -7003,6 +7023,30 @@ kvStore_PurgeDeletes(kvStore *kv, kvPurgeOptions *opts); NATS_EXTERN natsStatus kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *keys, kvWatchOptions *opts); +/** \brief Returns a watcher for any updates to keys that match one of the + * `keys` argument. + * + * Returns a watcher for any updates to keys that match the one of `keys` + * argument, which could include wildcards. + * + * A `NULL` entry will be posted when the watcher has received all initial + * values. + * + * Call #kvWatcher_Next to get the next #kvEntry. + * + * \note The watcher should be destroyed to release memory using + * #kvWatcher_Destroy. + * + * @param new_watcher the location where to store the pointer to the new + * #kvWatcher object. + * @param kv the pointer to the #kvStore object. + * @param keys the keys (wildcard possible) to create the watcher for. + * @param numKeys the number of keys in the `keys` array. + * @param opts the watcher options, possibly `NULL`. + */ +NATS_EXTERN natsStatus +kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int numKeys, kvWatchOptions *opts); + /** \brief Returns a watcher for any updates to any keys of the KeyValue store bucket. * * Returns a watcher for any updates to any keys of the KeyValue store bucket. diff --git a/src/util.c b/src/util.c index f91e04eb4..33f2df8ad 100644 --- a/src/util.c +++ b/src/util.c @@ -2555,3 +2555,56 @@ nats_freeMetadata(natsMetadata *md) md->List = NULL; md->Count = 0; } + + +int nats_printStringArray(char *out, int bufLen, const char **strings, int count) +{ + bool copy = (bufLen > 0); + int len = 0; + int i; + + if (copy && (len + 1 < bufLen)) + out[len] = '['; + len++; + + for (i=0; i Date: Thu, 25 Apr 2024 05:53:33 -0700 Subject: [PATCH 02/14] fix KVWatch --- src/kv.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/kv.c b/src/kv.c index 8a4d9dde6..9394cd7eb 100644 --- a/src/kv.c +++ b/src/kv.c @@ -1074,6 +1074,11 @@ kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int if ((new_watcher == NULL) || (kv == NULL) || numKeys <= 0) return nats_setDefaultError(NATS_INVALID_ARG); + for (i=0; i Date: Thu, 25 Apr 2024 06:02:35 -0700 Subject: [PATCH 03/14] free subjects --- src/kv.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/kv.c b/src/kv.c index 9394cd7eb..f759bec84 100644 --- a/src/kv.c +++ b/src/kv.c @@ -1145,6 +1145,7 @@ kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int } natsBuf_Cleanup(&buf); + NATS_FREE_STRINGS(subscribeSubjects, numKeys); if (s == NATS_OK) *new_watcher = w; From f86bb7d492b7964dd4bb61269014853475d15271 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 26 Apr 2024 05:31:31 -0700 Subject: [PATCH 04/14] Added tests --- src/js.c | 2 +- test/list.txt | 1 + test/test.c | 83 +++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 83 insertions(+), 3 deletions(-) diff --git a/src/js.c b/src/js.c index 1a0b8b796..facab82d3 100644 --- a/src/js.c +++ b/src/js.c @@ -2480,7 +2480,7 @@ _subscribeMulti(natsSubscription **new_sub, jsCtx *js, const char **subjects, in // that is, if user did not provide a `Stream` name through options). if (nats_IsStringEmpty(stream) && numSubjects > 0) { - // Ise the first subject to find the stream. + // Use the first subject to find the stream. s = _lookupStreamBySubject(&stream, nc, subjects[0], &jo, errCode); if (s != NATS_OK) goto END; diff --git a/test/list.txt b/test/list.txt index 18f5b898d..6bd9b9f8d 100644 --- a/test/list.txt +++ b/test/list.txt @@ -250,6 +250,7 @@ JetStreamInfoAlternates KeyValueManager KeyValueBasics KeyValueWatch +KeyValueWatchMulti KeyValueHistory KeyValueKeys KeyValueDeleteVsPurge diff --git a/test/test.c b/test/test.c index 8e4c66d1a..ef591c285 100644 --- a/test/test.c +++ b/test/test.c @@ -26166,6 +26166,9 @@ test_JetStreamSubscribe(void) char longsn[256]; #endif natsThread *threads[10] = {NULL}; + const char *subjectsStack[] = {"foo", "bar"}; + const char **subjects = subjectsStack; + const int numSubjects = sizeof(subjectsStack)/sizeof(char*); ENSURE_JS_VERSION(2, 3, 5); @@ -26221,8 +26224,8 @@ test_JetStreamSubscribe(void) test("Create stream: "); jsStreamConfig_Init(&sc); sc.Name = "TEST"; - sc.Subjects = (const char*[1]){"foo"}; - sc.SubjectsLen = 1; + sc.Subjects = subjects; + sc.SubjectsLen = numSubjects; s = js_AddStream(NULL, js, &sc, NULL, &jerr); testCond((s == NATS_OK) && (jerr == 0)); @@ -26230,6 +26233,8 @@ test_JetStreamSubscribe(void) s = js_Publish(NULL, js, "foo", "msg1", 4, NULL, &jerr); IFOK(s, js_Publish(NULL, js, "foo", "msg2", 4, NULL, &jerr)); IFOK(s, js_Publish(NULL, js, "foo", "msg3", 4, NULL, &jerr)); + IFOK(s, js_Publish(NULL, js, "bar", "msg1", 4, NULL, &jerr)); + IFOK(s, js_Publish(NULL, js, "bar", "msg2", 4, NULL, &jerr)); testCond(s == NATS_OK); test("Create sub to check lib sends ACKs: "); @@ -26663,6 +26668,31 @@ test_JetStreamSubscribe(void) natsSubscription_Destroy(sub); sub = NULL; + if (serverVersionAtLeast(2, 10, 14)) + { + test("Create consumer (multiple subjects): "); + jsSubOptions_Init(&so); + so.Config.Durable = "delcons3sync"; + s = js_SubscribeSyncMulti(&sub, js, subjects, numSubjects, NULL, &so, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Drain deletes consumer: "); + s = natsSubscription_Drain(sub); + for (i = 0; i < 5; i++) + { + natsMsg *msg = NULL; + IFOK(s, natsSubscription_NextMsg(&msg, sub, 1000)); + IFOK(s, natsMsg_Ack(msg, NULL)); + natsMsg_Destroy(msg); + msg = NULL; + } + IFOK(s, natsSubscription_WaitForDrainCompletion(sub, 1000)); + IFOK(s, js_GetConsumerInfo(&ci, js, "TEST", "delcons3sync", NULL, &jerr)); + testCond((s == NATS_NOT_FOUND) && (ci == NULL) && (jerr == JSConsumerNotFoundErr) && (nats_GetLastError(NULL) == NULL)); + natsSubscription_Destroy(sub); + sub = NULL; + } + test("Create consumer: "); jsSubOptions_Init(&so); so.Config.Durable = "delcons3"; @@ -31365,6 +31395,54 @@ test_KeyValueWatch(void) JS_TEARDOWN; } +static void +test_KeyValueWatchMulti(void) +{ + natsStatus s; + kvStore *kv = NULL; + kvWatcher *w = NULL; + kvConfig kvc; + + JS_SETUP(2, 10, 14); + + test("Create KV: "); + kvConfig_Init(&kvc); + kvc.Bucket = "WATCH"; + s = js_CreateKeyValue(&kv, js, &kvc); + testCond(s == NATS_OK); + + // Now try wildcard matching and make sure we only get last value when starting. + test("Put values in different keys: "); + s = kvStore_PutString(NULL, kv, "a.name", "A"); + IFOK(s, kvStore_PutString(NULL, kv, "b.name", "B")); + IFOK(s, kvStore_PutString(NULL, kv, "a.name", "AA")); + IFOK(s, kvStore_PutString(NULL, kv, "b.name", "BB")); + IFOK(s, kvStore_PutString(NULL, kv, "a.age", "22")); + IFOK(s, kvStore_PutString(NULL, kv, "a.age", "99")); + testCond(s == NATS_OK); + + test("Create watcher: "); + const char **subjects = (const char *[]){"a.*", "b.*", "c.*"}; + s = kvStore_WatchMulti(&w, kv, subjects, 3, NULL); + testCond(s == NATS_OK); + + testCond(_expectUpdate(w, "a.name", "AA", 3)); + testCond(_expectUpdate(w, "b.name", "BB", 4)); + testCond(_expectUpdate(w, "a.age", "99", 6)); + testCond(_expectInitDone(w)); + + IFOK(s, kvStore_PutString(NULL, kv, "c.occupation", "gardener")); + IFOK(s, kvStore_PutString(NULL, kv, "XXX.occupation", "no, plumber")); + IFOK(s, kvStore_PutString(NULL, kv, "c.occupation", "a digital plumber for a digital garden")); + testCond(_expectUpdate(w, "c.occupation", "gardener", 7)); + testCond(_expectUpdate(w, "c.occupation", "a digital plumber for a digital garden", 9)); + + kvWatcher_Destroy(w); + kvStore_Destroy(kv); + + JS_TEARDOWN; +} + static void test_KeyValueHistory(void) { @@ -36267,6 +36345,7 @@ static testInfo allTests[] = {"KeyValueManager", test_KeyValueManager}, {"KeyValueBasics", test_KeyValueBasics}, {"KeyValueWatch", test_KeyValueWatch}, + {"KeyValueWatchMulti", test_KeyValueWatchMulti}, {"KeyValueHistory", test_KeyValueHistory}, {"KeyValueKeys", test_KeyValueKeys}, {"KeyValueDeleteVsPurge", test_KeyValueDeleteVsPurge}, From 313a0a29f3820a339df37c14bb14d7962e68ea60 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 26 Apr 2024 05:44:43 -0700 Subject: [PATCH 05/14] Added js_SubscribeMulti --- src/js.c | 17 +++++++++++++++++ src/nats.h | 25 ++++++++++++------------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/js.c b/src/js.c index 61f5c34a4..80dce2365 100644 --- a/src/js.c +++ b/src/js.c @@ -2789,6 +2789,23 @@ js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject, return NATS_UPDATE_ERR_STACK(s); } +natsStatus +js_SubscribeMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects, + natsMsgHandler cb, void *cbClosure, + jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) +{ + natsStatus s; + + if (errCode != NULL) + *errCode = 0; + + if (cb == NULL) + return nats_setDefaultError(NATS_INVALID_ARG); + + s = _subscribeMulti(sub, js, subjects, numSubjects, NULL, cb, cbClosure, false, jsOpts, opts, errCode); + return NATS_UPDATE_ERR_STACK(s); +} + natsStatus js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject, jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) diff --git a/src/nats.h b/src/nats.h index 31d7aa2f3..3b2aac355 100644 --- a/src/nats.h +++ b/src/nats.h @@ -6338,6 +6338,15 @@ js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject, natsMsgHandler cb, void* cbClosure, jsOptions *opts, jsSubOptions *subOpts, jsErrCode *errCode); +/** \brief Create an asynchronous subscription to multiple subjects. + * + * Like #js_Subscribe, but accepts multiple subjects, each can be a wildcard. + */ +NATS_EXTERN natsStatus +js_SubscribeMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects, + natsMsgHandler cb, void *cbClosure, + jsOptions *opts, jsSubOptions *subOpts, jsErrCode *errCode); + /** \brief Create a synchronous subscription. * * See important notes in #js_Subscribe. @@ -6357,20 +6366,10 @@ NATS_EXTERN natsStatus js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject, jsOptions *opts, jsSubOptions *subOpts, jsErrCode *errCode); -/** \brief Create a synchronous subscription to multiple subjects. - * - * See important notes in #js_Subscribe. +/** \brief Create an asynchronous subscription to multiple subjects. * - * @param sub the location where to store the pointer to the newly created - * #natsSubscription object. - * @param js the pointer to the #jsCtx object. - * @param subjects the subjects this subscription is created for (consumer's FilterSubjects). - * @param cb the #natsMsgHandler prototype. - * @param cbClosure a pointer to an user defined object (can be `NULL`). - * @param opts the pointer to the #jsOptions object, possibly `NULL`. - * @param subOpts the subscribe options, possibly `NULL`. - * @param errCode the location where to store the JetStream specific error code, or `NULL` - * if not needed. + * Like #js_SubscribeSync, but accepts multiple subjects, each can be a + * wildcard. */ NATS_EXTERN natsStatus js_SubscribeSyncMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects, From b57df9e3eed998fb433dee379f3bab7113a206e6 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 29 Apr 2024 06:53:01 -0700 Subject: [PATCH 06/14] PR feedback: avoid unneeded calloc for KVWatch --- src/kv.c | 26 ++++++++++++++++++++------ src/mem.h | 3 +-- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/kv.c b/src/kv.c index f759bec84..81de4ce56 100644 --- a/src/kv.c +++ b/src/kv.c @@ -1068,7 +1068,9 @@ kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int natsStatus s; kvWatcher *w = NULL; jsSubOptions so; - char **subscribeSubjects = NULL; + char *singleSubject[1]; + char **multipleSubjects = NULL; // allocate if numKeys > 1 + char **subscribeSubjects = singleSubject; int i; DEFINE_BUF_FOR_SUBJECT; @@ -1090,13 +1092,23 @@ kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int w->kv = kv; w->refs = 1; - subscribeSubjects = (char**) NATS_CALLOC(numKeys, sizeof(const char*)); - if (subscribeSubjects == NULL) + if (numKeys == 1) { - _freeWatcher(w); - return nats_setDefaultError(NATS_NO_MEMORY); + // special case for single key to avoid a calloc. + subscribeSubjects[0] = (char *)keys[0]; + } - for (i=0; i Date: Mon, 29 Apr 2024 07:07:20 -0700 Subject: [PATCH 07/14] PR feedback: changes to _subscribe wrapper --- src/js.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/js.c b/src/js.c index 80dce2365..e6ffcf5fd 100644 --- a/src/js.c +++ b/src/js.c @@ -2764,12 +2764,12 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha natsMsgHandler usrCB, void *usrCBClosure, bool isPullMode, jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) { + natsStatus s = NATS_OK; const char *subjects[] = {subject}; + int numSubjects = nats_IsStringEmpty(subject) ? 0 : 1; - if (nats_IsStringEmpty(subject)) - return _subscribeMulti(new_sub, js, NULL, 0, pullDurable, usrCB, usrCBClosure, isPullMode, jsOpts, opts, errCode); - - return _subscribeMulti(new_sub, js, subjects, 1, pullDurable, usrCB, usrCBClosure, isPullMode, jsOpts, opts, errCode); + s = _subscribeMulti(new_sub, js, subjects, numSubjects, pullDurable, usrCB, usrCBClosure, isPullMode, jsOpts, opts, errCode); + return NATS_UPDATE_ERR_STACK(s); } natsStatus From dfa153ecfbc728595d183826c68c9e8b652af629 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 29 Apr 2024 10:16:13 -0700 Subject: [PATCH 08/14] nats_formatStringArray refactor+test --- src/js.c | 10 +------ src/util.c | 75 +++++++++++++++++++++++++++------------------------ src/util.h | 4 +-- test/list.txt | 1 + test/test.c | 26 ++++++++++++++++++ 5 files changed, 70 insertions(+), 46 deletions(-) diff --git a/src/js.c b/src/js.c index e6ffcf5fd..748d9ec5f 100644 --- a/src/js.c +++ b/src/js.c @@ -2598,15 +2598,7 @@ _subscribeMulti(natsSubscription **new_sub, jsCtx *js, const char **subjects, in s = nats_setDefaultError(NATS_NO_MEMORY); } IF_OK_DUP_STRING(s, jsi->stream, stream); - if ((s == NATS_OK) && numSubjects > 0) - { - int l = nats_printStringArray(NULL, 0, subjects, numSubjects); - jsi->psubj = NATS_CALLOC(l, 1); // '\0' included - if (jsi->psubj == NULL) - s = nats_setDefaultError(NATS_NO_MEMORY); - if (s == NATS_OK) - nats_printStringArray(jsi->psubj, l, subjects, numSubjects); - } + IFOK(s, nats_formatStringArray(&jsi->psubj, subjects, numSubjects)); if (s == NATS_OK) { jsi->js = js; diff --git a/src/util.c b/src/util.c index 33f2df8ad..497b6368a 100644 --- a/src/util.c +++ b/src/util.c @@ -2557,54 +2557,59 @@ nats_freeMetadata(natsMetadata *md) } -int nats_printStringArray(char *out, int bufLen, const char **strings, int count) +// allocates a sufficiently large buffer and formats the strings into it, as a +// ["unencoded-string-0","unencoded-string-1",...]. For an empty array of +// strings returns "[]". +natsStatus nats_formatStringArray(char **out, const char **strings, int count) { - bool copy = (bufLen > 0); - int len = 0; + natsStatus s = NATS_OK; + natsBuffer buf; + int len = 0; int i; - if (copy && (len + 1 < bufLen)) - out[len] = '['; - len++; - - for (i=0; i 0) + len++; // For the ',' + if (strings[i] == NULL) + len += strlen("(null)"); + else + len += strlen(strings[i]); + } + len++; // For the ']' + len++; // For the '\0' + + s = natsBuf_Init(&buf, len); - if (copy) + natsBuf_AppendByte(&buf, '['); + for (i = 0; (s == NATS_OK) && (i < count); i++) + { + if (i > 0) { - if (len + strLen + 1 < bufLen) - { - memcpy(out+len, str, strLen); - len += strLen; - out[len++] = ','; - } - else - { - return len; - } + IFOK(s, natsBuf_AppendByte(&buf, ',')); + } + IFOK(s, natsBuf_AppendByte(&buf, '"')); + if (strings[i] == NULL) + { + IFOK(s, natsBuf_Append(&buf, "(null)", -1)); } else { - len += strLen + 1; + IFOK(s, natsBuf_Append(&buf, strings[i], -1)); } + IFOK(s, natsBuf_AppendByte(&buf, '"')); } - if (copy && (len + 1 < bufLen)) - out[len] = ']'; - len++; - - if (!copy) - return len + 1; // +1 for the '\0' - - if (len < bufLen) - out[len++] = '\0'; - else + IFOK(s, natsBuf_AppendByte(&buf, ']')); + IFOK(s, natsBuf_AppendByte(&buf, '\0')); + + if (s != NATS_OK) { - out[bufLen-1] = '\0'; - len = bufLen; + return s; } - return len; + *out = natsBuf_Data(&buf); + return NATS_OK; } diff --git a/src/util.h b/src/util.h index 5395daba3..17e2d7bd2 100644 --- a/src/util.h +++ b/src/util.h @@ -261,7 +261,7 @@ nats_IsSubjectValid(const char *subject, bool wcAllowed); natsStatus nats_parseTime(char *str, int64_t *timeUTC); -int -nats_printStringArray(char *out, int bufLen, const char **strings, int count); +natsStatus +nats_formatStringArray(char **out, const char **strings, int count); #endif /* UTIL_H_ */ diff --git a/test/list.txt b/test/list.txt index 6bd9b9f8d..60cee3e79 100644 --- a/test/list.txt +++ b/test/list.txt @@ -42,6 +42,7 @@ HeadersLift HeadersAPIs MsgIsJSControl SrvVersionAtLeast +FormatStringArray ReconnectServerStats ParseStateReconnectFunctionality ServersRandomize diff --git a/test/test.c b/test/test.c index c890ff33d..85c543a76 100644 --- a/test/test.c +++ b/test/test.c @@ -5466,6 +5466,31 @@ test_natsSrvVersionAtLeast(void) natsConnection_Destroy(nc); } +static void +test_natsFormatStringArray(void) +{ + natsStatus s; + char *str = NULL; + + test("Check empty: "); + s = nats_formatStringArray(&str, NULL, 0); + testCond((s == NATS_OK) && (str != NULL) && (strcmp(str, "[]") == 0)); + NATS_FREE(str); + str = NULL; + + test("Check one: "); + const char *oneArray[] = {"one"}; + s = nats_formatStringArray(&str, oneArray, 1); + testCond((s == NATS_OK) && (str != NULL) && (strcmp(str, "[\"one\"]") == 0)); + NATS_FREE(str); + + test("Check multiple: "); + const char *threeArray[] = {"one","two","three"}; + s = nats_formatStringArray(&str, threeArray, 3); + testCond((s == NATS_OK) && (str != NULL) && (strcmp(str, "[\"one\",\"two\",\"three\"]") == 0)); + NATS_FREE(str); +} + static natsStatus _checkStart(const char *url, int orderIP, int maxAttempts) { @@ -36125,6 +36150,7 @@ static testInfo allTests[] = {"HeadersAPIs", test_natsMsgHeaderAPIs}, {"MsgIsJSControl", test_natsMsgIsJSCtrl}, {"SrvVersionAtLeast", test_natsSrvVersionAtLeast}, + {"FormatStringArray", test_natsFormatStringArray}, // Package Level Tests From faa7477dc5f329b2ddc7bf9616138bf8c3e730be Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 29 Apr 2024 10:22:39 -0700 Subject: [PATCH 09/14] PR feedback: safer _subscribe empty subject handling --- src/js.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/js.c b/src/js.c index 748d9ec5f..e31dd2f0a 100644 --- a/src/js.c +++ b/src/js.c @@ -2757,8 +2757,15 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode) { natsStatus s = NATS_OK; - const char *subjects[] = {subject}; - int numSubjects = nats_IsStringEmpty(subject) ? 0 : 1; + const char *singleSubject[] = {subject}; + int numSubjects = 1; + const char **subjects = singleSubject; + + if (nats_IsStringEmpty(subject)) + { + numSubjects = 0; + subjects = NULL; + } s = _subscribeMulti(new_sub, js, subjects, numSubjects, pullDurable, usrCB, usrCBClosure, isPullMode, jsOpts, opts, errCode); return NATS_UPDATE_ERR_STACK(s); From 37860ed196e565f24daae64e5a51d62cb62b5958 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 29 Apr 2024 11:05:42 -0700 Subject: [PATCH 10/14] PR feedback: @params in doc --- src/nats.h | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/nats.h b/src/nats.h index 3b2aac355..9dd2c9b0e 100644 --- a/src/nats.h +++ b/src/nats.h @@ -6341,6 +6341,18 @@ js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject, /** \brief Create an asynchronous subscription to multiple subjects. * * Like #js_Subscribe, but accepts multiple subjects, each can be a wildcard. + * + * @param sub the location where to store the pointer to the newly created + * #natsSubscription object. + * @param js the pointer to the #jsCtx object. + * @param subject the subject this subscription is created for. + * @param cb the #natsMsgHandler callback. + * @param cbClosure a pointer to an user defined object (can be `NULL`). See + * the #natsMsgHandler prototype. + * @param opts the pointer to the #jsOptions object, possibly `NULL`. + * @param subOpts the subscribe options, possibly `NULL`. + * @param errCode the location where to store the JetStream specific error code, or `NULL` + * if not needed. */ NATS_EXTERN natsStatus js_SubscribeMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects, @@ -6355,8 +6367,6 @@ js_SubscribeMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int * #natsSubscription object. * @param js the pointer to the #jsCtx object. * @param subject the subject this subscription is created for (consumer's FilterSubject). - * @param cb the #natsMsgHandler prototype. - * @param cbClosure a pointer to an user defined object (can be `NULL`). * @param opts the pointer to the #jsOptions object, possibly `NULL`. * @param subOpts the subscribe options, possibly `NULL`. * @param errCode the location where to store the JetStream specific error code, or `NULL` @@ -6370,6 +6380,16 @@ js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject, * * Like #js_SubscribeSync, but accepts multiple subjects, each can be a * wildcard. + * + * @param sub the location where to store the pointer to the newly created + * #natsSubscription object. + * @param js the pointer to the #jsCtx object. + * @param subjects the subjects this subscription is created for (consumer's FilterSubjects). + * @param numSubjects the number of subjects for the subscription (consumer's FilterSubjectsLen). + * @param opts the pointer to the #jsOptions object, possibly `NULL`. + * @param subOpts the subscribe options, possibly `NULL`. + * @param errCode the location where to store the JetStream specific error code, or `NULL` + * if not needed. */ NATS_EXTERN natsStatus js_SubscribeSyncMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects, From 7bf2ecb6049a4def75ba6481dd7efc5a332c18d9 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 29 Apr 2024 11:20:29 -0700 Subject: [PATCH 11/14] more coverage --- test/test.c | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/test/test.c b/test/test.c index 85c543a76..c68b149b3 100644 --- a/test/test.c +++ b/test/test.c @@ -5470,25 +5470,29 @@ static void test_natsFormatStringArray(void) { natsStatus s; - char *str = NULL; + char *array[4]; + memset(array, 0, sizeof(array)); test("Check empty: "); - s = nats_formatStringArray(&str, NULL, 0); - testCond((s == NATS_OK) && (str != NULL) && (strcmp(str, "[]") == 0)); - NATS_FREE(str); - str = NULL; + s = nats_formatStringArray(&array[0], NULL, 0); + testCond((s == NATS_OK) && (array[0] != NULL) && (strcmp(array[0], "[]") == 0)); test("Check one: "); const char *oneArray[] = {"one"}; - s = nats_formatStringArray(&str, oneArray, 1); - testCond((s == NATS_OK) && (str != NULL) && (strcmp(str, "[\"one\"]") == 0)); - NATS_FREE(str); + s = nats_formatStringArray(&array[1], oneArray, 1); + testCond((s == NATS_OK) && (array[1] != NULL) && (strcmp(array[1], "[\"one\"]") == 0)); test("Check multiple: "); - const char *threeArray[] = {"one","two","three"}; - s = nats_formatStringArray(&str, threeArray, 3); - testCond((s == NATS_OK) && (str != NULL) && (strcmp(str, "[\"one\",\"two\",\"three\"]") == 0)); - NATS_FREE(str); + const char *threeArray[] = {"one", "two", "three"}; + s = nats_formatStringArray(&array[2], threeArray, 3); + testCond((s == NATS_OK) && (array[2] != NULL) && (strcmp(array[2], "[\"one\",\"two\",\"three\"]") == 0)); + + test("Check NULL: "); + const char *nullArray[] = {NULL}; + s = nats_formatStringArray(&array[3], nullArray, 1); + testCond((s == NATS_OK) && (array[3] != NULL) && (strcmp(array[3], "[\"(null)\"]") == 0)); + + NATS_FREE_STRINGS(array, 3); } static natsStatus From 58a41642646122d335458f3cdc76c68eda935b85 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 29 Apr 2024 11:25:30 -0700 Subject: [PATCH 12/14] PR feedback: cleanup the existing buffer on failure to realloc --- src/util.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/util.c b/src/util.c index 497b6368a..4510b4bfe 100644 --- a/src/util.c +++ b/src/util.c @@ -2607,6 +2607,7 @@ natsStatus nats_formatStringArray(char **out, const char **strings, int count) if (s != NATS_OK) { + natsBuf_Cleanup(&buf); return s; } From fb62ffe9b6a59d372b9029e964c8e056bb603360 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 29 Apr 2024 11:48:32 -0700 Subject: [PATCH 13/14] refactored the Format test --- test/test.c | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/test/test.c b/test/test.c index c68b149b3..98ec22213 100644 --- a/test/test.c +++ b/test/test.c @@ -5470,29 +5470,35 @@ static void test_natsFormatStringArray(void) { natsStatus s; - char *array[4]; - memset(array, 0, sizeof(array)); + size_t i, N; - test("Check empty: "); - s = nats_formatStringArray(&array[0], NULL, 0); - testCond((s == NATS_OK) && (array[0] != NULL) && (strcmp(array[0], "[]") == 0)); + typedef struct + { + const char *name; + const char **in; + int inLen; + const char *out; + } TC; - test("Check one: "); - const char *oneArray[] = {"one"}; - s = nats_formatStringArray(&array[1], oneArray, 1); - testCond((s == NATS_OK) && (array[1] != NULL) && (strcmp(array[1], "[\"one\"]") == 0)); + TC tcs[] = { + {"Empty", NULL, 0, "[]"}, + {"One", (const char *[]){"one"}, 1, "[\"one\"]"}, + {"Three", (const char *[]){"one", "two", "three"}, 3, "[\"one\",\"two\",\"three\"]"}, + {"NULL", (const char *[]){NULL}, 1, "[\"(null)\"]"}, + }; - test("Check multiple: "); - const char *threeArray[] = {"one", "two", "three"}; - s = nats_formatStringArray(&array[2], threeArray, 3); - testCond((s == NATS_OK) && (array[2] != NULL) && (strcmp(array[2], "[\"one\",\"two\",\"three\"]") == 0)); + char *out[sizeof(tcs) / sizeof(TC)]; + memset(out, 0, sizeof(out)); - test("Check NULL: "); - const char *nullArray[] = {NULL}; - s = nats_formatStringArray(&array[3], nullArray, 1); - testCond((s == NATS_OK) && (array[3] != NULL) && (strcmp(array[3], "[\"(null)\"]") == 0)); + N = sizeof(tcs) / sizeof(TC); + for (i = 0; i < N; i++) + { + test(tcs[i].name); + s = nats_formatStringArray(&out[i], tcs[i].in, tcs[i].inLen); + testCond((s == NATS_OK) && (out[i] != NULL) && (strcmp(out[i], tcs[i].out) == 0)); + } - NATS_FREE_STRINGS(array, 3); + NATS_FREE_STRINGS(out, N); } static natsStatus From 518adedc29c53d520210e808e4cd694cac90cbdd Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 29 Apr 2024 12:08:18 -0700 Subject: [PATCH 14/14] adjusting codecov --- codecov.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/codecov.yml b/codecov.yml index a849fd986..e46c75ba6 100644 --- a/codecov.yml +++ b/codecov.yml @@ -11,8 +11,8 @@ coverage: status: project: default: - target: 65% - threshold: 1% + target: auto + threshold: 10% github_checks: annotations: true comment: