Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed RePublish from jsSubjectMapping to jsRePublish + HeadersOnly #563

Merged
merged 1 commit into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ _destroyStreamSource(jsStreamSource *source)
}

static void
_destroySubjectMapping(jsSubjectMapping *sm)
_destroyRePublish(jsRePublish *rp)
{
if (sm == NULL)
if (rp == NULL)
return;

NATS_FREE((char*) sm->Source);
NATS_FREE((char*) sm->Destination);
NATS_FREE(sm);
NATS_FREE((char*) rp->Source);
NATS_FREE((char*) rp->Destination);
NATS_FREE(rp);
}

void
Expand All @@ -127,7 +127,7 @@ js_destroyStreamConfig(jsStreamConfig *cfg)
for (i=0; i<cfg->SourcesLen; i++)
_destroyStreamSource(cfg->Sources[i]);
NATS_FREE(cfg->Sources);
_destroySubjectMapping(cfg->RePublish);
_destroyRePublish(cfg->RePublish);
NATS_FREE(cfg);
}

Expand Down Expand Up @@ -510,27 +510,28 @@ _marshalStorageType(jsStorageType storage, natsBuffer *buf)
}

static natsStatus
_unmarshalRePublish(nats_JSON *json, const char *fieldName, jsSubjectMapping **new_mapping)
_unmarshalRePublish(nats_JSON *json, const char *fieldName, jsRePublish **new_republish)
{
jsSubjectMapping *sm = NULL;
jsRePublish *rp = NULL;
nats_JSON *jsm = NULL;
natsStatus s;

s = nats_JSONGetObject(json, fieldName, &jsm);
if (jsm == NULL)
return NATS_UPDATE_ERR_STACK(s);

sm = (jsSubjectMapping*) NATS_CALLOC(1, sizeof(jsSubjectMapping));
if (sm == NULL)
rp = (jsRePublish*) NATS_CALLOC(1, sizeof(jsRePublish));
if (rp == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

s = nats_JSONGetStr(jsm, "src", (char**) &(sm->Source));
IFOK(s, nats_JSONGetStr(jsm, "dest", (char**) &(sm->Destination)));
s = nats_JSONGetStr(jsm, "src", (char**) &(rp->Source));
IFOK(s, nats_JSONGetStr(jsm, "dest", (char**) &(rp->Destination)));
IFOK(s, nats_JSONGetBool(jsm, "headers_only", &(rp->HeadersOnly)));

if (s == NATS_OK)
*new_mapping = sm;
*new_republish = rp;
else
_destroySubjectMapping(sm);
_destroyRePublish(rp);

return NATS_UPDATE_ERR_STACK(s);

Expand Down Expand Up @@ -702,20 +703,22 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg)
IFOK(s, natsBuf_Append(buf, ",\"deny_purge\":true", -1));
if ((s == NATS_OK) && cfg->AllowRollup)
IFOK(s, natsBuf_Append(buf, ",\"allow_rollup_hdrs\":true", -1));
if ((s == NATS_OK) && (cfg->RePublish != NULL))
if ((s == NATS_OK) && (cfg->RePublish != NULL) && !nats_IsStringEmpty(cfg->RePublish->Destination))
{
// "dest" is not omitempty, in that the field will always be present.
IFOK(s, natsBuf_Append(buf, ",\"republish\":{\"dest\":\"", -1));
// Still check that our value is not NULL
if (!nats_IsStringEmpty(cfg->RePublish->Destination))
IFOK(s, natsBuf_Append(buf, cfg->RePublish->Destination, -1));
IFOK(s, natsBuf_Append(buf, cfg->RePublish->Destination, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
// Now the source...
if (!nats_IsStringEmpty(cfg->RePublish->Source))
{
IFOK(s, natsBuf_Append(buf, "\",\"src\":\"", -1))
IFOK(s, natsBuf_Append(buf, ",\"src\":\"", -1))
IFOK(s, natsBuf_Append(buf, cfg->RePublish->Source, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}
IFOK(s, natsBuf_Append(buf, "\"}", -1));
if (cfg->RePublish->HeadersOnly)
IFOK(s, natsBuf_Append(buf, ",\"headers_only\":true", -1));
IFOK(s, natsBuf_AppendByte(buf, '}'));
}
if ((s == NATS_OK) && cfg->AllowDirect)
IFOK(s, natsBuf_Append(buf, ",\"allow_direct\":true", -1));
Expand Down Expand Up @@ -1881,13 +1884,12 @@ jsExternalStream_Init(jsExternalStream *external)
}

natsStatus
jsSubjectMapping_Init(jsSubjectMapping *sm, const char *src, const char *dst)
jsRePublish_Init(jsRePublish *rp)
{
if (sm == NULL)
if (rp == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

sm->Source = src;
sm->Destination = dst;
memset(rp, 0, sizeof(jsRePublish));
return NATS_OK;
}

Expand Down
25 changes: 13 additions & 12 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,13 @@ typedef struct jsStreamSource
/**
* Allows a source subject to be mapped to a destination subject for republishing.
*/
typedef struct jsSubjectMapping
typedef struct jsRePublish
{
const char *Source;
const char *Destination;
bool HeadersOnly;

} jsSubjectMapping;
} jsRePublish;

/**
* Configuration of a JetStream stream.
Expand Down Expand Up @@ -414,7 +415,7 @@ typedef struct jsSubjectMapping
* const char *subjects[] = {"foo", "bar"};
* const char *tags[] = {"tag1", "tag2"};
* jsStreamSource *sources[] = {&s1, &s2};
* jsSubjectMapping sm;
* jsRePublish rp;
*
* jsStreamConfig_Init(&sc);
*
Expand Down Expand Up @@ -456,8 +457,10 @@ typedef struct jsSubjectMapping
* sc.SourcesLen = 2;
*
* // For RePublish subject:
* jsSubjectMapping_Init(&sm, ">", "RP.>")
* sc.RePublish = &sm;
* jsRePublish_Init(&rp);
* rp.Source = ">";
* rp.Destination = "RP.>";
* sc.RePublish = &rp;
*
* s = js_AddStream(&si, js, &sc, NULL, &jerr);
* \endcode
Expand Down Expand Up @@ -494,7 +497,7 @@ typedef struct jsStreamConfig {
bool AllowRollup;

// Allow republish of the message after being sequenced and stored.
jsSubjectMapping *RePublish;
jsRePublish *RePublish;

// Allow higher performance, direct access to get individual messages. E.g. KeyValue
bool AllowDirect;
Expand Down Expand Up @@ -5294,16 +5297,14 @@ jsStreamSource_Init(jsStreamSource *source);
NATS_EXTERN natsStatus
jsExternalStream_Init(jsExternalStream *external);

/** \brief Initializes a subject mapping structure.
/** \brief Initializes a republish structure.
*
* Use this to set the source and destination for a subject mapping.
* Use this to set the source, destination and/or headers only for a stream re-publish.
*
* @param sm the pointer to the #jsSubjectMapping to initialize.
* @param src the string for the source.
* @param dst the string for the destination.
* @param rp the pointer to the #jsRePublish to initialize.
*/
NATS_EXTERN natsStatus
jsSubjectMapping_Init(jsSubjectMapping *sm, const char *src, const char *dst);
jsRePublish_Init(jsRePublish *rp);

/** \brief Creates a stream.
*
Expand Down
25 changes: 11 additions & 14 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21603,7 +21603,7 @@ test_JetStreamMarshalStreamConfig(void)
nats_JSON *json = NULL;
jsStreamConfig *rsc = NULL;
int64_t optStartTime = 1624583232123456000;
jsSubjectMapping sm;
jsRePublish rp;

test("init bad args: ");
s = jsStreamConfig_Init(NULL);
Expand Down Expand Up @@ -21681,22 +21681,18 @@ test_JetStreamMarshalStreamConfig(void)
sc.AllowDirect = true;
sc.MirrorDirect = true;

test("Subject mapping init err: ");
s = jsSubjectMapping_Init(NULL, "a", "b");
test("RePublish init err: ");
s = jsRePublish_Init(NULL);
testCond(s == NATS_INVALID_ARG);
nats_clearLastError();
s = NATS_OK;

test("Subject mapping init, NULL or empty are ok: ");
s = jsSubjectMapping_Init(&sm, "A", NULL);
IFOK(s, jsSubjectMapping_Init(&sm, "A", ""));
IFOK(s, jsSubjectMapping_Init(&sm, NULL, "B"));
IFOK(s, jsSubjectMapping_Init(&sm, "", "B"));
testCond(s == NATS_OK);

// Republish
jsSubjectMapping_Init(&sm, ">", "RP.>");
sc.RePublish = &sm;
jsRePublish_Init(&rp);
rp.Source = ">";
rp.Destination = "RP.>";
rp.HeadersOnly = true;
sc.RePublish = &rp;

test("Marshal stream config: ");
s = js_marshalStreamConfig(&buf, &sc);
Expand Down Expand Up @@ -21764,6 +21760,7 @@ test_JetStreamMarshalStreamConfig(void)
&& (strcmp(rsc->RePublish->Source, ">") == 0)
&& (rsc->RePublish->Destination != NULL)
&& (strcmp(rsc->RePublish->Destination, "RP.>") == 0)
&& rsc->RePublish->HeadersOnly
&& rsc->AllowDirect
&& rsc->MirrorDirect);
js_destroyStreamConfig(rsc);
Expand Down Expand Up @@ -26562,9 +26559,9 @@ test_JetStreamSubscribePull(void)

test("Ack: ");
for (i=0; (s == NATS_OK) && (i<list.Count); i++)
s = natsMsg_Ack(list.Msgs[i], NULL);
s = natsMsg_AckSync(list.Msgs[i], NULL, &jerr);
natsMsgList_Destroy(&list);
testCond(s == NATS_OK);
testCond((s == NATS_OK) && (jerr == 0));

test("Send a message: ");
s = js_Publish(NULL, js, "foo", "hello", 5, NULL, &jerr);
Expand Down