diff --git a/src/util.c b/src/util.c index 8fb114be5..c56ca5da5 100644 --- a/src/util.c +++ b/src/util.c @@ -455,63 +455,101 @@ _jsonTrimSpace(char *ptr) return ptr; } +static natsStatus +_decodeUni(char **iPtr, char *val) +{ + int res = 0; + char *i = *iPtr; + int j; + + if (strlen(i) < 5) + return NATS_ERR; + + i++; + for (j=0; j<4; j++) + { + char c = i[j]; + if ((c >= '0') && (c <= '9')) + c = c - '0'; + else if ((c >= 'a') && (c <= 'f')) + c = c - 'a' + 10; + else if ((c >= 'A') && (c <= 'F')) + c = c - 'A' + 10; + else + return NATS_ERR; + + res = (res << 4) + c; + } + *val = (char) res; + *iPtr += 5; + + return NATS_OK; +} + static natsStatus _jsonGetStr(char **ptr, char **value) { char *p = *ptr; + char *o = *ptr; while ((*p != '\0') && (*p != '"')) { - if ((*p == '\\') && (*(p + 1) != '\0')) + if (*p != '\\') { + if (o != p) + *o = *p; + o++; p++; - // based on what http://www.json.org/ says a string should be - switch (*p) + continue; + } + p++; + // Escaped character here... + if (*p == '\0') + { + *o = '\0'; + return nats_setError(NATS_ERR, + "error parsing string '%s': invalid control character at the end", + o); + } + // based on what http://www.json.org/ says a string should be + switch (*p) + { + case 'b': *o++ = '\b'; break; + case 'f': *o++ = '\f'; break; + case 'n': *o++ = '\n'; break; + case 'r': *o++ = '\r'; break; + case 't': *o++ = '\t'; break; + case '"': + case '\\': + case '/': + *o++ = *p; + break; + case 'u': { - case '"': - case '\\': - case '/': - case 'b': - case 'n': - case 'r': - case 't': - break; - case 'u': + char val = 0; + natsStatus s = _decodeUni(&p, &val); + if (s != NATS_OK) { - int i; - - // Needs to be 4 hex. A hex is a digit or AF, af - p++; - for (i=0; i<4; i++) - { - // digit range - if (isxdigit(*p)) - { - p++; - } - else - { - return nats_setError(NATS_ERR, - "error parsing string '%s': invalid unicode character", - p); - } - } - p--; - break; - } - default: return nats_setError(NATS_ERR, - "error parsing string '%s': invalid control character", + "error parsing string '%s': invalid unicode character", p); + } + *o++ = val; + p--; + break; } + default: + return nats_setError(NATS_ERR, + "error parsing string '%s': invalid control character", + p); } p++; } if (*p != '\0') { + *o = '\0'; *value = *ptr; - *p = '\0'; *ptr = (char*) (p + 1); return NATS_OK; } diff --git a/test/list.txt b/test/list.txt index 55748860a..95ee09f73 100644 --- a/test/list.txt +++ b/test/list.txt @@ -226,6 +226,7 @@ JetStreamSubscribeHeadersOnly JetStreamOrderedCons JetStreamOrderedConsWithErrors JetStreamOrderedConsAutoUnsub +JetStreamSubscribeWithFWC JetStreamStreamsSealAndRollup JetStreamGetMsgAndLastMsg KeyValueManager diff --git a/test/test.c b/test/test.c index b01df0648..46bd8034f 100644 --- a/test/test.c +++ b/test/test.c @@ -3153,13 +3153,26 @@ test_natsJSON(void) strVal = NULL; test("Single field, string with escape chars: "); - s = nats_JSONParse(&json, "{\"test\":\"escapes: \\\" \\\\ \\/ \\b \\n \\r \\t\"}", -1); + s = nats_JSONParse(&json, "{\"test\":\"\\\"\\\\\\/\\b\\f\\n\\r\\t\"}", -1); IFOK(s, nats_JSONGetStr(json, "test", &strVal)); testCond((s == NATS_OK) && (json != NULL) && (json->fields != NULL) && (json->fields->used == 1) - && (strcmp(strVal, "escapes: \\\" \\\\ \\/ \\b \\n \\r \\t") == 0)); + && (strcmp(strVal, "\"\\/\b\f\n\r\t") == 0)); + nats_JSONDestroy(json); + json = NULL; + free(strVal); + strVal = NULL; + + test("Single field, string with unicode: "); + s = nats_JSONParse(&json, "{\"test\":\"\\u0026\\u003c\\u003e\"}", -1); + IFOK(s, nats_JSONGetStr(json, "test", &strVal)); + testCond((s == NATS_OK) + && (json != NULL) + && (json->fields != NULL) + && (json->fields->used == 1) + && (strcmp(strVal, "&<>") == 0)); nats_JSONDestroy(json); json = NULL; free(strVal); @@ -5906,7 +5919,7 @@ _recvTestString(natsConnection *nc, natsSubscription *sub, natsMsg *msg, if (doSignal) { arg->msgReceived = true; - natsCondition_Signal(arg->c); + natsCondition_Broadcast(arg->c); } natsMutex_Unlock(arg->m); } @@ -9248,7 +9261,7 @@ test_RetryOnFailedConnect(void) test("Retried: ") #ifdef _WIN32 - testCond((((end-start) >= 1000) && ((end-start) <= 2600))); + testCond((((end-start) >= 1000) && ((end-start) <= 2800))); #else testCond((((end-start) >= 300) && ((end-start) <= 1500))); #endif @@ -9315,7 +9328,7 @@ test_RetryOnFailedConnect(void) test("Message received: "); natsMutex_Lock(arg.m); while ((s != NATS_TIMEOUT) && !arg.msgReceived) - s = natsCondition_TimedWait(arg.c, arg.m, 2000); + s = natsCondition_TimedWait(arg.c, arg.m, 5000); natsMutex_Unlock(arg.m); testCond(s == NATS_OK); @@ -15831,7 +15844,7 @@ test_CustomReconnectDelay(void) #if _WIN32 testCond((s == NATS_OK) && (dur <= 1000)); #else - testCond((s == NATS_OK) && (dur <= 500)); + testCond((s == NATS_OK) && (dur <= 600)); #endif natsConnection_Destroy(nc); @@ -22222,6 +22235,18 @@ test_JetStreamMgtStreams(void) free(desc); jsCtx_Destroy(js2); natsSubscription_Destroy(sub); + + test("Create stream with wilcards: "); + jsStreamConfig_Init(&cfg); + cfg.Name = "STREAM_WITH_WC"; + cfg.Subjects = (const char*[2]){"foo.>", "bar.*"}; + cfg.SubjectsLen = 2; + s = js_AddStream(&si, js, &cfg, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0) & (si != NULL) && (si->Config != NULL) + && (si->Config->SubjectsLen == 2) + && (strcmp(si->Config->Subjects[0], "foo.>") == 0) + && (strcmp(si->Config->Subjects[1], "bar.*") == 0)); + jsStreamInfo_Destroy(si); JS_TEARDOWN; } @@ -22449,7 +22474,7 @@ test_JetStreamMgtConsumers(void) test("Create stream: "); jsStreamConfig_Init(&scfg); scfg.Name = "MY_STREAM"; - scfg.Subjects = (const char*[1]){"bar"}; + scfg.Subjects = (const char*[1]){"bar.>"}; scfg.SubjectsLen = 1; s = js_AddStream(NULL, js, &scfg, NULL, &jerr); testCond((s == NATS_OK) && (jerr == 0)); @@ -22458,7 +22483,7 @@ test_JetStreamMgtConsumers(void) jsConsumerConfig_Init(&cfg); cfg.Durable = "dur"; cfg.DeliverSubject = "foo"; - cfg.FilterSubject = "bar"; + cfg.FilterSubject = "bar.>"; s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr); testCond((s == NATS_OK) && (jerr == 0) && (ci != NULL) && (strcmp(ci->Stream, "MY_STREAM") == 0) @@ -22467,7 +22492,7 @@ test_JetStreamMgtConsumers(void) ci = NULL; test("Publish: "); - s = natsConnection_Publish(nc, "bar", "hello", 5); + s = natsConnection_Publish(nc, "bar.baz", "hello", 5); testCond(s == NATS_OK); test("Get consumer info (bad args): "); @@ -22497,7 +22522,10 @@ test_JetStreamMgtConsumers(void) s = js_GetConsumerInfo(&ci, js, "MY_STREAM", "dur", NULL, &jerr); testCond((s == NATS_OK) && (jerr == 0) && (ci != NULL) && (strcmp(ci->Stream, "MY_STREAM") == 0) - && (strcmp(ci->Name, "dur") == 0)); + && (strcmp(ci->Name, "dur") == 0) + && (ci->Config != NULL) + && (ci->Config->FilterSubject != NULL) + && (strcmp(ci->Config->FilterSubject, "bar.>") == 0)); jsConsumerInfo_Destroy(ci); ci = NULL; @@ -25551,12 +25579,6 @@ test_JetStreamSubscribePull(void) natsMsgList_Destroy(&list); testCond(s == NATS_OK); - test("Fetch with batch > MaxAckPending: "); - s = natsSubscription_Fetch(&list, sub, ((int)so.Config.MaxAckPending)+10, 1000, &jerr); - testCond((s == NATS_ERR) && (list.Msgs == NULL) && (list.Count == 0) && (jerr == 0) - && (strstr(nats_GetLastError(NULL), "MaxAckPending") != NULL)); - nats_clearLastError(); - test("Receive msg with header no data: "); s = natsMsg_create(&msg, sub->subject, (int) strlen(sub->subject), NULL, 0, "NATS/1.0\r\nk:v\r\n\r\n", 17, 17); @@ -26348,6 +26370,44 @@ test_JetStreamOrderedConsumerWithAutoUnsub(void) JS_TEARDOWN; } +static void +test_JetStreamSubscribeWithFWC(void) +{ + natsStatus s; + natsSubscription *sub= NULL; + jsErrCode jerr= 0; + jsStreamConfig sc; + jsConsumerConfig cc; + jsSubOptions so; + + JS_SETUP(2, 3, 5); + + test("Create stream: "); + jsStreamConfig_Init(&sc); + sc.Name = "TEST_WC"; + sc.Subjects = (const char*[1]){"fwc.>"}; + sc.SubjectsLen = 1; + s = js_AddStream(NULL, js, &sc, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Create consumer: "); + jsConsumerConfig_Init(&cc); + cc.Durable = "dur"; + cc.DeliverSubject = "bar"; + cc.FilterSubject = "fwc.>"; + s = js_AddConsumer(NULL, js, "TEST_WC", &cc, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Subscribe: "); + jsSubOptions_Init(&so); + so.Config.Durable = "dur"; + s = js_SubscribeSync(&sub, js, "fwc.>", NULL, &so, &jerr); + testCond((s == NATS_OK) && (sub != NULL)); + natsSubscription_Destroy(sub); + + JS_TEARDOWN; +} + static void test_JetStreamStreamsSealAndRollup(void) { @@ -29979,6 +30039,7 @@ static testInfo allTests[] = {"JetStreamOrderedCons", test_JetStreamOrderedConsumer}, {"JetStreamOrderedConsWithErrors", test_JetStreamOrderedConsumerWithErrors}, {"JetStreamOrderedConsAutoUnsub", test_JetStreamOrderedConsumerWithAutoUnsub}, + {"JetStreamSubscribeWithFWC", test_JetStreamSubscribeWithFWC}, {"JetStreamStreamsSealAndRollup", test_JetStreamStreamsSealAndRollup}, {"JetStreamGetMsgAndLastMsg", test_JetStreamGetMsgAndLastMsg},