Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Stream RePublish and some ConsumerConfig new fields #546

Merged
merged 1 commit into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 72 additions & 3 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ _destroyStreamSource(jsStreamSource *source)
NATS_FREE(source);
}

static void
_destroySubjectMapping(jsSubjectMapping *sm)
{
if (sm == NULL)
return;

NATS_FREE((char*) sm->Source);
NATS_FREE((char*) sm->Destination);
NATS_FREE(sm);
}

void
js_destroyStreamConfig(jsStreamConfig *cfg)
{
Expand All @@ -116,6 +127,7 @@ js_destroyStreamConfig(jsStreamConfig *cfg)
for (i=0; i<cfg->SourcesLen; i++)
_destroyStreamSource(cfg->Sources[i]);
NATS_FREE(cfg->Sources);
_destroySubjectMapping(cfg->RePublish);
NATS_FREE(cfg);
}

Expand Down Expand Up @@ -497,6 +509,33 @@ _marshalStorageType(jsStorageType storage, natsBuffer *buf)
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalRePublish(nats_JSON *json, const char *fieldName, jsSubjectMapping **new_mapping)
{
jsSubjectMapping *sm = NULL;
nats_JSON *jsm = NULL;
natsStatus s;

s = nats_JSONGetObject(json, fieldName, &jsm);
if (jsm == NULL)
return NATS_UPDATE_ERR_STACK(s);

sm = (jsSubjectMapping*) NATS_CALLOC(1, sizeof(jsSubjectMapping));
if (sm == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

s = nats_JSONGetStr(jsm, "src", (char**) &(sm->Source));
IFOK(s, nats_JSONGetStr(jsm, "dest", (char**) &(sm->Destination)));

if (s == NATS_OK)
*new_mapping = sm;
else
_destroySubjectMapping(sm);

return NATS_UPDATE_ERR_STACK(s);

}

natsStatus
js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig **new_cfg)
{
Expand Down Expand Up @@ -562,6 +601,7 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig
IFOK(s, nats_JSONGetBool(jcfg, "deny_delete", &(cfg->DenyDelete)));
IFOK(s, nats_JSONGetBool(jcfg, "deny_purge", &(cfg->DenyPurge)));
IFOK(s, nats_JSONGetBool(jcfg, "allow_rollup_hdrs", &(cfg->AllowRollup)));
IFOK(s, _unmarshalRePublish(jcfg, "republish", &(cfg->RePublish)));

if (s == NATS_OK)
*new_cfg = cfg;
Expand Down Expand Up @@ -660,6 +700,21 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg)
IFOK(s, natsBuf_Append(buf, ",\"deny_purge\":true", -1));
if ((s == NATS_OK) && cfg->AllowRollup)
IFOK(s, natsBuf_Append(buf, ",\"allow_rollup_hdrs\":true", -1));
if ((s == NATS_OK) && (cfg->RePublish != NULL))
{
// "dest" is not omitempty, in that the field will always be present.
IFOK(s, natsBuf_Append(buf, ",\"republish\":{\"dest\":\"", -1));
// Still check that our value is not NULL
if (!nats_IsStringEmpty(cfg->RePublish->Destination))
IFOK(s, natsBuf_Append(buf, cfg->RePublish->Destination, -1));
// Now the source...
if (!nats_IsStringEmpty(cfg->RePublish->Source))
{
IFOK(s, natsBuf_Append(buf, "\",\"src\":\"", -1))
IFOK(s, natsBuf_Append(buf, cfg->RePublish->Source, -1));
}
IFOK(s, natsBuf_Append(buf, "\"}", -1));
}

IFOK(s, natsBuf_AppendByte(buf, '}'));

Expand Down Expand Up @@ -1653,6 +1708,17 @@ jsExternalStream_Init(jsExternalStream *external)
return NATS_OK;
}

natsStatus
jsSubjectMapping_Init(jsSubjectMapping *sm, const char *src, const char *dst)
{
if (sm == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

sm->Source = src;
sm->Destination = dst;
return NATS_OK;
}

//
// Consumer related functions
//
Expand Down Expand Up @@ -1817,8 +1883,6 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
s = nats_marshalLong(buf, true, "max_batch", cfg->MaxRequestBatch);
if ((s == NATS_OK) && (cfg->MaxRequestExpires > 0))
s = nats_marshalLong(buf, true, "max_expires", cfg->MaxRequestExpires);
if ((s == NATS_OK) && (cfg->MaxRequestMaxBytes > 0))
s = nats_marshalLong(buf, true, "max_bytes", cfg->MaxRequestMaxBytes);
if ((s == NATS_OK) && (cfg->InactiveThreshold > 0))
s = nats_marshalLong(buf, true, "inactive_threshold", cfg->InactiveThreshold);
if ((s == NATS_OK) && (cfg->BackOff != NULL) && (cfg->BackOffLen > 0))
Expand All @@ -1836,6 +1900,10 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
}
IFOK(s, natsBuf_AppendByte(buf, ']'));
}
if ((s == NATS_OK) && (cfg->Replicas > 0))
s = nats_marshalLong(buf, true, "num_replicas", cfg->Replicas);
if ((s == NATS_OK) && cfg->MemoryStorage)
s = natsBuf_Append(buf, ",\"mem_storage\":true", -1);
IFOK(s, natsBuf_Append(buf, "}}", -1));

if (s == NATS_OK)
Expand Down Expand Up @@ -1973,9 +2041,10 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi
IFOK(s, nats_JSONGetBool(cjson, "headers_only", &(cc->HeadersOnly)));
IFOK(s, nats_JSONGetLong(cjson, "max_batch", &(cc->MaxRequestBatch)));
IFOK(s, nats_JSONGetLong(cjson, "max_expires", &(cc->MaxRequestExpires)));
IFOK(s, nats_JSONGetLong(cjson, "max_bytes", &(cc->MaxRequestMaxBytes)));
IFOK(s, nats_JSONGetLong(cjson, "inactive_threshold", &(cc->InactiveThreshold)));
IFOK(s, nats_JSONGetArrayLong(cjson, "backoff", &(cc->BackOff), &(cc->BackOffLen)));
IFOK(s, nats_JSONGetLong(cjson, "num_replicas", &(cc->Replicas)));
IFOK(s, nats_JSONGetBool(cjson, "mem_storage", &(cc->MemoryStorage)));
}

if (s == NATS_OK)
Expand Down
46 changes: 43 additions & 3 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ extern "C" {
*/
#define JSMsgRollupAll "all"

// Headers for republished messages.
#define JSStream "Nats-Stream"
#define JSSequence "Nats-Sequence"
#define JSLastSequence "Nats-Last-Sequence"

//
// Types.
//
Expand Down Expand Up @@ -371,6 +376,16 @@ typedef struct jsStreamSource

} jsStreamSource;

/**
* Allows a source subject to be mapped to a destination subject for republishing.
*/
typedef struct jsSubjectMapping
{
const char *Source;
const char *Destination;

} jsSubjectMapping;

/**
* Configuration of a JetStream stream.
*
Expand All @@ -397,6 +412,7 @@ typedef struct jsStreamSource
* const char *subjects[] = {"foo", "bar"};
* const char *tags[] = {"tag1", "tag2"};
* jsStreamSource *sources[] = {&s1, &s2};
* jsSubjectMapping sm;
*
* jsStreamConfig_Init(&sc);
*
Expand Down Expand Up @@ -437,6 +453,10 @@ typedef struct jsStreamSource
* sc.Sources = sources;
* sc.SourcesLen = 2;
*
* // For RePublish subject:
* jsSubjectMapping_Init(&sm, ">", "RP.>")
* sc.RePublish = &sm;
*
* s = js_AddStream(&si, js, &sc, NULL, &jerr);
* \endcode
*/
Expand Down Expand Up @@ -471,6 +491,9 @@ typedef struct jsStreamConfig {
*/
bool AllowRollup;

// Allow republish of the message after being sequenced and stored.
jsSubjectMapping *RePublish;

} jsStreamConfig;

/**
Expand Down Expand Up @@ -653,8 +676,6 @@ typedef struct jsConsumerConfig
{
const char *Durable;
const char *Description;
const char *DeliverSubject;
const char *DeliverGroup;
jsDeliverPolicy DeliverPolicy;
uint64_t OptStartSeq;
int64_t OptStartTime; ///< UTC time expressed as number of nanoseconds since epoch.
Expand All @@ -676,11 +697,19 @@ typedef struct jsConsumerConfig
// Pull based options.
int64_t MaxRequestBatch; ///< Maximum Pull Consumer request batch size.
int64_t MaxRequestExpires; ///< Maximum Pull Consumer request expiration, expressed in number of nanoseconds.
int64_t MaxRequestMaxBytes; ///< Maximum number of bytes that the server will send for a given pull request.

// Push based options.
const char *DeliverSubject;
const char *DeliverGroup;

// Ephemeral inactivity threshold.
int64_t InactiveThreshold; ///< How long the server keeps an ephemeral after detecting loss of interest, expressed in number of nanoseconds.

// Generally inherited by parent stream and other markers, now can be configured directly.
int64_t Replicas;
// Force memory storage.
bool MemoryStorage;

} jsConsumerConfig;

/**
Expand Down Expand Up @@ -5167,6 +5196,17 @@ jsStreamSource_Init(jsStreamSource *source);
NATS_EXTERN natsStatus
jsExternalStream_Init(jsExternalStream *external);

/** \brief Initializes a subject mapping structure.
*
* Use this to set the source and destination for a subject mapping.
*
* @param sm the pointer to the #jsSubjectMapping to initialize.
* @param src the string for the source.
* @param dst the string for the destination.
*/
NATS_EXTERN natsStatus
jsSubjectMapping_Init(jsSubjectMapping *sm, const char *src, const char *dst);

/** \brief Creates a stream.
*
* Creates a stream based on the provided configuration (that cannot be `NULL`).
Expand Down
42 changes: 34 additions & 8 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21471,6 +21471,7 @@ test_JetStreamMarshalStreamConfig(void)
nats_JSON *json = NULL;
jsStreamConfig *rsc = NULL;
int64_t optStartTime = 1624583232123456000;
jsSubjectMapping sm;

test("init bad args: ");
s = jsStreamConfig_Init(NULL);
Expand Down Expand Up @@ -21546,6 +21547,23 @@ test_JetStreamMarshalStreamConfig(void)
sc.DenyPurge = true;
sc.AllowRollup = true;

test("Subject mapping init err: ");
s = jsSubjectMapping_Init(NULL, "a", "b");
testCond(s == NATS_INVALID_ARG);
nats_clearLastError();
s = NATS_OK;

test("Subject mapping init, NULL or empty are ok: ");
s = jsSubjectMapping_Init(&sm, "A", NULL);
IFOK(s, jsSubjectMapping_Init(&sm, "A", ""));
IFOK(s, jsSubjectMapping_Init(&sm, NULL, "B"));
IFOK(s, jsSubjectMapping_Init(&sm, "", "B"));
testCond(s == NATS_OK);

// Republish
jsSubjectMapping_Init(&sm, ">", "RP.>");
sc.RePublish = &sm;

test("Marshal stream config: ");
s = js_marshalStreamConfig(&buf, &sc);
testCond((s == NATS_OK) && (buf != NULL) && (natsBuf_Len(buf) > 0));
Expand Down Expand Up @@ -21606,7 +21624,12 @@ test_JetStreamMarshalStreamConfig(void)
&& rsc->Sealed
&& rsc->DenyDelete
&& rsc->DenyPurge
&& rsc->AllowRollup);
&& rsc->AllowRollup
&& (rsc->RePublish != NULL)
&& (rsc->RePublish->Source != NULL)
&& (strcmp(rsc->RePublish->Source, ">") == 0)
&& (rsc->RePublish->Destination != NULL)
&& (strcmp(rsc->RePublish->Destination, "RP.>") == 0));
js_destroyStreamConfig(rsc);
rsc = NULL;
// Check that this does not crash
Expand Down Expand Up @@ -21692,7 +21715,8 @@ test_JetStreamUnmarshalConsumerInfo(void)
"{\"config\":{\"backoff\":[50000000,250000000]}}",
"{\"config\":{\"max_batch\":100}}",
"{\"config\":{\"max_expires\":1000000000}}",
"{\"config\":{\"max_bytes\":1048576}}",
"{\"config\":{\"num_replicas\":1}}",
"{\"config\":{\"mem_storage\":true}}",
};
const char *bad[] = {
"{\"stream_name\":123}",
Expand Down Expand Up @@ -21724,7 +21748,7 @@ test_JetStreamUnmarshalConsumerInfo(void)
"{\"config\":{\"backoff\":true}}",
"{\"config\":{\"max_batch\":\"abc\"}}",
"{\"config\":{\"max_expires\":false}}",
"{\"config\":{\"max_bytes\":true}}",
"{\"config\":{\"mem_storage\":\"abc\"}}",
"{\"delivered\":123}",
"{\"delivered\":{\"consumer_seq\":\"abc\"}}",
"{\"delivered\":{\"stream_seq\":\"abc\"}}",
Expand Down Expand Up @@ -22854,6 +22878,8 @@ test_JetStreamMgtConsumers(void)
cfg.MaxAckPending = 600;
cfg.FlowControl = true;
cfg.Heartbeat = 700;
cfg.Replicas = 1;
cfg.MemoryStorage = true;
// We create a consumer with non existing stream, so we
// expect this to fail. We are just checking that the config
// is properly serialized.
Expand All @@ -22874,7 +22900,8 @@ test_JetStreamMgtConsumers(void)
"\"ack_wait\":200,\"max_deliver\":300,\"filter_subject\":\"bar\","\
"\"replay_policy\":\"instant\",\"rate_limit_bps\":400,"\
"\"sample_freq\":\"60%%\",\"max_waiting\":500,\"max_ack_pending\":600,"\
"\"flow_control\":true,\"idle_heartbeat\":700}}",
"\"flow_control\":true,\"idle_heartbeat\":700,"\
"\"num_replicas\":1,\"mem_storage\":true}}",
natsMsg_GetDataLength(resp)) == 0));
natsMsg_Destroy(resp);
resp = NULL;
Expand Down Expand Up @@ -22904,7 +22931,8 @@ test_JetStreamMgtConsumers(void)
"\"ack_wait\":200,\"max_deliver\":300,\"filter_subject\":\"bar\","\
"\"replay_policy\":\"instant\",\"rate_limit_bps\":400,"\
"\"sample_freq\":\"60%%\",\"max_waiting\":500,\"max_ack_pending\":600,"\
"\"flow_control\":true,\"idle_heartbeat\":700}}",
"\"flow_control\":true,\"idle_heartbeat\":700,"\
"\"num_replicas\":1,\"mem_storage\":true}}",
natsMsg_GetDataLength(resp)) == 0));
natsMsg_Destroy(resp);
resp = NULL;
Expand Down Expand Up @@ -23137,7 +23165,6 @@ test_JetStreamMgtConsumers(void)
cfg.HeadersOnly = true;
cfg.MaxRequestBatch = 10;
cfg.MaxRequestExpires = NATS_SECONDS_TO_NANOS(2);
cfg.MaxRequestMaxBytes = 1024*1024;

test("Update works ok: ");
s = js_UpdateConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
Expand All @@ -23150,8 +23177,7 @@ test_JetStreamMgtConsumers(void)
&& (ci->Config->MaxWaiting == 20)
&& (ci->Config->HeadersOnly)
&& (ci->Config->MaxRequestBatch == 10)
&& (ci->Config->MaxRequestExpires == NATS_SECONDS_TO_NANOS(2))
&& (ci->Config->MaxRequestMaxBytes == 1024*1024));
&& (ci->Config->MaxRequestExpires == NATS_SECONDS_TO_NANOS(2)));
jsConsumerInfo_Destroy(ci);
ci = NULL;

Expand Down