Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Added] natsConnection_Reconnect #757

Merged
merged 14 commits into from
May 6, 2024
29 changes: 27 additions & 2 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2139,9 +2139,14 @@ _evStopPolling(natsConnection *nc)
static bool
_processOpError(natsConnection *nc, natsStatus s, bool initialConnect)
{
bool forceReconnect;

natsConn_Lock(nc);

if (!initialConnect)
forceReconnect = initialConnect || nc->forceReconnect;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we need a change here? Meaning, if you were to just close the socket (not set the fd to invalid, just call natsSock_Close() in the new reconnect call, this should trigger the normal reconnect process. I think there should be no need to change anything else. What made you need to change code here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The go client disregards/overrides its equivalent of nc->opts->allowReconnect when Disconnect() is invoked, I posed a question about it in nats-io/nats.go#1624 (comment).

If we were to error out when not nc->opts->allowReconnect - no changes would be needed, otherwise I need to override it (present implementation).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my view, we should return an error in natsConnection_Reconnect() if the user explicitly disallowed reconnect with the natsOptions_SetAllowReconnect(opts, false). It does not make sense otherwise.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but that also means that the user would not be able to reconnect only "manually", the connection will auto-reconnect on all relevant errors.

I'll make the change now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't understand. The original check here says to check for the current connection status so that we don't attempt to reconnect if we are in the middle of it. The check for !initialConnect was likely introduced when adding the ability to connect asynchronously (that is, call natsConnection_Create() (or Connect) while a server is not running does not fail, but attempt the reconnect right away.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just meant that one won't be able to make a connection with allowReconnect==false if so desired, but then forcefully Reconnect() on it (say if they want to re-auth?). That was my understanding of the use-case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why maybe don't do the changes until it is agreed with the rest of the team. I did not mean to have you make all the changes if you have to put it back because this is how the team wants the feature to work.

If as a team it is agreed that it should override, then simply add to the doc that this function ignores the "allow reconnect" option (currently it says that it respects them all).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(will not merge until verified)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment from the team feedback: ignore allowReconnect altogether, just close the socket and let the client do what it does.

nc->forceReconnect = false;

if (!forceReconnect)
levb marked this conversation as resolved.
Show resolved Hide resolved
{
if (_isConnecting(nc) || natsConn_isClosed(nc) || (nc->inReconnect > 0))
{
Expand All @@ -2153,7 +2158,7 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect)

// Do reconnect only if allowed and we were actually connected
// or if we are retrying on initial failed connect.
if (initialConnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED)))
if (forceReconnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED)))
levb marked this conversation as resolved.
Show resolved Hide resolved
{
natsStatus ls = NATS_OK;

Expand Down Expand Up @@ -3422,6 +3427,26 @@ natsConnection_ConnectTo(natsConnection **newConn, const char *url)
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
natsConnection_Reconnect(natsConnection *nc)
{
if (nc == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

natsConn_Lock(nc);
if (natsConn_isClosed(nc))
{
natsConn_Unlock(nc);
return nats_setDefaultError(NATS_CONNECTION_CLOSED);
}

nc->forceReconnect = true;
levb marked this conversation as resolved.
Show resolved Hide resolved
natsSock_Close(nc->sockCtx.fd);

natsConn_Unlock(nc);
return NATS_OK;
}

// Test if connection has been closed.
bool
natsConnection_IsClosed(natsConnection *nc)
Expand Down
12 changes: 12 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -4033,6 +4033,18 @@ stanMsg_Destroy(stanMsg *msg);
NATS_EXTERN natsStatus
natsConnection_Connect(natsConnection **nc, natsOptions *options);

/** \brief Causes the client to drop the connection to the current server and
* perform standard reconnection process.
*
* This means that all subscriptions and consumers should be resubscribed and
* their work resumed after successful reconnect where all reconnect options are
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually not all reconnect options if we consider that "allow reconnect" is a reconnect option ;-). So I would argue that either the Go client should be changed, or the spec modified to specifically cover cases of libraries have an option to disable reconnection (maybe not all do that).

* respected.
*
* @param nc the pointer to the #natsConnection object.
*/
natsStatus
natsConnection_Reconnect(natsConnection *nc);

/** \brief Process a read event when using external event loop.
*
* When using an external event loop, and the callback indicating that
Expand Down
1 change: 1 addition & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ struct __natsConnection
natsThread *reconnectThread;
int inReconnect;
natsCondition *reconnectCond;
bool forceReconnect;

natsStatistics stats;

Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ ReconnectBufSize
RetryOnFailedConnect
NoPartialOnReconnect
ReconnectFailsPendingRequests
ForcedReconnect
ErrOnConnectAndDeadlock
ErrOnMaxPayloadLimit
Auth
Expand Down
78 changes: 78 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -20202,6 +20202,83 @@ test_NoPartialOnReconnect(void)
_stopServer(pid);
}

static void
test_ForcedReconnect(void)
{
natsStatus s;
struct threadArg arg;
natsOptions *opts = NULL;
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsPid pid = NATS_INVALID_PID;

s = _createDefaultThreadArgsForCbTests(&arg);
if (s != NATS_OK)
FAIL("unable to setup test");

test("Start server, connect, subscribe: ");
pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true);
CHECK_SERVER_STARTED(pid);
IFOK(s, natsOptions_Create(&opts));
IFOK(s, natsOptions_SetReconnectedCB(opts, _reconnectedCb, &arg));
IFOK(s, natsConnection_Connect(&nc, opts));
IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo"));
testCond(s == NATS_OK);

test("Send a message to foo: ");
s = natsMsg_Create(&msg, "foo", NULL, "bar", 3);
IFOK(s, natsConnection_PublishMsg(nc, msg));
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Receive the message: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond((s == NATS_OK) && (msg != NULL));
natsMsg_Destroy(msg);
msg = NULL;

test("Forced reconnect: ");
s = natsConnection_Reconnect(nc);
testCond(s == NATS_OK);

test("Waiting for reconnect: ");
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.reconnected)
s = natsCondition_TimedWait(arg.c, arg.m, 5000);
arg.reconnected = false;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

test("Send a message to foo: ");
s = natsMsg_Create(&msg, "foo", NULL, "bar", 3);
IFOK(s, natsConnection_PublishMsg(nc, msg));
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Receive the message: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond((s == NATS_OK) && (msg != NULL));
natsMsg_Destroy(msg);
msg = NULL;

natsConnection_Close(nc);
test("Reconect on a close connection errors: ");
s = natsConnection_Reconnect(nc);
testCond(s == NATS_CONNECTION_CLOSED);

test("Reconect on a NULL connection errors: ");
s = natsConnection_Reconnect(NULL);
testCond(s == NATS_INVALID_ARG);

natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);
natsOptions_Destroy(opts);
_destroyDefaultThreadArgs(&arg);
}

static void
_stopServerInThread(void *closure)
{
Expand Down Expand Up @@ -36210,6 +36287,7 @@ static testInfo allTests[] =
{"RetryOnFailedConnect", test_RetryOnFailedConnect},
{"NoPartialOnReconnect", test_NoPartialOnReconnect},
{"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest},
{"ForcedReconnect", test_ForcedReconnect},

{"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock},
{"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit},
Expand Down