Skip to content

Commit

Permalink
Merge pull request #479 from nats-io/js_stream_seal
Browse files Browse the repository at this point in the history
[ADDED] JetStream: Seal/DenyPurge/DenyDelete/AllowRollup
  • Loading branch information
kozlovic committed Oct 14, 2021
2 parents ca5c553 + 303cae1 commit 8cbe93c
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 3 deletions.
14 changes: 14 additions & 0 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,11 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(sources);
}
IFOK(s, nats_JSONGetBool(jcfg, "sealed", &(cfg->Sealed)));
IFOK(s, nats_JSONGetBool(jcfg, "deny_delete", &(cfg->DenyDelete)));
IFOK(s, nats_JSONGetBool(jcfg, "deny_purge", &(cfg->DenyPurge)));
IFOK(s, nats_JSONGetBool(jcfg, "allow_rollup_hdrs", &(cfg->AllowRollup)));

if (s == NATS_OK)
*new_cfg = cfg;
else
Expand Down Expand Up @@ -620,6 +625,15 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg)
IFOK(s, natsBuf_AppendByte(buf, ']'));
}

if ((s == NATS_OK) && cfg->Sealed)
IFOK(s, natsBuf_Append(buf, ",\"sealed\":true", -1));
if ((s == NATS_OK) && cfg->DenyDelete)
IFOK(s, natsBuf_Append(buf, ",\"deny_delete\":true", -1));
if ((s == NATS_OK) && cfg->DenyPurge)
IFOK(s, natsBuf_Append(buf, ",\"deny_purge\":true", -1));
if ((s == NATS_OK) && cfg->AllowRollup)
IFOK(s, natsBuf_Append(buf, ",\"allow_rollup_hdrs\":true", -1));

IFOK(s, natsBuf_AppendByte(buf, '}'));

if (s == NATS_OK)
Expand Down
42 changes: 41 additions & 1 deletion src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,39 @@ extern "C" {
*
* @see jsConsumerConfig
*/
#define JS_MSG_SIZE "Nats-Msg-Size"
#define JSMsgSize "Nats-Msg-Size"

/** \brief Message header for JetStream message for rollup
*
* If message is sent to a stream's subject with this header set, and the stream
* is configured with `AllowRollup` option, then the server will insert this
* message and delete all previous messages in the stream.
*
* If the header is set to #js_MsgRollupSubject, then only messages on the
* specific subject this message is sent to are deleted.
*
* If the header is set to #js_MsgRollupAll, then all messages on all subjects
* are deleted.
*/
#define JSMsgRollup "Nats-Rollup"

/** \brief Message header value causing rollup per subject
*
* This is a possible value for the #JSMsgRollup header indicating that only
* messages for the subject the rollup message is sent will be removed.
*
* @see JSMsgRollup
*/
#define JSMsgRollupSubject "sub"

/** \brief Message header value causing rollup for all subjects
*
* This is a possible value for the #JSMsgRollup header indicating that all
* messages for all subjects will be removed.
*
* @see JSMsgRollup
*/
#define JSMsgRollupAll "all"

//
// Types.
Expand Down Expand Up @@ -429,6 +461,14 @@ typedef struct jsStreamConfig {
jsStreamSource *Mirror;
jsStreamSource **Sources;
int SourcesLen;
bool Sealed; ///< Seal a stream so no messages can get our or in.
bool DenyDelete; ///< Restrict the ability to delete messages.
bool DenyPurge; ///< Restrict the ability to purge messages.
/**
* Allows messages to be placed into the system and purge
* all older messages using a special message header.
*/
bool AllowRollup;

} jsStreamConfig;

Expand Down
3 changes: 3 additions & 0 deletions src/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ typedef enum {
JSConsumerReplacementWithDifferentNameErr = 10106, ///< Consumer replacement durable config not the same
JSConsumerDescriptionTooLongErr = 10107, ///< Consumer description is too long
JSConsumerWithFlowControlNeedsHeartbeatsErr = 10108,///< Consumer with flow control also needs heartbeats
JSStreamSealedErr = 10109, ///< Invalid operation on sealed stream
JSStreamPurgeFailedErr = 10110, ///< Generic stream purge failure
JSStreamRollupFailedErr = 10111, ///< Generic stream rollup failure

} jsErrCode;

Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ JetStreamSubscribeHeadersOnly
JetStreamOrderedCons
JetStreamOrderedConsWithErrors
JetStreamOrderedConsAutoUnsub
JetStreamStreamsSealAndRollup
StanPBufAllocator
StanConnOptions
StanSubOptions
Expand Down
208 changes: 206 additions & 2 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21029,6 +21029,12 @@ test_JetStreamMarshalStreamConfig(void)
sc.Sources = (jsStreamSource *[2]){&s1, &s2};
sc.SourcesLen = 2;

// Seal, deny purge, etc..
sc.Sealed = true;
sc.DenyDelete = true;
sc.DenyPurge = true;
sc.AllowRollup = true;

test("Marshal stream config: ");
s = js_marshalStreamConfig(&buf, &sc);
testCond((s == NATS_OK) && (buf != NULL) && (natsBuf_Len(buf) > 0));
Expand Down Expand Up @@ -21085,7 +21091,11 @@ test_JetStreamMarshalStreamConfig(void)
&& (strcmp(rsc->Sources[1]->FilterSubject, "stream.two") == 0)
&& (rsc->Sources[1]->External != NULL)
&& (strcmp(rsc->Sources[1]->External->APIPrefix, "source2.prefix") == 0)
&& (strcmp(rsc->Sources[1]->External->DeliverPrefix, "source2.deliver.prefix") == 0));
&& (strcmp(rsc->Sources[1]->External->DeliverPrefix, "source2.deliver.prefix") == 0)
&& rsc->Sealed
&& rsc->DenyDelete
&& rsc->DenyPurge
&& rsc->AllowRollup);
js_destroyStreamConfig(rsc);
rsc = NULL;
// Check that this does not crash
Expand Down Expand Up @@ -25622,7 +25632,7 @@ test_JetStreamSubscribeHeadersOnly(void)
IFOK(s, (natsMsg_GetDataLength(msg) == 0 ? NATS_OK : NATS_ERR));
IFOK(s, natsMsgHeader_Get(msg, "User-Header", &val));
IFOK(s, (strcmp(val, "MyValue") == 0 ? NATS_OK : NATS_ERR));
IFOK(s, natsMsgHeader_Get(msg, JS_MSG_SIZE, &val));
IFOK(s, natsMsgHeader_Get(msg, JSMsgSize, &val));
IFOK(s, (strcmp(val, "5") == 0 ? NATS_OK : NATS_ERR));
natsMsg_Destroy(msg);
msg = NULL;
Expand Down Expand Up @@ -26309,6 +26319,199 @@ test_JetStreamOrderedConsumerWithAutoUnsub(void)
rmtree(datastore);
}

static void
test_JetStreamStreamsSealAndRollup(void)
{
natsStatus s;
natsConnection *nc = NULL;
jsCtx *js = NULL;
natsPid pid = NATS_INVALID_PID;
jsStreamInfo *si = NULL;
jsStreamConfig cfg;
jsErrCode jerr = 0;
natsMsg *msg = NULL;
natsSubscription *sub = NULL;
char datastore[256] = {'\0'};
char cmdLine[1024] = {'\0'};
int i;

ENSURE_JS_VERSION(2, 6, 2);

_makeUniqueDir(datastore, sizeof(datastore), "datastore_");

test("Start JS server: ");
snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s", datastore);
pid = _startServer("nats://127.0.0.1:4222", cmdLine, true);
CHECK_SERVER_STARTED(pid);
testCond(true);

test("Connect: ");
s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL);
testCond(s == NATS_OK);

test("Get context: ");
s = natsConnection_JetStream(&js, nc, NULL);
testCond(s == NATS_OK);

test("Create sealed stream fails: ");
jsStreamConfig_Init(&cfg);
cfg.Name = "SEAL_FAIL";
cfg.Sealed = true;
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
testCond((s == NATS_ERR) && (si == NULL) && (jerr == JSStreamInvalidConfig));
nats_clearLastError();

test("Create stream: ");
jsStreamConfig_Init(&cfg);
cfg.Name = "SEAL";
s = js_AddStream(NULL, js, &cfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Seal stream: ");
jsStreamConfig_Init(&cfg);
cfg.Name = "SEAL";
cfg.Sealed = true;
s = js_UpdateStream(&si, js, &cfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (si != NULL)
&& (si->Config != NULL) && si->Config->Sealed);
jsStreamInfo_Destroy(si);
si = NULL;

test("Can't send: ");
s = js_Publish(NULL, js, "SEAL", "a", 1, NULL, &jerr);
testCond((s == NATS_ERR) && (jerr == JSStreamSealedErr));
nats_clearLastError();

test("Create stream with deny purge/delete: ");
jsStreamConfig_Init(&cfg);
cfg.Name = "AUDIT";
cfg.Storage = js_MemoryStorage;
cfg.DenyPurge = true;
cfg.DenyDelete = true;
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (si != NULL)
&& (si->Config != NULL) && si->Config->DenyDelete
&& si->Config->DenyPurge);
jsStreamInfo_Destroy(si);
si = NULL;

test("Publish: ");
for (i=0; (s == NATS_OK) && (i < 10); i++)
s = js_Publish(NULL, js, "AUDIT", "ok", 2, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Can't delete: ");
s = js_DeleteMsg(js, "AUDIT", 1, NULL, &jerr);
testCond((s == NATS_ERR) && (jerr == JSStreamMsgDeleteFailed)
&& (strstr(nats_GetLastError(NULL), "message delete not permitted") != NULL));
nats_clearLastError();

test("Can't purge: ");
s = js_PurgeStream(js, "AUDIT", NULL, &jerr);
testCond((s == NATS_ERR) && (jerr == JSStreamPurgeFailedErr)
&& (strstr(nats_GetLastError(NULL), "stream purge not permitted") != NULL));
nats_clearLastError();

test("Try to remove deny clauses: ");
cfg.DenyPurge = false;
cfg.DenyDelete = false;
s = js_UpdateStream(NULL, js, &cfg, NULL, &jerr);
testCond((s == NATS_ERR) && (jerr == JSStreamInvalidConfig));
nats_clearLastError();

test("Create stream for rollup: ");
jsStreamConfig_Init(&cfg);
cfg.Name = "ROLLUP";
cfg.Subjects = (const char*[1]){"rollup.*"};
cfg.SubjectsLen = 1;
cfg.AllowRollup = true;
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (si != NULL)
&& (si->Config != NULL) && si->Config->AllowRollup);
jsStreamInfo_Destroy(si);
si = NULL;

test("Populate: ");
for (i=0; (s == NATS_OK) && (i<10); i++)
s = js_Publish(NULL, js, "rollup.a", "a", 1, NULL, &jerr);
for (i=0; (s == NATS_OK) && (i<10); i++)
s = js_Publish(NULL, js, "rollup.b", "b", 1, NULL, &jerr);
for (i=0; (s == NATS_OK) && (i<10); i++)
s = js_Publish(NULL, js, "rollup.c", "c", 1, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Check stream: ");
s = js_GetStreamInfo(&si, js, "ROLLUP", NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (si != NULL)
&& (si->State.Msgs == 30));
jsStreamInfo_Destroy(si);
si = NULL;

test("Rollup per subject: ");
s = natsMsg_Create(&msg, "rollup.b", NULL, "Rollup", 6);
IFOK(s, natsMsgHeader_Set(msg, JSMsgRollup, JSMsgRollupSubject));
IFOK(s, js_PublishMsg(NULL, js, msg, NULL, &jerr));
testCond((s == NATS_OK) && (jerr == 0));
natsMsg_Destroy(msg);
msg = NULL;

test("Create consumer: ");
s = js_SubscribeSync(&sub, js, "rollup.b", NULL, NULL, &jerr);
testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0));

test("Check content: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
IFOK(s, (strcmp(natsMsg_GetData(msg), "Rollup") == 0 ? NATS_OK : NATS_ERR));
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Make sure single msg: ");
s = natsSubscription_NextMsg(&msg, sub, 250);
testCond((s == NATS_TIMEOUT) && (msg == NULL));
nats_clearLastError();

natsSubscription_Destroy(sub);
sub = NULL;

test("Rollup for all: ");
s = natsMsg_Create(&msg, "rollup.c", NULL, "RollupAll", 9);
IFOK(s, natsMsgHeader_Set(msg, JSMsgRollup, JSMsgRollupAll));
IFOK(s, js_PublishMsg(NULL, js, msg, NULL, &jerr));
testCond((s == NATS_OK) && (jerr == 0));
natsMsg_Destroy(msg);
msg = NULL;

test("Create consumer: ");
s = js_SubscribeSync(&sub, js, "rollup.c", NULL, NULL, &jerr);
testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0));

test("Check content: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
IFOK(s, (strcmp(natsMsg_GetData(msg), "RollupAll") == 0 ? NATS_OK : NATS_ERR));
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Make sure single msg: ");
s = natsSubscription_NextMsg(&msg, sub, 250);
testCond((s == NATS_TIMEOUT) && (msg == NULL));
nats_clearLastError();

test("Check stream: ");
s = js_GetStreamInfo(&si, js, "ROLLUP", NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (si != NULL)
&& (si->State.Msgs == 1));
jsStreamInfo_Destroy(si);
si = NULL;

jsCtx_Destroy(js);
natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);
_stopServer(pid);
rmtree(datastore);
}

#if defined(NATS_HAS_STREAMING)

static int
Expand Down Expand Up @@ -28746,6 +28949,7 @@ static testInfo allTests[] =
{"JetStreamOrderedCons", test_JetStreamOrderedConsumer},
{"JetStreamOrderedConsWithErrors", test_JetStreamOrderedConsumerWithErrors},
{"JetStreamOrderedConsAutoUnsub", test_JetStreamOrderedConsumerWithAutoUnsub},
{"JetStreamStreamsSealAndRollup", test_JetStreamStreamsSealAndRollup},

#if defined(NATS_HAS_STREAMING)
{"StanPBufAllocator", test_StanPBufAllocator},
Expand Down

0 comments on commit 8cbe93c

Please sign in to comment.