Skip to content

Commit

Permalink
[CHANGED] Default connection error handler.
Browse files Browse the repository at this point in the history
If no error handler is specified by the user, the library will
use a default error handler that prints some error information
to stderr.

If the user did not specify an error handler but does not wish
to have anything printed to stderr, the user should set a dummy
error handler that does not do anything.

Resolves #622

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jan 23, 2023
1 parent e04189d commit 60983fa
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 62 deletions.
54 changes: 42 additions & 12 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2813,9 +2813,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
natsConn_Lock(nc);

nc->err = (sc ? NATS_SLOW_CONSUMER : NATS_MISMATCH);

if (nc->opts->asyncErrCb != NULL)
natsAsyncCb_PostErrHandler(nc, sub, nc->err, NULL);
natsAsyncCb_PostErrHandler(nc, sub, nc->err, NULL);

natsConn_Unlock(nc);
}
Expand All @@ -2837,8 +2835,7 @@ _processPermissionViolation(natsConnection *nc, char *error)
natsConn_Lock(nc);
nc->err = NATS_NOT_PERMITTED;
snprintf(nc->errStr, sizeof(nc->errStr), "%s", error);
if (nc->opts->asyncErrCb != NULL)
natsAsyncCb_PostErrHandler(nc, NULL, nc->err, NATS_STRDUP(error));
natsAsyncCb_PostErrHandler(nc, NULL, nc->err, NATS_STRDUP(error));
natsConn_Unlock(nc);
}

Expand All @@ -2852,7 +2849,7 @@ _processAuthError(natsConnection *nc, int errCode, char *error)
nc->err = NATS_CONNECTION_AUTH_FAILED;
snprintf(nc->errStr, sizeof(nc->errStr), "%s", error);

if (!nc->initc && nc->opts->asyncErrCb != NULL)
if (!nc->initc)
natsAsyncCb_PostErrHandler(nc, NULL, nc->err, NATS_STRDUP(error));

if (nc->cur->lastAuthErrCode == errCode)
Expand Down Expand Up @@ -3572,13 +3569,10 @@ natsConnection_Flush(natsConnection *nc)
static void
_pushDrainErr(natsConnection *nc, natsStatus s, const char *errTxt)
{
char tmp[256];
natsConn_Lock(nc);
if (nc->opts->asyncErrCb != NULL)
{
char tmp[256];
snprintf(tmp, sizeof(tmp), "Drain error: %s: %u (%s)", errTxt, s, natsStatus_GetText(s));
natsAsyncCb_PostErrHandler(nc, NULL, s, NATS_STRDUP(tmp));
}
snprintf(tmp, sizeof(tmp), "Drain error: %s: %u (%s)", errTxt, s, natsStatus_GetText(s));
natsAsyncCb_PostErrHandler(nc, NULL, s, NATS_STRDUP(tmp));
natsConn_Unlock(nc);
}

Expand Down Expand Up @@ -4408,3 +4402,39 @@ natsConn_setFilterWithClosure(natsConnection *nc, natsMsgFilter f, void* closure
nc->filterClosure = closure;
natsMutex_Unlock(nc->subsMu);
}

void
natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)
{
uint64_t cid = 0;
const char *lastErr = NULL;
const char *errTxt = NULL;

natsConn_Lock(nc);
cid = nc->info.CID;
natsConn_Unlock(nc);

// Get possibly more detailed error message. If empty, we will print the default
// error status text.
natsConnection_GetLastError(nc, &lastErr);
errTxt = (nats_IsStringEmpty(lastErr) ? natsStatus_GetText(err) : lastErr);
// If there is a subscription, check if it is a JetStream one and if so, take
// the "public" subject (the one that was provided to the subscribe call).
if (sub != NULL)
{
char *subject = NULL;

natsSub_Lock(sub);
if ((sub->jsi != NULL) && (sub->jsi->psubj != NULL))
subject = sub->jsi->psubj;
else
subject = sub->subject;
fprintf(stderr, "Error %d - %s on connection [%" PRIu64 "] on \"%s\"\n", err, errTxt, cid, subject);
natsSub_Unlock(sub);
}
else
{
fprintf(stderr, "Error %d - %s on connection [%" PRIu64 "]\n", err, errTxt, cid);
}
fflush(stderr);
}
3 changes: 3 additions & 0 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ natsConn_newInbox(natsConnection *nc, natsInbox **newInbox);
bool
natsConn_srvVersionAtLeast(natsConnection *nc, int major, int minor, int update);

void
natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure);

void
natsConn_close(natsConnection *nc);

Expand Down
71 changes: 25 additions & 46 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,7 @@ jsSub_free(jsSub *jsi)
NATS_FREE(jsi->nxtMsgSubj);
NATS_FREE(jsi->cmeta);
NATS_FREE(jsi->fcReply);
NATS_FREE(jsi->psubj);
js_destroyConsumerConfig(jsi->ocCfg);
NATS_FREE(jsi);

Expand Down Expand Up @@ -1321,14 +1322,11 @@ jsSub_deleteConsumerAfterDrain(natsSubscription *sub)
s = jsSub_deleteConsumer(sub);
if (s != NATS_OK)
{
char tmp[256];
natsConn_Lock(nc);
if (nc->opts->asyncErrCb != NULL)
{
char tmp[256];
snprintf(tmp, sizeof(tmp), "failed to delete consumer '%s': %u (%s)",
consumer, s, natsStatus_GetText(s));
natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp));
}
snprintf(tmp, sizeof(tmp), "failed to delete consumer '%s': %u (%s)",
consumer, s, natsStatus_GetText(s));
natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp));
natsConn_Unlock(nc);
}

Expand Down Expand Up @@ -2070,15 +2068,12 @@ _hbTimerFired(natsTimer *timer, void* closure)
}

natsConn_Lock(nc);
if (nc->opts->asyncErrCb != NULL)
{
// Even if we have called resetOrderedConsumer, we will post something
// to the async error callback, either "missed heartbeats", or the error
// that occurred trying to do the reset.
if (s == NATS_OK)
s = NATS_MISSED_HEARTBEAT;
natsAsyncCb_PostErrHandler(nc, sub, s, NULL);
}
// Even if we have called resetOrderedConsumer, we will post something
// to the async error callback, either "missed heartbeats", or the error
// that occurred trying to do the reset.
if (s == NATS_OK)
s = NATS_MISSED_HEARTBEAT;
natsAsyncCb_PostErrHandler(nc, sub, s, NULL);
natsConn_Unlock(nc);
}

Expand Down Expand Up @@ -2531,6 +2526,8 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
s = nats_setDefaultError(NATS_NO_MEMORY);
}
IF_OK_DUP_STRING(s, jsi->stream, stream);
if ((s == NATS_OK) && !nats_IsStringEmpty(subject))
DUP_STRING(s, jsi->psubj, subject);
if (s == NATS_OK)
{
jsi->js = js;
Expand Down Expand Up @@ -2572,26 +2569,12 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
opts->Queue, 0, cb, cbClosure, false, jsi));
if ((s == NATS_OK) && (hbi > 0) && !isPullMode)
{
// We will create a timer if we use create an ordered consumer, or
// if the async error callback is registered.
bool ct = opts->Ordered;

// Check to see if it is even worth creating a timer to check
// on missed heartbeats, since the way to notify the user will be
// through async callback.
natsConn_Lock(nc);
ct = ct || (nc->opts->asyncErrCb != NULL ? true : false);
natsConn_Unlock(nc);

if (ct)
{
natsSub_Lock(sub);
sub->refs++;
s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _hbTimerStopped, hbi*2, (void*) sub);
if (s != NATS_OK)
sub->refs--;
natsSub_Unlock(sub);
}
natsSub_Lock(sub);
sub->refs++;
s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _hbTimerStopped, hbi*2, (void*) sub);
if (s != NATS_OK)
sub->refs--;
natsSub_Unlock(sub);
}
}
if ((s == NATS_OK) && create)
Expand Down Expand Up @@ -3061,17 +3044,13 @@ _recreateOrderedCons(void *closure)
}
if (s != NATS_OK)
{
char tmp[256];
const char *lastErr = nats_GetLastError(NULL);
natsConn_Lock(nc);
if (nc->opts->asyncErrCb != NULL)
{
char tmp[256];
const char *lastErr = nats_GetLastError(NULL);

snprintf(tmp, sizeof(tmp),
"error recreating ordered consumer, will try again: status=%u error=%s",
s, (nats_IsStringEmpty(lastErr) ? natsStatus_GetText(s) : lastErr));
natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp));
}
snprintf(tmp, sizeof(tmp),
"error recreating ordered consumer, will try again: status=%u error=%s",
s, (nats_IsStringEmpty(lastErr) ? natsStatus_GetText(s) : lastErr));
natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp));
natsConn_Unlock(nc);
}

Expand Down
1 change: 1 addition & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ typedef struct __jsSub
jsCtx *js;
char *stream;
char *consumer;
char *psubj;
char *nxtMsgSubj;
bool pull;
bool inFetch;
Expand Down
6 changes: 5 additions & 1 deletion src/opts.c
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,9 @@ natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler,
opts->asyncErrCb = errHandler;
opts->asyncErrCbClosure = closure;

if (opts->asyncErrCb == NULL)
opts->asyncErrCb = natsConn_defaultErrHandler;

UNLOCK_OPTS(opts);

return NATS_OK;
Expand Down Expand Up @@ -1507,6 +1510,7 @@ natsOptions_Create(natsOptions **newOpts)
opts->reconnectBufSize = NATS_OPTS_DEFAULT_RECONNECT_BUF_SIZE;
opts->reconnectJitter = NATS_OPTS_DEFAULT_RECONNECT_JITTER;
opts->reconnectJitterTLS = NATS_OPTS_DEFAULT_RECONNECT_JITTER_TLS;
opts->asyncErrCb = natsConn_defaultErrHandler;

*newOpts = opts;

Expand Down Expand Up @@ -1586,7 +1590,7 @@ natsOptions_clone(natsOptions *opts)
s = natsOptions_SetUserCredentialsFromMemory(cloned,
opts->userCreds->jwtAndSeedContent);
}
else
else
{
s = natsOptions_SetUserCredentialsFromFiles(cloned,
opts->userCreds->userOrChainedFile,
Expand Down
18 changes: 15 additions & 3 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -2814,7 +2814,7 @@ test_natsOptions(void)

test("Remove Error Handler: ");
s = natsOptions_SetErrorHandler(opts, NULL, NULL);
testCond((s == NATS_OK) && (opts->asyncErrCb == NULL));
testCond((s == NATS_OK) && (opts->asyncErrCb == natsConn_defaultErrHandler));

test("Set ClosedCB: ");
s = natsOptions_SetClosedCB(opts, _dummyConnHandler, NULL);
Expand Down Expand Up @@ -3091,6 +3091,18 @@ test_natsOptions(void)
&& (strcmp(cloned->url, "url") == 0));

natsOptions_Destroy(cloned);
opts = NULL;
cloned = NULL;

test("If original has default err handler, cloned has it too: ");
s = natsOptions_Create(&opts);
if (s == NATS_OK)
cloned = natsOptions_clone(opts);
testCond((s == NATS_OK) && (cloned != NULL)
&& (cloned->asyncErrCb == natsConn_defaultErrHandler)
&& (cloned->asyncErrCbClosure == NULL));
natsOptions_Destroy(cloned);
natsOptions_Destroy(opts);
}

static void
Expand Down Expand Up @@ -19047,7 +19059,7 @@ test_UserCredsFromMemory(void)
s = natsOptions_SetUserCredentialsFromMemory(opts, "invalidCreds");
IFOK(s, natsConnection_Connect(&nc, opts));
testCond(s == NATS_NOT_FOUND);

// Use a file that contains no seed
test("jwtAndSeed string has no seed: ");
s = natsOptions_SetUserCredentialsFromMemory(opts, jwtWithoutSeed);
Expand Down Expand Up @@ -19110,7 +19122,7 @@ test_UserCredsFromMemory(void)
t = NULL;

_destroyDefaultThreadArgs(&arg);

natsOptions_Destroy(opts);
}

Expand Down

0 comments on commit 60983fa

Please sign in to comment.