From 60983faa4faec17359e985a4bcad7d5f5731ae87 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 23 Jan 2023 10:26:16 -0700 Subject: [PATCH] [CHANGED] Default connection error handler. If no error handler is specified by the user, the library will use a default error handler that prints some error information to stderr. If the user did not specify an error handler but does not wish to have anything printed to stderr, the user should set a dummy error handler that does not do anything. Resolves #622 Signed-off-by: Ivan Kozlovic --- src/conn.c | 54 +++++++++++++++++++++++++++++++--------- src/conn.h | 3 +++ src/js.c | 71 +++++++++++++++++++---------------------------------- src/natsp.h | 1 + src/opts.c | 6 ++++- test/test.c | 18 +++++++++++--- 6 files changed, 91 insertions(+), 62 deletions(-) diff --git a/src/conn.c b/src/conn.c index ae43c302f..94ef3aa66 100644 --- a/src/conn.c +++ b/src/conn.c @@ -2813,9 +2813,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) natsConn_Lock(nc); nc->err = (sc ? NATS_SLOW_CONSUMER : NATS_MISMATCH); - - if (nc->opts->asyncErrCb != NULL) - natsAsyncCb_PostErrHandler(nc, sub, nc->err, NULL); + natsAsyncCb_PostErrHandler(nc, sub, nc->err, NULL); natsConn_Unlock(nc); } @@ -2837,8 +2835,7 @@ _processPermissionViolation(natsConnection *nc, char *error) natsConn_Lock(nc); nc->err = NATS_NOT_PERMITTED; snprintf(nc->errStr, sizeof(nc->errStr), "%s", error); - if (nc->opts->asyncErrCb != NULL) - natsAsyncCb_PostErrHandler(nc, NULL, nc->err, NATS_STRDUP(error)); + natsAsyncCb_PostErrHandler(nc, NULL, nc->err, NATS_STRDUP(error)); natsConn_Unlock(nc); } @@ -2852,7 +2849,7 @@ _processAuthError(natsConnection *nc, int errCode, char *error) nc->err = NATS_CONNECTION_AUTH_FAILED; snprintf(nc->errStr, sizeof(nc->errStr), "%s", error); - if (!nc->initc && nc->opts->asyncErrCb != NULL) + if (!nc->initc) natsAsyncCb_PostErrHandler(nc, NULL, nc->err, NATS_STRDUP(error)); if (nc->cur->lastAuthErrCode == errCode) @@ -3572,13 +3569,10 @@ natsConnection_Flush(natsConnection *nc) static void _pushDrainErr(natsConnection *nc, natsStatus s, const char *errTxt) { + char tmp[256]; natsConn_Lock(nc); - if (nc->opts->asyncErrCb != NULL) - { - char tmp[256]; - snprintf(tmp, sizeof(tmp), "Drain error: %s: %u (%s)", errTxt, s, natsStatus_GetText(s)); - natsAsyncCb_PostErrHandler(nc, NULL, s, NATS_STRDUP(tmp)); - } + snprintf(tmp, sizeof(tmp), "Drain error: %s: %u (%s)", errTxt, s, natsStatus_GetText(s)); + natsAsyncCb_PostErrHandler(nc, NULL, s, NATS_STRDUP(tmp)); natsConn_Unlock(nc); } @@ -4408,3 +4402,39 @@ natsConn_setFilterWithClosure(natsConnection *nc, natsMsgFilter f, void* closure nc->filterClosure = closure; natsMutex_Unlock(nc->subsMu); } + +void +natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure) +{ + uint64_t cid = 0; + const char *lastErr = NULL; + const char *errTxt = NULL; + + natsConn_Lock(nc); + cid = nc->info.CID; + natsConn_Unlock(nc); + + // Get possibly more detailed error message. If empty, we will print the default + // error status text. + natsConnection_GetLastError(nc, &lastErr); + errTxt = (nats_IsStringEmpty(lastErr) ? natsStatus_GetText(err) : lastErr); + // If there is a subscription, check if it is a JetStream one and if so, take + // the "public" subject (the one that was provided to the subscribe call). + if (sub != NULL) + { + char *subject = NULL; + + natsSub_Lock(sub); + if ((sub->jsi != NULL) && (sub->jsi->psubj != NULL)) + subject = sub->jsi->psubj; + else + subject = sub->subject; + fprintf(stderr, "Error %d - %s on connection [%" PRIu64 "] on \"%s\"\n", err, errTxt, cid, subject); + natsSub_Unlock(sub); + } + else + { + fprintf(stderr, "Error %d - %s on connection [%" PRIu64 "]\n", err, errTxt, cid); + } + fflush(stderr); +} diff --git a/src/conn.h b/src/conn.h index 001b0f4b0..41e93f402 100644 --- a/src/conn.h +++ b/src/conn.h @@ -151,6 +151,9 @@ natsConn_newInbox(natsConnection *nc, natsInbox **newInbox); bool natsConn_srvVersionAtLeast(natsConnection *nc, int major, int minor, int update); +void +natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure); + void natsConn_close(natsConnection *nc); diff --git a/src/js.c b/src/js.c index aa70768f9..0017d3d8a 100644 --- a/src/js.c +++ b/src/js.c @@ -1246,6 +1246,7 @@ jsSub_free(jsSub *jsi) NATS_FREE(jsi->nxtMsgSubj); NATS_FREE(jsi->cmeta); NATS_FREE(jsi->fcReply); + NATS_FREE(jsi->psubj); js_destroyConsumerConfig(jsi->ocCfg); NATS_FREE(jsi); @@ -1321,14 +1322,11 @@ jsSub_deleteConsumerAfterDrain(natsSubscription *sub) s = jsSub_deleteConsumer(sub); if (s != NATS_OK) { + char tmp[256]; natsConn_Lock(nc); - if (nc->opts->asyncErrCb != NULL) - { - char tmp[256]; - snprintf(tmp, sizeof(tmp), "failed to delete consumer '%s': %u (%s)", - consumer, s, natsStatus_GetText(s)); - natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp)); - } + snprintf(tmp, sizeof(tmp), "failed to delete consumer '%s': %u (%s)", + consumer, s, natsStatus_GetText(s)); + natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp)); natsConn_Unlock(nc); } @@ -2070,15 +2068,12 @@ _hbTimerFired(natsTimer *timer, void* closure) } natsConn_Lock(nc); - if (nc->opts->asyncErrCb != NULL) - { - // Even if we have called resetOrderedConsumer, we will post something - // to the async error callback, either "missed heartbeats", or the error - // that occurred trying to do the reset. - if (s == NATS_OK) - s = NATS_MISSED_HEARTBEAT; - natsAsyncCb_PostErrHandler(nc, sub, s, NULL); - } + // Even if we have called resetOrderedConsumer, we will post something + // to the async error callback, either "missed heartbeats", or the error + // that occurred trying to do the reset. + if (s == NATS_OK) + s = NATS_MISSED_HEARTBEAT; + natsAsyncCb_PostErrHandler(nc, sub, s, NULL); natsConn_Unlock(nc); } @@ -2531,6 +2526,8 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha s = nats_setDefaultError(NATS_NO_MEMORY); } IF_OK_DUP_STRING(s, jsi->stream, stream); + if ((s == NATS_OK) && !nats_IsStringEmpty(subject)) + DUP_STRING(s, jsi->psubj, subject); if (s == NATS_OK) { jsi->js = js; @@ -2572,26 +2569,12 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha opts->Queue, 0, cb, cbClosure, false, jsi)); if ((s == NATS_OK) && (hbi > 0) && !isPullMode) { - // We will create a timer if we use create an ordered consumer, or - // if the async error callback is registered. - bool ct = opts->Ordered; - - // Check to see if it is even worth creating a timer to check - // on missed heartbeats, since the way to notify the user will be - // through async callback. - natsConn_Lock(nc); - ct = ct || (nc->opts->asyncErrCb != NULL ? true : false); - natsConn_Unlock(nc); - - if (ct) - { - natsSub_Lock(sub); - sub->refs++; - s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _hbTimerStopped, hbi*2, (void*) sub); - if (s != NATS_OK) - sub->refs--; - natsSub_Unlock(sub); - } + natsSub_Lock(sub); + sub->refs++; + s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _hbTimerStopped, hbi*2, (void*) sub); + if (s != NATS_OK) + sub->refs--; + natsSub_Unlock(sub); } } if ((s == NATS_OK) && create) @@ -3061,17 +3044,13 @@ _recreateOrderedCons(void *closure) } if (s != NATS_OK) { + char tmp[256]; + const char *lastErr = nats_GetLastError(NULL); natsConn_Lock(nc); - if (nc->opts->asyncErrCb != NULL) - { - char tmp[256]; - const char *lastErr = nats_GetLastError(NULL); - - snprintf(tmp, sizeof(tmp), - "error recreating ordered consumer, will try again: status=%u error=%s", - s, (nats_IsStringEmpty(lastErr) ? natsStatus_GetText(s) : lastErr)); - natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp)); - } + snprintf(tmp, sizeof(tmp), + "error recreating ordered consumer, will try again: status=%u error=%s", + s, (nats_IsStringEmpty(lastErr) ? natsStatus_GetText(s) : lastErr)); + natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp)); natsConn_Unlock(nc); } diff --git a/src/natsp.h b/src/natsp.h index b524a6624..226e18d08 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -370,6 +370,7 @@ typedef struct __jsSub jsCtx *js; char *stream; char *consumer; + char *psubj; char *nxtMsgSubj; bool pull; bool inFetch; diff --git a/src/opts.c b/src/opts.c index af2d66d93..ec730fbb0 100644 --- a/src/opts.c +++ b/src/opts.c @@ -892,6 +892,9 @@ natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler, opts->asyncErrCb = errHandler; opts->asyncErrCbClosure = closure; + if (opts->asyncErrCb == NULL) + opts->asyncErrCb = natsConn_defaultErrHandler; + UNLOCK_OPTS(opts); return NATS_OK; @@ -1507,6 +1510,7 @@ natsOptions_Create(natsOptions **newOpts) opts->reconnectBufSize = NATS_OPTS_DEFAULT_RECONNECT_BUF_SIZE; opts->reconnectJitter = NATS_OPTS_DEFAULT_RECONNECT_JITTER; opts->reconnectJitterTLS = NATS_OPTS_DEFAULT_RECONNECT_JITTER_TLS; + opts->asyncErrCb = natsConn_defaultErrHandler; *newOpts = opts; @@ -1586,7 +1590,7 @@ natsOptions_clone(natsOptions *opts) s = natsOptions_SetUserCredentialsFromMemory(cloned, opts->userCreds->jwtAndSeedContent); } - else + else { s = natsOptions_SetUserCredentialsFromFiles(cloned, opts->userCreds->userOrChainedFile, diff --git a/test/test.c b/test/test.c index b5aab9606..7b1dc274e 100644 --- a/test/test.c +++ b/test/test.c @@ -2814,7 +2814,7 @@ test_natsOptions(void) test("Remove Error Handler: "); s = natsOptions_SetErrorHandler(opts, NULL, NULL); - testCond((s == NATS_OK) && (opts->asyncErrCb == NULL)); + testCond((s == NATS_OK) && (opts->asyncErrCb == natsConn_defaultErrHandler)); test("Set ClosedCB: "); s = natsOptions_SetClosedCB(opts, _dummyConnHandler, NULL); @@ -3091,6 +3091,18 @@ test_natsOptions(void) && (strcmp(cloned->url, "url") == 0)); natsOptions_Destroy(cloned); + opts = NULL; + cloned = NULL; + + test("If original has default err handler, cloned has it too: "); + s = natsOptions_Create(&opts); + if (s == NATS_OK) + cloned = natsOptions_clone(opts); + testCond((s == NATS_OK) && (cloned != NULL) + && (cloned->asyncErrCb == natsConn_defaultErrHandler) + && (cloned->asyncErrCbClosure == NULL)); + natsOptions_Destroy(cloned); + natsOptions_Destroy(opts); } static void @@ -19047,7 +19059,7 @@ test_UserCredsFromMemory(void) s = natsOptions_SetUserCredentialsFromMemory(opts, "invalidCreds"); IFOK(s, natsConnection_Connect(&nc, opts)); testCond(s == NATS_NOT_FOUND); - + // Use a file that contains no seed test("jwtAndSeed string has no seed: "); s = natsOptions_SetUserCredentialsFromMemory(opts, jwtWithoutSeed); @@ -19110,7 +19122,7 @@ test_UserCredsFromMemory(void) t = NULL; _destroyDefaultThreadArgs(&arg); - + natsOptions_Destroy(opts); }