From 303cae1448d07ee3843644301a33a42390ec3f8d Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 14 Oct 2021 17:29:13 -0600 Subject: [PATCH] [ADDED] JetStream: Seal/DenyPurge/DenyDelete/AllowRollup Signed-off-by: Ivan Kozlovic --- src/jsm.c | 14 ++++ src/nats.h | 42 +++++++++- src/status.h | 3 + test/list.txt | 1 + test/test.c | 208 +++++++++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 265 insertions(+), 3 deletions(-) diff --git a/src/jsm.c b/src/jsm.c index 7dd2455ed..fdd29d0ae 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -531,6 +531,11 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig // Free the array of JSON objects that was allocated by nats_JSONGetArrayObject. NATS_FREE(sources); } + IFOK(s, nats_JSONGetBool(jcfg, "sealed", &(cfg->Sealed))); + IFOK(s, nats_JSONGetBool(jcfg, "deny_delete", &(cfg->DenyDelete))); + IFOK(s, nats_JSONGetBool(jcfg, "deny_purge", &(cfg->DenyPurge))); + IFOK(s, nats_JSONGetBool(jcfg, "allow_rollup_hdrs", &(cfg->AllowRollup))); + if (s == NATS_OK) *new_cfg = cfg; else @@ -620,6 +625,15 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg) IFOK(s, natsBuf_AppendByte(buf, ']')); } + if ((s == NATS_OK) && cfg->Sealed) + IFOK(s, natsBuf_Append(buf, ",\"sealed\":true", -1)); + if ((s == NATS_OK) && cfg->DenyDelete) + IFOK(s, natsBuf_Append(buf, ",\"deny_delete\":true", -1)); + if ((s == NATS_OK) && cfg->DenyPurge) + IFOK(s, natsBuf_Append(buf, ",\"deny_purge\":true", -1)); + if ((s == NATS_OK) && cfg->AllowRollup) + IFOK(s, natsBuf_Append(buf, ",\"allow_rollup_hdrs\":true", -1)); + IFOK(s, natsBuf_AppendByte(buf, '}')); if (s == NATS_OK) diff --git a/src/nats.h b/src/nats.h index 40936d21d..360898589 100644 --- a/src/nats.h +++ b/src/nats.h @@ -94,7 +94,39 @@ extern "C" { * * @see jsConsumerConfig */ - #define JS_MSG_SIZE "Nats-Msg-Size" + #define JSMsgSize "Nats-Msg-Size" + +/** \brief Message header for JetStream message for rollup + * + * If message is sent to a stream's subject with this header set, and the stream + * is configured with `AllowRollup` option, then the server will insert this + * message and delete all previous messages in the stream. + * + * If the header is set to #js_MsgRollupSubject, then only messages on the + * specific subject this message is sent to are deleted. + * + * If the header is set to #js_MsgRollupAll, then all messages on all subjects + * are deleted. + */ + #define JSMsgRollup "Nats-Rollup" + +/** \brief Message header value causing rollup per subject + * + * This is a possible value for the #JSMsgRollup header indicating that only + * messages for the subject the rollup message is sent will be removed. + * + * @see JSMsgRollup + */ + #define JSMsgRollupSubject "sub" + + /** \brief Message header value causing rollup for all subjects + * + * This is a possible value for the #JSMsgRollup header indicating that all + * messages for all subjects will be removed. + * + * @see JSMsgRollup + */ + #define JSMsgRollupAll "all" // // Types. @@ -429,6 +461,14 @@ typedef struct jsStreamConfig { jsStreamSource *Mirror; jsStreamSource **Sources; int SourcesLen; + bool Sealed; ///< Seal a stream so no messages can get our or in. + bool DenyDelete; ///< Restrict the ability to delete messages. + bool DenyPurge; ///< Restrict the ability to purge messages. + /** + * Allows messages to be placed into the system and purge + * all older messages using a special message header. + */ + bool AllowRollup; } jsStreamConfig; diff --git a/src/status.h b/src/status.h index fe4b54c81..5626610b7 100644 --- a/src/status.h +++ b/src/status.h @@ -240,6 +240,9 @@ typedef enum { JSConsumerReplacementWithDifferentNameErr = 10106, ///< Consumer replacement durable config not the same JSConsumerDescriptionTooLongErr = 10107, ///< Consumer description is too long JSConsumerWithFlowControlNeedsHeartbeatsErr = 10108,///< Consumer with flow control also needs heartbeats + JSStreamSealedErr = 10109, ///< Invalid operation on sealed stream + JSStreamPurgeFailedErr = 10110, ///< Generic stream purge failure + JSStreamRollupFailedErr = 10111, ///< Generic stream rollup failure } jsErrCode; diff --git a/test/list.txt b/test/list.txt index 46096d9e9..2231b517d 100644 --- a/test/list.txt +++ b/test/list.txt @@ -226,6 +226,7 @@ JetStreamSubscribeHeadersOnly JetStreamOrderedCons JetStreamOrderedConsWithErrors JetStreamOrderedConsAutoUnsub +JetStreamStreamsSealAndRollup StanPBufAllocator StanConnOptions StanSubOptions diff --git a/test/test.c b/test/test.c index 20e214ff7..c91813ec0 100644 --- a/test/test.c +++ b/test/test.c @@ -21029,6 +21029,12 @@ test_JetStreamMarshalStreamConfig(void) sc.Sources = (jsStreamSource *[2]){&s1, &s2}; sc.SourcesLen = 2; + // Seal, deny purge, etc.. + sc.Sealed = true; + sc.DenyDelete = true; + sc.DenyPurge = true; + sc.AllowRollup = true; + test("Marshal stream config: "); s = js_marshalStreamConfig(&buf, &sc); testCond((s == NATS_OK) && (buf != NULL) && (natsBuf_Len(buf) > 0)); @@ -21085,7 +21091,11 @@ test_JetStreamMarshalStreamConfig(void) && (strcmp(rsc->Sources[1]->FilterSubject, "stream.two") == 0) && (rsc->Sources[1]->External != NULL) && (strcmp(rsc->Sources[1]->External->APIPrefix, "source2.prefix") == 0) - && (strcmp(rsc->Sources[1]->External->DeliverPrefix, "source2.deliver.prefix") == 0)); + && (strcmp(rsc->Sources[1]->External->DeliverPrefix, "source2.deliver.prefix") == 0) + && rsc->Sealed + && rsc->DenyDelete + && rsc->DenyPurge + && rsc->AllowRollup); js_destroyStreamConfig(rsc); rsc = NULL; // Check that this does not crash @@ -25622,7 +25632,7 @@ test_JetStreamSubscribeHeadersOnly(void) IFOK(s, (natsMsg_GetDataLength(msg) == 0 ? NATS_OK : NATS_ERR)); IFOK(s, natsMsgHeader_Get(msg, "User-Header", &val)); IFOK(s, (strcmp(val, "MyValue") == 0 ? NATS_OK : NATS_ERR)); - IFOK(s, natsMsgHeader_Get(msg, JS_MSG_SIZE, &val)); + IFOK(s, natsMsgHeader_Get(msg, JSMsgSize, &val)); IFOK(s, (strcmp(val, "5") == 0 ? NATS_OK : NATS_ERR)); natsMsg_Destroy(msg); msg = NULL; @@ -26309,6 +26319,199 @@ test_JetStreamOrderedConsumerWithAutoUnsub(void) rmtree(datastore); } +static void +test_JetStreamStreamsSealAndRollup(void) +{ + natsStatus s; + natsConnection *nc = NULL; + jsCtx *js = NULL; + natsPid pid = NATS_INVALID_PID; + jsStreamInfo *si = NULL; + jsStreamConfig cfg; + jsErrCode jerr = 0; + natsMsg *msg = NULL; + natsSubscription *sub = NULL; + char datastore[256] = {'\0'}; + char cmdLine[1024] = {'\0'}; + int i; + + ENSURE_JS_VERSION(2, 6, 2); + + _makeUniqueDir(datastore, sizeof(datastore), "datastore_"); + + test("Start JS server: "); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s", datastore); + pid = _startServer("nats://127.0.0.1:4222", cmdLine, true); + CHECK_SERVER_STARTED(pid); + testCond(true); + + test("Connect: "); + s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + testCond(s == NATS_OK); + + test("Get context: "); + s = natsConnection_JetStream(&js, nc, NULL); + testCond(s == NATS_OK); + + test("Create sealed stream fails: "); + jsStreamConfig_Init(&cfg); + cfg.Name = "SEAL_FAIL"; + cfg.Sealed = true; + s = js_AddStream(&si, js, &cfg, NULL, &jerr); + testCond((s == NATS_ERR) && (si == NULL) && (jerr == JSStreamInvalidConfig)); + nats_clearLastError(); + + test("Create stream: "); + jsStreamConfig_Init(&cfg); + cfg.Name = "SEAL"; + s = js_AddStream(NULL, js, &cfg, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Seal stream: "); + jsStreamConfig_Init(&cfg); + cfg.Name = "SEAL"; + cfg.Sealed = true; + s = js_UpdateStream(&si, js, &cfg, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0) && (si != NULL) + && (si->Config != NULL) && si->Config->Sealed); + jsStreamInfo_Destroy(si); + si = NULL; + + test("Can't send: "); + s = js_Publish(NULL, js, "SEAL", "a", 1, NULL, &jerr); + testCond((s == NATS_ERR) && (jerr == JSStreamSealedErr)); + nats_clearLastError(); + + test("Create stream with deny purge/delete: "); + jsStreamConfig_Init(&cfg); + cfg.Name = "AUDIT"; + cfg.Storage = js_MemoryStorage; + cfg.DenyPurge = true; + cfg.DenyDelete = true; + s = js_AddStream(&si, js, &cfg, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0) && (si != NULL) + && (si->Config != NULL) && si->Config->DenyDelete + && si->Config->DenyPurge); + jsStreamInfo_Destroy(si); + si = NULL; + + test("Publish: "); + for (i=0; (s == NATS_OK) && (i < 10); i++) + s = js_Publish(NULL, js, "AUDIT", "ok", 2, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Can't delete: "); + s = js_DeleteMsg(js, "AUDIT", 1, NULL, &jerr); + testCond((s == NATS_ERR) && (jerr == JSStreamMsgDeleteFailed) + && (strstr(nats_GetLastError(NULL), "message delete not permitted") != NULL)); + nats_clearLastError(); + + test("Can't purge: "); + s = js_PurgeStream(js, "AUDIT", NULL, &jerr); + testCond((s == NATS_ERR) && (jerr == JSStreamPurgeFailedErr) + && (strstr(nats_GetLastError(NULL), "stream purge not permitted") != NULL)); + nats_clearLastError(); + + test("Try to remove deny clauses: "); + cfg.DenyPurge = false; + cfg.DenyDelete = false; + s = js_UpdateStream(NULL, js, &cfg, NULL, &jerr); + testCond((s == NATS_ERR) && (jerr == JSStreamInvalidConfig)); + nats_clearLastError(); + + test("Create stream for rollup: "); + jsStreamConfig_Init(&cfg); + cfg.Name = "ROLLUP"; + cfg.Subjects = (const char*[1]){"rollup.*"}; + cfg.SubjectsLen = 1; + cfg.AllowRollup = true; + s = js_AddStream(&si, js, &cfg, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0) && (si != NULL) + && (si->Config != NULL) && si->Config->AllowRollup); + jsStreamInfo_Destroy(si); + si = NULL; + + test("Populate: "); + for (i=0; (s == NATS_OK) && (i<10); i++) + s = js_Publish(NULL, js, "rollup.a", "a", 1, NULL, &jerr); + for (i=0; (s == NATS_OK) && (i<10); i++) + s = js_Publish(NULL, js, "rollup.b", "b", 1, NULL, &jerr); + for (i=0; (s == NATS_OK) && (i<10); i++) + s = js_Publish(NULL, js, "rollup.c", "c", 1, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Check stream: "); + s = js_GetStreamInfo(&si, js, "ROLLUP", NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0) && (si != NULL) + && (si->State.Msgs == 30)); + jsStreamInfo_Destroy(si); + si = NULL; + + test("Rollup per subject: "); + s = natsMsg_Create(&msg, "rollup.b", NULL, "Rollup", 6); + IFOK(s, natsMsgHeader_Set(msg, JSMsgRollup, JSMsgRollupSubject)); + IFOK(s, js_PublishMsg(NULL, js, msg, NULL, &jerr)); + testCond((s == NATS_OK) && (jerr == 0)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Create consumer: "); + s = js_SubscribeSync(&sub, js, "rollup.b", NULL, NULL, &jerr); + testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0)); + + test("Check content: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + IFOK(s, (strcmp(natsMsg_GetData(msg), "Rollup") == 0 ? NATS_OK : NATS_ERR)); + testCond(s == NATS_OK); + natsMsg_Destroy(msg); + msg = NULL; + + test("Make sure single msg: "); + s = natsSubscription_NextMsg(&msg, sub, 250); + testCond((s == NATS_TIMEOUT) && (msg == NULL)); + nats_clearLastError(); + + natsSubscription_Destroy(sub); + sub = NULL; + + test("Rollup for all: "); + s = natsMsg_Create(&msg, "rollup.c", NULL, "RollupAll", 9); + IFOK(s, natsMsgHeader_Set(msg, JSMsgRollup, JSMsgRollupAll)); + IFOK(s, js_PublishMsg(NULL, js, msg, NULL, &jerr)); + testCond((s == NATS_OK) && (jerr == 0)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Create consumer: "); + s = js_SubscribeSync(&sub, js, "rollup.c", NULL, NULL, &jerr); + testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0)); + + test("Check content: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + IFOK(s, (strcmp(natsMsg_GetData(msg), "RollupAll") == 0 ? NATS_OK : NATS_ERR)); + testCond(s == NATS_OK); + natsMsg_Destroy(msg); + msg = NULL; + + test("Make sure single msg: "); + s = natsSubscription_NextMsg(&msg, sub, 250); + testCond((s == NATS_TIMEOUT) && (msg == NULL)); + nats_clearLastError(); + + test("Check stream: "); + s = js_GetStreamInfo(&si, js, "ROLLUP", NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0) && (si != NULL) + && (si->State.Msgs == 1)); + jsStreamInfo_Destroy(si); + si = NULL; + + jsCtx_Destroy(js); + natsSubscription_Destroy(sub); + natsConnection_Destroy(nc); + _stopServer(pid); + rmtree(datastore); +} + #if defined(NATS_HAS_STREAMING) static int @@ -28746,6 +28949,7 @@ static testInfo allTests[] = {"JetStreamOrderedCons", test_JetStreamOrderedConsumer}, {"JetStreamOrderedConsWithErrors", test_JetStreamOrderedConsumerWithErrors}, {"JetStreamOrderedConsAutoUnsub", test_JetStreamOrderedConsumerWithAutoUnsub}, + {"JetStreamStreamsSealAndRollup", test_JetStreamStreamsSealAndRollup}, #if defined(NATS_HAS_STREAMING) {"StanPBufAllocator", test_StanPBufAllocator},