Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] JetStream: Seal/DenyPurge/DenyDelete/AllowRollup #479

Merged
merged 1 commit into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,11 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(sources);
}
IFOK(s, nats_JSONGetBool(jcfg, "sealed", &(cfg->Sealed)));
IFOK(s, nats_JSONGetBool(jcfg, "deny_delete", &(cfg->DenyDelete)));
IFOK(s, nats_JSONGetBool(jcfg, "deny_purge", &(cfg->DenyPurge)));
IFOK(s, nats_JSONGetBool(jcfg, "allow_rollup_hdrs", &(cfg->AllowRollup)));

if (s == NATS_OK)
*new_cfg = cfg;
else
Expand Down Expand Up @@ -620,6 +625,15 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg)
IFOK(s, natsBuf_AppendByte(buf, ']'));
}

if ((s == NATS_OK) && cfg->Sealed)
IFOK(s, natsBuf_Append(buf, ",\"sealed\":true", -1));
if ((s == NATS_OK) && cfg->DenyDelete)
IFOK(s, natsBuf_Append(buf, ",\"deny_delete\":true", -1));
if ((s == NATS_OK) && cfg->DenyPurge)
IFOK(s, natsBuf_Append(buf, ",\"deny_purge\":true", -1));
if ((s == NATS_OK) && cfg->AllowRollup)
IFOK(s, natsBuf_Append(buf, ",\"allow_rollup_hdrs\":true", -1));

IFOK(s, natsBuf_AppendByte(buf, '}'));

if (s == NATS_OK)
Expand Down
42 changes: 41 additions & 1 deletion src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,39 @@ extern "C" {
*
* @see jsConsumerConfig
*/
#define JS_MSG_SIZE "Nats-Msg-Size"
#define JSMsgSize "Nats-Msg-Size"

/** \brief Message header for JetStream message for rollup
*
* If message is sent to a stream's subject with this header set, and the stream
* is configured with `AllowRollup` option, then the server will insert this
* message and delete all previous messages in the stream.
*
* If the header is set to #js_MsgRollupSubject, then only messages on the
* specific subject this message is sent to are deleted.
*
* If the header is set to #js_MsgRollupAll, then all messages on all subjects
* are deleted.
*/
#define JSMsgRollup "Nats-Rollup"

/** \brief Message header value causing rollup per subject
*
* This is a possible value for the #JSMsgRollup header indicating that only
* messages for the subject the rollup message is sent will be removed.
*
* @see JSMsgRollup
*/
#define JSMsgRollupSubject "sub"

/** \brief Message header value causing rollup for all subjects
*
* This is a possible value for the #JSMsgRollup header indicating that all
* messages for all subjects will be removed.
*
* @see JSMsgRollup
*/
#define JSMsgRollupAll "all"

//
// Types.
Expand Down Expand Up @@ -429,6 +461,14 @@ typedef struct jsStreamConfig {
jsStreamSource *Mirror;
jsStreamSource **Sources;
int SourcesLen;
bool Sealed; ///< Seal a stream so no messages can get our or in.
bool DenyDelete; ///< Restrict the ability to delete messages.
bool DenyPurge; ///< Restrict the ability to purge messages.
/**
* Allows messages to be placed into the system and purge
* all older messages using a special message header.
*/
bool AllowRollup;

} jsStreamConfig;

Expand Down
3 changes: 3 additions & 0 deletions src/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ typedef enum {
JSConsumerReplacementWithDifferentNameErr = 10106, ///< Consumer replacement durable config not the same
JSConsumerDescriptionTooLongErr = 10107, ///< Consumer description is too long
JSConsumerWithFlowControlNeedsHeartbeatsErr = 10108,///< Consumer with flow control also needs heartbeats
JSStreamSealedErr = 10109, ///< Invalid operation on sealed stream
JSStreamPurgeFailedErr = 10110, ///< Generic stream purge failure
JSStreamRollupFailedErr = 10111, ///< Generic stream rollup failure

} jsErrCode;

Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ JetStreamSubscribeHeadersOnly
JetStreamOrderedCons
JetStreamOrderedConsWithErrors
JetStreamOrderedConsAutoUnsub
JetStreamStreamsSealAndRollup
StanPBufAllocator
StanConnOptions
StanSubOptions
Expand Down
208 changes: 206 additions & 2 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21029,6 +21029,12 @@ test_JetStreamMarshalStreamConfig(void)
sc.Sources = (jsStreamSource *[2]){&s1, &s2};
sc.SourcesLen = 2;

// Seal, deny purge, etc..
sc.Sealed = true;
sc.DenyDelete = true;
sc.DenyPurge = true;
sc.AllowRollup = true;

test("Marshal stream config: ");
s = js_marshalStreamConfig(&buf, &sc);
testCond((s == NATS_OK) && (buf != NULL) && (natsBuf_Len(buf) > 0));
Expand Down Expand Up @@ -21085,7 +21091,11 @@ test_JetStreamMarshalStreamConfig(void)
&& (strcmp(rsc->Sources[1]->FilterSubject, "stream.two") == 0)
&& (rsc->Sources[1]->External != NULL)
&& (strcmp(rsc->Sources[1]->External->APIPrefix, "source2.prefix") == 0)
&& (strcmp(rsc->Sources[1]->External->DeliverPrefix, "source2.deliver.prefix") == 0));
&& (strcmp(rsc->Sources[1]->External->DeliverPrefix, "source2.deliver.prefix") == 0)
&& rsc->Sealed
&& rsc->DenyDelete
&& rsc->DenyPurge
&& rsc->AllowRollup);
js_destroyStreamConfig(rsc);
rsc = NULL;
// Check that this does not crash
Expand Down Expand Up @@ -25622,7 +25632,7 @@ test_JetStreamSubscribeHeadersOnly(void)
IFOK(s, (natsMsg_GetDataLength(msg) == 0 ? NATS_OK : NATS_ERR));
IFOK(s, natsMsgHeader_Get(msg, "User-Header", &val));
IFOK(s, (strcmp(val, "MyValue") == 0 ? NATS_OK : NATS_ERR));
IFOK(s, natsMsgHeader_Get(msg, JS_MSG_SIZE, &val));
IFOK(s, natsMsgHeader_Get(msg, JSMsgSize, &val));
IFOK(s, (strcmp(val, "5") == 0 ? NATS_OK : NATS_ERR));
natsMsg_Destroy(msg);
msg = NULL;
Expand Down Expand Up @@ -26309,6 +26319,199 @@ test_JetStreamOrderedConsumerWithAutoUnsub(void)
rmtree(datastore);
}

static void
test_JetStreamStreamsSealAndRollup(void)
{
natsStatus s;
natsConnection *nc = NULL;
jsCtx *js = NULL;
natsPid pid = NATS_INVALID_PID;
jsStreamInfo *si = NULL;
jsStreamConfig cfg;
jsErrCode jerr = 0;
natsMsg *msg = NULL;
natsSubscription *sub = NULL;
char datastore[256] = {'\0'};
char cmdLine[1024] = {'\0'};
int i;

ENSURE_JS_VERSION(2, 6, 2);

_makeUniqueDir(datastore, sizeof(datastore), "datastore_");

test("Start JS server: ");
snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s", datastore);
pid = _startServer("nats://127.0.0.1:4222", cmdLine, true);
CHECK_SERVER_STARTED(pid);
testCond(true);

test("Connect: ");
s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL);
testCond(s == NATS_OK);

test("Get context: ");
s = natsConnection_JetStream(&js, nc, NULL);
testCond(s == NATS_OK);

test("Create sealed stream fails: ");
jsStreamConfig_Init(&cfg);
cfg.Name = "SEAL_FAIL";
cfg.Sealed = true;
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
testCond((s == NATS_ERR) && (si == NULL) && (jerr == JSStreamInvalidConfig));
nats_clearLastError();

test("Create stream: ");
jsStreamConfig_Init(&cfg);
cfg.Name = "SEAL";
s = js_AddStream(NULL, js, &cfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Seal stream: ");
jsStreamConfig_Init(&cfg);
cfg.Name = "SEAL";
cfg.Sealed = true;
s = js_UpdateStream(&si, js, &cfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (si != NULL)
&& (si->Config != NULL) && si->Config->Sealed);
jsStreamInfo_Destroy(si);
si = NULL;

test("Can't send: ");
s = js_Publish(NULL, js, "SEAL", "a", 1, NULL, &jerr);
testCond((s == NATS_ERR) && (jerr == JSStreamSealedErr));
nats_clearLastError();

test("Create stream with deny purge/delete: ");
jsStreamConfig_Init(&cfg);
cfg.Name = "AUDIT";
cfg.Storage = js_MemoryStorage;
cfg.DenyPurge = true;
cfg.DenyDelete = true;
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (si != NULL)
&& (si->Config != NULL) && si->Config->DenyDelete
&& si->Config->DenyPurge);
jsStreamInfo_Destroy(si);
si = NULL;

test("Publish: ");
for (i=0; (s == NATS_OK) && (i < 10); i++)
s = js_Publish(NULL, js, "AUDIT", "ok", 2, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Can't delete: ");
s = js_DeleteMsg(js, "AUDIT", 1, NULL, &jerr);
testCond((s == NATS_ERR) && (jerr == JSStreamMsgDeleteFailed)
&& (strstr(nats_GetLastError(NULL), "message delete not permitted") != NULL));
nats_clearLastError();

test("Can't purge: ");
s = js_PurgeStream(js, "AUDIT", NULL, &jerr);
testCond((s == NATS_ERR) && (jerr == JSStreamPurgeFailedErr)
&& (strstr(nats_GetLastError(NULL), "stream purge not permitted") != NULL));
nats_clearLastError();

test("Try to remove deny clauses: ");
cfg.DenyPurge = false;
cfg.DenyDelete = false;
s = js_UpdateStream(NULL, js, &cfg, NULL, &jerr);
testCond((s == NATS_ERR) && (jerr == JSStreamInvalidConfig));
nats_clearLastError();

test("Create stream for rollup: ");
jsStreamConfig_Init(&cfg);
cfg.Name = "ROLLUP";
cfg.Subjects = (const char*[1]){"rollup.*"};
cfg.SubjectsLen = 1;
cfg.AllowRollup = true;
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (si != NULL)
&& (si->Config != NULL) && si->Config->AllowRollup);
jsStreamInfo_Destroy(si);
si = NULL;

test("Populate: ");
for (i=0; (s == NATS_OK) && (i<10); i++)
s = js_Publish(NULL, js, "rollup.a", "a", 1, NULL, &jerr);
for (i=0; (s == NATS_OK) && (i<10); i++)
s = js_Publish(NULL, js, "rollup.b", "b", 1, NULL, &jerr);
for (i=0; (s == NATS_OK) && (i<10); i++)
s = js_Publish(NULL, js, "rollup.c", "c", 1, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Check stream: ");
s = js_GetStreamInfo(&si, js, "ROLLUP", NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (si != NULL)
&& (si->State.Msgs == 30));
jsStreamInfo_Destroy(si);
si = NULL;

test("Rollup per subject: ");
s = natsMsg_Create(&msg, "rollup.b", NULL, "Rollup", 6);
IFOK(s, natsMsgHeader_Set(msg, JSMsgRollup, JSMsgRollupSubject));
IFOK(s, js_PublishMsg(NULL, js, msg, NULL, &jerr));
testCond((s == NATS_OK) && (jerr == 0));
natsMsg_Destroy(msg);
msg = NULL;

test("Create consumer: ");
s = js_SubscribeSync(&sub, js, "rollup.b", NULL, NULL, &jerr);
testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0));

test("Check content: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
IFOK(s, (strcmp(natsMsg_GetData(msg), "Rollup") == 0 ? NATS_OK : NATS_ERR));
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Make sure single msg: ");
s = natsSubscription_NextMsg(&msg, sub, 250);
testCond((s == NATS_TIMEOUT) && (msg == NULL));
nats_clearLastError();

natsSubscription_Destroy(sub);
sub = NULL;

test("Rollup for all: ");
s = natsMsg_Create(&msg, "rollup.c", NULL, "RollupAll", 9);
IFOK(s, natsMsgHeader_Set(msg, JSMsgRollup, JSMsgRollupAll));
IFOK(s, js_PublishMsg(NULL, js, msg, NULL, &jerr));
testCond((s == NATS_OK) && (jerr == 0));
natsMsg_Destroy(msg);
msg = NULL;

test("Create consumer: ");
s = js_SubscribeSync(&sub, js, "rollup.c", NULL, NULL, &jerr);
testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0));

test("Check content: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
IFOK(s, (strcmp(natsMsg_GetData(msg), "RollupAll") == 0 ? NATS_OK : NATS_ERR));
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Make sure single msg: ");
s = natsSubscription_NextMsg(&msg, sub, 250);
testCond((s == NATS_TIMEOUT) && (msg == NULL));
nats_clearLastError();

test("Check stream: ");
s = js_GetStreamInfo(&si, js, "ROLLUP", NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (si != NULL)
&& (si->State.Msgs == 1));
jsStreamInfo_Destroy(si);
si = NULL;

jsCtx_Destroy(js);
natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);
_stopServer(pid);
rmtree(datastore);
}

#if defined(NATS_HAS_STREAMING)

static int
Expand Down Expand Up @@ -28746,6 +28949,7 @@ static testInfo allTests[] =
{"JetStreamOrderedCons", test_JetStreamOrderedConsumer},
{"JetStreamOrderedConsWithErrors", test_JetStreamOrderedConsumerWithErrors},
{"JetStreamOrderedConsAutoUnsub", test_JetStreamOrderedConsumerWithAutoUnsub},
{"JetStreamStreamsSealAndRollup", test_JetStreamStreamsSealAndRollup},

#if defined(NATS_HAS_STREAMING)
{"StanPBufAllocator", test_StanPBufAllocator},
Expand Down