Skip to content

Commit

Permalink
Merge pull request #468 from nats-io/js_ordered_consumers
Browse files Browse the repository at this point in the history
[ADDED] JetStream Ordered Consumers
  • Loading branch information
kozlovic committed Oct 5, 2021
2 parents 835260b + 74c5514 commit 7281f61
Show file tree
Hide file tree
Showing 10 changed files with 1,023 additions and 96 deletions.
122 changes: 71 additions & 51 deletions src/conn.c
Expand Up @@ -1000,8 +1000,8 @@ _connectProto(natsConnection *nc, char **proto)
return s;
}

static natsStatus
_sendUnsubProto(natsConnection *nc, int64_t subId, int max)
natsStatus
natsConn_sendUnsubProto(natsConnection *nc, int64_t subId, int max)
{
natsStatus s = NATS_OK;
char *proto = NULL;
Expand All @@ -1023,14 +1023,31 @@ _sendUnsubProto(natsConnection *nc, int64_t subId, int max)
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
natsConn_sendSubProto(natsConnection *nc, const char *subject, const char *queue, int64_t sid)
{
natsStatus s = NATS_OK;
char *proto = NULL;
int res = 0;

res = nats_asprintf(&proto, _SUB_PROTO_, subject, (queue == NULL ? "" : queue), sid);
if (res < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);
else
{
s = natsConn_bufferWriteString(nc, proto);
NATS_FREE(proto);
proto = NULL;
}
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_resendSubscriptions(natsConnection *nc)
{
natsStatus s = NATS_OK;
natsSubscription *sub = NULL;
natsHashIter iter;
char *proto;
int res;
int adjustedMax;
natsSubscription **subs = NULL;
int i = 0;
Expand Down Expand Up @@ -1066,8 +1083,6 @@ _resendSubscriptions(natsConnection *nc)
{
sub = subs[i];

proto = NULL;

adjustedMax = 0;
natsSub_Lock(sub);
if (natsSub_drainStarted(sub))
Expand All @@ -1085,27 +1100,14 @@ _resendSubscriptions(natsConnection *nc)
if (adjustedMax == 0)
{
natsSub_Unlock(sub);
s = _sendUnsubProto(nc, sub->sid, 0);
s = natsConn_sendUnsubProto(nc, sub->sid, 0);
continue;
}
}

res = nats_asprintf(&proto, _SUB_PROTO_,
sub->subject,
(sub->queue == NULL ? "" : sub->queue),
(int) sub->sid);
if (res < 0)
s = NATS_NO_MEMORY;

if (s == NATS_OK)
{
s = natsConn_bufferWriteString(nc, proto);
NATS_FREE(proto);
proto = NULL;
}

s = natsConn_sendSubProto(nc, sub->subject, sub->queue, sub->sid);
if ((s == NATS_OK) && (adjustedMax > 0))
s = _sendUnsubProto(nc, sub->sid, adjustedMax);
s = natsConn_sendUnsubProto(nc, sub->sid, adjustedMax);

// Hold the lock up to that point so we are sure not to resend
// any SUB/UNSUB for a subscription that is in draining mode.
Expand Down Expand Up @@ -2520,14 +2522,20 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
bool ctrlMsg = false;
const char *fcReply= NULL;
int jct = 0;
natsMsgFilter mf = NULL;
void *mfc = NULL;

natsMutex_Lock(nc->subsMu);

nc->stats.inMsgs += 1;
nc->stats.inBytes += (uint64_t) bufLen;

sub = natsHash_Get(nc->subs, nc->ps->ma.sid);

if (sub != NULL)
{
mf = nc->filter;
mfc = nc->filterClosure;
}
natsMutex_Unlock(nc->subsMu);

if (sub == NULL)
Expand All @@ -2544,6 +2552,13 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
// computed as the bufLen - header size.
dl = msg->dataLen;

if (mf != NULL)
{
(*mf)(nc, &msg, mfc);
if (msg == NULL)
return NATS_OK;
}

// Pick mutex, condition variable and list based on if the sub is
// part of a global delivery thread pool or not.
// Note about `list`: this is used only to link messages, but
Expand Down Expand Up @@ -2581,6 +2596,18 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
// We will send it at the end of this function.
natsMsgHeader_Get(msg, jsConsumerStalledHdr, &fcReply);
}
else if (!ctrlMsg && jsi->ordered)
{
bool replaced = false;

s = jsSub_checkOrderedMsg(sub, mu, msg, &replaced);
if ((s != NATS_OK) || replaced)
{
natsMutex_Unlock(mu);
natsMsg_Destroy(msg);
return s;
}
}
}

if (!ctrlMsg)
Expand Down Expand Up @@ -2638,7 +2665,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
else if ((jct == jsCtrlHeartbeat) && (msg->reply == NULL))
{
// Handle control heartbeat messages.
s = jsSub_processSequenceMismatch(sub, msg, &sm);
s = jsSub_processSequenceMismatch(sub, mu, msg, &sm);
}
else if ((jct == jsCtrlFlowControl) && (msg->reply != NULL))
{
Expand Down Expand Up @@ -2936,33 +2963,17 @@ natsConn_subscribeImpl(natsSubscription **newSub,
// so that we can suppress here.
if (!natsConn_isReconnecting(nc))
{
char *proto = NULL;
int res = 0;

res = nats_asprintf(&proto, _SUB_PROTO_,
subj,
(queue == NULL ? "" : queue),
(int) sub->sid);
if (res < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);

SET_WRITE_DEADLINE(nc);
s = natsConn_sendSubProto(nc, subj, queue, sub->sid);
if (s == NATS_OK)
{
SET_WRITE_DEADLINE(nc);
s = natsConn_bufferWriteString(nc, proto);
if (s == NATS_OK)
s = natsConn_flushOrKickFlusher(nc);

// We should not return a failure if we get an issue
// with the buffer write (except if it is no memory).
// For IO errors (if we just got disconnected), the
// reconnect logic will resend the sub protocol.

if (s != NATS_NO_MEMORY)
s = NATS_OK;
}

NATS_FREE(proto);
s = natsConn_flushOrKickFlusher(nc);

// We should not return a failure if we get an issue
// with the buffer write (except if it is no memory).
// For IO errors (if we just got disconnected), the
// reconnect logic will resend the sub protocol.
if (s != NATS_NO_MEMORY)
s = NATS_OK;
}
}

Expand Down Expand Up @@ -3051,7 +3062,7 @@ natsConn_unsubscribe(natsConnection *nc, natsSubscription *sub, int max, bool dr
SET_WRITE_DEADLINE(nc);
// We will send these for all subs when we reconnect
// so that we can suppress here.
s = _sendUnsubProto(nc, sub->sid, max);
s = natsConn_sendUnsubProto(nc, sub->sid, max);
if (s == NATS_OK)
s = natsConn_flushOrKickFlusher(nc);

Expand Down Expand Up @@ -4271,3 +4282,12 @@ natsConnection_GetLocalIPAndPort(natsConnection *nc, char **ip, int *port)

return NATS_UPDATE_ERR_STACK(s);
}

void
natsConn_setFilterWithClosure(natsConnection *nc, natsMsgFilter f, void* closure)
{
natsMutex_Lock(nc->subsMu);
nc->filter = f;
nc->filterClosure = closure;
natsMutex_Unlock(nc->subsMu);
}
11 changes: 11 additions & 0 deletions src/conn.h
Expand Up @@ -131,6 +131,17 @@ natsConn_userFromFile(char **userJWT, char **customErrTxt, void *closure);
natsStatus
natsConn_signatureHandler(char **customErrTxt, unsigned char **sig, int *sigLen, const char *nonce, void *closure);

natsStatus
natsConn_sendSubProto(natsConnection *nc, const char *subject, const char *queue, int64_t sid);

natsStatus
natsConn_sendUnsubProto(natsConnection *nc, int64_t subId, int max);

#define natsConn_setFilter(c, f) natsConn_setFilterWithClosure((c), (f), NULL)

void
natsConn_setFilterWithClosure(natsConnection *nc, natsMsgFilter f, void* closure);

void
natsConn_close(natsConnection *nc);

Expand Down
6 changes: 3 additions & 3 deletions src/hash.c
Expand Up @@ -619,16 +619,16 @@ natsStrHash_SetEx(natsStrHash *hash, char *key, bool copyKey, bool freeKey,
}

void*
natsStrHash_Get(natsStrHash *hash, char *key)
natsStrHash_GetEx(natsStrHash *hash, char *key, int keyLen)
{
natsStrHashEntry *e;
uint32_t hk = natsStrHash_Hash(key, (int) strlen(key));
uint32_t hk = natsStrHash_Hash(key, keyLen);

e = hash->bkts[hk & hash->mask];
while (e != NULL)
{
if ((e->hk == hk)
&& (strcmp(e->key, key) == 0))
&& (strncmp(e->key, key, keyLen) == 0))
{
return e->data;
}
Expand Down
4 changes: 3 additions & 1 deletion src/hash.h
Expand Up @@ -129,8 +129,10 @@ natsStatus
natsStrHash_SetEx(natsStrHash *hash, char *key, bool copyKey, bool freeKey,
void *data, void **oldData);

#define natsStrHash_Get(h, k) natsStrHash_GetEx((h), (k), (int) strlen(k))

void*
natsStrHash_Get(natsStrHash *hash, char *key);
natsStrHash_GetEx(natsStrHash *hash, char *key, int keyLen);

void*
natsStrHash_Remove(natsStrHash *hash, char *key);
Expand Down

0 comments on commit 7281f61

Please sign in to comment.