Skip to content

Commit

Permalink
Add number of messages per subject in stream config
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jun 18, 2021
1 parent 349c2c9 commit 2867903
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ natsJS_unmarshalStreamConfig(natsJSStreamConfig **new_cfg, nats_JSON *jcfg)
IFOK(s, nats_JSONGetLong(jcfg, "max_bytes", &(cfg->MaxBytes)));
IFOK(s, nats_JSONGetLong(jcfg, "max_age", &(cfg->MaxAge)));
IFOK_INF(s, nats_JSONGetInt32(jcfg, "max_msg_size", &(cfg->MaxMsgSize)));
IFOK(s, nats_JSONGetLong(jcfg, "max_msgs_per_subject", &(cfg->MaxMsgsPerSubject)));
IFOK(s, nats_JSONGetStr(jcfg, "discard", &tmpStr));
IFOK(s, _unmarshalDiscardPolicy(&(cfg->Discard), &tmpStr));
IFOK(s, nats_JSONGetStr(jcfg, "storage", &tmpStr));
Expand Down Expand Up @@ -540,6 +541,10 @@ natsJS_marshalStreamConfig(natsBuffer **new_buf, natsJSStreamConfig *cfg)
snprintf(temp, sizeof(temp), "%d", (int) cfg->MaxMsgSize);
IFOK(s, natsBuf_Append(buf, temp, -1));

IFOK(s, natsBuf_Append(buf, ",\"max_msgs_per_subject\":", -1));
snprintf(temp, sizeof(temp), "%d", (int) cfg->MaxMsgsPerSubject);
IFOK(s, natsBuf_Append(buf, temp, -1));

IFOK(s, _marshalDiscardPolicy(cfg->Discard, buf));

IFOK(s, _marshalStorageType(cfg->Storage, buf));
Expand Down
1 change: 1 addition & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ typedef struct natsJSStreamConfig {
int64_t MaxBytes; //`json:"max_bytes"`
int64_t MaxAge; //`json:"max_age"`
int32_t MaxMsgSize; //`json:"max_msg_size,omitempty"`
int64_t MaxMsgsPerSubject; //`json:"max_msgs_per_subject"`
natsJSDiscardPolicy Discard; //`json:"discard"`
natsJSStorageType Storage; //`json:"storage"`
int Replicas; //`json:"num_replicas"`
Expand Down
12 changes: 10 additions & 2 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -20022,7 +20022,7 @@ test_JetStreamUnmarshalStreamConfig(void)

test("Stream config with all required: ");
snprintf(tmp, sizeof(tmp), "{\"name\":\"TEST\",\"retention\":\"workqueue\","\
"\"max_consumers\":5,\"max_msgs\":10,\"max_bytes\":1000,\"max_age\":20000000,"\
"\"max_consumers\":5,\"max_msgs\":10,\"max_bytes\":1000,\"max_msgs_per_subject\":1,\"max_age\":20000000,"\
"\"discard\":\"new\",\"storage\":\"memory\",\"num_replicas\":3}");
s = nats_JSONParse(&json, tmp, (int) strlen(tmp));
IFOK(s, natsJS_unmarshalStreamConfig(&sc, json));
Expand All @@ -20033,6 +20033,7 @@ test_JetStreamUnmarshalStreamConfig(void)
&& (sc->MaxMsgs == 10)
&& (sc->MaxBytes == 1000)
&& (sc->MaxAge == 20000000)
&& (sc->MaxMsgsPerSubject == 1)
&& (sc->Discard == natsJS_DiscardNew)
&& (sc->Storage == natsJS_MemoryStorage)
&& (sc->Replicas == 3));
Expand All @@ -20044,7 +20045,7 @@ test_JetStreamUnmarshalStreamConfig(void)
test("Stream config with all: ");
if (snprintf(tmp, sizeof(tmp), "{\"name\":\"TEST\",\"subjects\":[\"foo\",\"bar\"],"\
"\"retention\":\"workqueue\",\"max_consumers\":5,\"max_msgs\":10,\"max_bytes\":1000,"\
"\"max_age\":20000000,\"max_msg_size\":1024,\"discard\":\"new\",\"storage\":\"memory\","\
"\"max_age\":20000000,\"max_msg_size\":1024,\"max_msgs_per_subject\":1,\"discard\":\"new\",\"storage\":\"memory\","\
"\"num_replicas\":3,\"no_ack\":true,\"template_owner\":\"owner\","\
"\"duplicate_window\":100000000000,\"placement\":{\"cluster\":\"cluster\",\"tags\":[\"tag1\",\"tag2\"]},"\
"\"mirror\":{\"name\":\"TEST2\",\"opt_start_seq\":10,\"filter_subject\":\"foo\",\"external\":{\"api\":\"my_prefix\",\"deliver\":\"deliver_prefix\"}},"\
Expand All @@ -20059,6 +20060,7 @@ test_JetStreamUnmarshalStreamConfig(void)
&& (sc->SubjectsLen == 2)
&& (strcmp(sc->Subjects[0], "foo") == 0)
&& (strcmp(sc->Subjects[1], "bar") == 0)
&& (sc->MaxMsgsPerSubject == 1)
&& (sc->MaxMsgSize == 1024)
&& (sc->NoAck)
&& (strcmp(sc->Template, "owner") == 0)
Expand Down Expand Up @@ -20118,6 +20120,7 @@ test_JetStreamMarshalStreamConfig(void)
sc.MaxAge = 4;
sc.MaxMsgSize = 5;
sc.Duplicates = 6;
sc.MaxMsgsPerSubject = 1;
sc.Discard = natsJS_DiscardNew;
sc.Storage = natsJS_MemoryStorage;
sc.Replicas = 3;
Expand Down Expand Up @@ -20180,6 +20183,7 @@ test_JetStreamMarshalStreamConfig(void)
&& (rsc->MaxBytes == 3)
&& (rsc->MaxAge == 4)
&& (rsc->MaxMsgSize == 5)
&& (rsc->MaxMsgsPerSubject == 1)
&& (rsc->Duplicates == 6)
&& (rsc->Discard == natsJS_DiscardNew)
&& (rsc->Storage == natsJS_MemoryStorage)
Expand Down Expand Up @@ -20401,6 +20405,7 @@ test_JetStreamContextDomain(void)
testCond((s == NATS_OK)
&& (jerr == 0)
&& (ai != NULL)
&& (strcmp(ai->Domain, "ABC") == 0)
&& (ai->Limits.MaxMemory == -1)
&& (ai->Limits.MaxStore == -1));
natsJSAccountInfo_Destroy(ai);
Expand Down Expand Up @@ -20428,6 +20433,7 @@ test_JetStreamContextDomain(void)
testCond((s == NATS_OK)
&& (jerr == 0)
&& (ai != NULL)
&& (strcmp(ai->Domain, "ABC") == 0)
&& (ai->Limits.MaxMemory == -1)
&& (ai->Limits.MaxStore == -1));
natsJSAccountInfo_Destroy(ai);
Expand All @@ -20447,6 +20453,7 @@ test_JetStreamContextDomain(void)
testCond((s == NATS_OK)
&& (jerr == 0)
&& (ai != NULL)
&& (strcmp(ai->Domain, "ABC") == 0)
&& (ai->Limits.MaxMemory == -1)
&& (ai->Limits.MaxStore == -1));
natsJSAccountInfo_Destroy(ai);
Expand All @@ -20459,6 +20466,7 @@ test_JetStreamContextDomain(void)
testCond((s == NATS_OK)
&& (jerr == 0)
&& (ai != NULL)
&& (strcmp(ai->Domain, "ABC") == 0)
&& (ai->Limits.MaxMemory == -1)
&& (ai->Limits.MaxStore == -1));
natsJSAccountInfo_Destroy(ai);
Expand Down

0 comments on commit 2867903

Please sign in to comment.