From d58353badd8e985e663573c780959db8ae3585bd Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 05:55:52 -0700 Subject: [PATCH 01/14] wip --- src/conn.c | 90 +++++++++++++++++++++++++++++++++++------------------ src/nats.h | 12 +++++++ test/test.c | 5 +++ 3 files changed, 77 insertions(+), 30 deletions(-) diff --git a/src/conn.c b/src/conn.c index bbe5c2933..2387a4fb0 100644 --- a/src/conn.c +++ b/src/conn.c @@ -81,8 +81,14 @@ _processConnInit(natsConnection *nc); static void _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doCBs); -static bool -_processOpError(natsConnection *nc, natsStatus s, bool initialConnect); +static natsStatus +_tryReconnect(natsConnection *nc, natsStatus s, bool forcedReconnect, bool *started); + +static void +_maybeReconnect(natsConnection *nc, natsStatus s) { _tryReconnect(nc, s, false, NULL); } + +static natsStatus +_forceReconnect(natsConnection *nc, natsStatus s, bool *started) { return _tryReconnect(nc, s, true, started); } static natsStatus _flushTimeout(natsConnection *nc, int64_t timeout); @@ -2096,13 +2102,16 @@ _connect(natsConnection *nc) } // If not connected and retry asynchronously on failed connect + printf("<>/<> nc->status: %d retry: %d, has CB:%d\n", nc->status, retryOnFailedConnect, hasConnectedCb); if ((nc->status != NATS_CONN_STATUS_CONNECTED) && retryOnFailedConnect && hasConnectedCb) { natsConn_Unlock(nc); - if (_processOpError(nc, retSts, true)) + bool reconnectStarted = false; + s = _forceReconnect(nc, retSts, &reconnectStarted); + if ((s == NATS_OK) && reconnectStarted) { nats_clearLastError(); return NATS_NOT_YET_CONNECTED; @@ -2134,29 +2143,33 @@ _evStopPolling(natsConnection *nc) return s; } -// _processOpError handles errors from reading or parsing the protocol. +// _tryReconnect handles errors from reading or parsing the protocol, or forced +// reconnection. It will fire off a doReconnect thread if needed. // The lock should not be held entering this function. -static bool -_processOpError(natsConnection *nc, natsStatus s, bool initialConnect) +static natsStatus +_tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool *started) { + natsStatus s = NATS_OK; + natsConn_Lock(nc); - if (!initialConnect) + printf("<>/<> tryReconnect 1: forced:%d\n", forcedReconnect); + if (!forcedReconnect) { if (_isConnecting(nc) || natsConn_isClosed(nc) || (nc->inReconnect > 0)) { + printf("<>/<> tryReconnect 2: nothing to do\n"); natsConn_Unlock(nc); - return false; + return NATS_OK; } } // Do reconnect only if allowed and we were actually connected // or if we are retrying on initial failed connect. - if (initialConnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) + if (forcedReconnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) { - natsStatus ls = NATS_OK; - + printf("<>/<> tryReconnect 3: try: %d %d\n", nc->opts->allowReconnect, nc->status); // Set our new status nc->status = NATS_CONN_STATUS_RECONNECTING; @@ -2176,7 +2189,7 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect) // on the socket since we are going to reconnect. if (nc->el.attached) { - ls = _evStopPolling(nc); + s = _evStopPolling(nc); natsSock_Close(nc->sockCtx.fd); nc->sockCtx.fd = NATS_SOCK_INVALID; @@ -2185,25 +2198,24 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect) } // Fail pending flush requests. - if (ls == NATS_OK) + if (s == NATS_OK) _clearPendingFlushRequests(nc); // If option set, also fail pending requests. - if ((ls == NATS_OK) && nc->opts->failRequestsOnDisconnect) + if ((s == NATS_OK) && nc->opts->failRequestsOnDisconnect) _clearPendingRequestCalls(nc, NATS_CONNECTION_DISCONNECTED); // Create the pending buffer to hold all write requests while we try // to reconnect. - if (ls == NATS_OK) - ls = natsBuf_Create(&(nc->pending), nc->opts->reconnectBufSize); - if (ls == NATS_OK) + IFOK (s, natsBuf_Create(&(nc->pending), nc->opts->reconnectBufSize)); + if (s == NATS_OK) { nc->usePending = true; // Start the reconnect thread - ls = natsThread_Create(&(nc->reconnectThread), + s = natsThread_Create(&(nc->reconnectThread), _doReconnect, (void*) nc); } - if (ls == NATS_OK) + if (s == NATS_OK) { // We created the reconnect thread successfully, so retain // the connection. @@ -2211,20 +2223,26 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect) nc->inReconnect++; natsConn_Unlock(nc); - return true; + if (started != NULL) + *started = true; + + printf("<>/<> tryReconnect 4: OK\n"); + return NATS_OK; } } // reconnect not allowed or we failed to setup the reconnect code. + if (started != NULL) + *started = false; nc->status = NATS_CONN_STATUS_DISCONNECTED; - nc->err = s; + nc->err = newErr; natsConn_Unlock(nc); _close(nc, NATS_CONN_STATUS_CLOSED, false, true); - return false; + return NATS_UPDATE_ERR_STACK(s); } static void @@ -2267,7 +2285,7 @@ _readLoop(void *arg) s = natsParser_Parse(nc, buffer, n); if (s != NATS_OK) - _processOpError(nc, s, false); + _maybeReconnect(nc, s); natsConn_Lock(nc); } @@ -2396,7 +2414,7 @@ _processPingTimer(natsTimer *timer, void *arg) if (++(nc->pout) > nc->opts->maxPingsOut) { natsConn_Unlock(nc); - _processOpError(nc, NATS_STALE_CONNECTION, false); + _maybeReconnect(nc, NATS_STALE_CONNECTION); return; } @@ -2921,7 +2939,7 @@ natsConn_processErr(natsConnection *nc, char *buf, int bufLen) if (strcasecmp(error, STALE_CONNECTION) == 0) { - _processOpError(nc, NATS_STALE_CONNECTION, false); + _maybeReconnect(nc, NATS_STALE_CONNECTION); } else if (nats_strcasestr(error, PERMISSIONS_ERR) != NULL) { @@ -3306,9 +3324,9 @@ natsConn_create(natsConnection **newConn, natsOptions *options) natsStatus natsConnection_Connect(natsConnection **newConn, natsOptions *options) { - natsStatus s = NATS_OK; - natsConnection *nc = NULL; - natsOptions *opts = NULL; + natsStatus s = NATS_OK; + natsConnection *nc = NULL; + natsOptions *opts = NULL; if (options == NULL) { @@ -3333,6 +3351,18 @@ natsConnection_Connect(natsConnection **newConn, natsOptions *options) return NATS_UPDATE_ERR_STACK(s); } +natsStatus +natsConnection_Reconnect(natsConnection *nc) +{ + natsStatus s = NATS_OK; + + if (natsConnection_IsClosed(nc)) + return nats_setDefaultError(NATS_INVALID_ARG); + + IFOK(s, _forceReconnect(nc, NATS_OK, NULL)); + return NATS_UPDATE_ERR_STACK(s); +} + static natsStatus _processUrlString(natsOptions *opts, const char *urls) { @@ -4117,7 +4147,7 @@ natsConnection_ProcessReadEvent(natsConnection *nc) s = natsParser_Parse(nc, buffer, n); if (s != NATS_OK) - _processOpError(nc, s, false); + _maybeReconnect(nc, s); natsConn_release(nc); } @@ -4166,7 +4196,7 @@ natsConnection_ProcessWriteEvent(natsConnection *nc) natsConn_Unlock(nc); if (s != NATS_OK) - _processOpError(nc, s, false); + _maybeReconnect(nc, s); (void) NATS_UPDATE_ERR_STACK(s); } diff --git a/src/nats.h b/src/nats.h index 9dd2c9b0e..dc331e08f 100644 --- a/src/nats.h +++ b/src/nats.h @@ -4033,6 +4033,18 @@ stanMsg_Destroy(stanMsg *msg); NATS_EXTERN natsStatus natsConnection_Connect(natsConnection **nc, natsOptions *options); +/** \brief Causes the client to drop the connection to the current server and + * perform standard reconnection process. + * + * This means that all subscriptions and consumers should be resubscribed and + * their work resumed after successful reconnect where all reconnect options are + * respected. + * + * @param nc the pointer to the #natsConnection object. + */ +natsStatus +natsConnection_Reconnect(natsConnection *nc); + /** \brief Process a read event when using external event loop. * * When using an external event loop, and the callback indicating that diff --git a/test/test.c b/test/test.c index 070255638..71373709c 100644 --- a/test/test.c +++ b/test/test.c @@ -12047,12 +12047,17 @@ test_RequestTimeout(void) natsMsg *msg = NULL; natsPid serverPid = NATS_INVALID_PID; + printf("<>/<> 1\n"); serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); CHECK_SERVER_STARTED(serverPid); test("Test Request should timeout: ") s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + printf("<>/<> 2: %d\n", s); + IFOK(s, natsConnection_RequestString(&msg, nc, "foo", "bar", 500)); + printf("<>/<> 3: %d\n", s); + testCond(serverVersionAtLeast(2, 2, 0) ? (s == NATS_NO_RESPONDERS) : (s == NATS_TIMEOUT)); natsConnection_Destroy(nc); From 46236d818939eb3678ff701fad17be46f647f5dd Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 06:18:50 -0700 Subject: [PATCH 02/14] removed debug log --- src/conn.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/conn.c b/src/conn.c index 2387a4fb0..1f861a6ff 100644 --- a/src/conn.c +++ b/src/conn.c @@ -2102,7 +2102,6 @@ _connect(natsConnection *nc) } // If not connected and retry asynchronously on failed connect - printf("<>/<> nc->status: %d retry: %d, has CB:%d\n", nc->status, retryOnFailedConnect, hasConnectedCb); if ((nc->status != NATS_CONN_STATUS_CONNECTED) && retryOnFailedConnect && hasConnectedCb) @@ -2153,12 +2152,10 @@ _tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool natsConn_Lock(nc); - printf("<>/<> tryReconnect 1: forced:%d\n", forcedReconnect); if (!forcedReconnect) { if (_isConnecting(nc) || natsConn_isClosed(nc) || (nc->inReconnect > 0)) { - printf("<>/<> tryReconnect 2: nothing to do\n"); natsConn_Unlock(nc); return NATS_OK; @@ -2169,7 +2166,6 @@ _tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool // or if we are retrying on initial failed connect. if (forcedReconnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) { - printf("<>/<> tryReconnect 3: try: %d %d\n", nc->opts->allowReconnect, nc->status); // Set our new status nc->status = NATS_CONN_STATUS_RECONNECTING; @@ -2226,7 +2222,6 @@ _tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool if (started != NULL) *started = true; - printf("<>/<> tryReconnect 4: OK\n"); return NATS_OK; } } From a5e99d3892db9cbc86135901f85cb880fd1555dc Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 06:22:58 -0700 Subject: [PATCH 03/14] removed whitespace --- src/conn.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/conn.c b/src/conn.c index 1f861a6ff..4a31d8921 100644 --- a/src/conn.c +++ b/src/conn.c @@ -3319,9 +3319,9 @@ natsConn_create(natsConnection **newConn, natsOptions *options) natsStatus natsConnection_Connect(natsConnection **newConn, natsOptions *options) { - natsStatus s = NATS_OK; - natsConnection *nc = NULL; - natsOptions *opts = NULL; + natsStatus s = NATS_OK; + natsConnection *nc = NULL; + natsOptions *opts = NULL; if (options == NULL) { From e5a31f3b30aa3704b9fc3c82216d7a7be033ee66 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 06:24:23 -0700 Subject: [PATCH 04/14] removed debug log --- test/test.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test/test.c b/test/test.c index 71373709c..070255638 100644 --- a/test/test.c +++ b/test/test.c @@ -12047,17 +12047,12 @@ test_RequestTimeout(void) natsMsg *msg = NULL; natsPid serverPid = NATS_INVALID_PID; - printf("<>/<> 1\n"); serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); CHECK_SERVER_STARTED(serverPid); test("Test Request should timeout: ") s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); - printf("<>/<> 2: %d\n", s); - IFOK(s, natsConnection_RequestString(&msg, nc, "foo", "bar", 500)); - printf("<>/<> 3: %d\n", s); - testCond(serverVersionAtLeast(2, 2, 0) ? (s == NATS_NO_RESPONDERS) : (s == NATS_TIMEOUT)); natsConnection_Destroy(nc); From 8227d5fe082c3e62efcd6c8934300f623e853a6b Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 08:56:02 -0700 Subject: [PATCH 05/14] Added a test --- test/list.txt | 1 + test/test.c | 69 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/test/list.txt b/test/list.txt index 60cee3e79..60be59288 100644 --- a/test/list.txt +++ b/test/list.txt @@ -86,6 +86,7 @@ ReconnectBufSize RetryOnFailedConnect NoPartialOnReconnect ReconnectFailsPendingRequests +ForcedReconnect ErrOnConnectAndDeadlock ErrOnMaxPayloadLimit Auth diff --git a/test/test.c b/test/test.c index 070255638..823350186 100644 --- a/test/test.c +++ b/test/test.c @@ -20202,6 +20202,74 @@ test_NoPartialOnReconnect(void) _stopServer(pid); } +static void +test_ForcedReconnect(void) +{ + natsStatus s; + struct threadArg arg; + natsOptions *opts = NULL; + natsConnection *nc = NULL; + natsSubscription *sub = NULL; + natsMsg *msg = NULL; + natsPid pid = NATS_INVALID_PID; + + s = _createDefaultThreadArgsForCbTests(&arg); + if (s != NATS_OK) + FAIL("unable to setup test"); + + test("Start server, connect, subscribe: "); + pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true); + CHECK_SERVER_STARTED(pid); + IFOK(s, natsOptions_Create(&opts)); + IFOK(s, natsOptions_SetReconnectedCB(opts, _reconnectedCb, &arg)); + IFOK(s, natsConnection_Connect(&nc, opts)); + IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo")); + testCond(s == NATS_OK); + + test("Send a message to foo: "); + s = natsMsg_Create(&msg, "foo", NULL, "bar", 3); + IFOK(s, natsConnection_PublishMsg(nc, msg)); + testCond(s == NATS_OK); + natsMsg_Destroy(msg); + msg = NULL; + + test("Receive the message: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) && (msg != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Forced reconnect: "); + s = natsConnection_Reconnect(nc); + testCond(s == NATS_OK); + + test("Waiting for reconnect: "); + natsMutex_Lock(arg.m); + while ((s != NATS_TIMEOUT) && !arg.reconnected) + s = natsCondition_TimedWait(arg.c, arg.m, 5000); + arg.reconnected = false; + natsMutex_Unlock(arg.m); + testCond(s == NATS_OK); + + test("Send a message to foo: "); + s = natsMsg_Create(&msg, "foo", NULL, "bar", 3); + IFOK(s, natsConnection_PublishMsg(nc, msg)); + testCond(s == NATS_OK); + natsMsg_Destroy(msg); + msg = NULL; + + test("Receive the message: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) && (msg != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + natsSubscription_Destroy(sub); + sub = NULL; + natsConnection_Destroy(nc); + nc = NULL; +} + static void _stopServerInThread(void *closure) { @@ -36210,6 +36278,7 @@ static testInfo allTests[] = {"RetryOnFailedConnect", test_RetryOnFailedConnect}, {"NoPartialOnReconnect", test_NoPartialOnReconnect}, {"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest}, + {"ForcedReconnect", test_ForcedReconnect}, {"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock}, {"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit}, From 27fd5dfbcc81b0f7faf0a1b97ee3237bc92ce425 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 09:10:27 -0700 Subject: [PATCH 06/14] Oops forgot to free opts/threadargs from last edit --- test/test.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test.c b/test/test.c index 823350186..c22f0fc87 100644 --- a/test/test.c +++ b/test/test.c @@ -20265,9 +20265,9 @@ test_ForcedReconnect(void) msg = NULL; natsSubscription_Destroy(sub); - sub = NULL; natsConnection_Destroy(nc); - nc = NULL; + natsOptions_Destroy(opts); + _destroyDefaultThreadArgs(&arg); } static void From 6ac8e67ea2c2206b240b82421ad75d1696f6d2b1 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 10:35:49 -0700 Subject: [PATCH 07/14] Simplifed --- src/conn.c | 104 ++++++++++++++++++++++++++-------------------------- src/natsp.h | 1 + 2 files changed, 54 insertions(+), 51 deletions(-) diff --git a/src/conn.c b/src/conn.c index 4a31d8921..52133aad2 100644 --- a/src/conn.c +++ b/src/conn.c @@ -81,14 +81,8 @@ _processConnInit(natsConnection *nc); static void _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doCBs); -static natsStatus -_tryReconnect(natsConnection *nc, natsStatus s, bool forcedReconnect, bool *started); - -static void -_maybeReconnect(natsConnection *nc, natsStatus s) { _tryReconnect(nc, s, false, NULL); } - -static natsStatus -_forceReconnect(natsConnection *nc, natsStatus s, bool *started) { return _tryReconnect(nc, s, true, started); } +static bool +_processOpError(natsConnection *nc, natsStatus s, bool initialConnect); static natsStatus _flushTimeout(natsConnection *nc, int64_t timeout); @@ -2108,9 +2102,7 @@ _connect(natsConnection *nc) { natsConn_Unlock(nc); - bool reconnectStarted = false; - s = _forceReconnect(nc, retSts, &reconnectStarted); - if ((s == NATS_OK) && reconnectStarted) + if (_processOpError(nc, retSts, true)) { nats_clearLastError(); return NATS_NOT_YET_CONNECTED; @@ -2142,30 +2134,34 @@ _evStopPolling(natsConnection *nc) return s; } -// _tryReconnect handles errors from reading or parsing the protocol, or forced -// reconnection. It will fire off a doReconnect thread if needed. +// _processOpError handles errors from reading or parsing the protocol. // The lock should not be held entering this function. -static natsStatus -_tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool *started) +static bool +_processOpError(natsConnection *nc, natsStatus s, bool initialConnect) { - natsStatus s = NATS_OK; + bool forceReconnect; natsConn_Lock(nc); - if (!forcedReconnect) + forceReconnect = initialConnect || nc->forceReconnect; + nc->forceReconnect = false; + + if (!forceReconnect) { if (_isConnecting(nc) || natsConn_isClosed(nc) || (nc->inReconnect > 0)) { natsConn_Unlock(nc); - return NATS_OK; + return false; } } // Do reconnect only if allowed and we were actually connected // or if we are retrying on initial failed connect. - if (forcedReconnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) + if (forceReconnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) { + natsStatus ls = NATS_OK; + // Set our new status nc->status = NATS_CONN_STATUS_RECONNECTING; @@ -2185,7 +2181,7 @@ _tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool // on the socket since we are going to reconnect. if (nc->el.attached) { - s = _evStopPolling(nc); + ls = _evStopPolling(nc); natsSock_Close(nc->sockCtx.fd); nc->sockCtx.fd = NATS_SOCK_INVALID; @@ -2194,24 +2190,25 @@ _tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool } // Fail pending flush requests. - if (s == NATS_OK) + if (ls == NATS_OK) _clearPendingFlushRequests(nc); // If option set, also fail pending requests. - if ((s == NATS_OK) && nc->opts->failRequestsOnDisconnect) + if ((ls == NATS_OK) && nc->opts->failRequestsOnDisconnect) _clearPendingRequestCalls(nc, NATS_CONNECTION_DISCONNECTED); // Create the pending buffer to hold all write requests while we try // to reconnect. - IFOK (s, natsBuf_Create(&(nc->pending), nc->opts->reconnectBufSize)); - if (s == NATS_OK) + if (ls == NATS_OK) + ls = natsBuf_Create(&(nc->pending), nc->opts->reconnectBufSize); + if (ls == NATS_OK) { nc->usePending = true; // Start the reconnect thread - s = natsThread_Create(&(nc->reconnectThread), + ls = natsThread_Create(&(nc->reconnectThread), _doReconnect, (void*) nc); } - if (s == NATS_OK) + if (ls == NATS_OK) { // We created the reconnect thread successfully, so retain // the connection. @@ -2219,25 +2216,20 @@ _tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool nc->inReconnect++; natsConn_Unlock(nc); - if (started != NULL) - *started = true; - - return NATS_OK; + return true; } } // reconnect not allowed or we failed to setup the reconnect code. - if (started != NULL) - *started = false; nc->status = NATS_CONN_STATUS_DISCONNECTED; - nc->err = newErr; + nc->err = s; natsConn_Unlock(nc); _close(nc, NATS_CONN_STATUS_CLOSED, false, true); - return NATS_UPDATE_ERR_STACK(s); + return false; } static void @@ -2280,7 +2272,7 @@ _readLoop(void *arg) s = natsParser_Parse(nc, buffer, n); if (s != NATS_OK) - _maybeReconnect(nc, s); + _processOpError(nc, s, false); natsConn_Lock(nc); } @@ -2409,7 +2401,7 @@ _processPingTimer(natsTimer *timer, void *arg) if (++(nc->pout) > nc->opts->maxPingsOut) { natsConn_Unlock(nc); - _maybeReconnect(nc, NATS_STALE_CONNECTION); + _processOpError(nc, NATS_STALE_CONNECTION, false); return; } @@ -2934,7 +2926,7 @@ natsConn_processErr(natsConnection *nc, char *buf, int bufLen) if (strcasecmp(error, STALE_CONNECTION) == 0) { - _maybeReconnect(nc, NATS_STALE_CONNECTION); + _processOpError(nc, NATS_STALE_CONNECTION, false); } else if (nats_strcasestr(error, PERMISSIONS_ERR) != NULL) { @@ -3346,18 +3338,6 @@ natsConnection_Connect(natsConnection **newConn, natsOptions *options) return NATS_UPDATE_ERR_STACK(s); } -natsStatus -natsConnection_Reconnect(natsConnection *nc) -{ - natsStatus s = NATS_OK; - - if (natsConnection_IsClosed(nc)) - return nats_setDefaultError(NATS_INVALID_ARG); - - IFOK(s, _forceReconnect(nc, NATS_OK, NULL)); - return NATS_UPDATE_ERR_STACK(s); -} - static natsStatus _processUrlString(natsOptions *opts, const char *urls) { @@ -3447,6 +3427,28 @@ natsConnection_ConnectTo(natsConnection **newConn, const char *url) return NATS_UPDATE_ERR_STACK(s); } +natsStatus +natsConnection_Reconnect(natsConnection *nc) +{ + if (nc == NULL) + return nats_setDefaultError(NATS_INVALID_ARG); + + natsConn_Lock(nc); + if (natsConn_isClosed(nc)) + { + natsConn_Unlock(nc); + return nats_setDefaultError(NATS_CONNECTION_CLOSED); + } + + nc->forceReconnect = true; + natsSock_Close(nc->sockCtx.fd); + nc->sockCtx.fd = NATS_SOCK_INVALID; + nc->sockCtx.fdActive = false; + + natsConn_Unlock(nc); + return NATS_OK; +} + // Test if connection has been closed. bool natsConnection_IsClosed(natsConnection *nc) @@ -4142,7 +4144,7 @@ natsConnection_ProcessReadEvent(natsConnection *nc) s = natsParser_Parse(nc, buffer, n); if (s != NATS_OK) - _maybeReconnect(nc, s); + _processOpError(nc, s, false); natsConn_release(nc); } @@ -4191,7 +4193,7 @@ natsConnection_ProcessWriteEvent(natsConnection *nc) natsConn_Unlock(nc); if (s != NATS_OK) - _maybeReconnect(nc, s); + _processOpError(nc, s, false); (void) NATS_UPDATE_ERR_STACK(s); } diff --git a/src/natsp.h b/src/natsp.h index c749b84bb..fac06c15b 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -700,6 +700,7 @@ struct __natsConnection natsThread *reconnectThread; int inReconnect; natsCondition *reconnectCond; + bool forceReconnect; natsStatistics stats; From 5d2c27cf0dc0e95a4b6f386c8e3396163daab831 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 10:51:05 -0700 Subject: [PATCH 08/14] Added trivial cerror hecks to the test, for coverage --- test/test.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/test.c b/test/test.c index c22f0fc87..fe756b8f1 100644 --- a/test/test.c +++ b/test/test.c @@ -20264,6 +20264,15 @@ test_ForcedReconnect(void) natsMsg_Destroy(msg); msg = NULL; + natsConnection_Close(nc); + test("Reconect on a close connection errors: "); + s = natsConnection_Reconnect(nc); + testCond(s == NATS_CONNECTION_CLOSED); + + test("Reconect on a NULL connection errors: "); + s = natsConnection_Reconnect(NULL); + testCond(s == NATS_INVALID_ARG); + natsSubscription_Destroy(sub); natsConnection_Destroy(nc); natsOptions_Destroy(opts); From e9ee20dea9d3761bc9b403528857cee1bf92efd5 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 11:48:25 -0700 Subject: [PATCH 09/14] PR feedback --- src/conn.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/conn.c b/src/conn.c index 52133aad2..d7fdace54 100644 --- a/src/conn.c +++ b/src/conn.c @@ -3442,8 +3442,6 @@ natsConnection_Reconnect(natsConnection *nc) nc->forceReconnect = true; natsSock_Close(nc->sockCtx.fd); - nc->sockCtx.fd = NATS_SOCK_INVALID; - nc->sockCtx.fdActive = false; natsConn_Unlock(nc); return NATS_OK; From e567e2e6fc6052083dff617e2d3c0589b4fbe192 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 12:20:36 -0700 Subject: [PATCH 10/14] PR feedback: natsConnection_Reconnect to error unless allowReconnect is set --- src/conn.c | 13 +++++-------- src/natsp.h | 1 - test/test.c | 7 +++++++ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/conn.c b/src/conn.c index d7fdace54..9d13c7c21 100644 --- a/src/conn.c +++ b/src/conn.c @@ -2139,14 +2139,9 @@ _evStopPolling(natsConnection *nc) static bool _processOpError(natsConnection *nc, natsStatus s, bool initialConnect) { - bool forceReconnect; - natsConn_Lock(nc); - forceReconnect = initialConnect || nc->forceReconnect; - nc->forceReconnect = false; - - if (!forceReconnect) + if (!initialConnect) { if (_isConnecting(nc) || natsConn_isClosed(nc) || (nc->inReconnect > 0)) { @@ -2158,7 +2153,7 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect) // Do reconnect only if allowed and we were actually connected // or if we are retrying on initial failed connect. - if (forceReconnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) + if (initialConnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) { natsStatus ls = NATS_OK; @@ -3440,7 +3435,9 @@ natsConnection_Reconnect(natsConnection *nc) return nats_setDefaultError(NATS_CONNECTION_CLOSED); } - nc->forceReconnect = true; + if (!nc->opts->allowReconnect) + return nats_setDefaultError(NATS_INVALID_ARG); + natsSock_Close(nc->sockCtx.fd); natsConn_Unlock(nc); diff --git a/src/natsp.h b/src/natsp.h index fac06c15b..c749b84bb 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -700,7 +700,6 @@ struct __natsConnection natsThread *reconnectThread; int inReconnect; natsCondition *reconnectCond; - bool forceReconnect; natsStatistics stats; diff --git a/test/test.c b/test/test.c index fe756b8f1..cde1f4a78 100644 --- a/test/test.c +++ b/test/test.c @@ -20273,6 +20273,13 @@ test_ForcedReconnect(void) s = natsConnection_Reconnect(NULL); testCond(s == NATS_INVALID_ARG); + test("Reconect on errors if allowReconnect is not set: "); + natsMutex_Lock(nc->mu); + nc->opts->allowReconnect = false; + natsMutex_Unlock(nc->mu); + s = natsConnection_Reconnect(NULL); + testCond(s == NATS_INVALID_ARG); + natsSubscription_Destroy(sub); natsConnection_Destroy(nc); natsOptions_Destroy(opts); From 5884592d58f2695b7432f02da35c4488ce57050e Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 12:29:19 -0700 Subject: [PATCH 11/14] PR feedback: oops, too fast --- src/conn.c | 5 ++++- test/test.c | 14 +++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/conn.c b/src/conn.c index 9d13c7c21..780eacdd5 100644 --- a/src/conn.c +++ b/src/conn.c @@ -3436,7 +3436,10 @@ natsConnection_Reconnect(natsConnection *nc) } if (!nc->opts->allowReconnect) - return nats_setDefaultError(NATS_INVALID_ARG); + { + natsConn_Unlock(nc); + return nats_setDefaultError(NATS_ILLEGAL_STATE); + } natsSock_Close(nc->sockCtx.fd); diff --git a/test/test.c b/test/test.c index cde1f4a78..9234ae2b9 100644 --- a/test/test.c +++ b/test/test.c @@ -20264,6 +20264,13 @@ test_ForcedReconnect(void) natsMsg_Destroy(msg); msg = NULL; + test("Reconect on errors if allowReconnect is not set: "); + natsMutex_Lock(nc->mu); + nc->opts->allowReconnect = false; + natsMutex_Unlock(nc->mu); + s = natsConnection_Reconnect(nc); + testCond(s == NATS_ILLEGAL_STATE); + natsConnection_Close(nc); test("Reconect on a close connection errors: "); s = natsConnection_Reconnect(nc); @@ -20273,13 +20280,6 @@ test_ForcedReconnect(void) s = natsConnection_Reconnect(NULL); testCond(s == NATS_INVALID_ARG); - test("Reconect on errors if allowReconnect is not set: "); - natsMutex_Lock(nc->mu); - nc->opts->allowReconnect = false; - natsMutex_Unlock(nc->mu); - s = natsConnection_Reconnect(NULL); - testCond(s == NATS_INVALID_ARG); - natsSubscription_Destroy(sub); natsConnection_Destroy(nc); natsOptions_Destroy(opts); From c1fcc61261945b039522bca792b0b39b2a91e7fb Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 6 May 2024 10:11:22 -0700 Subject: [PATCH 12/14] Team feedback: ignore allowReconnect --- src/conn.c | 6 ------ test/test.c | 18 ++++++++---------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/conn.c b/src/conn.c index 780eacdd5..1b405c852 100644 --- a/src/conn.c +++ b/src/conn.c @@ -3435,12 +3435,6 @@ natsConnection_Reconnect(natsConnection *nc) return nats_setDefaultError(NATS_CONNECTION_CLOSED); } - if (!nc->opts->allowReconnect) - { - natsConn_Unlock(nc); - return nats_setDefaultError(NATS_ILLEGAL_STATE); - } - natsSock_Close(nc->sockCtx.fd); natsConn_Unlock(nc); diff --git a/test/test.c b/test/test.c index 9234ae2b9..014dc09a7 100644 --- a/test/test.c +++ b/test/test.c @@ -20227,11 +20227,8 @@ test_ForcedReconnect(void) testCond(s == NATS_OK); test("Send a message to foo: "); - s = natsMsg_Create(&msg, "foo", NULL, "bar", 3); - IFOK(s, natsConnection_PublishMsg(nc, msg)); + IFOK(s, natsConnection_PublishString(nc, "foo", "bar")); testCond(s == NATS_OK); - natsMsg_Destroy(msg); - msg = NULL; test("Receive the message: "); s = natsSubscription_NextMsg(&msg, sub, 1000); @@ -20252,11 +20249,8 @@ test_ForcedReconnect(void) testCond(s == NATS_OK); test("Send a message to foo: "); - s = natsMsg_Create(&msg, "foo", NULL, "bar", 3); - IFOK(s, natsConnection_PublishMsg(nc, msg)); + IFOK(s, natsConnection_PublishString(nc, "foo", "bar")); testCond(s == NATS_OK); - natsMsg_Destroy(msg); - msg = NULL; test("Receive the message: "); s = natsSubscription_NextMsg(&msg, sub, 1000); @@ -20264,12 +20258,16 @@ test_ForcedReconnect(void) natsMsg_Destroy(msg); msg = NULL; - test("Reconect on errors if allowReconnect is not set: "); + test("Reconnect again with allowReconnect false, the call succeeds: "); natsMutex_Lock(nc->mu); nc->opts->allowReconnect = false; natsMutex_Unlock(nc->mu); s = natsConnection_Reconnect(nc); - testCond(s == NATS_ILLEGAL_STATE); + testCond(s == NATS_OK); + + test("But the connection is closed: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_CONNECTION_CLOSED) && (msg == NULL)); natsConnection_Close(nc); test("Reconect on a close connection errors: "); From d3ef836015f54da76681f185091b7bb359cc0a8e Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 6 May 2024 10:19:08 -0700 Subject: [PATCH 13/14] different status on Ubuntu vs MacOS --- test/test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test.c b/test/test.c index 014dc09a7..69ff9c68d 100644 --- a/test/test.c +++ b/test/test.c @@ -20267,7 +20267,7 @@ test_ForcedReconnect(void) test("But the connection is closed: "); s = natsSubscription_NextMsg(&msg, sub, 1000); - testCond((s == NATS_CONNECTION_CLOSED) && (msg == NULL)); + testCond(((s == NATS_CONNECTION_CLOSED) || (s = NATS_TIMEOUT)) && (msg == NULL)); natsConnection_Close(nc); test("Reconect on a close connection errors: "); From e85d0b756a83edd3234ff644edddbd460824150e Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Mon, 6 May 2024 10:20:10 -0700 Subject: [PATCH 14/14] Comment for clarity --- test/test.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/test.c b/test/test.c index 69ff9c68d..ec2d2e888 100644 --- a/test/test.c +++ b/test/test.c @@ -20265,6 +20265,8 @@ test_ForcedReconnect(void) s = natsConnection_Reconnect(nc); testCond(s == NATS_OK); + // On MacOS this returns NATS_CONNECTION_CLOSED, on Ubuntu we get a + // NATS_TIMEOUT. test("But the connection is closed: "); s = natsSubscription_NextMsg(&msg, sub, 1000); testCond(((s == NATS_CONNECTION_CLOSED) || (s = NATS_TIMEOUT)) && (msg == NULL));