Skip to content

Commit

Permalink
[Added] More v2.10 related changes (#682)
Browse files Browse the repository at this point in the history
* Added Metadata to Stream, Consumer configs
Merged some other fixes from go PR

* leak

* Added jsStreamConfig.Compression

* Added jsStreamConfig.FirstSeq

* Added jsStreamConfig.SubjectTransform

* Added jsStreamSourceInfo.SubjectTransforms and .FilterSubjects

* Added jsStreamConfig.ConsumerLimits

* PR feedback: nits

* PR feedback: verify metadata values in test

* PR feedback: fix a cast

* Fixed the broken test (order of values)
  • Loading branch information
levb committed Oct 12, 2023
1 parent b8a27cf commit 204f256
Show file tree
Hide file tree
Showing 12 changed files with 596 additions and 152 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ install/
html/
!doc/html/
test/datastore_*/
test/conf_*

# Emacs
*~
Expand Down
3 changes: 3 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ extern const int64_t jsDefaultRequestWait;
#define jsStorageTypeFileStr "file"
#define jsStorageTypeMemStr "memory"

#define jsStorageCompressionNoneStr "none"
#define jsStorageCompressionS2Str "s2"

#define jsDeliverAllStr "all"
#define jsDeliverLastStr "last"
#define jsDeliverNewStr "new"
Expand Down
200 changes: 193 additions & 7 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ _destroyRePublish(jsRePublish *rp)
NATS_FREE(rp);
}

void
js_destroyStreamConfig(jsStreamConfig *cfg)
void js_destroyStreamConfig(jsStreamConfig *cfg)
{
int i;

Expand All @@ -136,6 +135,9 @@ js_destroyStreamConfig(jsStreamConfig *cfg)
_destroyStreamSource(cfg->Sources[i]);
NATS_FREE(cfg->Sources);
_destroyRePublish(cfg->RePublish);
nats_freeMetadata(&(cfg->Metadata));
NATS_FREE((char *)cfg->SubjectTransform.Source);
NATS_FREE((char *)cfg->SubjectTransform.Destination);
NATS_FREE(cfg);
}

Expand Down Expand Up @@ -168,10 +170,19 @@ _destroyClusterInfo(jsClusterInfo *cluster)
static void
_destroyStreamSourceInfo(jsStreamSourceInfo *info)
{
int i;

if (info == NULL)
return;

NATS_FREE(info->Name);
NATS_FREE((char*)info->FilterSubject);
for (i=0; i < info->SubjectTransformsLen; i++)
{
NATS_FREE((char *)info->SubjectTransforms[i].Source);
NATS_FREE((char *)info->SubjectTransforms[i].Destination);
}
NATS_FREE(info->SubjectTransforms);
_destroyExternalStream(info->External);
NATS_FREE(info);
}
Expand Down Expand Up @@ -535,6 +546,113 @@ _marshalStorageType(jsStorageType storage, natsBuffer *buf)
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStorageCompression(nats_JSON *json, const char *fieldName, jsStorageCompression *compression)
{
natsStatus s = NATS_OK;
const char *str = NULL;

s = nats_JSONGetStrPtr(json, "compression", &str);
if (str == NULL)
return NATS_UPDATE_ERR_STACK(s);

if (strcmp(str, jsStorageCompressionNoneStr) == 0)
*compression = js_StorageCompressionNone;
else if (strcmp(str, jsStorageCompressionS2Str) == 0)
*compression = js_StorageCompressionS2;
else
s = nats_setError(NATS_ERR, "unable to unmarshal storage compression '%s'", str);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_marshalStorageCompression(jsStorageCompression compression, natsBuffer *buf)
{
natsStatus s;
const char *st = NULL;

s = natsBuf_Append(buf, ",\"compression\":\"", -1);
switch (compression)
{
case js_StorageCompressionNone:
st = jsStorageCompressionNoneStr;
break;
case js_StorageCompressionS2:
st = jsStorageCompressionS2Str;
break;
default:
return nats_setError(NATS_INVALID_ARG, "invalid storage type %d", (int)compression);
}
IFOK(s, natsBuf_Append(buf, st, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalSubjectTransformConfig(nats_JSON *obj, jsSubjectTransformConfig *cfg)
{
natsStatus s = NATS_OK;

memset(cfg, 0, sizeof(jsSubjectTransformConfig));
if (obj == NULL)
{
return NATS_OK;
}

IFOK(s, nats_JSONGetStr(obj, "src", (char **)&(cfg->Source)));
IFOK(s, nats_JSONGetStr(obj, "dest", (char **)&(cfg->Destination)));
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_marshalSubjectTransformConfig(jsSubjectTransformConfig *cfg, natsBuffer *buf)
{
natsStatus s;
if (cfg == NULL || (nats_IsStringEmpty(cfg->Source) && nats_IsStringEmpty(cfg->Destination)))
return NATS_OK;

s = natsBuf_Append(buf, ",\"subject_transform\":{", -1);
IFOK(s, natsBuf_Append(buf, "\"src\":\"", -1));
if (cfg->Source != NULL)
IFOK(s, natsBuf_Append(buf, cfg->Source, -1));
IFOK(s, natsBuf_Append(buf, "\",\"dest\":\"", -1));
if (cfg->Destination != NULL)
IFOK(s, natsBuf_Append(buf, cfg->Destination, -1));
IFOK(s, natsBuf_Append(buf, "\"}", -1));
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_marshalStreamConsumerLimits(jsStreamConsumerLimits *limits, natsBuffer *buf)
{
natsStatus s;
if (limits == NULL || (limits->InactiveThreshold == 0 && limits->MaxAckPending == 0))
return NATS_OK;

s = natsBuf_Append(buf, ",\"consumer_limits\":{", -1);
IFOK(s, nats_marshalLong(buf, false, "inactive_threshold", limits->InactiveThreshold));
IFOK(s, nats_marshalLong(buf, true, "max_ack_pending", limits->MaxAckPending));
IFOK(s, natsBuf_AppendByte(buf, '}'));
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStreamConsumerLimits(nats_JSON *obj, jsStreamConsumerLimits *limits)
{
natsStatus s = NATS_OK;

memset(limits, 0, sizeof(*limits));
if (obj == NULL)
{
return NATS_OK;
}

IFOK(s, nats_JSONGetLong(obj, "inactive_threshold", &limits->InactiveThreshold));
IFOK(s, nats_JSONGetInt(obj, "max_ack_pending", &limits->MaxAckPending));
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalRePublish(nats_JSON *json, const char *fieldName, jsRePublish **new_republish)
{
Expand Down Expand Up @@ -570,6 +688,7 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig
jsStreamConfig *cfg = NULL;
nats_JSON **sources = NULL;
int sourcesLen = 0;
nats_JSON *obj = NULL;
natsStatus s;

if (fieldName != NULL)
Expand Down Expand Up @@ -633,6 +752,15 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig
IFOK(s, nats_JSONGetBool(jcfg, "mirror_direct", &(cfg->MirrorDirect)));
IFOK(s, nats_JSONGetBool(jcfg, "discard_new_per_subject", &(cfg->DiscardNewPerSubject)));

IFOK(s, nats_unmarshalMetadata(jcfg, "metadata", &(cfg->Metadata)));
IFOK(s, _unmarshalStorageCompression(jcfg, "storage", &(cfg->Compression)));
IFOK(s, nats_JSONGetULong(jcfg, "first_seq", &(cfg->FirstSeq)));
IFOK(s, nats_JSONGetObject(jcfg, "subject_transform", &obj));
IFOK(s, _unmarshalSubjectTransformConfig(obj, &(cfg->SubjectTransform)));
obj = NULL;
IFOK(s, nats_JSONGetObject(jcfg, "consumer_limits", &obj));
IFOK(s, _unmarshalStreamConsumerLimits(obj, &(cfg->ConsumerLimits)));

if (s == NATS_OK)
*new_cfg = cfg;
else
Expand Down Expand Up @@ -754,6 +882,12 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg)
if ((s == NATS_OK) && cfg->DiscardNewPerSubject)
IFOK(s, natsBuf_Append(buf, ",\"discard_new_per_subject\":true", -1));

IFOK(s, nats_marshalMetadata(buf, true, "metadata", cfg->Metadata));
IFOK(s, _marshalStorageCompression(cfg->Compression, buf));
IFOK(s, nats_marshalULong(buf, true, "first_seq", cfg->FirstSeq));
IFOK(s, _marshalSubjectTransformConfig(&cfg->SubjectTransform, buf));
IFOK(s, _marshalStreamConsumerLimits(&cfg->ConsumerLimits, buf));

IFOK(s, natsBuf_AppendByte(buf, '}'));

if (s == NATS_OK)
Expand Down Expand Up @@ -941,6 +1075,8 @@ _unmarshalStreamSourceInfo(nats_JSON *pjson, const char *fieldName, jsStreamSour
nats_JSON *json = NULL;
jsStreamSourceInfo *ssi = NULL;
natsStatus s;
nats_JSON **subjectTransforms = NULL;
int subjectTransformsLen = 0;

if (fieldName != NULL)
{
Expand All @@ -961,6 +1097,27 @@ _unmarshalStreamSourceInfo(nats_JSON *pjson, const char *fieldName, jsStreamSour
IFOK(s, _unmarshalExternalStream(json, "external", &(ssi->External)));
IFOK(s, nats_JSONGetULong(json, "lag", &(ssi->Lag)));
IFOK(s, nats_JSONGetLong(json, "active", &(ssi->Active)));
IFOK(s, nats_JSONGetStr(json, "filter_subject", (char **)&(ssi->FilterSubject)));

// Get the sources and unmarshal if present
IFOK(s, nats_JSONGetArrayObject(json, "subject_transforms", &subjectTransforms, &subjectTransformsLen));
if ((s == NATS_OK) && (subjectTransforms != NULL))
{
int i;

ssi->SubjectTransforms = (jsSubjectTransformConfig *)NATS_CALLOC(subjectTransformsLen, sizeof(jsSubjectTransformConfig));
if (ssi->SubjectTransforms == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);

for (i = 0; (s == NATS_OK) && (i < subjectTransformsLen); i++)
{
s = _unmarshalSubjectTransformConfig(subjectTransforms[i], &(ssi->SubjectTransforms[i]));
if (s == NATS_OK)
ssi->SubjectTransformsLen++;
}
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(subjectTransforms);
}

if (s == NATS_OK)
*new_src = ssi;
Expand Down Expand Up @@ -1118,6 +1275,7 @@ jsStreamConfig_Init(jsStreamConfig *cfg)
cfg->Storage = js_FileStorage;
cfg->Discard = js_DiscardOld;
cfg->Replicas = 1;
cfg->Compression = js_StorageCompressionNone;
return NATS_OK;
}

Expand Down Expand Up @@ -1269,6 +1427,22 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo
if (msc)
_restoreMirrorAndSourcesExternal(cfg);

// Make sure the 2.10 config fields actually worked, in case the server is
// older.
if ((s == NATS_OK) && (new_si != NULL) && (*new_si != NULL)
&& (cfg->Compression != (*new_si)->Config->Compression)
&& (cfg->FirstSeq != (*new_si)->Config->FirstSeq)
&& (cfg->Metadata.Count != (*new_si)->Config->Metadata.Count)
&& nats_StringEquals(cfg->SubjectTransform.Source, (*new_si)->Config->SubjectTransform.Source)
&& nats_StringEquals(cfg->SubjectTransform.Destination, (*new_si)->Config->SubjectTransform.Destination)
&& (cfg->ConsumerLimits.InactiveThreshold != (*new_si)->Config->ConsumerLimits.InactiveThreshold)
&& (cfg->ConsumerLimits.MaxAckPending != (*new_si)->Config->ConsumerLimits.MaxAckPending)
)
{
// <>/<> wrong error
return nats_setError(NATS_INVALID_ARG, "%s", jsErrStreamConfigRequired);
}

natsBuf_Destroy(buf);
natsMsg_Destroy(resp);
NATS_FREE(subj);
Expand Down Expand Up @@ -2772,6 +2946,7 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo

IFOK(s, natsBuf_AppendByte(buf, ']'));
}
IFOK(s, nats_marshalMetadata(buf, true, "metadata", cfg->Metadata));
IFOK(s, _marshalReplayPolicy(buf, cfg->ReplayPolicy))
if ((s == NATS_OK) && (cfg->RateLimit > 0))
s = nats_marshalULong(buf, true, "rate_limit_bps", cfg->RateLimit);
Expand Down Expand Up @@ -2844,6 +3019,7 @@ js_destroyConsumerConfig(jsConsumerConfig *cc)
NATS_FREE((char*) cc->FilterSubject);
for (i = 0; i < cc->FilterSubjectsLen; i++)
NATS_FREE((char *)cc->FilterSubjects[i]);
nats_freeMetadata(&(cc->Metadata));
NATS_FREE((char *)cc->FilterSubjects);
NATS_FREE((char *)cc->SampleFrequency);
NATS_FREE(cc->BackOff);
Expand Down Expand Up @@ -2968,6 +3144,7 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi
IFOK(s, nats_JSONGetArrayLong(cjson, "backoff", &(cc->BackOff), &(cc->BackOffLen)));
IFOK(s, nats_JSONGetLong(cjson, "num_replicas", &(cc->Replicas)));
IFOK(s, nats_JSONGetBool(cjson, "mem_storage", &(cc->MemoryStorage)));
IFOK(s, nats_unmarshalMetadata(cjson, "metadata", &(cc->Metadata)));
}

if (s == NATS_OK)
Expand Down Expand Up @@ -3105,14 +3282,14 @@ js_AddConsumer(jsConsumerInfo **new_ci, jsCtx *js,
{
// No subject filter, use <stream>.<consumer name>
// otherwise, the filter subject goes at the end.
if (nats_IsStringEmpty(cfg->FilterSubject))
res = nats_asprintf(&subj, jsApiConsumerCreateExT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, cfg->Name);
else
if (!nats_IsStringEmpty(cfg->FilterSubject) && (cfg->FilterSubjectsLen == 0))
res = nats_asprintf(&subj, jsApiConsumerCreateExWithFilterT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, cfg->Name, cfg->FilterSubject);
else
res = nats_asprintf(&subj, jsApiConsumerCreateExT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, cfg->Name);
}
else if (nats_IsStringEmpty(cfg->Durable))
res = nats_asprintf(&subj, jsApiConsumerCreateT,
Expand All @@ -3136,6 +3313,14 @@ js_AddConsumer(jsConsumerInfo **new_ci, jsCtx *js,
// If we got a response, check for error or return the consumer info result.
IFOK(s, _unmarshalConsumerCreateOrGetResp(new_ci, resp, errCode));

if ((s == NATS_OK)
&& (new_ci != NULL)
&& (cfg->FilterSubjectsLen > 0)
&& ((*new_ci)->Config->FilterSubjectsLen == 0))
{
s = nats_setError(NATS_INVALID_ARG, "%s", "multiple consumer filter subjects not supported by the server");
}

NATS_FREE(subj);
natsMsg_Destroy(resp);
natsBuf_Destroy(buf);
Expand Down Expand Up @@ -3688,6 +3873,7 @@ js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone)
}
c->FilterSubjectsLen = org->FilterSubjectsLen;
}
IFOK(s, nats_cloneMetadata(&(c->Metadata), org->Metadata));
if (s == NATS_OK)
*clone = c;
else
Expand Down
Loading

0 comments on commit 204f256

Please sign in to comment.