Skip to content

Commit

Permalink
Merge pull request #563 from nats-io/js_republish_changes
Browse files Browse the repository at this point in the history
Changed RePublish from jsSubjectMapping to jsRePublish + HeadersOnly
  • Loading branch information
kozlovic committed Jul 22, 2022
2 parents 418dd00 + 7f14956 commit 8f6ac65
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 50 deletions.
50 changes: 26 additions & 24 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ _destroyStreamSource(jsStreamSource *source)
}

static void
_destroySubjectMapping(jsSubjectMapping *sm)
_destroyRePublish(jsRePublish *rp)
{
if (sm == NULL)
if (rp == NULL)
return;

NATS_FREE((char*) sm->Source);
NATS_FREE((char*) sm->Destination);
NATS_FREE(sm);
NATS_FREE((char*) rp->Source);
NATS_FREE((char*) rp->Destination);
NATS_FREE(rp);
}

void
Expand All @@ -127,7 +127,7 @@ js_destroyStreamConfig(jsStreamConfig *cfg)
for (i=0; i<cfg->SourcesLen; i++)
_destroyStreamSource(cfg->Sources[i]);
NATS_FREE(cfg->Sources);
_destroySubjectMapping(cfg->RePublish);
_destroyRePublish(cfg->RePublish);
NATS_FREE(cfg);
}

Expand Down Expand Up @@ -510,27 +510,28 @@ _marshalStorageType(jsStorageType storage, natsBuffer *buf)
}

static natsStatus
_unmarshalRePublish(nats_JSON *json, const char *fieldName, jsSubjectMapping **new_mapping)
_unmarshalRePublish(nats_JSON *json, const char *fieldName, jsRePublish **new_republish)
{
jsSubjectMapping *sm = NULL;
jsRePublish *rp = NULL;
nats_JSON *jsm = NULL;
natsStatus s;

s = nats_JSONGetObject(json, fieldName, &jsm);
if (jsm == NULL)
return NATS_UPDATE_ERR_STACK(s);

sm = (jsSubjectMapping*) NATS_CALLOC(1, sizeof(jsSubjectMapping));
if (sm == NULL)
rp = (jsRePublish*) NATS_CALLOC(1, sizeof(jsRePublish));
if (rp == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

s = nats_JSONGetStr(jsm, "src", (char**) &(sm->Source));
IFOK(s, nats_JSONGetStr(jsm, "dest", (char**) &(sm->Destination)));
s = nats_JSONGetStr(jsm, "src", (char**) &(rp->Source));
IFOK(s, nats_JSONGetStr(jsm, "dest", (char**) &(rp->Destination)));
IFOK(s, nats_JSONGetBool(jsm, "headers_only", &(rp->HeadersOnly)));

if (s == NATS_OK)
*new_mapping = sm;
*new_republish = rp;
else
_destroySubjectMapping(sm);
_destroyRePublish(rp);

return NATS_UPDATE_ERR_STACK(s);

Expand Down Expand Up @@ -702,20 +703,22 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg)
IFOK(s, natsBuf_Append(buf, ",\"deny_purge\":true", -1));
if ((s == NATS_OK) && cfg->AllowRollup)
IFOK(s, natsBuf_Append(buf, ",\"allow_rollup_hdrs\":true", -1));
if ((s == NATS_OK) && (cfg->RePublish != NULL))
if ((s == NATS_OK) && (cfg->RePublish != NULL) && !nats_IsStringEmpty(cfg->RePublish->Destination))
{
// "dest" is not omitempty, in that the field will always be present.
IFOK(s, natsBuf_Append(buf, ",\"republish\":{\"dest\":\"", -1));
// Still check that our value is not NULL
if (!nats_IsStringEmpty(cfg->RePublish->Destination))
IFOK(s, natsBuf_Append(buf, cfg->RePublish->Destination, -1));
IFOK(s, natsBuf_Append(buf, cfg->RePublish->Destination, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
// Now the source...
if (!nats_IsStringEmpty(cfg->RePublish->Source))
{
IFOK(s, natsBuf_Append(buf, "\",\"src\":\"", -1))
IFOK(s, natsBuf_Append(buf, ",\"src\":\"", -1))
IFOK(s, natsBuf_Append(buf, cfg->RePublish->Source, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}
IFOK(s, natsBuf_Append(buf, "\"}", -1));
if (cfg->RePublish->HeadersOnly)
IFOK(s, natsBuf_Append(buf, ",\"headers_only\":true", -1));
IFOK(s, natsBuf_AppendByte(buf, '}'));
}
if ((s == NATS_OK) && cfg->AllowDirect)
IFOK(s, natsBuf_Append(buf, ",\"allow_direct\":true", -1));
Expand Down Expand Up @@ -1881,13 +1884,12 @@ jsExternalStream_Init(jsExternalStream *external)
}

natsStatus
jsSubjectMapping_Init(jsSubjectMapping *sm, const char *src, const char *dst)
jsRePublish_Init(jsRePublish *rp)
{
if (sm == NULL)
if (rp == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

sm->Source = src;
sm->Destination = dst;
memset(rp, 0, sizeof(jsRePublish));
return NATS_OK;
}

Expand Down
25 changes: 13 additions & 12 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,13 @@ typedef struct jsStreamSource
/**
* Allows a source subject to be mapped to a destination subject for republishing.
*/
typedef struct jsSubjectMapping
typedef struct jsRePublish
{
const char *Source;
const char *Destination;
bool HeadersOnly;

} jsSubjectMapping;
} jsRePublish;

/**
* Configuration of a JetStream stream.
Expand Down Expand Up @@ -414,7 +415,7 @@ typedef struct jsSubjectMapping
* const char *subjects[] = {"foo", "bar"};
* const char *tags[] = {"tag1", "tag2"};
* jsStreamSource *sources[] = {&s1, &s2};
* jsSubjectMapping sm;
* jsRePublish rp;
*
* jsStreamConfig_Init(&sc);
*
Expand Down Expand Up @@ -456,8 +457,10 @@ typedef struct jsSubjectMapping
* sc.SourcesLen = 2;
*
* // For RePublish subject:
* jsSubjectMapping_Init(&sm, ">", "RP.>")
* sc.RePublish = &sm;
* jsRePublish_Init(&rp);
* rp.Source = ">";
* rp.Destination = "RP.>";
* sc.RePublish = &rp;
*
* s = js_AddStream(&si, js, &sc, NULL, &jerr);
* \endcode
Expand Down Expand Up @@ -494,7 +497,7 @@ typedef struct jsStreamConfig {
bool AllowRollup;

// Allow republish of the message after being sequenced and stored.
jsSubjectMapping *RePublish;
jsRePublish *RePublish;

// Allow higher performance, direct access to get individual messages. E.g. KeyValue
bool AllowDirect;
Expand Down Expand Up @@ -5294,16 +5297,14 @@ jsStreamSource_Init(jsStreamSource *source);
NATS_EXTERN natsStatus
jsExternalStream_Init(jsExternalStream *external);

/** \brief Initializes a subject mapping structure.
/** \brief Initializes a republish structure.
*
* Use this to set the source and destination for a subject mapping.
* Use this to set the source, destination and/or headers only for a stream re-publish.
*
* @param sm the pointer to the #jsSubjectMapping to initialize.
* @param src the string for the source.
* @param dst the string for the destination.
* @param rp the pointer to the #jsRePublish to initialize.
*/
NATS_EXTERN natsStatus
jsSubjectMapping_Init(jsSubjectMapping *sm, const char *src, const char *dst);
jsRePublish_Init(jsRePublish *rp);

/** \brief Creates a stream.
*
Expand Down
25 changes: 11 additions & 14 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21603,7 +21603,7 @@ test_JetStreamMarshalStreamConfig(void)
nats_JSON *json = NULL;
jsStreamConfig *rsc = NULL;
int64_t optStartTime = 1624583232123456000;
jsSubjectMapping sm;
jsRePublish rp;

test("init bad args: ");
s = jsStreamConfig_Init(NULL);
Expand Down Expand Up @@ -21681,22 +21681,18 @@ test_JetStreamMarshalStreamConfig(void)
sc.AllowDirect = true;
sc.MirrorDirect = true;

test("Subject mapping init err: ");
s = jsSubjectMapping_Init(NULL, "a", "b");
test("RePublish init err: ");
s = jsRePublish_Init(NULL);
testCond(s == NATS_INVALID_ARG);
nats_clearLastError();
s = NATS_OK;

test("Subject mapping init, NULL or empty are ok: ");
s = jsSubjectMapping_Init(&sm, "A", NULL);
IFOK(s, jsSubjectMapping_Init(&sm, "A", ""));
IFOK(s, jsSubjectMapping_Init(&sm, NULL, "B"));
IFOK(s, jsSubjectMapping_Init(&sm, "", "B"));
testCond(s == NATS_OK);

// Republish
jsSubjectMapping_Init(&sm, ">", "RP.>");
sc.RePublish = &sm;
jsRePublish_Init(&rp);
rp.Source = ">";
rp.Destination = "RP.>";
rp.HeadersOnly = true;
sc.RePublish = &rp;

test("Marshal stream config: ");
s = js_marshalStreamConfig(&buf, &sc);
Expand Down Expand Up @@ -21764,6 +21760,7 @@ test_JetStreamMarshalStreamConfig(void)
&& (strcmp(rsc->RePublish->Source, ">") == 0)
&& (rsc->RePublish->Destination != NULL)
&& (strcmp(rsc->RePublish->Destination, "RP.>") == 0)
&& rsc->RePublish->HeadersOnly
&& rsc->AllowDirect
&& rsc->MirrorDirect);
js_destroyStreamConfig(rsc);
Expand Down Expand Up @@ -26562,9 +26559,9 @@ test_JetStreamSubscribePull(void)

test("Ack: ");
for (i=0; (s == NATS_OK) && (i<list.Count); i++)
s = natsMsg_Ack(list.Msgs[i], NULL);
s = natsMsg_AckSync(list.Msgs[i], NULL, &jerr);
natsMsgList_Destroy(&list);
testCond(s == NATS_OK);
testCond((s == NATS_OK) && (jerr == 0));

test("Send a message: ");
s = js_Publish(NULL, js, "foo", "hello", 5, NULL, &jerr);
Expand Down

0 comments on commit 8f6ac65

Please sign in to comment.