diff --git a/server/consumer.go b/server/consumer.go index aceaf5904bb..2e6295f999b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -66,6 +66,7 @@ type ConsumerConfig struct { MaxDeliver int `json:"max_deliver,omitempty"` BackOff []time.Duration `json:"backoff,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` + FilterSubjects []string `json:"filter_subjects,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec SampleFrequency string `json:"sample_freq,omitempty"` @@ -227,22 +228,28 @@ type consumer struct { // Atomic used to notify that we want to process an ack. // This will be checked in checkPending to abort processing // and let ack be processed in priority. - awl int64 - mu sync.RWMutex - js *jetStream - mset *stream - acc *Account - srv *Server - client *client - sysc *client - sid int - name string - stream string - sseq uint64 - dseq uint64 - adflr uint64 - asflr uint64 - npc uint64 + awl int64 + mu sync.RWMutex + js *jetStream + mset *stream + acc *Account + srv *Server + client *client + sysc *client + sid int + name string + stream string + sseq uint64 + msseq []filteredSubject + // delivery sequence + dseq uint64 + // ack delivery floor + adflr uint64 + // ack store floor + asflr uint64 + // Num Pending Count + npc uint64 + // Last Num Pending Min npcm uint64 dsubj string qgroup string @@ -307,8 +314,8 @@ type consumer struct { // Ack queue ackMsgs *ipQueue - // For stream signaling. - sigSub *subscription + // for stream signaling when multiple filters are set. + sigSubs []*subscription } type proposal struct { @@ -457,6 +464,7 @@ func checkConsumerCfg( } // As best we can make sure the filtered subject is valid. + // TODO: not handled for multiple subject filter if config.FilterSubject != _EMPTY_ { subjects := copyStrings(cfg.Subjects) // explicitly skip validFilteredSubject when recovering @@ -765,6 +773,22 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } o.store = store } + if len(o.cfg.FilterSubjects) != 0 { + msseq := make([]filteredSubject, 0, len(config.FilterSubjects)) + for _, subject := range config.FilterSubjects { + sub := filteredSubject{ + subject: subject, + hasWildcard: false, + sequence: 1, + } + if subject != _EMPTY_ && subjectHasWildcard(subject) { + sub.hasWildcard = true + + } + msseq = append(o.msseq, sub) + } + o.msseq = msseq + } if o.store != nil && o.store.HasState() { // Restore our saved state. @@ -988,8 +1012,17 @@ func (o *consumer) setLeader(isLeader bool) { } // Update the group on the our starting sequence if we are starting but we skipped some in the stream. - if o.dseq == 1 && o.sseq > 1 { - o.updateSkipped() + if len(o.cfg.FilterSubjects) == 0 { + sort.Slice(o.msseq, func(i, j int) bool { + return o.msseq[j].sequence > o.msseq[i].sequence + }) + if o.dseq == 1 && o.sseq > 1 { + o.updateSkipped() + } + } else { + if o.dseq == 1 && o.msseq != nil && o.msseq[0].sequence > 1 { + o.updateSkipped() + } } // Do info sub. @@ -2416,7 +2449,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { // even if the stream only has a single non-wildcard subject designation. // Read lock should be held. func (o *consumer) isFiltered() bool { - if o.cfg.FilterSubject == _EMPTY_ { + if o.cfg.FilterSubject == _EMPTY_ && len(o.cfg.FilterSubjects) == 0 { return false } // If we are here we want to check if the filtered subject is @@ -2999,23 +3032,64 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { return nil, 0, errMaxAckPending } - // Grab next message applicable to us. - pmsg := getJSPubMsgFromPool() - sm, sseq, err := o.mset.store.LoadNextMsg(o.cfg.FilterSubject, o.filterWC, seq, &pmsg.StoreMsg) + if len(o.cfg.FilterSubjects) == 0 { - if sseq >= o.sseq { - o.sseq = sseq + 1 - if err == ErrStoreEOF { - o.updateSkipped() + // Grab next message applicable to us. + pmsg := getJSPubMsgFromPool() + sm, sseq, err := o.mset.store.LoadNextMsg(o.cfg.FilterSubject, o.filterWC, seq, &pmsg.StoreMsg) + + if sseq >= o.sseq { + o.sseq = sseq + 1 + if err == ErrStoreEOF { + o.updateSkipped() + } + } + if sm == nil { + pmsg.returnToPool() + return nil, 0, err } + + return pmsg, dc, err } - if sm == nil { - pmsg.returnToPool() - return nil, 0, err + var lastErr error + for i, value := range o.msseq { + if value.pmsg == nil { + pmsg := getJSPubMsgFromPool() + sm, sseq, err := o.mset.store.LoadNextMsg(o.cfg.FilterSubject, value.hasWildcard, value.sequence, &pmsg.StoreMsg) + o.msseq[i].err = err + + if sm != nil { + o.msseq[i].pmsg = pmsg + } else { + pmsg.returnToPool() + } + if sseq >= o.msseq[i].sequence { + o.msseq[i].sequence = sseq + 1 + if err == ErrStoreEOF { + o.updateSkipped() + } + } + } + + } + // Grab next message applicable to us. + sort.Slice(o.msseq, func(i, j int) bool { + return o.msseq[i].sequence > o.msseq[j].sequence + }) + + for i, value := range o.msseq { + if value.pmsg != nil { + o.msseq[i].pmsg = nil + return value.pmsg, dc, value.err + } + if value.err != nil { + lastErr = value.err + } } - return pmsg, dc, err + return nil, 0, lastErr + } // Will check for expiration and lack of interest on waiting requests. @@ -3163,7 +3237,7 @@ func (o *consumer) suppressDeletion() { } func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { - // On startup check to see if we are in a a reply situation where replay policy is not instant. + // On startup check to see if we are in a reply situation where replay policy is not instant. var ( lts int64 // last time stamp seen, used for replay. lseq uint64 @@ -3330,6 +3404,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { waitForMsgs: // If we were in a replay state check to see if we are caught up. If so clear. + // FIXME: we need to track proper sequence here if o.replay && o.sseq > lseq { o.replay = false } @@ -3419,21 +3494,52 @@ func (o *consumer) numPending() uint64 { // Depends on delivery policy, for last per subject we calculate differently. // Lock should be held. func (o *consumer) streamNumPending() uint64 { + // single filter subject + if len(o.cfg.FilterSubjects) == 0 { + if o.mset == nil || o.mset.store == nil { + o.npc, o.npcm = 0, 0 + } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { + o.npc, o.npcm = 0, 0 + for _, ss := range o.mset.store.SubjectsState(o.cfg.FilterSubject) { + if o.sseq <= ss.Last { + o.npc++ + if ss.Last > o.npcm { + o.npcm = ss.Last + } + } + } + } else { + ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject) + o.npc, o.npcm = ss.Msgs, ss.Last + } + return o.npc + } + + // else - multiple filter subjects if o.mset == nil || o.mset.store == nil { o.npc, o.npcm = 0, 0 } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { o.npc, o.npcm = 0, 0 - for _, ss := range o.mset.store.SubjectsState(o.cfg.FilterSubject) { - if o.sseq <= ss.Last { - o.npc++ - if ss.Last > o.npcm { - o.npcm = ss.Last + for _, filter := range o.msseq { + for _, ss := range o.mset.store.SubjectsState(filter.subject) { + if filter.sequence <= ss.Last { + o.npc++ + if ss.Last > o.npcm { + o.npcm = ss.Last + } } } } } else { - ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject) - o.npc, o.npcm = ss.Msgs, ss.Last + var npc uint64 = 0 + for _, filter := range o.msseq { + ss := o.mset.store.FilteredState(filter.sequence, filter.subject) + npc += ss.Msgs + if ss.Last > o.npcm { + o.npcm = ss.Last + } + } + o.npc = npc } return o.npc } @@ -3902,63 +4008,160 @@ func (o *consumer) hasSkipListPending() bool { // Will select the starting sequence. func (o *consumer) selectStartingSeqNo() { - if o.mset == nil || o.mset.store == nil { - o.sseq = 1 - } else { - var state StreamState - o.mset.store.FastState(&state) - if o.cfg.OptStartSeq == 0 { - if o.cfg.DeliverPolicy == DeliverAll { - o.sseq = state.FirstSeq - } else if o.cfg.DeliverPolicy == DeliverLast { - o.sseq = state.LastSeq - // If we are partitioned here this will be properly set when we become leader. - if o.cfg.FilterSubject != _EMPTY_ { - ss := o.mset.store.FilteredState(1, o.cfg.FilterSubject) - o.sseq = ss.Last - } - } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { - if mss := o.mset.store.SubjectsState(o.cfg.FilterSubject); len(mss) > 0 { - o.lss = &lastSeqSkipList{ - resume: state.LastSeq, - seqs: createLastSeqSkipList(mss), + if len(o.cfg.FilterSubjects) == 0 { + if o.mset == nil || o.mset.store == nil { + o.sseq = 1 + } else { + var state StreamState + o.mset.store.FastState(&state) + if o.cfg.OptStartSeq == 0 { + if o.cfg.DeliverPolicy == DeliverAll { + o.sseq = state.FirstSeq + } else if o.cfg.DeliverPolicy == DeliverLast { + o.sseq = state.LastSeq + // If we are partitioned here this will be properly set when we become leader. + if o.cfg.FilterSubject != _EMPTY_ { + ss := o.mset.store.FilteredState(1, o.cfg.FilterSubject) + o.sseq = ss.Last } - o.sseq = o.lss.seqs[0] + } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { + if mss := o.mset.store.SubjectsState(o.cfg.FilterSubject); len(mss) > 0 { + o.lss = &lastSeqSkipList{ + resume: state.LastSeq, + seqs: createLastSeqSkipList(mss), + } + o.sseq = o.lss.seqs[0] + } else { + // If no mapping info just set to last. + o.sseq = state.LastSeq + } + } else if o.cfg.OptStartTime != nil { + // If we are here we are time based. + // TODO(dlc) - Once clustered can't rely on this. + o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime) } else { - // If no mapping info just set to last. - o.sseq = state.LastSeq + // DeliverNew + o.sseq = state.LastSeq + 1 } - } else if o.cfg.OptStartTime != nil { - // If we are here we are time based. - // TODO(dlc) - Once clustered can't rely on this. - o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime) } else { - // DeliverNew + o.sseq = o.cfg.OptStartSeq + } + + if state.FirstSeq == 0 { + o.sseq = 1 + } else if o.sseq < state.FirstSeq { + o.sseq = state.FirstSeq + } else if o.sseq > state.LastSeq { o.sseq = state.LastSeq + 1 } - } else { - o.sseq = o.cfg.OptStartSeq } - if state.FirstSeq == 0 { - o.sseq = 1 - } else if o.sseq < state.FirstSeq { - o.sseq = state.FirstSeq - } else if o.sseq > state.LastSeq { - o.sseq = state.LastSeq + 1 + // Always set delivery sequence to 1. + o.dseq = 1 + // Set ack delivery floor to delivery-1 + o.adflr = o.dseq - 1 + // Set ack store floor to store-1 + o.asflr = o.sseq - 1 + + // Set our starting sequence state. + if o.store != nil && o.sseq > 0 { + o.store.SetStarting(o.sseq - 1) } - } + } else { - // Always set delivery sequence to 1. - o.dseq = 1 - // Set ack delivery floor to delivery-1 - o.adflr = o.dseq - 1 - // Set ack store floor to store-1 - o.asflr = o.sseq - 1 + if o.mset == nil || o.mset.store == nil { + o.sseq = 1 + } else { + var state StreamState + o.mset.store.FastState(&state) + if o.cfg.OptStartSeq == 0 { + if o.cfg.DeliverPolicy == DeliverAll { + o.sseq = state.FirstSeq + } else if o.cfg.DeliverPolicy == DeliverLast { + o.sseq = state.LastSeq + for i, subject := range o.msseq { + if subject.subject == _EMPTY_ { + o.msseq[i].sequence = state.LastSeq + } else { + ss := o.mset.store.FilteredState(1, subject.subject) + o.msseq[i].sequence = ss.Last + } + } + // If we are partitioned here this will be properly set when we become leader. + if o.cfg.FilterSubject != _EMPTY_ { + ss := o.mset.store.FilteredState(1, o.cfg.FilterSubject) + o.sseq = ss.Last + } + } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { + if mss := o.mset.store.SubjectsState(o.cfg.FilterSubject); len(mss) > 0 { + o.lss = &lastSeqSkipList{ + resume: state.LastSeq, + seqs: createLastSeqSkipList(mss), + } + o.sseq = o.lss.seqs[0] + } else { + // If no mapping info just set to last. + o.sseq = state.LastSeq + } + } else if o.cfg.OptStartTime != nil { + // If we are here we are time based. + // TODO(dlc) - Once clustered can't rely on this. + o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime) + } else { + // DeliverNew + o.sseq = state.LastSeq + 1 + } + } else { + o.sseq = o.cfg.OptStartSeq + } + + if state.FirstSeq == 0 { + o.sseq = 1 + for i := range o.msseq { + o.msseq[i].sequence = 1 + } + } else if o.sseq < state.FirstSeq { + o.sseq = state.FirstSeq + } else if o.sseq > state.LastSeq { + o.sseq = state.LastSeq + 1 + } + for i, filter := range o.msseq { + if state.FirstSeq == 0 { + + o.msseq[i].sequence = 1 + } + if filter.sequence < state.FirstSeq { + o.msseq[i].sequence = state.FirstSeq + } + if filter.sequence > state.LastSeq { + o.msseq[i].sequence = state.LastSeq + 1 + } + } + + } + sort.Slice(o.msseq, func(i, j int) bool { + return o.msseq[j].sequence > o.msseq[i].sequence + }) + + // Always set delivery sequence to 1. + o.dseq = 1 + // Set ack delivery floor to delivery-1 + o.adflr = o.dseq - 1 + // Set ack store floor to store-1 + if len(o.cfg.FilterSubjects) == 0 { + o.asflr = o.sseq - 1 + // Set our starting sequence state. + if o.store != nil && o.sseq > 0 { + o.store.SetStarting(o.msseq[0].sequence - 1) + } + } else { + o.asflr = o.msseq[0].sequence - 1 + // Set our starting sequence state. + if o.store != nil && o.sseq > 0 { + o.store.SetStarting(o.sseq - 1) + } + } - // Set our starting sequence state. - if o.store != nil && o.sseq > 0 { - o.store.SetStarting(o.sseq - 1) } } @@ -4335,19 +4538,25 @@ func (o *consumer) account() *Account { return a } -func (o *consumer) signalSub() *subscription { +func (o *consumer) signalSubs() []*subscription { o.mu.Lock() defer o.mu.Unlock() - if o.sigSub != nil { - return o.sigSub + if o.sigSubs != nil { + return o.sigSubs } - subject := o.cfg.FilterSubject - if subject == _EMPTY_ { - subject = fwcs + subs := []*subscription{} + if o.cfg.FilterSubject == _EMPTY_ && len(o.cfg.FilterSubjects) == 0 { + subs = append(subs, &subscription{subject: []byte(fwcs), icb: o.processStreamSignal}) + } + if o.cfg.FilterSubject != _EMPTY_ { + subs = append(subs, &subscription{subject: []byte(o.cfg.FilterSubject), icb: o.processStreamSignal}) + } + for _, sub := range o.cfg.FilterSubjects { + subs = append(subs, &subscription{subject: []byte(sub), icb: o.processStreamSignal}) } - return &subscription{subject: []byte(subject), icb: o.processStreamSignal} + return subs } // This is what will be called when our parent stream wants to kick us regarding a new message. @@ -4373,3 +4582,11 @@ func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, s o.signalNewMessages() } } + +type filteredSubject struct { + subject string + hasWildcard bool + sequence uint64 + pmsg *jsPubMsg + err error +} diff --git a/server/errors.json b/server/errors.json index 2413531bb34..24755921a1c 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1318,5 +1318,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JsConsumerCantHaveBothFilterSubjects", + "code": 400, + "error_code": 10134, + "description": "Consumer cannot have both FilterSubject and FilterSubjects specified", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] \ No newline at end of file diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 8cf409c3e77..b234212d547 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -400,6 +400,9 @@ const ( // JSTemplateNameNotMatchSubjectErr template name in subject does not match request JSTemplateNameNotMatchSubjectErr ErrorIdentifier = 10073 + + // JsConsumerCantHaveBothFilterSubjects Consumer cannot have both FilterSubject and FilterSubjects specified + JsConsumerCantHaveBothFilterSubjects ErrorIdentifier = 10134 ) var ( @@ -536,6 +539,7 @@ var ( JSStreamWrongLastSequenceErrF: {Code: 400, ErrCode: 10071, Description: "wrong last sequence: {seq}"}, JSTempStorageFailedErr: {Code: 500, ErrCode: 10072, Description: "JetStream unable to open temp storage for restore"}, JSTemplateNameNotMatchSubjectErr: {Code: 400, ErrCode: 10073, Description: "template name in subject does not match request"}, + JsConsumerCantHaveBothFilterSubjects: {Code: 400, ErrCode: 10134, Description: "Consumer cannot have both FilterSubject and FilterSubjects specified"}, } // ErrJetStreamNotClustered Deprecated by JSClusterNotActiveErr ApiError, use IsNatsError() for comparisons ErrJetStreamNotClustered = ApiErrors[JSClusterNotActiveErr] @@ -2090,3 +2094,13 @@ func NewJSTemplateNameNotMatchSubjectError(opts ...ErrorOption) *ApiError { return ApiErrors[JSTemplateNameNotMatchSubjectErr] } + +// NewJsConsumerCantHaveBothFilterSubjectsError creates a new JsConsumerCantHaveBothFilterSubjects error: "Consumer cannot have both FilterSubject and FilterSubjects specified" +func NewJsConsumerCantHaveBothFilterSubjectsError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JsConsumerCantHaveBothFilterSubjects] +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 65381f1d948..215dacee0bf 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -19104,6 +19104,368 @@ func TestJetStreamServerCrashOnPullConsumerDeleteWithInactiveThresholdAfterAck(t require_NoError(t, err) } +func TestJetStreamConsumerMultipleSubjectsLast(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, js := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Subjects: []string{"events.>"}, + Name: "name", + }) + if err != nil { + t.Fatalf("error while creating stream") + } + + for i := 0; i < 20; i++ { + js.Publish("events.created", []byte(fmt.Sprintf("%v", i))) + js.Publish("events.processed", []byte(fmt.Sprintf("%v", i+1))) + i++ + } + + // if they're not the same, expect error + _, err = mset.addConsumer(&ConsumerConfig{ + DeliverPolicy: DeliverLast, + DeliverSubject: "deliver", + FilterSubjects: []string{"events.created", "events.processed"}, + Durable: durable, + }) + + if err != nil { + t.Fatalf("error while creating consumer %v", err) + } + + sub, err := js.SubscribeSync("events.created,events.processed", nats.Bind("name", durable)) + if err != nil { + t.Fatalf("error while subscribing to Consumer: %v", err) + } + + msg, err := sub.NextMsg(time.Millisecond * 500) + if err != nil { + t.Fatalf("eror while getting the next message: %v", err) + } + fmt.Printf("MESSAGE: %+v\n", string(msg.Data)) + j, err := strconv.Atoi(string(msg.Data)) + require_NoError(t, err) + if j != 19 { + t.Fatalf("wrong sequence, expected %v got %v", 19, j) + } + if err := msg.AckSync(); err != nil { + t.Fatalf("error while acking the message :%v", err) + } + + info, err := js.ConsumerInfo("name", durable) + require_NoError(t, err) + fmt.Printf("INFO: %+v\n", info) + +} + +func TestJetStreamConsumerMultipleSubjectsTrue(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, js := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Subjects: []string{"events.>"}, + Name: "name", + }) + if err != nil { + t.Fatalf("error while creating stream") + } + + for i := 0; i < 20; i++ { + js.Publish("events.created", []byte(fmt.Sprintf("created %v", i))) + js.Publish("events.processed", []byte(fmt.Sprintf("processed %v", i+1))) + i++ + } + + // if they're not the same, expect error + _, err = mset.addConsumer(&ConsumerConfig{ + DeliverSubject: "deliver", + FilterSubjects: []string{"events.created", "events.processed"}, + Durable: durable, + }) + + if err != nil { + t.Fatalf("error while creating consumer %v", err) + } + + sub, err := js.SubscribeSync("events.created,events.processed", nats.Bind("name", durable)) + if err != nil { + t.Fatalf("error while subscribing to Consumer: %v", err) + } + + for i := 0; i < 20; i++ { + + msg, err := sub.NextMsg(time.Millisecond * 500) + if err != nil { + t.Fatalf("eror while getting the next message: %v", err) + } + fmt.Printf("MESSAGE: %+v\n", string(msg.Data)) + // j, err := strconv.Atoi(string(msg.Data)) + require_NoError(t, err) + // if j != i { + // t.Fatalf("wrong sequence, expected %v got %v", i, j) + // } + if err := msg.AckSync(); err != nil { + t.Fatalf("error while acking the message :%v", err) + } + + } + info, err := js.ConsumerInfo("name", durable) + require_NoError(t, err) + fmt.Printf("INFO: %+v\n", info) + +} +func TestJetStreamConsumerMultipleSubjectsWithEmpty(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Subjects: []string{"events.>"}, + Name: "name", + }) + if err != nil { + t.Fatalf("error while creating stream") + } + + for i := 0; i < 10; i++ { + js.Publish("events.created", []byte(fmt.Sprintf("%v", i))) + } + + // if they're not the same, expect error + _, err = js.AddConsumer("name", &nats.ConsumerConfig{ + DeliverSubject: "deliver", + FilterSubject: "", + Durable: durable, + AckPolicy: nats.AckExplicitPolicy}) + + if err != nil { + t.Fatalf("error while creating consumer %v", err) + } + + sub, err := js.SubscribeSync("events.created,events.processed", nats.Bind("name", durable)) + if err != nil { + t.Fatalf("error while subscribing to Consumer: %v", err) + } + + for i := 0; i < 10; i++ { + + msg, err := sub.NextMsg(time.Millisecond * 500) + if err != nil { + t.Fatalf("eror while getting the next message: %v", err) + } + fmt.Printf("MESSAGE: %+v\n", string(msg.Data)) + j, err := strconv.Atoi(string(msg.Data)) + require_NoError(t, err) + if j != i { + t.Fatalf("wrong sequence, expected %v got %v", i, j) + } + if err := msg.AckSync(); err != nil { + t.Fatalf("error while acking the message :%v", err) + } + + } + info, err := js.ConsumerInfo("name", durable) + require_NoError(t, err) + fmt.Printf("INFO: %+v\n", info) + +} + +func SingleFilterConsumerCheck(t *testing.T) { + + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, _ := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Subjects: []string{"events.>"}, + Name: "deliver", + }) + if err != nil { + t.Fatalf("error while creating stream") + } + + // if they're not the same, expect error + _, err = mset.addConsumer(&ConsumerConfig{ + DeliverSubject: "deliver", + FilterSubject: "SINGLE", + Durable: durable, + }) + require_Error(t, err) +} + +func TestJetStreamConsumerMultipleSubjectsWithAddedMessages(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, js := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Subjects: []string{"events.>"}, + Name: "deliver", + }) + if err != nil { + t.Fatalf("error while creating stream") + } + + // if they're not the same, expect error + _, err = mset.addConsumer(&ConsumerConfig{ + DeliverSubject: "deliver", + FilterSubjects: []string{"events.created", "events.processed"}, + Durable: durable, + }) + + if err != nil { + t.Fatalf("error while creating consumer %v", err) + } + + js.Publish("events.created", []byte(fmt.Sprintf("%v", 0))) + js.Publish("events.created", []byte(fmt.Sprintf("%v", 1))) + js.Publish("events.created", []byte(fmt.Sprintf("%v", 2))) + js.Publish("events.created", []byte(fmt.Sprintf("%v", 3))) + js.Publish("events.processed", []byte(fmt.Sprintf("%v", 4))) + js.Publish("events.processed", []byte(fmt.Sprintf("%v", 5))) + js.Publish("events.processed", []byte(fmt.Sprintf("%v", 6))) + js.Publish("events.processed", []byte(fmt.Sprintf("%v", 7))) + js.Publish("events.processed", []byte(fmt.Sprintf("%v", 8))) + + sub, err := js.SubscribeSync("", nats.Bind("deliver", durable)) + if err != nil { + t.Fatalf("error while subscribing to Consumer: %v", err) + } + + for i := 0; i < 10; i++ { + if i == 5 { + js.Publish("events.created", []byte(fmt.Sprintf("%v", 9))) + } + if i == 9 { + js.Publish("events.created", []byte(fmt.Sprintf("%v", 11))) + } + if i == 7 { + + js.Publish("events.processed", []byte(fmt.Sprintf("%v", 10))) + } + + msg, err := sub.NextMsg(time.Millisecond * 5000) + if err != nil { + t.Fatalf("eror while getting the next message: %v", err) + } + fmt.Printf("MESSAGE: %+v\n", string(msg.Data)) + j, err := strconv.Atoi(string(msg.Data)) + require_NoError(t, err) + if j != i { + t.Fatalf("wrong sequence, expected %v got %v", i, j) + } + if err := msg.AckSync(); err != nil { + t.Fatalf("error while acking the message :%v", err) + } + + } + info, err := js.ConsumerInfo("deliver", durable) + require_NoError(t, err) + fmt.Printf("INFO: %+v\n", info) + +} +func TestJetStreamConsumerMultipleSubjectFifo(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Subjects: []string{"events.>"}, + Name: "name", + }) + if err != nil { + t.Fatalf("error while creating stream") + } + + for i := 0; i < 10; i++ { + js.Publish("events.created", []byte(fmt.Sprintf("%v", i))) + } + + js.Publish("events.processed", []byte("1")) + js.Publish("events.created", []byte("11")) + + // if they're not the same, expect error + _, err = js.AddConsumer("name", &nats.ConsumerConfig{ + DeliverSubject: "deliver", + FilterSubject: "", + Durable: durable, + AckPolicy: nats.AckExplicitPolicy}) + + if err != nil { + t.Fatalf("error while creating consumer %v", err) + } + + sub, err := js.SubscribeSync("events.created,events.processed", nats.Bind("name", durable)) + if err != nil { + t.Fatalf("error while subscribing to Consumer: %v", err) + } + + for i := 0; i < 10; i++ { + + msg, err := sub.NextMsg(time.Millisecond * 500) + if err != nil { + t.Fatalf("eror while getting the next message: %v", err) + } + fmt.Printf("MESSAGE: %+v\n", string(msg.Data)) + j, err := strconv.Atoi(string(msg.Data)) + require_NoError(t, err) + if j != i { + t.Fatalf("wrong sequence, expected %v got %v", i, j) + } + if err := msg.AckSync(); err != nil { + t.Fatalf("error while acking the message :%v", err) + } + + } + info, err := js.ConsumerInfo("name", durable) + require_NoError(t, err) + fmt.Printf("INFO: %+v\n", info) + +} + func TestJetStreamStreamUpdateSubjectsOverlapOthers(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() diff --git a/server/stream.go b/server/stream.go index 2431e0ba827..44ed310f7e9 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4609,6 +4609,7 @@ func (mset *stream) setConsumer(o *consumer) { if o.cfg.FilterSubject != _EMPTY_ { mset.numFilter++ } + mset.numFilter += len(o.cfg.FilterSubjects) if o.cfg.Direct { mset.directs++ } @@ -4638,7 +4639,9 @@ func (mset *stream) removeConsumer(o *consumer) { } // Always remove from the leader sublist. if mset.csl != nil { - mset.csl.Remove(o.signalSub()) + for _, sub := range o.signalSubs() { + mset.csl.Remove(sub) + } } mset.clsMu.Unlock() } @@ -4652,7 +4655,9 @@ func (mset *stream) setConsumerAsLeader(o *consumer) { if mset.csl == nil { mset.csl = NewSublistWithCache() } - mset.csl.Insert(o.signalSub()) + for _, sub := range o.signalSubs() { + mset.csl.Insert(sub) + } } // Remove the consumer as a leader. This will update signaling sublist. @@ -4660,7 +4665,9 @@ func (mset *stream) removeConsumerAsLeader(o *consumer) { mset.clsMu.Lock() defer mset.clsMu.Unlock() if mset.csl != nil { - mset.csl.Remove(o.signalSub()) + for _, sub := range o.signalSubs() { + mset.csl.Remove(sub) + } } }