From d7399e7b5aeb747fd076fdf5fabd487bd3ca62a2 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 22 Jul 2022 16:51:20 -0600 Subject: [PATCH] [UPDATED] JetStream: consumer configuration check New fields have been added in the consumer configuration, but the function that checks that the existing consumer configuration matches the configuration of a consumer that is looked-up duing a "subscribe" call was not up-to-date. Signed-off-by: Ivan Kozlovic --- src/js.c | 23 +++++++++++++++-- test/test.c | 73 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 91 insertions(+), 5 deletions(-) diff --git a/src/js.c b/src/js.c index ae80059d..7432a0ec 100644 --- a/src/js.c +++ b/src/js.c @@ -1981,6 +1981,9 @@ _checkConfig(jsConsumerConfig *s, jsConsumerConfig *u) if (u->MaxDeliver > 0 && u->MaxDeliver != s->MaxDeliver) return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%" PRId64 CFG_CHECK_ERR_END "%" PRId64, "max deliver", u->MaxDeliver, s->MaxDeliver); + if (u->BackOffLen > 0 && u->BackOffLen != s->BackOffLen) + return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%d" CFG_CHECK_ERR_END "%d", "backoff list length", u->BackOffLen, s->BackOffLen); + if ((int) u->ReplayPolicy >= 0 && u->ReplayPolicy != s->ReplayPolicy) return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%d" CFG_CHECK_ERR_END "%d", "replay policy", u->ReplayPolicy, s->ReplayPolicy); @@ -1994,9 +1997,7 @@ _checkConfig(jsConsumerConfig *s, jsConsumerConfig *u) return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%" PRId64 CFG_CHECK_ERR_END "%" PRId64, "max waiting", u->MaxWaiting, s->MaxWaiting); if (u->MaxAckPending > 0 && u->MaxAckPending != s->MaxAckPending) - { return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%" PRId64 CFG_CHECK_ERR_END "%" PRId64, "max ack pending", u->MaxAckPending, s->MaxAckPending); - } // For flow control, we want to fail if the user explicit wanted it, but // it is not set in the existing consumer. If it is not asked by the user, @@ -2007,6 +2008,24 @@ _checkConfig(jsConsumerConfig *s, jsConsumerConfig *u) if (u->Heartbeat > 0 && u->Heartbeat != s->Heartbeat) return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%" PRId64 CFG_CHECK_ERR_END "%" PRId64, "heartbeat", u->Heartbeat, s->Heartbeat); + if (u->HeadersOnly != s->HeadersOnly) + return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%d" CFG_CHECK_ERR_END "%d", "headers only", u->HeadersOnly, s->HeadersOnly); + + if (u->MaxRequestBatch > 0 && u->MaxRequestBatch != s->MaxRequestBatch) + return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%" PRId64 CFG_CHECK_ERR_END "%" PRId64, "max request batch", u->Heartbeat, s->Heartbeat); + + if (u->MaxRequestExpires > 0 && u->MaxRequestExpires != s->MaxRequestExpires) + return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%" PRId64 CFG_CHECK_ERR_END "%" PRId64, "max request expires", u->MaxRequestExpires, s->MaxRequestExpires); + + if (u->InactiveThreshold > 0 && u->InactiveThreshold != s->InactiveThreshold) + return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%" PRId64 CFG_CHECK_ERR_END "%" PRId64, "inactive threshold", u->InactiveThreshold, s->InactiveThreshold); + + if (u->Replicas > 0 && u->Replicas != s->Replicas) + return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%" PRId64 CFG_CHECK_ERR_END "%" PRId64, "replicas", u->Replicas, s->Replicas); + + if (u->MemoryStorage != s->MemoryStorage) + return nats_setError(NATS_ERR, CFG_CHECK_ERR_START "%d" CFG_CHECK_ERR_END "%d", "memory storage", u->MemoryStorage, s->MemoryStorage); + return NATS_OK; } diff --git a/test/test.c b/test/test.c index 2fa81b57..6a306749 100644 --- a/test/test.c +++ b/test/test.c @@ -25573,8 +25573,10 @@ test_JetStreamSubscribeConfigCheck(void) jsStreamConfig sc; jsConsumerConfig cc; int i; + int64_t backOffListOf3[3] = {1, 2, 3}; + int64_t backOffListOf2[2] = {1, 2}; - JS_SETUP(2, 2, 0); + JS_SETUP(2, 9, 0); test("Create stream: "); jsStreamConfig_Init(&sc); @@ -25584,7 +25586,7 @@ test_JetStreamSubscribeConfigCheck(void) s = js_AddStream(NULL, js, &sc, NULL, &jerr); testCond((s == NATS_OK) && (jerr == 0)); - for (i=0; i<10; i++) + for (i=0; i<17; i++) { jsSubOptions so1; jsSubOptions so2; @@ -25665,6 +25667,59 @@ test_JetStreamSubscribeConfigCheck(void) so2.Config.SampleFrequency = "50%"; break; } + case 10: + { + name = "backoff"; + so1.Config.BackOff = backOffListOf3; + so1.Config.BackOffLen = 3; + so1.Config.MaxDeliver = 5; + so2.Config.BackOff = backOffListOf2; + so2.Config.BackOffLen = 2; + so2.Config.MaxDeliver = 5; + break; + } + case 11: + { + name = "headers only"; + so1.Config.HeadersOnly = true; + // Not setting it for the 2nd subscribe call should fail. + break; + } + case 12: + { + name = "max request batch"; + so1.Config.MaxRequestBatch = 100; + so2.Config.MaxRequestBatch = 200; + break; + } + case 13: + { + name = "max request expires"; + so1.Config.MaxRequestExpires = NATS_SECONDS_TO_NANOS(1); + so2.Config.MaxRequestExpires = NATS_SECONDS_TO_NANOS(2); + break; + } + case 14: + { + name = "inactive threshold"; + so1.Config.InactiveThreshold = NATS_SECONDS_TO_NANOS(1); + so2.Config.InactiveThreshold = NATS_SECONDS_TO_NANOS(2); + break; + } + case 15: + { + name = "replicas"; + so1.Config.Replicas = 1; + so2.Config.Replicas = 3; + break; + } + case 16: + { + name = "memory storage"; + so1.Config.MemoryStorage = true; + // Not setting it for the 2nd subscribe call should fail. + break; + } } snprintf(testName, sizeof(testName), "Check %s: ", name); test(testName); @@ -25736,7 +25791,7 @@ test_JetStreamSubscribeConfigCheck(void) } // Verify that we don't fail if user did not set it. - for (i=0; i<9; i++) + for (i=0; i<14; i++) { natsSubscription *nsub = NULL; jsSubOptions so; @@ -25753,6 +25808,18 @@ test_JetStreamSubscribeConfigCheck(void) case 6: name = "replay policy"; so.Config.ReplayPolicy = js_ReplayInstant; break; case 7: name = "max waiting"; so.Config.MaxWaiting = 10; break; case 8: name = "max ack pending"; so.Config.MaxAckPending = 10; break; + case 9: + { + name = "backoff"; + so.Config.BackOff = backOffListOf3; + so.Config.BackOffLen = 3; + so.Config.MaxDeliver = 4; + break; + } + case 10: name = "max request batch"; so.Config.MaxRequestBatch = 100; break; + case 11: name = "max request expires"; so.Config.MaxRequestExpires = NATS_SECONDS_TO_NANOS(2); break; + case 12: name = "inactive threshold"; so.Config.InactiveThreshold = NATS_SECONDS_TO_NANOS(2); break; + case 13: name = "replicas"; so.Config.Replicas = 1; break; } natsNUID_Next(durName, sizeof(durName));