Skip to content

Commit

Permalink
Merge 60983fa into e04189d
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jan 23, 2023
2 parents e04189d + 60983fa commit e2a5f5b
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 62 deletions.
54 changes: 42 additions & 12 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2813,9 +2813,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
natsConn_Lock(nc);

nc->err = (sc ? NATS_SLOW_CONSUMER : NATS_MISMATCH);

if (nc->opts->asyncErrCb != NULL)
natsAsyncCb_PostErrHandler(nc, sub, nc->err, NULL);
natsAsyncCb_PostErrHandler(nc, sub, nc->err, NULL);

natsConn_Unlock(nc);
}
Expand All @@ -2837,8 +2835,7 @@ _processPermissionViolation(natsConnection *nc, char *error)
natsConn_Lock(nc);
nc->err = NATS_NOT_PERMITTED;
snprintf(nc->errStr, sizeof(nc->errStr), "%s", error);
if (nc->opts->asyncErrCb != NULL)
natsAsyncCb_PostErrHandler(nc, NULL, nc->err, NATS_STRDUP(error));
natsAsyncCb_PostErrHandler(nc, NULL, nc->err, NATS_STRDUP(error));
natsConn_Unlock(nc);
}

Expand All @@ -2852,7 +2849,7 @@ _processAuthError(natsConnection *nc, int errCode, char *error)
nc->err = NATS_CONNECTION_AUTH_FAILED;
snprintf(nc->errStr, sizeof(nc->errStr), "%s", error);

if (!nc->initc && nc->opts->asyncErrCb != NULL)
if (!nc->initc)
natsAsyncCb_PostErrHandler(nc, NULL, nc->err, NATS_STRDUP(error));

if (nc->cur->lastAuthErrCode == errCode)
Expand Down Expand Up @@ -3572,13 +3569,10 @@ natsConnection_Flush(natsConnection *nc)
static void
_pushDrainErr(natsConnection *nc, natsStatus s, const char *errTxt)
{
char tmp[256];
natsConn_Lock(nc);
if (nc->opts->asyncErrCb != NULL)
{
char tmp[256];
snprintf(tmp, sizeof(tmp), "Drain error: %s: %u (%s)", errTxt, s, natsStatus_GetText(s));
natsAsyncCb_PostErrHandler(nc, NULL, s, NATS_STRDUP(tmp));
}
snprintf(tmp, sizeof(tmp), "Drain error: %s: %u (%s)", errTxt, s, natsStatus_GetText(s));
natsAsyncCb_PostErrHandler(nc, NULL, s, NATS_STRDUP(tmp));
natsConn_Unlock(nc);
}

Expand Down Expand Up @@ -4408,3 +4402,39 @@ natsConn_setFilterWithClosure(natsConnection *nc, natsMsgFilter f, void* closure
nc->filterClosure = closure;
natsMutex_Unlock(nc->subsMu);
}

void
natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)
{
uint64_t cid = 0;
const char *lastErr = NULL;
const char *errTxt = NULL;

natsConn_Lock(nc);
cid = nc->info.CID;
natsConn_Unlock(nc);

// Get possibly more detailed error message. If empty, we will print the default
// error status text.
natsConnection_GetLastError(nc, &lastErr);
errTxt = (nats_IsStringEmpty(lastErr) ? natsStatus_GetText(err) : lastErr);
// If there is a subscription, check if it is a JetStream one and if so, take
// the "public" subject (the one that was provided to the subscribe call).
if (sub != NULL)
{
char *subject = NULL;

natsSub_Lock(sub);
if ((sub->jsi != NULL) && (sub->jsi->psubj != NULL))
subject = sub->jsi->psubj;
else
subject = sub->subject;
fprintf(stderr, "Error %d - %s on connection [%" PRIu64 "] on \"%s\"\n", err, errTxt, cid, subject);
natsSub_Unlock(sub);
}
else
{
fprintf(stderr, "Error %d - %s on connection [%" PRIu64 "]\n", err, errTxt, cid);
}
fflush(stderr);
}
3 changes: 3 additions & 0 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ natsConn_newInbox(natsConnection *nc, natsInbox **newInbox);
bool
natsConn_srvVersionAtLeast(natsConnection *nc, int major, int minor, int update);

void
natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure);

void
natsConn_close(natsConnection *nc);

Expand Down
71 changes: 25 additions & 46 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,7 @@ jsSub_free(jsSub *jsi)
NATS_FREE(jsi->nxtMsgSubj);
NATS_FREE(jsi->cmeta);
NATS_FREE(jsi->fcReply);
NATS_FREE(jsi->psubj);
js_destroyConsumerConfig(jsi->ocCfg);
NATS_FREE(jsi);

Expand Down Expand Up @@ -1321,14 +1322,11 @@ jsSub_deleteConsumerAfterDrain(natsSubscription *sub)
s = jsSub_deleteConsumer(sub);
if (s != NATS_OK)
{
char tmp[256];
natsConn_Lock(nc);
if (nc->opts->asyncErrCb != NULL)
{
char tmp[256];
snprintf(tmp, sizeof(tmp), "failed to delete consumer '%s': %u (%s)",
consumer, s, natsStatus_GetText(s));
natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp));
}
snprintf(tmp, sizeof(tmp), "failed to delete consumer '%s': %u (%s)",
consumer, s, natsStatus_GetText(s));
natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp));
natsConn_Unlock(nc);
}

Expand Down Expand Up @@ -2070,15 +2068,12 @@ _hbTimerFired(natsTimer *timer, void* closure)
}

natsConn_Lock(nc);
if (nc->opts->asyncErrCb != NULL)
{
// Even if we have called resetOrderedConsumer, we will post something
// to the async error callback, either "missed heartbeats", or the error
// that occurred trying to do the reset.
if (s == NATS_OK)
s = NATS_MISSED_HEARTBEAT;
natsAsyncCb_PostErrHandler(nc, sub, s, NULL);
}
// Even if we have called resetOrderedConsumer, we will post something
// to the async error callback, either "missed heartbeats", or the error
// that occurred trying to do the reset.
if (s == NATS_OK)
s = NATS_MISSED_HEARTBEAT;
natsAsyncCb_PostErrHandler(nc, sub, s, NULL);
natsConn_Unlock(nc);
}

Expand Down Expand Up @@ -2531,6 +2526,8 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
s = nats_setDefaultError(NATS_NO_MEMORY);
}
IF_OK_DUP_STRING(s, jsi->stream, stream);
if ((s == NATS_OK) && !nats_IsStringEmpty(subject))
DUP_STRING(s, jsi->psubj, subject);
if (s == NATS_OK)
{
jsi->js = js;
Expand Down Expand Up @@ -2572,26 +2569,12 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
opts->Queue, 0, cb, cbClosure, false, jsi));
if ((s == NATS_OK) && (hbi > 0) && !isPullMode)
{
// We will create a timer if we use create an ordered consumer, or
// if the async error callback is registered.
bool ct = opts->Ordered;

// Check to see if it is even worth creating a timer to check
// on missed heartbeats, since the way to notify the user will be
// through async callback.
natsConn_Lock(nc);
ct = ct || (nc->opts->asyncErrCb != NULL ? true : false);
natsConn_Unlock(nc);

if (ct)
{
natsSub_Lock(sub);
sub->refs++;
s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _hbTimerStopped, hbi*2, (void*) sub);
if (s != NATS_OK)
sub->refs--;
natsSub_Unlock(sub);
}
natsSub_Lock(sub);
sub->refs++;
s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _hbTimerStopped, hbi*2, (void*) sub);
if (s != NATS_OK)
sub->refs--;
natsSub_Unlock(sub);
}
}
if ((s == NATS_OK) && create)
Expand Down Expand Up @@ -3061,17 +3044,13 @@ _recreateOrderedCons(void *closure)
}
if (s != NATS_OK)
{
char tmp[256];
const char *lastErr = nats_GetLastError(NULL);
natsConn_Lock(nc);
if (nc->opts->asyncErrCb != NULL)
{
char tmp[256];
const char *lastErr = nats_GetLastError(NULL);

snprintf(tmp, sizeof(tmp),
"error recreating ordered consumer, will try again: status=%u error=%s",
s, (nats_IsStringEmpty(lastErr) ? natsStatus_GetText(s) : lastErr));
natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp));
}
snprintf(tmp, sizeof(tmp),
"error recreating ordered consumer, will try again: status=%u error=%s",
s, (nats_IsStringEmpty(lastErr) ? natsStatus_GetText(s) : lastErr));
natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp));
natsConn_Unlock(nc);
}

Expand Down
1 change: 1 addition & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ typedef struct __jsSub
jsCtx *js;
char *stream;
char *consumer;
char *psubj;
char *nxtMsgSubj;
bool pull;
bool inFetch;
Expand Down
6 changes: 5 additions & 1 deletion src/opts.c
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,9 @@ natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler,
opts->asyncErrCb = errHandler;
opts->asyncErrCbClosure = closure;

if (opts->asyncErrCb == NULL)
opts->asyncErrCb = natsConn_defaultErrHandler;

UNLOCK_OPTS(opts);

return NATS_OK;
Expand Down Expand Up @@ -1507,6 +1510,7 @@ natsOptions_Create(natsOptions **newOpts)
opts->reconnectBufSize = NATS_OPTS_DEFAULT_RECONNECT_BUF_SIZE;
opts->reconnectJitter = NATS_OPTS_DEFAULT_RECONNECT_JITTER;
opts->reconnectJitterTLS = NATS_OPTS_DEFAULT_RECONNECT_JITTER_TLS;
opts->asyncErrCb = natsConn_defaultErrHandler;

*newOpts = opts;

Expand Down Expand Up @@ -1586,7 +1590,7 @@ natsOptions_clone(natsOptions *opts)
s = natsOptions_SetUserCredentialsFromMemory(cloned,
opts->userCreds->jwtAndSeedContent);
}
else
else
{
s = natsOptions_SetUserCredentialsFromFiles(cloned,
opts->userCreds->userOrChainedFile,
Expand Down
18 changes: 15 additions & 3 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -2814,7 +2814,7 @@ test_natsOptions(void)

test("Remove Error Handler: ");
s = natsOptions_SetErrorHandler(opts, NULL, NULL);
testCond((s == NATS_OK) && (opts->asyncErrCb == NULL));
testCond((s == NATS_OK) && (opts->asyncErrCb == natsConn_defaultErrHandler));

test("Set ClosedCB: ");
s = natsOptions_SetClosedCB(opts, _dummyConnHandler, NULL);
Expand Down Expand Up @@ -3091,6 +3091,18 @@ test_natsOptions(void)
&& (strcmp(cloned->url, "url") == 0));

natsOptions_Destroy(cloned);
opts = NULL;
cloned = NULL;

test("If original has default err handler, cloned has it too: ");
s = natsOptions_Create(&opts);
if (s == NATS_OK)
cloned = natsOptions_clone(opts);
testCond((s == NATS_OK) && (cloned != NULL)
&& (cloned->asyncErrCb == natsConn_defaultErrHandler)
&& (cloned->asyncErrCbClosure == NULL));
natsOptions_Destroy(cloned);
natsOptions_Destroy(opts);
}

static void
Expand Down Expand Up @@ -19047,7 +19059,7 @@ test_UserCredsFromMemory(void)
s = natsOptions_SetUserCredentialsFromMemory(opts, "invalidCreds");
IFOK(s, natsConnection_Connect(&nc, opts));
testCond(s == NATS_NOT_FOUND);

// Use a file that contains no seed
test("jwtAndSeed string has no seed: ");
s = natsOptions_SetUserCredentialsFromMemory(opts, jwtWithoutSeed);
Expand Down Expand Up @@ -19110,7 +19122,7 @@ test_UserCredsFromMemory(void)
t = NULL;

_destroyDefaultThreadArgs(&arg);

natsOptions_Destroy(opts);
}

Expand Down

0 comments on commit e2a5f5b

Please sign in to comment.