Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Stream alternates in jsStreamInfo #611

Merged
merged 1 commit into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 57 additions & 0 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,18 @@ js_cleanStreamState(jsStreamState *state)
_destroyStreamStateSubjects(state->Subjects);
}

static void
_destroyStreamAlternate(jsStreamAlternate *sa)
{
if (sa == NULL)
return;

NATS_FREE((char*) sa->Name);
NATS_FREE((char*) sa->Domain);
NATS_FREE((char*) sa->Cluster);
NATS_FREE(sa);
}

void
jsStreamInfo_Destroy(jsStreamInfo *si)
{
Expand All @@ -226,6 +238,9 @@ jsStreamInfo_Destroy(jsStreamInfo *si)
for (i=0; i<si->SourcesLen; i++)
_destroyStreamSourceInfo(si->Sources[i]);
NATS_FREE(si->Sources);
for (i=0; i<si->AlternatesLen; i++)
_destroyStreamAlternate(si->Alternates[i]);
NATS_FREE(si->Alternates);
NATS_FREE(si);
}

Expand Down Expand Up @@ -955,12 +970,36 @@ _unmarshalStreamSourceInfo(nats_JSON *pjson, const char *fieldName, jsStreamSour
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStreamAlternate(nats_JSON *json, jsStreamAlternate **new_alt)
{
jsStreamAlternate *sa = NULL;
natsStatus s = NATS_OK;

sa = (jsStreamAlternate*) NATS_CALLOC(1, sizeof(jsStreamAlternate));
if (sa == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

s = nats_JSONGetStr(json, "name", (char**) &(sa->Name));
IFOK(s, nats_JSONGetStr(json, "domain", (char**) &(sa->Domain)));
IFOK(s, nats_JSONGetStr(json, "cluster", (char**) &(sa->Cluster)));

if (s == NATS_OK)
*new_alt = sa;
else
_destroyStreamAlternate(sa);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStreamInfoPaged(nats_JSON *json, jsStreamInfo **new_si, apiPaged *page)
{
jsStreamInfo *si = NULL;
nats_JSON **sources = NULL;
int sourcesLen = 0;
nats_JSON **alts = NULL;
int altsLen = 0;
natsStatus s;

si = (jsStreamInfo*) NATS_CALLOC(1, sizeof(jsStreamInfo));
Expand Down Expand Up @@ -991,6 +1030,24 @@ _unmarshalStreamInfoPaged(nats_JSON *json, jsStreamInfo **new_si, apiPaged *page
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(sources);
}
IFOK(s, nats_JSONGetArrayObject(json, "alternates", &alts, &altsLen));
if ((s == NATS_OK) && (alts != NULL))
{
int i;

si->Alternates = (jsStreamAlternate**) NATS_CALLOC(altsLen, sizeof(jsStreamAlternate*));
if (si->Alternates == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);

for (i=0; (s == NATS_OK) && (i<altsLen); i++)
{
s = _unmarshalStreamAlternate(alts[i], &(si->Alternates[i]));
if (s == NATS_OK)
si->AlternatesLen++;
}
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(alts);
}
if ((s == NATS_OK) && (page != NULL))
{
IFOK(s, nats_JSONGetLong(json, "total", &page->total));
Expand Down
13 changes: 13 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,17 @@ typedef struct jsStreamSourceInfo

} jsStreamSourceInfo;

/**
* Information about an alternate stream represented by a mirror.
*/
typedef struct jsStreamAlternate
{
const char *Name;
const char *Domain;
const char *Cluster;

} jsStreamAlternate;

/**
* Configuration and current state for this stream.
*
Expand All @@ -650,6 +661,8 @@ typedef struct jsStreamInfo
jsStreamSourceInfo *Mirror;
jsStreamSourceInfo **Sources;
int SourcesLen;
jsStreamAlternate **Alternates;
int AlternatesLen;

} jsStreamInfo;

Expand Down
7 changes: 7 additions & 0 deletions src/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,13 @@ typedef enum {
JSStreamMoveInProgressErr = 10124, ///< Stream move already in progress
JSConsumerMaxRequestBatchExceededErr = 10125, ///< Consumer max request batch exceeds server limit
JSConsumerReplicasExceedsStreamErr = 10126, ///< Consumer config replica count exceeds parent stream
JSConsumerNameContainsPathSeparatorsErr = 10127, ///< Consumer name can not contain path separators
JSStreamNameContainsPathSeparatorsErr = 10128, ///< Stream name can not contain path separators
JSStreamMoveNotInProgressErr = 10129, ///< Stream move not in progress
JSStreamNameExistRestoreFailedErr = 10130, ///< Stream name already in use, cannot restore
JSConsumerCreateFilterSubjectMismatchErr = 10131, ///< Consumer create request did not match filtered subject from create subject
JSConsumerCreateDurableAndNameMismatchErr = 10132, ///< Consumer Durable and Name have to be equal if both are provided
JSReplicasCountCannotBeNegativeErr = 10133, ///< Replicas count cannot be negative

} jsErrCode;

Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ JetStreamDirectGetMsg
JetStreamNakWithDelay
JetStreamBackOffRedeliveries
JetStreamInfoWithSubjects
JetStreamInfoAlternates
KeyValueManager
KeyValueBasics
KeyValueWatch
Expand Down
139 changes: 139 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21732,8 +21732,11 @@ test_JetStreamUnmarshalStreamInfo(void)
"{\"cluster\":{\"name\":\"S1\",\"leader\":\"S2\",\"replicas\":[{\"name\":\"S1\",\"current\":true,\"offline\":false,\"active\":123,\"lag\":456},{\"name\":\"S1\",\"current\":false,\"offline\":true,\"active\":123,\"lag\":456}]}}",
"{\"mirror\":{\"name\":\"M\",\"lag\":123,\"active\":456}}",
"{\"mirror\":{\"name\":\"M\",\"external\":{\"api\":\"MyApi\",\"deliver\":\"deliver.prefix\"},\"lag\":123,\"active\":456}}",
"{\"sources\":[{\"name\":\"S1\",\"lag\":123,\"active\":456}]}",
"{\"sources\":[{\"name\":\"S1\",\"lag\":123,\"active\":456},{\"name\":\"S2\",\"lag\":123,\"active\":456}]}",
"{\"sources\":[{\"name\":\"S1\",\"external\":{\"api\":\"MyApi\",\"deliver\":\"deliver.prefix\"},\"lag\":123,\"active\":456},{\"name\":\"S2\",\"lag\":123,\"active\":456}]}",
"{\"alternates\":[{\"name\":\"S1\",\"domain\":\"domain\",\"cluster\":\"abc\"}]}",
"{\"alternates\":[{\"name\":\"S1\",\"domain\":\"domain\",\"cluster\":\"abc\"},{\"name\":\"S2\",\"domain\":\"domain\",\"cluster\":\"abc\"}]}",
};
const char *bad[] = {
"{\"config\":123}",
Expand Down Expand Up @@ -21781,6 +21784,10 @@ test_JetStreamUnmarshalStreamInfo(void)
"{\"sources\":[{\"name\":123}]}",
"{\"sources\":[{\"name\":\"S1\",\"external\":123}]}",
"{\"sources\":[{\"name\":\"S1\",\"external\":{\"deliver\":123}}]}",
"{\"alternates\":123}",
"{\"alternates\":[{\"name\":123}]}",
"{\"alternates\":[{\"name\":\"S1\",\"domain\":123}]}",
"{\"alternates\":[{\"name\":\"S1\",\"domain\":\"domain\",\"cluster\":123}]}",
};
int i;
char tmp[64];
Expand Down Expand Up @@ -29738,6 +29745,137 @@ test_JetStreamInfoWithSubjects(void)
JS_TEARDOWN;
}

static natsStatus
_checkJSClusterReady(const char *url)
{
natsStatus s = NATS_OK;
natsConnection *nc = NULL;
jsCtx *js = NULL;
jsErrCode jerr= 0;
int i;
jsOptions jo;

jsOptions_Init(&jo);
jo.Wait = 1000;

s = natsConnection_ConnectTo(&nc, url);
IFOK(s, natsConnection_JetStream(&js, nc, &jo));
for (i=0; (s == NATS_OK) && (i<10); i++)
{
jsStreamInfo *si = NULL;

s = js_GetStreamInfo(&si, js, "CHECK_CLUSTER", &jo, &jerr);
if (jerr == JSStreamNotFoundErr)
{
nats_clearLastError();
s = NATS_OK;
break;
}
if ((s != NATS_OK) && (i < 9))
{
s = NATS_OK;
nats_Sleep(500);
}
}
jsCtx_Destroy(js);
natsConnection_Destroy(nc);
return s;
}

static void
test_JetStreamInfoAlternates(void)
{
char datastore1[256] = {'\0'};
char datastore2[256] = {'\0'};
char datastore3[256] = {'\0'};
char cmdLine[1024] = {'\0'};
natsPid pid1 = NATS_INVALID_PID;
natsPid pid2 = NATS_INVALID_PID;
natsPid pid3 = NATS_INVALID_PID;
natsConnection *nc = NULL;
jsCtx *js = NULL;
jsStreamInfo *si = NULL;
jsStreamConfig sc;
jsStreamSource ss;
natsStatus s;

ENSURE_JS_VERSION(2, 9, 0);

test("Start cluster: ");
_makeUniqueDir(datastore1, sizeof(datastore1), "datastore_");
snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -cluster_name abc -server_name A -cluster nats://127.0.0.1:6222 -routes nats://127.0.0.1:6222,nats://127.0.0.1:6223,nats://127.0.0.1:6224 -p 4222", datastore1);
pid1 = _startServer("nats://127.0.0.1:4222", cmdLine, true);
CHECK_SERVER_STARTED(pid1);

_makeUniqueDir(datastore2, sizeof(datastore2), "datastore_");
snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -cluster_name abc -server_name B -cluster nats://127.0.0.1:6223 -routes nats://127.0.0.1:6222,nats://127.0.0.1:6223,nats://127.0.0.1:6224 -p 4223", datastore2);
pid2 = _startServer("nats://127.0.0.1:4223", cmdLine, true);
CHECK_SERVER_STARTED(pid1);

_makeUniqueDir(datastore3, sizeof(datastore3), "datastore_");
snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -cluster_name abc -server_name C -cluster nats://127.0.0.1:6224 -routes nats://127.0.0.1:6222,nats://127.0.0.1:6223,nats://127.0.0.1:6224 -p 4224", datastore3);
pid3 = _startServer("nats://127.0.0.1:4224", cmdLine, true);
CHECK_SERVER_STARTED(pid1);
testCond(true);

test("Check cluster: ");
s = _checkJSClusterReady("nats://127.0.0.1:4224");
testCond(s == NATS_OK);

test("Connect: ");
s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL);
testCond(s == NATS_OK);

test("Get context: ");
s = natsConnection_JetStream(&js, nc, NULL);
testCond(s == NATS_OK);

test("Create stream: ");
jsStreamConfig_Init(&sc);
sc.Name = "TEST";
sc.Subjects = (const char*[1]){"foo"};
sc.SubjectsLen = 1;
s = js_AddStream(NULL, js, &sc, NULL, NULL);
testCond(s == NATS_OK);

test("Create mirror: ");
jsStreamConfig_Init(&sc);
sc.Name = "MIRROR";
jsStreamSource_Init(&ss);
ss.Name = "TEST";
sc.Mirror = &ss;
s = js_AddStream(NULL, js, &sc, NULL, NULL);
testCond(s == NATS_OK);

test("Check for alternate: ");
s = js_GetStreamInfo(&si, js, "TEST", NULL, NULL);
testCond((s == NATS_OK) && (si != NULL) && (si->AlternatesLen == 2));

test("Check alternate content: ");
if ((strcmp(si->Alternates[0]->Cluster, "abc") != 0)
|| (strcmp(si->Alternates[1]->Cluster, "abc") != 0))
{
s = NATS_ERR;
}
else if (((strcmp(si->Alternates[0]->Name, "TEST") == 0) && (strcmp(si->Alternates[1]->Name, "MIRROR") != 0))
|| ((strcmp(si->Alternates[0]->Name, "MIRROR") == 0) && (strcmp(si->Alternates[1]->Name, "TEST") != 0)))
{
s = NATS_ERR;
}
testCond(s == NATS_OK);
jsStreamInfo_Destroy(si);

jsCtx_Destroy(js);
natsConnection_Destroy(nc);

_stopServer(pid3);
_stopServer(pid2);
_stopServer(pid1);
rmtree(datastore1);
rmtree(datastore2);
rmtree(datastore3);
}

static void
test_KeyValueManager(void)
{
Expand Down Expand Up @@ -34077,6 +34215,7 @@ static testInfo allTests[] =
{"JetStreamNakWithDelay", test_JetStreamNakWithDelay},
{"JetStreamBackOffRedeliveries", test_JetStreamBackOffRedeliveries},
{"JetStreamInfoWithSubjects", test_JetStreamInfoWithSubjects},
{"JetStreamInfoAlternates", test_JetStreamInfoAlternates},

{"KeyValueManager", test_KeyValueManager},
{"KeyValueBasics", test_KeyValueBasics},
Expand Down