Skip to content

Commit

Permalink
Merge 87baf2a into b2ac624
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Apr 7, 2023
2 parents b2ac624 + 87baf2a commit a5e8bc4
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 20 deletions.
56 changes: 56 additions & 0 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -4432,3 +4432,59 @@ natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus
}
fflush(stderr);
}

natsStatus
natsConn_getErrorCallback(natsErrHandler *cb, void **closure, natsConnection *nc)
{
if ((nc == NULL) || (cb == NULL) || (closure == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsConn_Lock(nc);
*cb = nc->opts->asyncErrCb;
*closure = nc->opts->asyncErrCbClosure;
natsConn_Unlock(nc);

return NATS_OK;
}

natsStatus
natsConn_setErrorCallback(natsConnection *nc, natsErrHandler cb, void *closure)
{
if ((nc == NULL) || (cb == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsConn_Lock(nc);
nc->opts->asyncErrCb = cb;
nc->opts->asyncErrCbClosure = closure;
natsConn_Unlock(nc);

return NATS_OK;
}

natsStatus
natsConn_getClosedCallback(natsConnectionHandler *cb, void **closure, natsConnection *nc)
{
if ((nc == NULL) || (cb == NULL) || (closure == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsConn_Lock(nc);
*cb = nc->opts->closedCb;
*closure = nc->opts->closedCbClosure;
natsConn_Unlock(nc);

return NATS_OK;
}

natsStatus
natsConn_setClosedCallback(natsConnection *nc, natsConnectionHandler cb, void *closure)
{
if ((nc == NULL) || (cb == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsConn_Lock(nc);
nc->opts->closedCb = cb;
nc->opts->closedCbClosure = closure;
natsConn_Unlock(nc);

return NATS_OK;
}
13 changes: 13 additions & 0 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,17 @@ natsConn_close(natsConnection *nc);
void
natsConn_destroy(natsConnection *nc, bool fromPublicDestroy);

natsStatus
natsConn_setErrorCallback(natsConnection *nc, natsErrHandler cb, void *closure);

natsStatus
natsConn_getErrorCallback(natsErrHandler *cb, void **closure, natsConnection *nc);

natsStatus
natsConn_setClosedCallback(natsConnection *nc, natsConnectionHandler cb, void *closure);

natsStatus
natsConn_getClosedCallback(natsConnectionHandler *cb, void **closure, natsConnection *nc);


#endif /* CONN_H_ */
66 changes: 46 additions & 20 deletions src/nats.c
Original file line number Diff line number Diff line change
Expand Up @@ -726,11 +726,14 @@ _timerThread(void *arg)
static void
_asyncCbsThread(void *arg)
{
natsLibAsyncCbs *asyncCbs = &(gLib.asyncCbs);
natsAsyncCbInfo *cb = NULL;
natsConnection *nc = NULL;
natsLibAsyncCbs *asyncCbs = &(gLib.asyncCbs);
natsAsyncCbInfo *cb = NULL;
natsConnection *nc = NULL;
natsConnectionHandler cbHandler = NULL;
natsErrHandler errHandler = NULL;
void *cbClosure = NULL;
#if defined(NATS_HAS_STREAMING)
stanConnection *sc = NULL;
stanConnection *sc = NULL;
#endif

WAIT_LIB_INITIALIZED;
Expand Down Expand Up @@ -760,41 +763,64 @@ _asyncCbsThread(void *arg)
sc = cb->sc;
#endif

// callback handlers can be updated on a live connection, so we need to
// lock.
cbHandler = NULL;
errHandler = NULL;
cbClosure = NULL;
natsMutex_Lock(nc->mu);
switch (cb->type)
{
case ASYNC_CLOSED:
(*(nc->opts->closedCb))(nc, nc->opts->closedCbClosure);
cbHandler = nc->opts->closedCb;
cbClosure = nc->opts->closedCbClosure;
break;
case ASYNC_DISCONNECTED:
(*(nc->opts->disconnectedCb))(nc, nc->opts->disconnectedCbClosure);
cbHandler = nc->opts->disconnectedCb;
cbClosure = nc->opts->disconnectedCbClosure;
break;
case ASYNC_RECONNECTED:
(*(nc->opts->reconnectedCb))(nc, nc->opts->reconnectedCbClosure);
cbHandler = nc->opts->reconnectedCb;
cbClosure = nc->opts->reconnectedCbClosure;
break;
case ASYNC_CONNECTED:
(*(nc->opts->connectedCb))(nc, nc->opts->connectedCbClosure);
cbHandler = nc->opts->connectedCb;
cbClosure = nc->opts->connectedCbClosure;
break;
case ASYNC_DISCOVERED_SERVERS:
(*(nc->opts->discoveredServersCb))(nc, nc->opts->discoveredServersClosure);
cbHandler = nc->opts->discoveredServersCb;
cbClosure = nc->opts->discoveredServersClosure;
break;
case ASYNC_LAME_DUCK_MODE:
(*(nc->opts->lameDuckCb))(nc, nc->opts->lameDuckClosure);
cbHandler = nc->opts->lameDuckCb;
cbClosure = nc->opts->lameDuckClosure;
break;
case ASYNC_ERROR:
{
if (cb->errTxt != NULL)
nats_setErrStatusAndTxt(cb->err, cb->errTxt);
(*(nc->opts->asyncErrCb))(nc, cb->sub, cb->err, nc->opts->asyncErrCbClosure);
break;
}
#if defined(NATS_HAS_STREAMING)
case ASYNC_STAN_CONN_LOST:
(*(sc->opts->connectionLostCB))(sc, sc->connLostErrTxt, sc->opts->connectionLostCBClosure);
errHandler = nc->opts->asyncErrCb;
cbClosure = nc->opts->asyncErrCbClosure;
break;
#endif
default:
break;
}
natsMutex_Unlock(nc->mu);

// Invoke the callback
if (cbHandler != NULL)
{
(*(cbHandler))(nc, cbClosure);
}
else if (errHandler != NULL)
{
if (cb->errTxt != NULL)
nats_setErrStatusAndTxt(cb->err, cb->errTxt);
(*(errHandler))(nc, cb->sub, cb->err, cbClosure);
}
#if defined(NATS_HAS_STREAMING)
else if (cb->type == ASYNC_STAN_CONN_LOST)
{
(*(sc->opts->connectionLostCB))(sc, sc->connLostErrTxt, sc->opts->connectionLostCBClosure);
}
#endif

natsAsyncCb_Destroy(cb);
nats_clearLastError();
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ AsyncSubscriptionPendingDrain
SyncSubscriptionPending
SyncSubscriptionPendingDrain
AsyncErrHandler
AsyncSetErrHandler
AsyncSubscriberStarvation
AsyncSubscriberOnClose
NextMsgCallOnAsyncSub
Expand Down
74 changes: 74 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -14373,6 +14373,79 @@ test_AsyncErrHandler(void)
_stopServer(serverPid);
}

static void
test_AsyncSetErrHandler(void)
{
natsStatus s;
natsConnection *nc = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsPid serverPid = NATS_INVALID_PID;
struct threadArg arg;

s = _createDefaultThreadArgsForCbTests(&arg);
if (s != NATS_OK)
FAIL("Unable to setup test!");

arg.status = NATS_OK;
arg.control= 7;

s = natsOptions_Create(&opts);
IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL));
IFOK(s, natsOptions_SetMaxPendingMsgs(opts, 10));

if (s != NATS_OK)
FAIL("Unable to create options for test AsyncErrHandler");

serverPid = _startServer("nats://127.0.0.1:4222", NULL, true);
CHECK_SERVER_STARTED(serverPid);

s = natsConnection_Connect(&nc, opts);
IFOK(s, natsConnection_Subscribe(&sub, nc, "async_test", _recvTestString, (void*) &arg));

natsMutex_Lock(arg.m);
arg.sub = sub;
natsMutex_Unlock(arg.m);

// Start sending messages
for (int i=0;
(s == NATS_OK) && (i < (opts->maxPendingMsgs)); i++)
{
s = natsConnection_PublishString(nc, "async_test", "hello");
}

// Set the error handler in-flight
IFOK(s, natsConn_setErrorCallback(nc, _asyncErrCb, (void*) &arg));

for (int i=0;
(s == NATS_OK) && (i < 100); i++)
{
s = natsConnection_PublishString(nc, "async_test", "hello");
}
IFOK(s, natsConnection_Flush(nc));

// Wait for async err callback
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.done)
s = natsCondition_TimedWait(arg.c, arg.m, 2000);
natsMutex_Unlock(arg.m);

test("Aync fired properly, and all checks are good: ");
testCond((s == NATS_OK)
&& arg.done
&& arg.closed
&& (arg.status == NATS_OK));

natsOptions_Destroy(opts);
natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);

_destroyDefaultThreadArgs(&arg);

_stopServer(serverPid);
}


static void
_responseCb(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
Expand Down Expand Up @@ -34440,6 +34513,7 @@ static testInfo allTests[] =
{"SyncSubscriptionPending", test_SyncSubscriptionPending},
{"SyncSubscriptionPendingDrain", test_SyncSubscriptionPendingDrain},
{"AsyncErrHandler", test_AsyncErrHandler},
{"AsyncSetErrHandler", test_AsyncSetErrHandler},
{"AsyncSubscriberStarvation", test_AsyncSubscriberStarvation},
{"AsyncSubscriberOnClose", test_AsyncSubscriberOnClose},
{"NextMsgCallOnAsyncSub", test_NextMsgCallOnAsyncSub},
Expand Down

0 comments on commit a5e8bc4

Please sign in to comment.