diff --git a/src/conn.c b/src/conn.c index de1a7e28..18661d1c 100644 --- a/src/conn.c +++ b/src/conn.c @@ -1000,8 +1000,8 @@ _connectProto(natsConnection *nc, char **proto) return s; } -static natsStatus -_sendUnsubProto(natsConnection *nc, int64_t subId, int max) +natsStatus +natsConn_sendUnsubProto(natsConnection *nc, int64_t subId, int max) { natsStatus s = NATS_OK; char *proto = NULL; @@ -1023,14 +1023,31 @@ _sendUnsubProto(natsConnection *nc, int64_t subId, int max) return NATS_UPDATE_ERR_STACK(s); } +natsStatus +natsConn_sendSubProto(natsConnection *nc, const char *subject, const char *queue, int64_t sid) +{ + natsStatus s = NATS_OK; + char *proto = NULL; + int res = 0; + + res = nats_asprintf(&proto, _SUB_PROTO_, subject, (queue == NULL ? "" : queue), sid); + if (res < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + { + s = natsConn_bufferWriteString(nc, proto); + NATS_FREE(proto); + proto = NULL; + } + return NATS_UPDATE_ERR_STACK(s); +} + static natsStatus _resendSubscriptions(natsConnection *nc) { natsStatus s = NATS_OK; natsSubscription *sub = NULL; natsHashIter iter; - char *proto; - int res; int adjustedMax; natsSubscription **subs = NULL; int i = 0; @@ -1066,8 +1083,6 @@ _resendSubscriptions(natsConnection *nc) { sub = subs[i]; - proto = NULL; - adjustedMax = 0; natsSub_Lock(sub); if (natsSub_drainStarted(sub)) @@ -1085,27 +1100,14 @@ _resendSubscriptions(natsConnection *nc) if (adjustedMax == 0) { natsSub_Unlock(sub); - s = _sendUnsubProto(nc, sub->sid, 0); + s = natsConn_sendUnsubProto(nc, sub->sid, 0); continue; } } - res = nats_asprintf(&proto, _SUB_PROTO_, - sub->subject, - (sub->queue == NULL ? "" : sub->queue), - (int) sub->sid); - if (res < 0) - s = NATS_NO_MEMORY; - - if (s == NATS_OK) - { - s = natsConn_bufferWriteString(nc, proto); - NATS_FREE(proto); - proto = NULL; - } - + s = natsConn_sendSubProto(nc, sub->subject, sub->queue, sub->sid); if ((s == NATS_OK) && (adjustedMax > 0)) - s = _sendUnsubProto(nc, sub->sid, adjustedMax); + s = natsConn_sendUnsubProto(nc, sub->sid, adjustedMax); // Hold the lock up to that point so we are sure not to resend // any SUB/UNSUB for a subscription that is in draining mode. @@ -2520,6 +2522,8 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) bool ctrlMsg = false; const char *fcReply= NULL; int jct = 0; + natsMsgFilter mf = NULL; + void *mfc = NULL; natsMutex_Lock(nc->subsMu); @@ -2527,7 +2531,11 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) nc->stats.inBytes += (uint64_t) bufLen; sub = natsHash_Get(nc->subs, nc->ps->ma.sid); - + if (sub != NULL) + { + mf = nc->filter; + mfc = nc->filterClosure; + } natsMutex_Unlock(nc->subsMu); if (sub == NULL) @@ -2544,6 +2552,13 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) // computed as the bufLen - header size. dl = msg->dataLen; + if (mf != NULL) + { + (*mf)(nc, &msg, mfc); + if (msg == NULL) + return NATS_OK; + } + // Pick mutex, condition variable and list based on if the sub is // part of a global delivery thread pool or not. // Note about `list`: this is used only to link messages, but @@ -2581,6 +2596,18 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) // We will send it at the end of this function. natsMsgHeader_Get(msg, jsConsumerStalledHdr, &fcReply); } + else if (!ctrlMsg && jsi->ordered) + { + bool replaced = false; + + s = jsSub_checkOrderedMsg(sub, mu, msg, &replaced); + if ((s != NATS_OK) || replaced) + { + natsMutex_Unlock(mu); + natsMsg_Destroy(msg); + return s; + } + } } if (!ctrlMsg) @@ -2638,7 +2665,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) else if ((jct == jsCtrlHeartbeat) && (msg->reply == NULL)) { // Handle control heartbeat messages. - s = jsSub_processSequenceMismatch(sub, msg, &sm); + s = jsSub_processSequenceMismatch(sub, mu, msg, &sm); } else if ((jct == jsCtrlFlowControl) && (msg->reply != NULL)) { @@ -2936,33 +2963,17 @@ natsConn_subscribeImpl(natsSubscription **newSub, // so that we can suppress here. if (!natsConn_isReconnecting(nc)) { - char *proto = NULL; - int res = 0; - - res = nats_asprintf(&proto, _SUB_PROTO_, - subj, - (queue == NULL ? "" : queue), - (int) sub->sid); - if (res < 0) - s = nats_setDefaultError(NATS_NO_MEMORY); - + SET_WRITE_DEADLINE(nc); + s = natsConn_sendSubProto(nc, subj, queue, sub->sid); if (s == NATS_OK) - { - SET_WRITE_DEADLINE(nc); - s = natsConn_bufferWriteString(nc, proto); - if (s == NATS_OK) - s = natsConn_flushOrKickFlusher(nc); - - // We should not return a failure if we get an issue - // with the buffer write (except if it is no memory). - // For IO errors (if we just got disconnected), the - // reconnect logic will resend the sub protocol. - - if (s != NATS_NO_MEMORY) - s = NATS_OK; - } - - NATS_FREE(proto); + s = natsConn_flushOrKickFlusher(nc); + + // We should not return a failure if we get an issue + // with the buffer write (except if it is no memory). + // For IO errors (if we just got disconnected), the + // reconnect logic will resend the sub protocol. + if (s != NATS_NO_MEMORY) + s = NATS_OK; } } @@ -3051,7 +3062,7 @@ natsConn_unsubscribe(natsConnection *nc, natsSubscription *sub, int max, bool dr SET_WRITE_DEADLINE(nc); // We will send these for all subs when we reconnect // so that we can suppress here. - s = _sendUnsubProto(nc, sub->sid, max); + s = natsConn_sendUnsubProto(nc, sub->sid, max); if (s == NATS_OK) s = natsConn_flushOrKickFlusher(nc); @@ -4271,3 +4282,12 @@ natsConnection_GetLocalIPAndPort(natsConnection *nc, char **ip, int *port) return NATS_UPDATE_ERR_STACK(s); } + +void +natsConn_setFilterWithClosure(natsConnection *nc, natsMsgFilter f, void* closure) +{ + natsMutex_Lock(nc->subsMu); + nc->filter = f; + nc->filterClosure = closure; + natsMutex_Unlock(nc->subsMu); +} diff --git a/src/conn.h b/src/conn.h index 21a297d2..a969e766 100644 --- a/src/conn.h +++ b/src/conn.h @@ -131,6 +131,17 @@ natsConn_userFromFile(char **userJWT, char **customErrTxt, void *closure); natsStatus natsConn_signatureHandler(char **customErrTxt, unsigned char **sig, int *sigLen, const char *nonce, void *closure); +natsStatus +natsConn_sendSubProto(natsConnection *nc, const char *subject, const char *queue, int64_t sid); + +natsStatus +natsConn_sendUnsubProto(natsConnection *nc, int64_t subId, int max); + +#define natsConn_setFilter(c, f) natsConn_setFilterWithClosure((c), (f), NULL) + +void +natsConn_setFilterWithClosure(natsConnection *nc, natsMsgFilter f, void* closure); + void natsConn_close(natsConnection *nc); diff --git a/src/hash.c b/src/hash.c index 27314faa..a7b2ccbc 100644 --- a/src/hash.c +++ b/src/hash.c @@ -619,16 +619,16 @@ natsStrHash_SetEx(natsStrHash *hash, char *key, bool copyKey, bool freeKey, } void* -natsStrHash_Get(natsStrHash *hash, char *key) +natsStrHash_GetEx(natsStrHash *hash, char *key, int keyLen) { natsStrHashEntry *e; - uint32_t hk = natsStrHash_Hash(key, (int) strlen(key)); + uint32_t hk = natsStrHash_Hash(key, keyLen); e = hash->bkts[hk & hash->mask]; while (e != NULL) { if ((e->hk == hk) - && (strcmp(e->key, key) == 0)) + && (strncmp(e->key, key, keyLen) == 0)) { return e->data; } diff --git a/src/hash.h b/src/hash.h index 12f914fb..d1b513ff 100644 --- a/src/hash.h +++ b/src/hash.h @@ -129,8 +129,10 @@ natsStatus natsStrHash_SetEx(natsStrHash *hash, char *key, bool copyKey, bool freeKey, void *data, void **oldData); +#define natsStrHash_Get(h, k) natsStrHash_GetEx((h), (k), (int) strlen(k)) + void* -natsStrHash_Get(natsStrHash *hash, char *key); +natsStrHash_GetEx(natsStrHash *hash, char *key, int keyLen); void* natsStrHash_Remove(natsStrHash *hash, char *key); diff --git a/src/js.c b/src/js.c index 2887661c..29281b92 100644 --- a/src/js.c +++ b/src/js.c @@ -42,6 +42,7 @@ const int64_t jsDefaultRequestWait = 5000; const int64_t jsDefaultStallWait = 200; const char *jsDigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; const int jsBase = 62; +const int64_t jsOrderedHBInterval = 5*(int64_t)1E9; #define jsReplyTokenSize (8) #define jsReplyPrefixLen (NATS_INBOX_PRE_LEN + (jsReplyTokenSize) + 1) @@ -51,6 +52,18 @@ const int jsBase = 62; #define jsAckPrefixLen (8) #define jsLastConsumerSeqHdr "Nats-Last-Consumer" +typedef struct __jsOrderedConsInfo +{ + int64_t osid; + int64_t nsid; + uint64_t sseq; + natsConnection *nc; + natsSubscription *sub; + char *ndlv; + natsThread *thread; + +} jsOrderedConsInfo; + static void _destroyOptions(jsOptions *o) { @@ -1271,7 +1284,7 @@ jsSub_trackSequences(jsSub *jsi, const char *reply) } natsStatus -jsSub_processSequenceMismatch(natsSubscription *sub, natsMsg *msg, bool *sm) +jsSub_processSequenceMismatch(natsSubscription *sub, natsMutex *mu, natsMsg *msg, bool *sm) { jsSub *jsi = sub->jsi; const char *str = NULL; @@ -1319,18 +1332,25 @@ jsSub_processSequenceMismatch(natsSubscription *sub, natsMsg *msg, bool *sm) // Clear the suppression flag. jsi->ssmn = false; } - else if (!jsi->ssmn) + else { - // Record the sequence mismatch. - jsi->sm = true; - // Prevent following mismatch report until mismatch is resolved. - jsi->ssmn = true; - // Only for async subscriptions, indicate that the connection should - // push a NATS_MISMATCH to the async callback. - if (sub->msgCb != NULL) - *sm = true; + if (jsi->ordered) + { + s = jsSub_resetOrderedConsumer(sub, mu, jsi->sseq+1); + } + else if (!jsi->ssmn) + { + // Record the sequence mismatch. + jsi->sm = true; + // Prevent following mismatch report until mismatch is resolved. + jsi->ssmn = true; + // Only for async subscriptions, indicate that the connection should + // push a NATS_MISMATCH to the async callback. + if (sub->msgCb != NULL) + *sm = true; + } } - return NATS_OK; + return NATS_UPDATE_ERR_STACK(s); } natsStatus @@ -1862,6 +1882,32 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha return nats_setError(NATS_INVALID_ARG, "%s", jsErrNoFlowControlForQueueSub); } + // Do some quick checks here for ordered consumers. + if (opts->Ordered) + { + // Check for pull mode. + if (isPullMode) + return nats_setError(NATS_INVALID_ARG, "%s", jsErrOrderedConsNoPullMode); + // Make sure we are not durable. + if (!nats_IsStringEmpty(durable)) + return nats_setError(NATS_INVALID_ARG, "%s", jsErrOrderedConsNoDurable); + // Check ack policy. + if ((int) opts->Config.AckPolicy != -1) + return nats_setError(NATS_INVALID_ARG, "%s", jsErrOrderedConsNoAckPolicy); + // Check max deliver. If set, it has to be 1. + if ((opts->Config.MaxDeliver > 0) && (opts->Config.MaxDeliver != 1)) + return nats_setError(NATS_INVALID_ARG, "%s", jsErrOrderedConsNoMaxDeliver); + // No deliver subject, we pick our own. + if (!nats_IsStringEmpty(opts->Config.DeliverSubject)) + return nats_setError(NATS_INVALID_ARG, "%s", jsErrOrderedConsNoDeliverSubject); + // Queue groups not allowed. + if (isQueue) + return nats_setError(NATS_INVALID_ARG, "%s", jsErrOrderedConsNoQueue); + // Check for bound consumers. + if (!nats_IsStringEmpty(consumer)) + return nats_setError(NATS_INVALID_ARG, "%s", jsErrOrderedConsNoBind); + } + // In case a consumer has not been set explicitly, then the durable name // will be used as the consumer name (after that, `consumer` will still be // possibly NULL). @@ -1934,20 +1980,32 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha cfg->DeliverSubject = deliver; } - // Set config durable with "durable" variable, which will - // possibly be NULL. - cfg->Durable = durable; - - // Set DeliverGroup to queue name, possibly NULL - cfg->DeliverGroup = opts->Queue; - // Do filtering always, server will clear as needed. cfg->FilterSubject = subject; - // If we have acks at all and the MaxAckPending is not set go ahead - // and set to the internal max. - if ((cfg->MaxAckPending == 0) && (cfg->AckPolicy != js_AckNone)) - cfg->MaxAckPending = NATS_OPTS_DEFAULT_MAX_PENDING_MSGS; + if (opts->Ordered) + { + cfg->FlowControl = true; + cfg->AckPolicy = js_AckNone; + cfg->MaxDeliver = 1; + cfg->AckWait = (24*60*60)*(int64_t)1E9; // Just set to something known, not utilized. + if (opts->Config.Heartbeat <= 0) + cfg->Heartbeat = jsOrderedHBInterval; + } + else + { + // Set config durable with "durable" variable, which will + // possibly be NULL. + cfg->Durable = durable; + + // Set DeliverGroup to queue name, possibly NULL + cfg->DeliverGroup = opts->Queue; + + // If we have acks at all and the MaxAckPending is not set go ahead + // and set to the internal max. + if ((cfg->MaxAckPending == 0) && (cfg->AckPolicy != js_AckNone)) + cfg->MaxAckPending = NATS_OPTS_DEFAULT_MAX_PENDING_MSGS; + } // Capture the HB interval (convert in millsecond since Go duration is in nanos) hbi = cfg->Heartbeat / 1000000; @@ -1972,6 +2030,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha jsi->js = js; jsi->hbi = hbi; jsi->pull = isPullMode; + jsi->ordered= opts->Ordered; js_retain(js); if ((usrCB != NULL) && !opts->ManualAck && (opts->Config.AckPolicy != js_AckNone)) @@ -2055,7 +2114,11 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha { natsSub_Lock(sub); jsi->dc = true; - DUP_STRING(s, jsi->consumer, info->Name); + // There may be a race in the case of an ordered consumer where by this + // time, the consumer has been recreated (jsResetOrderedConsumer). So + // set only if jsi->consumer is NULL! + if (jsi->consumer == NULL) + DUP_STRING(s, jsi->consumer, info->Name); natsSub_Unlock(sub); } } @@ -2325,3 +2388,199 @@ natsMsg_isJSCtrl(natsMsg *msg, int *ctrlType) return true; } + +// Update and replace sid. +// Lock should be held on entry but will be unlocked to prevent lock inversion. +int64_t +applyNewSID(natsSubscription *sub, natsMutex *mu) +{ + int64_t osid = 0; + int64_t nsid = 0; + natsConnection *nc = sub->conn; + + natsMutex_Unlock(mu); + + natsMutex_Lock(nc->subsMu); + osid = sub->sid; + natsHash_Remove(nc->subs, osid); + // Place new one. + nc->ssid++; + nsid = nc->ssid; + natsHash_Set(nc->subs, nsid, sub, NULL); + natsMutex_Unlock(nc->subsMu); + + natsMutex_Lock(mu); + sub->sid = nsid; + return osid; +} + +static void +_recreateOrderedCons(void *closure) +{ + jsOrderedConsInfo *oci = (jsOrderedConsInfo*) closure; + natsConnection *nc = oci->nc; + natsSubscription *sub = oci->sub; + natsThread *t = NULL; + jsSub *jsi = NULL; + jsConsumerInfo *ci = NULL; + jsConsumerConfig cc; + natsStatus s; + + // Unsubscribe and subscribe with new inbox and sid. + // Remap a new low level sub into this sub since its client accessible. + // This is done here in this thread to prevent lock inversion. + + natsConn_Lock(nc); + SET_WRITE_DEADLINE(nc); + s = natsConn_sendUnsubProto(nc, oci->osid, 0); + IFOK(s, natsConn_sendSubProto(nc, oci->ndlv, NULL, oci->nsid)); + IFOK(s, natsConn_flushOrKickFlusher(nc)); + natsConn_Unlock(nc); + + if (s == NATS_OK) + { + natsSubAndLdw_Lock(sub); + t = oci->thread; + jsi = sub->jsi; + // Reset some items in jsi. + jsi->dseq = 1; + NATS_FREE(jsi->cmeta); + jsi->cmeta = NULL; + NATS_FREE(jsi->fcReply); + jsi->fcReply = NULL; + jsi->fcDelivered = 0; + // Create consumer request for starting policy. + jsConsumerConfig_Init(&cc); + cc.FlowControl = true; + cc.AckPolicy = js_AckNone; + cc.MaxDeliver = 1; + cc.AckWait = (24*60*60)*(int64_t)1E9; // Just set to something known, not utilized. + cc.Heartbeat = jsi->hbi * 1000000; + cc.DeliverSubject = sub->subject; + cc.DeliverPolicy = js_DeliverByStartSequence; + cc.OptStartSeq = oci->sseq; + natsSubAndLdw_Unlock(sub); + + s = js_AddConsumer(&ci, jsi->js, jsi->stream, &cc, NULL, NULL); + if (s == NATS_OK) + { + natsSub_Lock(sub); + NATS_FREE(jsi->consumer); + jsi->consumer = NULL; + DUP_STRING(s, jsi->consumer, ci->Name); + natsSub_Unlock(sub); + + jsConsumerInfo_Destroy(ci); + } + } + if (s != NATS_OK) + { + natsConn_Lock(nc); + if (nc->opts->asyncErrCb != NULL) + { + char tmp[256]; + snprintf(tmp, sizeof(tmp), "failed recreating ordered consumer: %d (%s)", + s, natsStatus_GetText(s)); + natsAsyncCb_PostErrHandler(nc, sub, s, NATS_STRDUP(tmp)); + } + natsConn_Unlock(nc); + + natsConn_unsubscribe(nc, sub, 0, true, 0); + } + + NATS_FREE(oci); + natsThread_Detach(t); + natsThread_Destroy(t); + natsSub_release(sub); +} + +// We are here if we have detected a gap with an ordered consumer. +// We will create a new consumer and rewire the low level subscription. +// Lock should be held. +natsStatus +jsSub_resetOrderedConsumer(natsSubscription *sub, natsMutex *mu, uint64_t sseq) +{ + natsStatus s = NATS_OK; + natsConnection *nc = sub->conn; + int64_t osid = 0; + natsInbox *newDeliver = NULL; + jsOrderedConsInfo *oci = NULL; + + if ((sub->jsi == NULL) || (nc == NULL) || sub->closed) + return NATS_OK; + + // Grab new inbox. + s = natsInbox_Create(&newDeliver); + if (s != NATS_OK) + return NATS_UPDATE_ERR_STACK(s); + + // Quick unsubscribe. Since we know this is a simple push subscriber we do in place. + osid = applyNewSID(sub, mu); + + NATS_FREE(sub->subject); + sub->subject = (char*) newDeliver; + + // We are still in the low level readloop for the connection so we need + // to spin a thread to try to create the new consumer. + // Create object that will hold some state to pass to the thread. + oci = NATS_CALLOC(1, sizeof(jsOrderedConsInfo)); + if (oci == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + + if (s == NATS_OK) + { + oci->osid = osid; + oci->nsid = sub->sid; + oci->sseq = sseq; + oci->nc = nc; + oci->sub = sub; + oci->ndlv = (char*) newDeliver; + natsSub_retain(sub); + + s = natsThread_Create(&oci->thread, _recreateOrderedCons, (void*) oci); + if (s != NATS_OK) + { + NATS_FREE(oci); + natsSub_release(sub); + } + } + return s; +} + +// Check to make sure messages are arriving in order. +// Returns true if the sub had to be replaced. Will cause upper layers to return. +// The caller has verified that sub.jsi != nil and that this is not a control message. +// Lock should be held. +natsStatus +jsSub_checkOrderedMsg(natsSubscription *sub, natsMutex *mu, natsMsg *msg, bool *reset) +{ + natsStatus s = NATS_OK; + jsSub *jsi = NULL; + uint64_t sseq = 0; + uint64_t dseq = 0; + + *reset = false; + + // Ignore msgs with no reply like HBs and flowcontrol, they are handled elsewhere. + if (natsMsg_GetReply(msg) == NULL) + return NATS_OK; + + // Normal message here. + s = _getMetaData(natsMsg_GetReply(msg), NULL, NULL, NULL, NULL, &sseq, &dseq, NULL, NULL, 2); + if (s == NATS_OK) + { + jsi = sub->jsi; + if (dseq != jsi->dseq) + { + *reset = true; + s = jsSub_resetOrderedConsumer(sub, mu, jsi->sseq+1); + } + else + { + // Update our tracking here. + jsi->dseq = dseq+1; + jsi->sseq = sseq; + } + } + return NATS_UPDATE_ERR_STACK(s); +} diff --git a/src/js.h b/src/js.h index 807ddc8d..6ec22d5f 100644 --- a/src/js.h +++ b/src/js.h @@ -53,6 +53,13 @@ extern const int64_t jsDefaultRequestWait; #define jsErrNoHeartbeatForQueueSub "a queue subscription cannot be created for a consumer with heartbeat" #define jsErrNoFlowControlForQueueSub "a queue subscription cannot be created for a consumer with flow control" #define jsErrConsumerSeqMismatch "consumer sequence mismatch" +#define jsErrOrderedConsNoDurable "durable can not be set for an ordered consumer" +#define jsErrOrderedConsNoAckPolicy "ack policy can not be set for an ordered consume" +#define jsErrOrderedConsNoMaxDeliver "max deliver can not be set for an ordered consumer" +#define jsErrOrderedConsNoDeliverSubject "deliver subject can not be set for an ordered consumer" +#define jsErrOrderedConsNoQueue "queue can not be set for an ordered consumer" +#define jsErrOrderedConsNoBind "can not bind existing consumer for an ordered consumer" +#define jsErrOrderedConsNoPullMode "can not use pull mode for an ordered consumer" #define jsCtrlHeartbeat (1) #define jsCtrlFlowControl (2) diff --git a/src/nats.h b/src/nats.h index 1d6fbb4c..4a7d8f5a 100644 --- a/src/nats.h +++ b/src/nats.h @@ -652,6 +652,13 @@ typedef struct jsSubOptions * consumer. */ jsConsumerConfig Config; ///< Consumer configuration. + /** + * This will create a fifo ephemeral consumer for in order delivery of + * messages. There are no redeliveries and no acks. + * Flow control and heartbeats are required and set by default, but + * the heartbeats value can be overridden in the consumer configuration. + */ + bool Ordered; ///< If true, this will be an ordered consumer. } jsSubOptions; diff --git a/src/natsp.h b/src/natsp.h index 1aef3e53..e1734279 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -70,7 +70,7 @@ #define _PING_PROTO_ "PING\r\n" #define _PONG_PROTO_ "PONG\r\n" -#define _SUB_PROTO_ "SUB %s %s %d\r\n" +#define _SUB_PROTO_ "SUB %s %s %" PRId64 "\r\n" #define _UNSUB_PROTO_ "UNSUB %" PRId64 " %d\r\n" #define _UNSUB_NO_MAX_PROTO_ "UNSUB %" PRId64 " \r\n" @@ -349,6 +349,7 @@ typedef struct __jsSub char *consumer; char *nxtMsgSubj; bool pull; + bool ordered; bool dc; // delete JS consumer in Unsub()/Drain() int64_t hbi; @@ -538,6 +539,9 @@ typedef struct __respInfo } respInfo; +// Used internally for testing and allow to alter/suppress an incoming message +typedef void (*natsMsgFilter)(natsConnection *nc, natsMsg **msg, void* closure); + struct __natsConnection { natsMutex *mu; @@ -615,6 +619,11 @@ struct __natsConnection void *buffer; void *data; } el; + + // Msg filters for testing. + // Protected by subsMu + natsMsgFilter filter; + void *filterClosure; }; // @@ -779,11 +788,17 @@ natsStatus jsSub_trackSequences(jsSub *jsi, const char *reply); natsStatus -jsSub_processSequenceMismatch(natsSubscription *sub, natsMsg *msg, bool *sm); +jsSub_processSequenceMismatch(natsSubscription *sub, natsMutex *mu, natsMsg *msg, bool *sm); natsStatus jsSub_scheduleFlowControlResponse(jsSub *jsi, natsSubscription *sub, const char *reply); +natsStatus +jsSub_checkOrderedMsg(natsSubscription *sub, natsMutex *mu, natsMsg *msg, bool *reset); + +natsStatus +jsSub_resetOrderedConsumer(natsSubscription *sub, natsMutex *mu, uint64_t sseq); + bool natsMsg_isJSCtrl(natsMsg *msg, int *ctrlType); diff --git a/test/list.txt b/test/list.txt index 7e11c9ce..62b77cfb 100644 --- a/test/list.txt +++ b/test/list.txt @@ -170,6 +170,7 @@ ConnSign WriteDeadline HeadersNotSupported HeadersBasic +MsgsFilter EventLoop EventLoopRetryOnFailedConnect EventLoopTLS @@ -221,6 +222,8 @@ JetStreamSubscribeConfigCheck JetStreamSubscribeIdleHeartbeat JetStreamSubscribeFlowControl JetStreamSubscribePull +JetStreamOrderedCons +JetStreamOrderedConsWithErrors StanPBufAllocator StanConnOptions StanSubOptions diff --git a/test/test.c b/test/test.c index b6c4e900..177419d8 100644 --- a/test/test.c +++ b/test/test.c @@ -128,6 +128,7 @@ struct threadArg natsOptions *opts; natsConnection *nc; jsCtx *js; + natsBuffer *buf; #if defined(NATS_HAS_STREAMING) stanConnection *sc; @@ -5035,7 +5036,6 @@ test_natsMsgIsJSCtrl(void) } } - static natsStatus _checkStart(const char *url, int orderIP, int maxAttempts) { @@ -15573,7 +15573,7 @@ test_ReconnectJitter(void) // the number of reconnect attempts. start = nats_Now(); nats_Sleep(400); - pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true); + pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true); CHECK_SERVER_STARTED(pid); test("Check jitter: "); @@ -15684,14 +15684,14 @@ test_CustomReconnectDelay(void) s = natsConnection_Connect(&nc, opts); testCond(s == NATS_OK); - // Cause disconnect + // Cause disconnect _stopServer(pid); pid = NATS_INVALID_PID; - // We should be trying to reconnect 4 times - start = nats_Now(); + // We should be trying to reconnect 4 times + start = nats_Now(); - // Wait on error or completion of test. + // Wait on error or completion of test. test("Check custom delay cb: "); natsMutex_Lock(arg.m); while ((s != NATS_TIMEOUT) && !arg.closed && (arg.status == NATS_OK)) @@ -17775,16 +17775,18 @@ test_GetClientIP(void) CHECK_SERVER_STARTED(serverPid); test("Connect: "); - s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + s = natsConnection_ConnectTo(&nc, "nats://127.0.0.1:4222"); testCond(s == NATS_OK); test("Get client IP - no conn: "); s = natsConnection_GetClientIP(NULL, &ip); testCond(s == NATS_INVALID_ARG); + nats_clearLastError(); test("Get client IP - no ip loc: "); s = natsConnection_GetClientIP(nc, NULL); testCond(s == NATS_INVALID_ARG); + nats_clearLastError(); test("Get client IP: "); s = natsConnection_GetClientIP(nc, &ip); @@ -17796,6 +17798,7 @@ test_GetClientIP(void) test("Get client IP after conn closed: "); s = natsConnection_GetClientIP(nc, &ip); testCond((s == NATS_CONNECTION_CLOSED) && (ip == NULL)); + nats_clearLastError(); natsConnection_Destroy(nc); nc = NULL; @@ -17837,6 +17840,7 @@ test_GetClientIP(void) test("Get client IP with old server: "); s = natsConnection_GetClientIP(nc, &ip); testCond ((s == NATS_NO_SERVER_SUPPORT) && (ip == NULL)); + nats_clearLastError(); // Notify mock server we are done natsMutex_Lock(arg.m); @@ -19232,6 +19236,89 @@ test_HeadersBasic(void) _stopServer(pid); } +static void +_msgFilterNoOp(natsConnection *nc, natsMsg **msg, void *closure) { } + +static void +_msgFilterAlterMsg(natsConnection *nc, natsMsg **msg, void *closure) +{ + natsStatus s; + natsMsg *nm = NULL; + + s = natsMsg_Create(&nm, natsMsg_GetSubject(*msg), NULL, "replaced", 8); + if (s != NATS_OK) + nats_PrintLastErrorStack(stderr); + + natsMsg_Destroy(*msg); + *msg = nm; +} + +static void +_msgFilterDropMsg(natsConnection *nc, natsMsg **msg, void *closure) +{ + natsMsg_Destroy(*msg); + *msg = NULL; + natsConn_setFilter(nc, NULL); +} + +static void +test_natsMsgsFilter(void) +{ + natsStatus s; + natsConnection *nc = NULL; + natsPid pid = NATS_INVALID_PID; + natsMsg *msg = NULL; + natsSubscription *sub = NULL; + + pid = _startServer("nats://127.0.0.1:4222", NULL, true); + CHECK_SERVER_STARTED(pid); + + test("Connect ok: "); + s = natsConnection_ConnectTo(&nc, "nats://127.0.0.1:4222"); + testCond(s == NATS_OK); + + test("Create sub: "); + s = natsConnection_SubscribeSync(&sub, nc, "foo"); + testCond(s == NATS_OK); + + test("Add no-op filter: "); + natsConn_setFilter(nc, _msgFilterNoOp); + s = natsConnection_PublishString(nc, "foo", "original"); + IFOK(s, natsSubscription_NextMsg(&msg, sub, 1000)); + testCond((s == NATS_OK) && (msg != NULL) + && (strcmp(natsMsg_GetData(msg), "original") == 0)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Add alter-msg filter: "); + natsConn_setFilter(nc, _msgFilterAlterMsg); + s = natsConnection_PublishString(nc, "foo", "original"); + IFOK(s, natsSubscription_NextMsg(&msg, sub, 1000)); + testCond((s == NATS_OK) && (msg != NULL) + && (strcmp(natsMsg_GetData(msg), "replaced") == 0)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Add drop-msg filter: "); + natsConn_setFilter(nc, _msgFilterDropMsg); + s = natsConnection_PublishString(nc, "foo", "will be dropped"); + IFOK(s, natsSubscription_NextMsg(&msg, sub, 100)); + testCond((s == NATS_TIMEOUT) && (msg == NULL)); + nats_clearLastError(); + + test("Filter is removed from previous filter: "); + s = natsConnection_PublishString(nc, "foo", "got it"); + IFOK(s, natsSubscription_NextMsg(&msg, sub, 1000)); + testCond((s == NATS_OK) && (msg != NULL) + && (strcmp(natsMsg_GetData(msg), "got it") == 0)); + natsMsg_Destroy(msg); + msg = NULL; + + natsSubscription_Destroy(sub); + natsConnection_Destroy(nc); + _stopServer(pid); +} + static natsStatus _evLoopAttach(void **userData, void *loop, natsConnection *nc, natsSock socket) { @@ -24348,17 +24435,17 @@ test_JetStreamSubscribeConfigCheck(void) if (s == NATS_OK) { // If not explicitly asked by the user, we are ok - s = js_PullSubscribe(&nsub, js, "foo", durName, NULL, NULL, &jerr); + s = js_PullSubscribe(&nsub, js, "foo", durName, NULL, NULL, &jerr); natsSubscription_Unsubscribe(nsub); - natsSubscription_Destroy(nsub); + natsSubscription_Destroy(nsub); nsub = NULL; } testCond(s == NATS_OK); natsSubscription_Unsubscribe(sub); natsSubscription_Destroy(sub); sub = NULL; - } + } // If the option is the same as the server default, it is not an error either. for (i=0; i<5; i++) @@ -24383,16 +24470,16 @@ test_JetStreamSubscribeConfigCheck(void) s = js_PullSubscribe(&sub, js, "foo", durName, NULL, NULL, &jerr); if (s == NATS_OK) { - s = js_PullSubscribe(&nsub, js, "foo", durName, NULL, &so, &jerr); + s = js_PullSubscribe(&nsub, js, "foo", durName, NULL, &so, &jerr); natsSubscription_Unsubscribe(nsub); - natsSubscription_Destroy(nsub); + natsSubscription_Destroy(nsub); nsub = NULL; } testCond(s == NATS_OK); natsSubscription_Unsubscribe(sub); natsSubscription_Destroy(sub); sub = NULL; - } + } for (i=0; i<5; i++) { @@ -24417,9 +24504,9 @@ test_JetStreamSubscribeConfigCheck(void) { natsSubscription *nsub = NULL; - // First time it was created with defaults and the - // second time a change is attempted, so it is an error. - s = js_PullSubscribe(&nsub, js, "foo", durName, NULL, &so, &jerr); + // First time it was created with defaults and the + // second time a change is attempted, so it is an error. + s = js_PullSubscribe(&nsub, js, "foo", durName, NULL, &so, &jerr); if ((s != NATS_OK) && (nsub == NULL) && (strstr(nats_GetLastError(NULL), name) != NULL)) { s = NATS_OK; @@ -25336,6 +25423,519 @@ test_JetStreamSubscribePull(void) rmtree(datastore); } +static void +_orderedConsCB(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) +{ + struct threadArg *args = (struct threadArg*) closure; + + natsMutex_Lock(args->m); + if (natsMsg_GetDataLength(msg) == 0) + { + args->done = true; + natsCondition_Signal(args->c); + } + else + { + natsBuf_Append(args->buf, natsMsg_GetData(msg), natsMsg_GetDataLength(msg)); + args->sum++; + } + natsMutex_Unlock(args->m); + natsMsg_Destroy(msg); +} + +static natsStatus +_testOrderedCons(jsCtx *js, jsStreamInfo *si, char *asset, int assetLen, struct threadArg *args, bool async) +{ + natsStatus s = NATS_OK; + natsSubscription *sub = NULL; + int received = 0; + jsSubOptions so; + + jsSubOptions_Init(&so); + so.Ordered = true; + so.Config.Heartbeat = 250*1000000; + + s = natsBuf_Create(&args->buf, assetLen); + if ((s == NATS_OK) && async) + { + s = js_Subscribe(&sub, js, "a", _orderedConsCB, (void*) args, NULL, &so, NULL); + if (s == NATS_OK) + { + natsMutex_Lock(args->m); + while ((s != NATS_TIMEOUT) && !args->done) + s = natsCondition_TimedWait(args->c, args->m, 5000); + received = args->sum; + natsMutex_Unlock(args->m); + } + } + else if (s == NATS_OK) + { + s = js_SubscribeSync(&sub, js, "a", NULL, &so, NULL); + if (s == NATS_OK) + { + bool done = false; + int64_t start = 0; + natsMsg *msg = NULL; + + start = nats_Now(); + while ((s == NATS_OK) && !done) + { + s = natsSubscription_NextMsg(&msg, sub, 1000); + if (s == NATS_OK) + { + done = (natsMsg_GetDataLength(msg) == 0 ? true : false); + if (!done) + { + s = natsBuf_Append(args->buf, natsMsg_GetData(msg), natsMsg_GetDataLength(msg)); + received++; + } + natsMsg_Destroy(msg); + msg = NULL; + } + if ((s == NATS_OK) && (nats_Now() - start > 5000)) + s = NATS_TIMEOUT; + } + } + } + if ((s == NATS_OK) && (natsBuf_Len(args->buf) != assetLen)) + { + fprintf(stderr, "\nAsset length (%d) does not match received data length (%d)\n", + assetLen, natsBuf_Len(args->buf)); + s = NATS_MISMATCH; + } + else if (s == NATS_OK) + { + int i; + char *data = natsBuf_Data(args->buf); + + for (i=0; iState.Msgs-1)); + + fflush(stderr); + natsSubscription_Destroy(sub); + natsMutex_Lock(args->m); + natsBuf_Destroy(args->buf); + args->buf = NULL; + args->sum = 0; + args->done = false; + natsMutex_Unlock(args->m); + + return s; +} + +static void +_singleLoss(natsConnection *nc, natsMsg **msg, void* closure) +{ + const char *val = NULL; + int res = rand() % 100; + + if ((res <= 10) && (natsMsgHeader_Get(*msg, "data", &val) == NATS_OK)) + { + natsMsg_Destroy(*msg); + *msg = NULL; + natsConn_setFilter(nc, NULL); + } +} + +static void +_multiLoss(natsConnection *nc, natsMsg **msg, void* closure) +{ + const char *val = NULL; + int res = rand() % 100; + + if ((res <= 10) && (natsMsgHeader_Get(*msg, "data", &val) == NATS_OK)) + { + natsMsg_Destroy(*msg); + *msg = NULL; + } +} + +static void +_firstOnlyLoss(natsConnection *nc, natsMsg **msg, void* closure) +{ + jsMsgMetaData *meta = NULL; + + if (natsMsg_GetMetaData(&meta, *msg) == NATS_OK) + { + if (meta->Sequence.Consumer == 1) + { + natsMsg_Destroy(*msg); + *msg = NULL; + natsConn_setFilter(nc, NULL); + } + jsMsgMetaData_Destroy(meta); + } +} + +static void +_lastOnlyLoss(natsConnection *nc, natsMsg **msg, void* closure) +{ + jsMsgMetaData *meta = NULL; + jsStreamInfo *si = (jsStreamInfo*) closure; + + if (natsMsg_GetMetaData(&meta, *msg) == NATS_OK) + { + if (meta->Sequence.Stream == si->State.LastSeq-1) + { + natsMsg_Destroy(*msg); + *msg = NULL; + natsConn_setFilter(nc, NULL); + } + jsMsgMetaData_Destroy(meta); + } +} + +static void +test_JetStreamOrderedConsumer(void) +{ + natsStatus s; + natsConnection *nc = NULL; + natsSubscription *sub= NULL; + jsCtx *js = NULL; + natsPid pid = NATS_INVALID_PID; + jsErrCode jerr= 0; + char datastore[256] = {'\0'}; + char cmdLine[1024] = {'\0'}; + jsStreamConfig sc; + jsSubOptions so; + struct threadArg args; + int i; + char *asset = NULL; + int assetLen = 1024*1024; + const int chunkSize = 1024; + jsStreamInfo *si = NULL; + + ENSURE_JS_VERSION(2, 3, 3); + + s = _createDefaultThreadArgsForCbTests(&args); + if (s != NATS_OK) + FAIL("Unable to setup test"); + + _makeUniqueDir(datastore, sizeof(datastore), "datastore_"); + + test("Start JS server: "); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s", datastore); + pid = _startServer("nats://127.0.0.1:4222", cmdLine, true); + CHECK_SERVER_STARTED(pid); + testCond(true); + + test("Connect: "); + s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + testCond(s == NATS_OK); + + test("Get context: "); + s = natsConnection_JetStream(&js, nc, NULL); + testCond(s == NATS_OK); + + test("Create stream: "); + jsStreamConfig_Init(&sc); + sc.Name = "OBJECT"; + sc.Subjects = (const char*[1]){"a"}; + sc.SubjectsLen = 1; + sc.Storage = js_MemoryStorage; + s = js_AddStream(NULL, js, &sc, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + // Create a sample asset. + asset = malloc(assetLen); + for (i=0; im); + args->status = err; + natsCondition_Signal(args->c); + natsMutex_Unlock(args->m); +} + +static void +test_JetStreamOrderedConsumerWithErrors(void) +{ + natsStatus s; + natsConnection *nc = NULL; + natsOptions *opts = NULL; + natsSubscription *sub= NULL; + jsCtx *js = NULL; + natsPid pid = NATS_INVALID_PID; + jsErrCode jerr= 0; + char datastore[256] = {'\0'}; + char cmdLine[1024] = {'\0'}; + jsStreamConfig sc; + jsSubOptions so; + int i, iter; + char *asset = NULL; + int assetLen = 128*1024; + const int chunkSize = 256; + struct threadArg args; + + ENSURE_JS_VERSION(2, 3, 3); + + s = _createDefaultThreadArgsForCbTests(&args); + if (s != NATS_OK) + FAIL("Unable to setup test"); + + _makeUniqueDir(datastore, sizeof(datastore), "datastore_"); + + test("Start JS server: "); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s", datastore); + pid = _startServer("nats://127.0.0.1:4222", cmdLine, true); + CHECK_SERVER_STARTED(pid); + testCond(true); + + test("Connect: "); + s = natsOptions_Create(&opts); + IFOK(s, natsOptions_SetErrorHandler(opts, _jsOrderedErrHandler, (void*) &args)); + IFOK(s, natsConnection_Connect(&nc, opts)); + testCond(s == NATS_OK); + + test("Get context: "); + s = natsConnection_JetStream(&js, nc, NULL); + testCond(s == NATS_OK); + + // Create a sample asset. + asset = malloc(assetLen); + for (i=0; ijsi->consumer; + natsSub_Unlock(sub); + s = js_DeleteConsumer(js, "OBJECT", cons, NULL, &jerr); + } + testCond((s == NATS_OK) && (jerr == 0)); + + test("Check we get error: "); + natsMutex_Lock(args.m); + while ((s != NATS_TIMEOUT) && (args.status != NATS_MISSED_HEARTBEAT)) + s = natsCondition_TimedWait(args.c, args.m, 1000); + args.status = NATS_OK; + natsMutex_Unlock(args.m); + testCond(s == NATS_OK); + + natsSubscription_Destroy(sub); + sub = NULL; + } + + free(asset); + jsCtx_Destroy(js); + natsOptions_Destroy(opts); + natsConnection_Destroy(nc); + _destroyDefaultThreadArgs(&args); + _stopServer(pid); + rmtree(datastore); +} + #if defined(NATS_HAS_STREAMING) static int @@ -27713,6 +28313,7 @@ static testInfo allTests[] = {"WriteDeadline", test_WriteDeadline}, {"HeadersNotSupported", test_HeadersNotSupported}, {"HeadersBasic", test_HeadersBasic}, + {"MsgsFilter", test_natsMsgsFilter}, {"EventLoop", test_EventLoop}, {"EventLoopRetryOnFailedConnect", test_EventLoopRetryOnFailedConnect}, {"EventLoopTLS", test_EventLoopTLS}, @@ -27768,6 +28369,8 @@ static testInfo allTests[] = {"JetStreamSubscribeIdleHeartbeat", test_JetStreamSubscribeIdleHearbeat}, {"JetStreamSubscribeFlowControl", test_JetStreamSubscribeFlowControl}, {"JetStreamSubscribePull", test_JetStreamSubscribePull}, + {"JetStreamOrderedCons", test_JetStreamOrderedConsumer}, + {"JetStreamOrderedConsWithErrors", test_JetStreamOrderedConsumerWithErrors}, #if defined(NATS_HAS_STREAMING) {"StanPBufAllocator", test_StanPBufAllocator},