Skip to content

Commit

Permalink
Implemented ordered consumers
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Sep 29, 2021
1 parent 4fce32a commit ebc6ce0
Show file tree
Hide file tree
Showing 8 changed files with 881 additions and 84 deletions.
111 changes: 57 additions & 54 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -1000,8 +1000,8 @@ _connectProto(natsConnection *nc, char **proto)
return s;
}

static natsStatus
_sendUnsubProto(natsConnection *nc, int64_t subId, int max)
natsStatus
natsConn_sendUnsubProto(natsConnection *nc, int64_t subId, int max)
{
natsStatus s = NATS_OK;
char *proto = NULL;
Expand All @@ -1023,14 +1023,31 @@ _sendUnsubProto(natsConnection *nc, int64_t subId, int max)
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
natsConn_sendSubProto(natsConnection *nc, const char *subject, const char *queue, int64_t sid)
{
natsStatus s = NATS_OK;
char *proto = NULL;
int res = 0;

res = nats_asprintf(&proto, _SUB_PROTO_, subject, (queue == NULL ? "" : queue), sid);
if (res < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);
else
{
s = natsConn_bufferWriteString(nc, proto);
NATS_FREE(proto);
proto = NULL;
}
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_resendSubscriptions(natsConnection *nc)
{
natsStatus s = NATS_OK;
natsSubscription *sub = NULL;
natsHashIter iter;
char *proto;
int res;
int adjustedMax;
natsSubscription **subs = NULL;
int i = 0;
Expand Down Expand Up @@ -1066,8 +1083,6 @@ _resendSubscriptions(natsConnection *nc)
{
sub = subs[i];

proto = NULL;

adjustedMax = 0;
natsSub_Lock(sub);
if (natsSub_drainStarted(sub))
Expand All @@ -1085,27 +1100,14 @@ _resendSubscriptions(natsConnection *nc)
if (adjustedMax == 0)
{
natsSub_Unlock(sub);
s = _sendUnsubProto(nc, sub->sid, 0);
s = natsConn_sendUnsubProto(nc, sub->sid, 0);
continue;
}
}

res = nats_asprintf(&proto, _SUB_PROTO_,
sub->subject,
(sub->queue == NULL ? "" : sub->queue),
(int) sub->sid);
if (res < 0)
s = NATS_NO_MEMORY;

if (s == NATS_OK)
{
s = natsConn_bufferWriteString(nc, proto);
NATS_FREE(proto);
proto = NULL;
}

s = natsConn_sendSubProto(nc, sub->subject, sub->queue, sub->sid);
if ((s == NATS_OK) && (adjustedMax > 0))
s = _sendUnsubProto(nc, sub->sid, adjustedMax);
s = natsConn_sendUnsubProto(nc, sub->sid, adjustedMax);

// Hold the lock up to that point so we are sure not to resend
// any SUB/UNSUB for a subscription that is in draining mode.
Expand Down Expand Up @@ -2521,6 +2523,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
const char *fcReply= NULL;
int jct = 0;
natsMsgFilter mf = NULL;
void *mfc = NULL;

natsMutex_Lock(nc->subsMu);

Expand All @@ -2529,7 +2532,10 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)

sub = natsHash_Get(nc->subs, nc->ps->ma.sid);
if (sub != NULL)
mf = nc->filter;
{
mf = nc->filter;
mfc = nc->filterClosure;
}
natsMutex_Unlock(nc->subsMu);

if (sub == NULL)
Expand All @@ -2548,7 +2554,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)

if (mf != NULL)
{
(*mf)(nc, &msg);
(*mf)(nc, &msg, mfc);
if (msg == NULL)
return NATS_OK;
}
Expand Down Expand Up @@ -2590,6 +2596,18 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
// We will send it at the end of this function.
natsMsgHeader_Get(msg, jsConsumerStalledHdr, &fcReply);
}
else if (!ctrlMsg && jsi->ordered)
{
bool replaced = false;

s = jsSub_checkOrderedMsg(sub, mu, msg, &replaced);
if ((s != NATS_OK) || replaced)
{
natsMutex_Unlock(mu);
natsMsg_Destroy(msg);
return s;
}
}
}

if (!ctrlMsg)
Expand Down Expand Up @@ -2647,7 +2665,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
else if ((jct == jsCtrlHeartbeat) && (msg->reply == NULL))
{
// Handle control heartbeat messages.
s = jsSub_processSequenceMismatch(sub, msg, &sm);
s = jsSub_processSequenceMismatch(sub, mu, msg, &sm);
}
else if ((jct == jsCtrlFlowControl) && (msg->reply != NULL))
{
Expand Down Expand Up @@ -2945,33 +2963,17 @@ natsConn_subscribeImpl(natsSubscription **newSub,
// so that we can suppress here.
if (!natsConn_isReconnecting(nc))
{
char *proto = NULL;
int res = 0;

res = nats_asprintf(&proto, _SUB_PROTO_,
subj,
(queue == NULL ? "" : queue),
(int) sub->sid);
if (res < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);

SET_WRITE_DEADLINE(nc);
s = natsConn_sendSubProto(nc, subj, queue, sub->sid);
if (s == NATS_OK)
{
SET_WRITE_DEADLINE(nc);
s = natsConn_bufferWriteString(nc, proto);
if (s == NATS_OK)
s = natsConn_flushOrKickFlusher(nc);

// We should not return a failure if we get an issue
// with the buffer write (except if it is no memory).
// For IO errors (if we just got disconnected), the
// reconnect logic will resend the sub protocol.

if (s != NATS_NO_MEMORY)
s = NATS_OK;
}

NATS_FREE(proto);
s = natsConn_flushOrKickFlusher(nc);

// We should not return a failure if we get an issue
// with the buffer write (except if it is no memory).
// For IO errors (if we just got disconnected), the
// reconnect logic will resend the sub protocol.
if (s != NATS_NO_MEMORY)
s = NATS_OK;
}
}

Expand Down Expand Up @@ -3060,7 +3062,7 @@ natsConn_unsubscribe(natsConnection *nc, natsSubscription *sub, int max, bool dr
SET_WRITE_DEADLINE(nc);
// We will send these for all subs when we reconnect
// so that we can suppress here.
s = _sendUnsubProto(nc, sub->sid, max);
s = natsConn_sendUnsubProto(nc, sub->sid, max);
if (s == NATS_OK)
s = natsConn_flushOrKickFlusher(nc);

Expand Down Expand Up @@ -4282,9 +4284,10 @@ natsConnection_GetLocalIPAndPort(natsConnection *nc, char **ip, int *port)
}

void
natsConn_setFilter(natsConnection *nc, natsMsgFilter f)
natsConn_setFilterWithClosure(natsConnection *nc, natsMsgFilter f, void* closure)
{
natsMutex_Lock(nc->subsMu);
nc->filter = f;
nc->filter = f;
nc->filterClosure = closure;
natsMutex_Unlock(nc->subsMu);
}
10 changes: 9 additions & 1 deletion src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,16 @@ natsConn_userFromFile(char **userJWT, char **customErrTxt, void *closure);
natsStatus
natsConn_signatureHandler(char **customErrTxt, unsigned char **sig, int *sigLen, const char *nonce, void *closure);

natsStatus
natsConn_sendSubProto(natsConnection *nc, const char *subject, const char *queue, int64_t sid);

natsStatus
natsConn_sendUnsubProto(natsConnection *nc, int64_t subId, int max);

#define natsConn_setFilter(c, f) natsConn_setFilterWithClosure((c), (f), NULL)

void
natsConn_setFilter(natsConnection *nc, natsMsgFilter f);
natsConn_setFilterWithClosure(natsConnection *nc, natsMsgFilter f, void* closure);

void
natsConn_close(natsConnection *nc);
Expand Down

0 comments on commit ebc6ce0

Please sign in to comment.