diff --git a/src/conn.c b/src/conn.c index bbe5c2933..1b405c852 100644 --- a/src/conn.c +++ b/src/conn.c @@ -3422,6 +3422,25 @@ natsConnection_ConnectTo(natsConnection **newConn, const char *url) return NATS_UPDATE_ERR_STACK(s); } +natsStatus +natsConnection_Reconnect(natsConnection *nc) +{ + if (nc == NULL) + return nats_setDefaultError(NATS_INVALID_ARG); + + natsConn_Lock(nc); + if (natsConn_isClosed(nc)) + { + natsConn_Unlock(nc); + return nats_setDefaultError(NATS_CONNECTION_CLOSED); + } + + natsSock_Close(nc->sockCtx.fd); + + natsConn_Unlock(nc); + return NATS_OK; +} + // Test if connection has been closed. bool natsConnection_IsClosed(natsConnection *nc) diff --git a/src/nats.h b/src/nats.h index 9dd2c9b0e..dc331e08f 100644 --- a/src/nats.h +++ b/src/nats.h @@ -4033,6 +4033,18 @@ stanMsg_Destroy(stanMsg *msg); NATS_EXTERN natsStatus natsConnection_Connect(natsConnection **nc, natsOptions *options); +/** \brief Causes the client to drop the connection to the current server and + * perform standard reconnection process. + * + * This means that all subscriptions and consumers should be resubscribed and + * their work resumed after successful reconnect where all reconnect options are + * respected. + * + * @param nc the pointer to the #natsConnection object. + */ +natsStatus +natsConnection_Reconnect(natsConnection *nc); + /** \brief Process a read event when using external event loop. * * When using an external event loop, and the callback indicating that diff --git a/test/list.txt b/test/list.txt index 60cee3e79..60be59288 100644 --- a/test/list.txt +++ b/test/list.txt @@ -86,6 +86,7 @@ ReconnectBufSize RetryOnFailedConnect NoPartialOnReconnect ReconnectFailsPendingRequests +ForcedReconnect ErrOnConnectAndDeadlock ErrOnMaxPayloadLimit Auth diff --git a/test/test.c b/test/test.c index 070255638..ec2d2e888 100644 --- a/test/test.c +++ b/test/test.c @@ -20202,6 +20202,90 @@ test_NoPartialOnReconnect(void) _stopServer(pid); } +static void +test_ForcedReconnect(void) +{ + natsStatus s; + struct threadArg arg; + natsOptions *opts = NULL; + natsConnection *nc = NULL; + natsSubscription *sub = NULL; + natsMsg *msg = NULL; + natsPid pid = NATS_INVALID_PID; + + s = _createDefaultThreadArgsForCbTests(&arg); + if (s != NATS_OK) + FAIL("unable to setup test"); + + test("Start server, connect, subscribe: "); + pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true); + CHECK_SERVER_STARTED(pid); + IFOK(s, natsOptions_Create(&opts)); + IFOK(s, natsOptions_SetReconnectedCB(opts, _reconnectedCb, &arg)); + IFOK(s, natsConnection_Connect(&nc, opts)); + IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo")); + testCond(s == NATS_OK); + + test("Send a message to foo: "); + IFOK(s, natsConnection_PublishString(nc, "foo", "bar")); + testCond(s == NATS_OK); + + test("Receive the message: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) && (msg != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Forced reconnect: "); + s = natsConnection_Reconnect(nc); + testCond(s == NATS_OK); + + test("Waiting for reconnect: "); + natsMutex_Lock(arg.m); + while ((s != NATS_TIMEOUT) && !arg.reconnected) + s = natsCondition_TimedWait(arg.c, arg.m, 5000); + arg.reconnected = false; + natsMutex_Unlock(arg.m); + testCond(s == NATS_OK); + + test("Send a message to foo: "); + IFOK(s, natsConnection_PublishString(nc, "foo", "bar")); + testCond(s == NATS_OK); + + test("Receive the message: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) && (msg != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Reconnect again with allowReconnect false, the call succeeds: "); + natsMutex_Lock(nc->mu); + nc->opts->allowReconnect = false; + natsMutex_Unlock(nc->mu); + s = natsConnection_Reconnect(nc); + testCond(s == NATS_OK); + + // On MacOS this returns NATS_CONNECTION_CLOSED, on Ubuntu we get a + // NATS_TIMEOUT. + test("But the connection is closed: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond(((s == NATS_CONNECTION_CLOSED) || (s = NATS_TIMEOUT)) && (msg == NULL)); + + natsConnection_Close(nc); + test("Reconect on a close connection errors: "); + s = natsConnection_Reconnect(nc); + testCond(s == NATS_CONNECTION_CLOSED); + + test("Reconect on a NULL connection errors: "); + s = natsConnection_Reconnect(NULL); + testCond(s == NATS_INVALID_ARG); + + natsSubscription_Destroy(sub); + natsConnection_Destroy(nc); + natsOptions_Destroy(opts); + _destroyDefaultThreadArgs(&arg); +} + static void _stopServerInThread(void *closure) { @@ -36210,6 +36294,7 @@ static testInfo allTests[] = {"RetryOnFailedConnect", test_RetryOnFailedConnect}, {"NoPartialOnReconnect", test_NoPartialOnReconnect}, {"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest}, + {"ForcedReconnect", test_ForcedReconnect}, {"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock}, {"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit},