Skip to content

Commit

Permalink
Merge pull request #570 from nats-io/kv_add_republish
Browse files Browse the repository at this point in the history
[ADDED] KeyValue: RePublish configuration
  • Loading branch information
kozlovic committed Aug 10, 2022
2 parents a9e86ca + 372016d commit d8092c4
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/kv.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg)
sc.AllowRollup = true;
sc.DenyDelete = true;
sc.AllowDirect = true;
sc.RePublish = cfg->RePublish;

// If connecting to a v2.7.2+, create with discard new policy
if (natsConn_srvVersionAtLeast(kv->js->nc, 2, 7, 2))
Expand Down
1 change: 1 addition & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,7 @@ typedef struct kvConfig
int64_t MaxBytes;
jsStorageType StorageType;
int Replicas;
jsRePublish *RePublish;

} kvConfig;

Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ KeyValueDeleteTombstones
KeyValueDeleteMarkerThreshold
KeyValueCrossAccount
KeyValueDiscardOldToNew
KeyValueRePublish
StanPBufAllocator
StanConnOptions
StanSubOptions
Expand Down
75 changes: 73 additions & 2 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -23298,7 +23298,6 @@ test_JetStreamMgtConsumers(void)
cfg.MaxDeliver = 1;
cfg.SampleFrequency = "30";
cfg.MaxAckPending = 10;
cfg.MaxWaiting = 20;
cfg.HeadersOnly = true;
cfg.MaxRequestBatch = 10;
cfg.MaxRequestExpires = NATS_SECONDS_TO_NANOS(2);
Expand All @@ -23311,7 +23310,6 @@ test_JetStreamMgtConsumers(void)
&& (ci->Config->MaxDeliver == 1)
&& (strcmp(ci->Config->SampleFrequency, "30") == 0)
&& (ci->Config->MaxAckPending == 10)
&& (ci->Config->MaxWaiting == 20)
&& (ci->Config->HeadersOnly)
&& (ci->Config->MaxRequestBatch == 10)
&& (ci->Config->MaxRequestExpires == NATS_SECONDS_TO_NANOS(2)));
Expand Down Expand Up @@ -29675,6 +29673,78 @@ test_KeyValueDiscardOldToNew(void)
JS_TEARDOWN;
}

static void
test_KeyValueRePublish(void)
{
kvStore *kv = NULL;
jsStreamInfo *si = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
kvConfig kvc;
jsRePublish rp;
natsStatus s;

JS_SETUP(2, 9, 0);

test("Create KV: ");
kvConfig_Init(&kvc);
kvc.Bucket = "TEST_UPDATE";
s = js_CreateKeyValue(&kv, js, &kvc);
testCond(s == NATS_OK);

kvStore_Destroy(kv);
kv = NULL;

test("Set RePublish should fail: ");
jsRePublish_Init(&rp);
rp.Source = ">";
rp.Destination = "bar.>";
kvc.RePublish =&rp;
s = js_CreateKeyValue(&kv, js, &kvc);
testCond((s == NATS_ERR) && (strstr(nats_GetLastError(NULL), "can not change RePublish") != NULL));
nats_clearLastError();

test("Create with repub: ");
kvc.Bucket = "TEST";
s = js_CreateKeyValue(&kv, js, &kvc);
testCond(s == NATS_OK);

test("Check set: ");
s = js_GetStreamInfo(&si, js, "KV_TEST", NULL, NULL);
testCond((s == NATS_OK) && (si->Config != NULL) && (si->Config->RePublish != NULL));
jsStreamInfo_Destroy(si);

test("Sub: ");
s = natsConnection_SubscribeSync(&sub, nc, "bar.>");
testCond(s == NATS_OK);

test("Put: ");
s = kvStore_PutString(NULL, kv, "foo", "value");
testCond(s == NATS_OK);

test("Get msg: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond(s == NATS_OK);

test("Check msg: ");
s = (strcmp(natsMsg_GetData(msg), "value") == 0 ? NATS_OK : NATS_ERR);
if (s == NATS_OK)
{
const char *subj = NULL;

s = natsMsgHeader_Get(msg, JSSubject, &subj);
if (s == NATS_OK)
s = (strcmp(subj, "$KV.TEST.foo") == 0 ? NATS_OK : NATS_ERR);
}
testCond(s == NATS_OK);

natsMsg_Destroy(msg);
natsSubscription_Destroy(sub);
kvStore_Destroy(kv);

JS_TEARDOWN;
}

#if defined(NATS_HAS_STREAMING)

static int
Expand Down Expand Up @@ -32136,6 +32206,7 @@ static testInfo allTests[] =
{"KeyValueDeleteMarkerThreshold", test_KeyValuePurgeDeletesMarkerThreshold},
{"KeyValueCrossAccount", test_KeyValueCrossAccount},
{"KeyValueDiscardOldToNew", test_KeyValueDiscardOldToNew},
{"KeyValueRePublish", test_KeyValueRePublish},

#if defined(NATS_HAS_STREAMING)
{"StanPBufAllocator", test_StanPBufAllocator},
Expand Down

0 comments on commit d8092c4

Please sign in to comment.