Skip to content

Commit

Permalink
Merge 59c3c4e into d6f3747
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed May 22, 2023
2 parents d6f3747 + 59c3c4e commit 4b2293c
Show file tree
Hide file tree
Showing 10 changed files with 544 additions and 256 deletions.
136 changes: 41 additions & 95 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,17 @@ _processInfo(natsConnection *nc, char *info, int len)
{
natsStatus s = NATS_OK;
nats_JSON *json = NULL;
bool postDiscoveredServersCb = false;
bool postLameDuckCb = false;

if (info == NULL)
return NATS_OK;

natsOptions_lock(nc->opts);
postDiscoveredServersCb = (nc->opts->discoveredServersCb != NULL);
postLameDuckCb = (nc->opts->lameDuckCb != NULL);
natsOptions_unlock(nc->opts);

_clearServerInfo(&(nc->info));

s = nats_JSONParse(&json, info, len);
Expand Down Expand Up @@ -574,13 +581,13 @@ _processInfo(natsConnection *nc, char *info, int len)
nc->info.connectURLsCount,
tlsName,
&added);
if ((s == NATS_OK) && added && !nc->initc && (nc->opts->discoveredServersCb != NULL))
if ((s == NATS_OK) && added && !nc->initc && postDiscoveredServersCb)
natsAsyncCb_PostConnHandler(nc, ASYNC_DISCOVERED_SERVERS);
}
// Process the LDM callback after the above. It will cover cases where
// we have connect URLs and invoke discovered server callback, and case
// where we don't.
if ((s == NATS_OK) && nc->info.lameDuckMode && (nc->opts->lameDuckCb != NULL))
if ((s == NATS_OK) && nc->info.lameDuckMode && postLameDuckCb)
natsAsyncCb_PostConnHandler(nc, ASYNC_LAME_DUCK_MODE);

if (s != NATS_OK)
Expand Down Expand Up @@ -1523,6 +1530,15 @@ _doReconnect(void *arg)
int i = 0;
natsCustomReconnectDelayHandler crd = NULL;
void *crdClosure = NULL;
bool postDisconnectedCb = false;
bool postReconnectedCb = false;
bool postConnectedCb = false;

natsOptions_lock(nc->opts);
postDisconnectedCb = (nc->opts->disconnectedCb != NULL);
postReconnectedCb = (nc->opts->reconnectedCb != NULL);
postConnectedCb = (nc->opts->connectedCb != NULL);
natsOptions_unlock(nc->opts);

natsConn_Lock(nc);

Expand All @@ -1542,7 +1558,7 @@ _doReconnect(void *arg)

// Perform appropriate callback if needed for a disconnect.
// (do not do this if we are here on initial connect failure)
if (!nc->initc && (nc->opts->disconnectedCb != NULL))
if (!nc->initc && postDisconnectedCb)
natsAsyncCb_PostConnHandler(nc, ASYNC_DISCONNECTED);

crd = nc->opts->customReconnectDelayCB;
Expand Down Expand Up @@ -1706,15 +1722,15 @@ _doReconnect(void *arg)
// This was the initial connect. Set this to false.
nc->initc = false;
// Invoke the callback.
if (nc->opts->connectedCb != NULL)
if (postConnectedCb)
natsAsyncCb_PostConnHandler(nc, ASYNC_CONNECTED);
}
else
{
// Call reconnectedCB if appropriate. Since we are in a separate
// thread, we could invoke the callback directly, however, we
// still post it so all callbacks from a connection are serialized.
if (nc->opts->reconnectedCb != NULL)
if (postReconnectedCb)
natsAsyncCb_PostConnHandler(nc, ASYNC_RECONNECTED);
}

Expand Down Expand Up @@ -1998,13 +2014,20 @@ _connect(natsConnection *nc)
int max = 0;
int64_t wtime = 0;
bool retry = false;
bool retryOnFailedConnect = false;
bool hasConnectedCb = false;

natsOptions_lock(nc->opts);
hasConnectedCb = (nc->opts->connectedCb != NULL);
retryOnFailedConnect = nc->opts->retryOnFailedConnect;
natsOptions_unlock(nc->opts);

natsConn_Lock(nc);
nc->initc = true;

pool = nc->srvPool;

if ((nc->opts->retryOnFailedConnect) && (nc->opts->connectedCb == NULL))
if ((retryOnFailedConnect) && !hasConnectedCb)
{
retry = true;
max = nc->opts->maxReconnect;
Expand Down Expand Up @@ -2057,6 +2080,7 @@ _connect(natsConnection *nc)
retSts = NATS_OK;
}
}

if (!retry)
break;

Expand All @@ -2070,8 +2094,8 @@ _connect(natsConnection *nc)

// If not connected and retry asynchronously on failed connect
if ((nc->status != NATS_CONN_STATUS_CONNECTED)
&& nc->opts->retryOnFailedConnect
&& (nc->opts->connectedCb != NULL))
&& retryOnFailedConnect
&& hasConnectedCb)
{
natsConn_Unlock(nc);

Expand Down Expand Up @@ -2472,8 +2496,15 @@ _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doC
struct threadsToJoin ttj;
bool sockWasActive = false;
bool detach = false;
bool postClosedCb = false;
bool postDisconnectedCb = false;
natsSubscription *sub = NULL;

natsOptions_lock(nc->opts);
postClosedCb = (nc->opts->closedCb != NULL);
postDisconnectedCb = (nc->opts->disconnectedCb != NULL);
natsOptions_unlock(nc->opts);

natsConn_lockAndRetain(nc);

// If invoked from the public Close() call, attempt to flush
Expand Down Expand Up @@ -2547,7 +2578,7 @@ _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doC
// Perform appropriate callback if needed for a disconnect.
// Do not invoke if we were disconnected and failed to reconnect (since
// it has already been invoked in doReconnect).
if (doCBs && !nc->rle && (nc->opts->disconnectedCb != NULL) && sockWasActive)
if (doCBs && !nc->rle && postDisconnectedCb && sockWasActive)
natsAsyncCb_PostConnHandler(nc, ASYNC_DISCONNECTED);

sub = nc->respMux;
Expand All @@ -2563,7 +2594,7 @@ _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doC
natsConn_Lock(nc);

// Perform appropriate callback if needed for a connection closed.
if (doCBs && (nc->opts->closedCb != NULL))
if (doCBs && postClosedCb)
natsAsyncCb_PostConnHandler(nc, ASYNC_CLOSED);

nc->status = status;
Expand Down Expand Up @@ -4432,88 +4463,3 @@ natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus
}
fflush(stderr);
}

natsStatus
natsConn_getErrorCallback(natsErrHandler *cb, void **closure, natsConnection *nc)
{
if ((nc == NULL) || (nc->opts == NULL) || (nc->opts->mu == NULL) || (cb == NULL) || (closure == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsMutex_Lock(nc->opts->mu);
*cb = nc->opts->asyncErrCb;
*closure = nc->opts->asyncErrCbClosure;
natsMutex_Unlock(nc->opts->mu);

return NATS_OK;
}

natsStatus
natsConn_setErrorCallback(natsConnection *nc, natsErrHandler cb, void *closure)
{
// The error callback must not be NULL, other code may rely on it.
if ((nc == NULL) || (nc->opts == NULL) || (nc->opts->mu == NULL) || (cb == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsMutex_Lock(nc->opts->mu);
nc->opts->asyncErrCb = cb;
nc->opts->asyncErrCbClosure = closure;
natsMutex_Unlock(nc->opts->mu);

return NATS_OK;
}

natsStatus
natsConn_getClosedCallback(natsConnectionHandler *cb, void **closure, natsConnection *nc)
{
if ((nc == NULL) || (nc->opts == NULL) || (nc->opts->mu == NULL) || (cb == NULL) || (closure == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsMutex_Lock(nc->opts->mu);
*cb = nc->opts->closedCb;
*closure = nc->opts->closedCbClosure;
natsMutex_Unlock(nc->opts->mu);

return NATS_OK;
}

natsStatus
natsConn_setClosedCallback(natsConnection *nc, natsConnectionHandler cb, void *closure)
{
if (nc == NULL || (nc->opts == NULL) || (nc->opts->mu == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsMutex_Lock(nc->opts->mu);
nc->opts->closedCb = cb;
nc->opts->closedCbClosure = closure;
natsMutex_Unlock(nc->opts->mu);

return NATS_OK;
}

natsStatus
natsConn_getDisconnectedCallback(natsConnectionHandler *cb, void **closure, natsConnection *nc)
{
if ((nc == NULL) || (nc->opts == NULL) || (nc->opts->mu == NULL) || (cb == NULL) || (closure == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsMutex_Lock(nc->opts->mu);
*cb = nc->opts->disconnectedCb;
*closure = nc->opts->disconnectedCbClosure;
natsMutex_Unlock(nc->opts->mu);

return NATS_OK;
}

natsStatus
natsConn_setDisconnectedCallback(natsConnection *nc, natsConnectionHandler cb, void *closure)
{
if (nc == NULL || (nc->opts == NULL) || (nc->opts->mu == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

natsMutex_Lock(nc->opts->mu);
nc->opts->disconnectedCb = cb;
nc->opts->disconnectedCbClosure = closure;
natsMutex_Unlock(nc->opts->mu);

return NATS_OK;
}
19 changes: 0 additions & 19 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,23 +160,4 @@ natsConn_close(natsConnection *nc);
void
natsConn_destroy(natsConnection *nc, bool fromPublicDestroy);

natsStatus
natsConn_setErrorCallback(natsConnection *nc, natsErrHandler cb, void *closure);

natsStatus
natsConn_getErrorCallback(natsErrHandler *cb, void **closure, natsConnection *nc);

natsStatus
natsConn_setClosedCallback(natsConnection *nc, natsConnectionHandler cb, void *closure);

natsStatus
natsConn_getClosedCallback(natsConnectionHandler *cb, void **closure, natsConnection *nc);

natsStatus
natsConn_setDisconnectedCallback(natsConnection *nc, natsConnectionHandler cb, void *closure);

natsStatus
natsConn_getDisconnectedCallback(natsConnectionHandler *cb, void **closure, natsConnection *nc);


#endif /* CONN_H_ */
40 changes: 11 additions & 29 deletions src/micro.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@

#include "microp.h"
#include "conn.h"
#include "mem.h"
#include "opts.h"

static microError *new_service(microService **ptr, natsConnection *nc);
static void free_service(microService *m);
static microError *wrap_connection_event_callbacks(microService *m);
static microError *unwrap_connection_event_callbacks(microService *m);
static void unwrap_connection_event_callbacks(microService *m);

microError *
micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *cfg)
Expand Down Expand Up @@ -178,7 +178,7 @@ microService_Stop(microService *m)
if (!is_running)
return NULL;

err = unwrap_connection_event_callbacks(m);
unwrap_connection_event_callbacks(m);

for (ep = first_ep; (err == NULL) && (ep != NULL); ep = first_ep)
{
Expand Down Expand Up @@ -379,11 +379,6 @@ on_connection_closed(natsConnection *nc, void *closure)
return;

microService_Stop(m);

if (m->prev_on_connection_closed != NULL)
{
(*m->prev_on_connection_closed)(nc, m->prev_on_connection_closed_closure);
}
}

static void
Expand Down Expand Up @@ -424,43 +419,30 @@ on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *closure)

// <>/<> TODO: Should we stop the service? The Go client does.
microService_Stop(m);

if (m->prev_on_error != NULL)
{
(*m->prev_on_error)(nc, sub, s, m->prev_on_error_closure);
}
}

static microError *
wrap_connection_event_callbacks(microService *m)
{
natsStatus s = NATS_OK;

if (m == NULL)
if ((m == NULL) || (m->nc == NULL) || (m->nc->opts == NULL))
return micro_ErrorInvalidArg;

IFOK(s, natsConn_getClosedCallback(&m->prev_on_connection_closed, &m->prev_on_connection_closed_closure, m->nc));
IFOK(s, natsConn_setClosedCallback(m->nc, on_connection_closed, m));

IFOK(s, natsConn_getErrorCallback(&m->prev_on_error, &m->prev_on_error_closure, m->nc));
IFOK(s, natsConn_setErrorCallback(m->nc, on_error, m));
IFOK(s, natsOptions_addConnectionClosedCallback(m->nc->opts,on_connection_closed, m));
IFOK(s, natsOptions_addErrorCallback(m->nc->opts, on_error, m));

return microError_Wrapf(micro_ErrorFromStatus(s), "failed to wrap connection event callbacks");
}

static microError *
static void
unwrap_connection_event_callbacks(microService *m)
{
natsStatus s = NATS_OK;

if (m == NULL)
return micro_ErrorInvalidArg;

IFOK(s, natsConn_setClosedCallback(m->nc, m->prev_on_connection_closed, m->prev_on_connection_closed_closure));
IFOK(s, natsConn_setDisconnectedCallback(m->nc, m->prev_on_connection_disconnected, m->prev_on_connection_disconnected_closure));
IFOK(s, natsConn_setErrorCallback(m->nc, m->prev_on_error, m->prev_on_error_closure));
if ((m == NULL) || (m->nc == NULL) || (m->nc->opts == NULL))
return;

return microError_Wrapf(micro_ErrorFromStatus(s), "failed to unwrap connection event callbacks");
natsOptions_removeConnectionClosedCallback(m->nc->opts, on_connection_closed, m);
natsOptions_removeErrorCallback(m->nc->opts, on_error, m);
}

microError *
Expand Down
7 changes: 0 additions & 7 deletions src/microp.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,6 @@ struct micro_service_s
struct micro_endpoint_s *first_ep;
int num_eps;

natsConnectionHandler prev_on_connection_closed;
void *prev_on_connection_closed_closure;
natsConnectionHandler prev_on_connection_disconnected;
void *prev_on_connection_disconnected_closure;
natsErrHandler prev_on_error;
void *prev_on_error_closure;

int64_t started; // UTC time expressed as number of nanoseconds since epoch.
bool is_running;
};
Expand Down

0 comments on commit 4b2293c

Please sign in to comment.