Skip to content

Commit

Permalink
Merge pull request #611 from nats-io/js_add_stream_alternates
Browse files Browse the repository at this point in the history
[ADDED] Stream alternates in `jsStreamInfo`
  • Loading branch information
kozlovic committed Nov 4, 2022
2 parents 435bac3 + 1e8a57c commit f32b86d
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 0 deletions.
57 changes: 57 additions & 0 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,18 @@ js_cleanStreamState(jsStreamState *state)
_destroyStreamStateSubjects(state->Subjects);
}

static void
_destroyStreamAlternate(jsStreamAlternate *sa)
{
if (sa == NULL)
return;

NATS_FREE((char*) sa->Name);
NATS_FREE((char*) sa->Domain);
NATS_FREE((char*) sa->Cluster);
NATS_FREE(sa);
}

void
jsStreamInfo_Destroy(jsStreamInfo *si)
{
Expand All @@ -226,6 +238,9 @@ jsStreamInfo_Destroy(jsStreamInfo *si)
for (i=0; i<si->SourcesLen; i++)
_destroyStreamSourceInfo(si->Sources[i]);
NATS_FREE(si->Sources);
for (i=0; i<si->AlternatesLen; i++)
_destroyStreamAlternate(si->Alternates[i]);
NATS_FREE(si->Alternates);
NATS_FREE(si);
}

Expand Down Expand Up @@ -955,12 +970,36 @@ _unmarshalStreamSourceInfo(nats_JSON *pjson, const char *fieldName, jsStreamSour
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStreamAlternate(nats_JSON *json, jsStreamAlternate **new_alt)
{
jsStreamAlternate *sa = NULL;
natsStatus s = NATS_OK;

sa = (jsStreamAlternate*) NATS_CALLOC(1, sizeof(jsStreamAlternate));
if (sa == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

s = nats_JSONGetStr(json, "name", (char**) &(sa->Name));
IFOK(s, nats_JSONGetStr(json, "domain", (char**) &(sa->Domain)));
IFOK(s, nats_JSONGetStr(json, "cluster", (char**) &(sa->Cluster)));

if (s == NATS_OK)
*new_alt = sa;
else
_destroyStreamAlternate(sa);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStreamInfoPaged(nats_JSON *json, jsStreamInfo **new_si, apiPaged *page)
{
jsStreamInfo *si = NULL;
nats_JSON **sources = NULL;
int sourcesLen = 0;
nats_JSON **alts = NULL;
int altsLen = 0;
natsStatus s;

si = (jsStreamInfo*) NATS_CALLOC(1, sizeof(jsStreamInfo));
Expand Down Expand Up @@ -991,6 +1030,24 @@ _unmarshalStreamInfoPaged(nats_JSON *json, jsStreamInfo **new_si, apiPaged *page
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(sources);
}
IFOK(s, nats_JSONGetArrayObject(json, "alternates", &alts, &altsLen));
if ((s == NATS_OK) && (alts != NULL))
{
int i;

si->Alternates = (jsStreamAlternate**) NATS_CALLOC(altsLen, sizeof(jsStreamAlternate*));
if (si->Alternates == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);

for (i=0; (s == NATS_OK) && (i<altsLen); i++)
{
s = _unmarshalStreamAlternate(alts[i], &(si->Alternates[i]));
if (s == NATS_OK)
si->AlternatesLen++;
}
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(alts);
}
if ((s == NATS_OK) && (page != NULL))
{
IFOK(s, nats_JSONGetLong(json, "total", &page->total));
Expand Down
13 changes: 13 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,17 @@ typedef struct jsStreamSourceInfo

} jsStreamSourceInfo;

/**
* Information about an alternate stream represented by a mirror.
*/
typedef struct jsStreamAlternate
{
const char *Name;
const char *Domain;
const char *Cluster;

} jsStreamAlternate;

/**
* Configuration and current state for this stream.
*
Expand All @@ -650,6 +661,8 @@ typedef struct jsStreamInfo
jsStreamSourceInfo *Mirror;
jsStreamSourceInfo **Sources;
int SourcesLen;
jsStreamAlternate **Alternates;
int AlternatesLen;

} jsStreamInfo;

Expand Down
7 changes: 7 additions & 0 deletions src/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,13 @@ typedef enum {
JSStreamMoveInProgressErr = 10124, ///< Stream move already in progress
JSConsumerMaxRequestBatchExceededErr = 10125, ///< Consumer max request batch exceeds server limit
JSConsumerReplicasExceedsStreamErr = 10126, ///< Consumer config replica count exceeds parent stream
JSConsumerNameContainsPathSeparatorsErr = 10127, ///< Consumer name can not contain path separators
JSStreamNameContainsPathSeparatorsErr = 10128, ///< Stream name can not contain path separators
JSStreamMoveNotInProgressErr = 10129, ///< Stream move not in progress
JSStreamNameExistRestoreFailedErr = 10130, ///< Stream name already in use, cannot restore
JSConsumerCreateFilterSubjectMismatchErr = 10131, ///< Consumer create request did not match filtered subject from create subject
JSConsumerCreateDurableAndNameMismatchErr = 10132, ///< Consumer Durable and Name have to be equal if both are provided
JSReplicasCountCannotBeNegativeErr = 10133, ///< Replicas count cannot be negative

} jsErrCode;

Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ JetStreamDirectGetMsg
JetStreamNakWithDelay
JetStreamBackOffRedeliveries
JetStreamInfoWithSubjects
JetStreamInfoAlternates
KeyValueManager
KeyValueBasics
KeyValueWatch
Expand Down
139 changes: 139 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21732,8 +21732,11 @@ test_JetStreamUnmarshalStreamInfo(void)
"{\"cluster\":{\"name\":\"S1\",\"leader\":\"S2\",\"replicas\":[{\"name\":\"S1\",\"current\":true,\"offline\":false,\"active\":123,\"lag\":456},{\"name\":\"S1\",\"current\":false,\"offline\":true,\"active\":123,\"lag\":456}]}}",
"{\"mirror\":{\"name\":\"M\",\"lag\":123,\"active\":456}}",
"{\"mirror\":{\"name\":\"M\",\"external\":{\"api\":\"MyApi\",\"deliver\":\"deliver.prefix\"},\"lag\":123,\"active\":456}}",
"{\"sources\":[{\"name\":\"S1\",\"lag\":123,\"active\":456}]}",
"{\"sources\":[{\"name\":\"S1\",\"lag\":123,\"active\":456},{\"name\":\"S2\",\"lag\":123,\"active\":456}]}",
"{\"sources\":[{\"name\":\"S1\",\"external\":{\"api\":\"MyApi\",\"deliver\":\"deliver.prefix\"},\"lag\":123,\"active\":456},{\"name\":\"S2\",\"lag\":123,\"active\":456}]}",
"{\"alternates\":[{\"name\":\"S1\",\"domain\":\"domain\",\"cluster\":\"abc\"}]}",
"{\"alternates\":[{\"name\":\"S1\",\"domain\":\"domain\",\"cluster\":\"abc\"},{\"name\":\"S2\",\"domain\":\"domain\",\"cluster\":\"abc\"}]}",
};
const char *bad[] = {
"{\"config\":123}",
Expand Down Expand Up @@ -21781,6 +21784,10 @@ test_JetStreamUnmarshalStreamInfo(void)
"{\"sources\":[{\"name\":123}]}",
"{\"sources\":[{\"name\":\"S1\",\"external\":123}]}",
"{\"sources\":[{\"name\":\"S1\",\"external\":{\"deliver\":123}}]}",
"{\"alternates\":123}",
"{\"alternates\":[{\"name\":123}]}",
"{\"alternates\":[{\"name\":\"S1\",\"domain\":123}]}",
"{\"alternates\":[{\"name\":\"S1\",\"domain\":\"domain\",\"cluster\":123}]}",
};
int i;
char tmp[64];
Expand Down Expand Up @@ -29738,6 +29745,137 @@ test_JetStreamInfoWithSubjects(void)
JS_TEARDOWN;
}

static natsStatus
_checkJSClusterReady(const char *url)
{
natsStatus s = NATS_OK;
natsConnection *nc = NULL;
jsCtx *js = NULL;
jsErrCode jerr= 0;
int i;
jsOptions jo;

jsOptions_Init(&jo);
jo.Wait = 1000;

s = natsConnection_ConnectTo(&nc, url);
IFOK(s, natsConnection_JetStream(&js, nc, &jo));
for (i=0; (s == NATS_OK) && (i<10); i++)
{
jsStreamInfo *si = NULL;

s = js_GetStreamInfo(&si, js, "CHECK_CLUSTER", &jo, &jerr);
if (jerr == JSStreamNotFoundErr)
{
nats_clearLastError();
s = NATS_OK;
break;
}
if ((s != NATS_OK) && (i < 9))
{
s = NATS_OK;
nats_Sleep(500);
}
}
jsCtx_Destroy(js);
natsConnection_Destroy(nc);
return s;
}

static void
test_JetStreamInfoAlternates(void)
{
char datastore1[256] = {'\0'};
char datastore2[256] = {'\0'};
char datastore3[256] = {'\0'};
char cmdLine[1024] = {'\0'};
natsPid pid1 = NATS_INVALID_PID;
natsPid pid2 = NATS_INVALID_PID;
natsPid pid3 = NATS_INVALID_PID;
natsConnection *nc = NULL;
jsCtx *js = NULL;
jsStreamInfo *si = NULL;
jsStreamConfig sc;
jsStreamSource ss;
natsStatus s;

ENSURE_JS_VERSION(2, 9, 0);

test("Start cluster: ");
_makeUniqueDir(datastore1, sizeof(datastore1), "datastore_");
snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -cluster_name abc -server_name A -cluster nats://127.0.0.1:6222 -routes nats://127.0.0.1:6222,nats://127.0.0.1:6223,nats://127.0.0.1:6224 -p 4222", datastore1);
pid1 = _startServer("nats://127.0.0.1:4222", cmdLine, true);
CHECK_SERVER_STARTED(pid1);

_makeUniqueDir(datastore2, sizeof(datastore2), "datastore_");
snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -cluster_name abc -server_name B -cluster nats://127.0.0.1:6223 -routes nats://127.0.0.1:6222,nats://127.0.0.1:6223,nats://127.0.0.1:6224 -p 4223", datastore2);
pid2 = _startServer("nats://127.0.0.1:4223", cmdLine, true);
CHECK_SERVER_STARTED(pid1);

_makeUniqueDir(datastore3, sizeof(datastore3), "datastore_");
snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -cluster_name abc -server_name C -cluster nats://127.0.0.1:6224 -routes nats://127.0.0.1:6222,nats://127.0.0.1:6223,nats://127.0.0.1:6224 -p 4224", datastore3);
pid3 = _startServer("nats://127.0.0.1:4224", cmdLine, true);
CHECK_SERVER_STARTED(pid1);
testCond(true);

test("Check cluster: ");
s = _checkJSClusterReady("nats://127.0.0.1:4224");
testCond(s == NATS_OK);

test("Connect: ");
s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL);
testCond(s == NATS_OK);

test("Get context: ");
s = natsConnection_JetStream(&js, nc, NULL);
testCond(s == NATS_OK);

test("Create stream: ");
jsStreamConfig_Init(&sc);
sc.Name = "TEST";
sc.Subjects = (const char*[1]){"foo"};
sc.SubjectsLen = 1;
s = js_AddStream(NULL, js, &sc, NULL, NULL);
testCond(s == NATS_OK);

test("Create mirror: ");
jsStreamConfig_Init(&sc);
sc.Name = "MIRROR";
jsStreamSource_Init(&ss);
ss.Name = "TEST";
sc.Mirror = &ss;
s = js_AddStream(NULL, js, &sc, NULL, NULL);
testCond(s == NATS_OK);

test("Check for alternate: ");
s = js_GetStreamInfo(&si, js, "TEST", NULL, NULL);
testCond((s == NATS_OK) && (si != NULL) && (si->AlternatesLen == 2));

test("Check alternate content: ");
if ((strcmp(si->Alternates[0]->Cluster, "abc") != 0)
|| (strcmp(si->Alternates[1]->Cluster, "abc") != 0))
{
s = NATS_ERR;
}
else if (((strcmp(si->Alternates[0]->Name, "TEST") == 0) && (strcmp(si->Alternates[1]->Name, "MIRROR") != 0))
|| ((strcmp(si->Alternates[0]->Name, "MIRROR") == 0) && (strcmp(si->Alternates[1]->Name, "TEST") != 0)))
{
s = NATS_ERR;
}
testCond(s == NATS_OK);
jsStreamInfo_Destroy(si);

jsCtx_Destroy(js);
natsConnection_Destroy(nc);

_stopServer(pid3);
_stopServer(pid2);
_stopServer(pid1);
rmtree(datastore1);
rmtree(datastore2);
rmtree(datastore3);
}

static void
test_KeyValueManager(void)
{
Expand Down Expand Up @@ -34077,6 +34215,7 @@ static testInfo allTests[] =
{"JetStreamNakWithDelay", test_JetStreamNakWithDelay},
{"JetStreamBackOffRedeliveries", test_JetStreamBackOffRedeliveries},
{"JetStreamInfoWithSubjects", test_JetStreamInfoWithSubjects},
{"JetStreamInfoAlternates", test_JetStreamInfoAlternates},

{"KeyValueManager", test_KeyValueManager},
{"KeyValueBasics", test_KeyValueBasics},
Expand Down

0 comments on commit f32b86d

Please sign in to comment.