diff --git a/server/consumer.go b/server/consumer.go index fa6dd4a3aa..61ae84000d 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -69,6 +69,7 @@ type ConsumerConfig struct { MaxDeliver int `json:"max_deliver,omitempty"` BackOff []time.Duration `json:"backoff,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` + FilterSubjects []string `json:"filter_subjects,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec SampleFrequency string `json:"sample_freq,omitempty"` @@ -244,12 +245,13 @@ type consumer struct { sid int name string stream string - sseq uint64 - dseq uint64 - adflr uint64 - asflr uint64 - npc uint64 - npcm uint64 + sseq uint64 // next stream sequence + subjf subjectFilters // subject filters and their sequences + dseq uint64 // delivered consumer sequence + adflr uint64 // ack delivery floor + asflr uint64 // ack store floor + npc uint64 // Num Pending Count + npcm uint64 // Last Num Pending Min dsubj string qgroup string lss *lastSeqSkipList @@ -279,7 +281,6 @@ type consumer struct { store ConsumerStore active bool replay bool - filterWC bool dtmr *time.Timer gwdtmr *time.Timer dthresh time.Duration @@ -313,8 +314,30 @@ type consumer struct { // Ack queue ackMsgs *ipQueue - // For stream signaling. - sigSub *subscription + // for stream signaling when multiple filters are set. + sigSubs []*subscription +} + +// A single subject filter. +type subjectFilter struct { + subject string + nextSeq uint64 + currentSeq uint64 + pmsg *jsPubMsg + err error + hasWildcard bool +} + +type subjectFilters []*subjectFilter + +// subjects is a helper function used for updating consumers. +// It is not used and should not be used in hotpath. +func (s subjectFilters) subjects() []string { + subjects := make([]string, 0, len(s)) + for _, filter := range s { + subjects = append(subjects, filter.subject) + } + return subjects } type proposal struct { @@ -375,7 +398,7 @@ func checkConsumerCfg( config *ConsumerConfig, srvLim *JSLimitOpts, cfg *StreamConfig, - acc *Account, + _ *Account, accLim *JetStreamAccountLimits, isRecovering bool, ) *ApiError { @@ -476,6 +499,26 @@ func checkConsumerCfg( return NewJSStreamInvalidConfigError(ErrBadSubject) } + // We treat FilterSubjects: []string{""} as a misconfig, so we validate against it. + for _, filter := range config.FilterSubjects { + if filter == _EMPTY_ { + return NewJSConsumerEmptyFilterError() + } + } + subjectFilters := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) + + // Check subject filters overlap. + for outer, subject := range subjectFilters { + if !IsValidSubject(subject) { + return NewJSStreamInvalidConfigError(ErrBadSubject) + } + for inner, ssubject := range subjectFilters { + if inner != outer && subjectIsSubsetMatch(subject, ssubject) { + return NewJsConsumerOverlappingSubjectFiltersError() + } + } + } + // Helper function to formulate similar errors. badStart := func(dp, start string) error { return fmt.Errorf("consumer delivery policy is deliver %s, but optional start %s is also set", dp, start) @@ -507,7 +550,7 @@ func checkConsumerCfg( if config.OptStartTime != nil { return NewJSConsumerInvalidPolicyError(badStart("last per subject", "time")) } - if config.FilterSubject == _EMPTY_ { + if config.FilterSubject == _EMPTY_ && len(config.FilterSubjects) == 0 { return NewJSConsumerInvalidPolicyError(notSet("last per subject", "filter subject")) } case DeliverNew: @@ -663,11 +706,12 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, NewJSConsumerWQRequiresExplicitAckError() } + subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) if len(mset.consumers) > 0 { - if config.FilterSubject == _EMPTY_ { + if len(subjects) == 0 { mset.mu.Unlock() return nil, NewJSConsumerWQMultipleUnfilteredError() - } else if !mset.partitionUnique(config.FilterSubject) { + } else if !mset.partitionUnique(subjects) { // Prior to v2.9.7, on a stream with WorkQueue policy, the servers // were not catching the error of having multiple consumers with // overlapping filter subjects depending on the scope, for instance @@ -753,11 +797,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri o.waiting = newWaitQueue(config.MaxWaiting) } - // Check if we have filtered subject that is a wildcard. - if config.FilterSubject != _EMPTY_ && subjectHasWildcard(config.FilterSubject) { - o.filterWC = true - } - // already under lock, mset.Name() would deadlock o.stream = mset.cfg.Name o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name @@ -781,6 +820,15 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri o.store = store } + subjects := gatherSubjectFilters(o.cfg.FilterSubject, o.cfg.FilterSubjects) + for _, filter := range subjects { + sub := &subjectFilter{ + subject: filter, + hasWildcard: subjectHasWildcard(filter), + } + o.subjf = append(o.subjf, sub) + } + if o.store != nil && o.store.HasState() { // Restore our saved state. o.mu.Lock() @@ -1004,7 +1052,7 @@ func (o *consumer) setLeader(isLeader bool) { // Update the group on the our starting sequence if we are starting but we skipped some in the stream. if o.dseq == 1 && o.sseq > 1 { - o.updateSkipped() + o.updateSkipped(o.sseq) } // Do info sub. @@ -1606,16 +1654,37 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { } } - if o.cfg.FilterSubject != cfg.FilterSubject { - if cfg.FilterSubject != _EMPTY_ { - o.filterWC = subjectHasWildcard(cfg.FilterSubject) + // Check for Subject Filters update. + newSubjects := gatherSubjectFilters(cfg.FilterSubject, cfg.FilterSubjects) + if !subjectSliceEqual(newSubjects, o.subjf.subjects()) { + newSubjf := make(subjectFilters, 0, len(newSubjects)) + for _, newFilter := range newSubjects { + fs := &subjectFilter{ + subject: newFilter, + hasWildcard: subjectHasWildcard(newFilter), + } + // If given subject was present, we will retain its fields values + // so `getNextMgs` can take advantage of already buffered `pmsgs`. + for _, oldFilter := range o.subjf { + if oldFilter.subject == newFilter { + fs.currentSeq = oldFilter.currentSeq + fs.nextSeq = oldFilter.nextSeq + fs.pmsg = oldFilter.pmsg + } + continue + } + newSubjf = append(newSubjf, fs) + } // Make sure we have correct signaling setup. // Consumer lock can not be held. mset := o.mset o.mu.Unlock() - mset.swapSigSubs(o, cfg.FilterSubject) + mset.swapSigSubs(o, newSubjf.subjects()) o.mu.Lock() + + // When we're done with signaling, we can replace the subjects. + o.subjf = newSubjf } // Record new config for others that do not need special handling. @@ -1754,7 +1823,7 @@ func (o *consumer) progressUpdate(seq uint64) { } // Lock should be held. -func (o *consumer) updateSkipped() { +func (o *consumer) updateSkipped(seq uint64) { // Clustered mode and R>1 only. if o.node == nil || !o.isLeader() { return @@ -1762,7 +1831,7 @@ func (o *consumer) updateSkipped() { var b [1 + 8]byte b[0] = byte(updateSkipOp) var le = binary.LittleEndian - le.PutUint64(b[1:], o.sseq) + le.PutUint64(b[1:], seq) o.propose(b[:]) } @@ -2443,7 +2512,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { // even if the stream only has a single non-wildcard subject designation. // Read lock should be held. func (o *consumer) isFiltered() bool { - if o.cfg.FilterSubject == _EMPTY_ { + if o.subjf == nil { return false } // If we are here we want to check if the filtered subject is @@ -2452,11 +2521,34 @@ func (o *consumer) isFiltered() bool { if mset == nil { return true } - if len(mset.cfg.Subjects) == 1 { - return o.cfg.FilterSubject != mset.cfg.Subjects[0] + + // `isFiltered` need to be performant, so we do + // as any checks as possible to avoid unnecessary work. + // Here we avoid iteration over slices if there is only one subject in stream + // and one equal filter for the consumer. + if len(mset.cfg.Subjects) == 1 && len(o.subjf) == 1 && mset.cfg.Subjects[0] == o.subjf[0].subject { + return true } - // All else return true. - return true + + // if the list is not equal length, we can return early, as this is filtered. + if len(mset.cfg.Subjects) != len(o.subjf) { + return true + } + + // if in rare case scenario that user passed all stream subjects as consumer filters, + // we need to do a more expensive operation. + // reflect.DeepEqual would return false if the filters are the same, but in different order + // so it can't be used here. + cfilters := make(map[string]struct{}, len(o.subjf)) + for _, val := range o.subjf { + cfilters[val.subject] = struct{}{} + } + for _, val := range mset.cfg.Subjects { + if _, ok := cfilters[val]; !ok { + return true + } + } + return false } // Check if we need an ack for this store seq. @@ -2482,7 +2574,6 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { return false } } - if o.isLeader() { asflr, osseq = o.asflr, o.sseq pending = o.pending @@ -2955,15 +3046,23 @@ func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) { // Lock should be held. func (o *consumer) isFilteredMatch(subj string) bool { // No filter is automatic match. - if o.cfg.FilterSubject == _EMPTY_ { + if o.subjf == nil { return true } - if !o.filterWC { - return subj == o.cfg.FilterSubject + for _, filter := range o.subjf { + if !filter.hasWildcard && subj == filter.subject { + return true + } } - // If we are here we have a wildcard filter subject. + // It's quicker to first check for non-wildcard filters, then + // iterate again to check for subset match. // TODO(dlc) at speed might be better to just do a sublist with L2 and/or possibly L1. - return subjectIsSubsetMatch(subj, o.cfg.FilterSubject) + for _, filter := range o.subjf { + if subjectIsSubsetMatch(subj, filter.subject) { + return true + } + } + return false } var ( @@ -2979,9 +3078,9 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { if o.mset == nil || o.mset.store == nil { return nil, 0, errBadConsumer } - seq, dc := o.sseq, uint64(1) // Process redelivered messages before looking at possibly "skip list" (deliver last per subject) if o.hasRedeliveries() { + var seq, dc uint64 for seq = o.getNextToRedeliver(); seq > 0; seq = o.getNextToRedeliver() { dc = o.incDeliveryCount(seq) if o.maxdc > 0 && dc > o.maxdc { @@ -3003,46 +3102,116 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { return pmsg, dc, err } } - // Fallback if all redeliveries are gone. - seq, dc = o.sseq, 1 } - // Don't make it a "else" because it is possible that there were redeliveries - // but we exhausted the redelivery count and are back to try deliver the next message. + + // Check if we have max pending. + if o.maxp > 0 && len(o.pending) >= o.maxp { + // maxp only set when ack policy != AckNone and user set MaxAckPending + // Stall if we have hit max pending. + return nil, 0, errMaxAckPending + } + if o.hasSkipListPending() { - seq = o.lss.seqs[0] + seq := o.lss.seqs[0] if len(o.lss.seqs) == 1 { o.sseq = o.lss.resume o.lss = nil - o.updateSkipped() + o.updateSkipped(o.sseq) } else { o.lss.seqs = o.lss.seqs[1:] } + pmsg := getJSPubMsgFromPool() + sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg) + if sm == nil || err != nil { + pmsg.returnToPool() + } + o.sseq++ + return pmsg, 1, err + } - // Check if we have max pending. - if o.maxp > 0 && len(o.pending) >= o.maxp { - // maxp only set when ack policy != AckNone and user set MaxAckPending - // Stall if we have hit max pending. - return nil, 0, errMaxAckPending + // If no filters are specified, optimize to fetch just non-filtered messages. + if o.subjf == nil { + // Grab next message applicable to us. + pmsg := getJSPubMsgFromPool() + sm, sseq, err := o.mset.store.LoadNextMsg("", false, o.sseq, &pmsg.StoreMsg) + + if sseq >= o.sseq { + o.sseq = sseq + 1 + if err == ErrStoreEOF { + o.updateSkipped(o.sseq) + } + } + if sm == nil { + pmsg.returnToPool() + return nil, 0, err + } + return pmsg, 1, err } - // Grab next message applicable to us. - pmsg := getJSPubMsgFromPool() - sm, sseq, err := o.mset.store.LoadNextMsg(o.cfg.FilterSubject, o.filterWC, seq, &pmsg.StoreMsg) + var lastErr error + // if we have filters, iterate over filters and optimize by buffering found messages. + for _, filter := range o.subjf { + if filter.nextSeq < o.sseq { + // o.subjf should always point to the right starting point for reading messages + // if anything modified it, make sure our sequence do not start earlier. + filter.nextSeq = o.sseq + } + // if this subject didn't fetch any message before, do it now + if filter.pmsg == nil { + pmsg := getJSPubMsgFromPool() + sm, sseq, err := o.mset.store.LoadNextMsg(filter.subject, filter.hasWildcard, filter.nextSeq, &pmsg.StoreMsg) + + filter.err = err - if sseq >= o.sseq { - o.sseq = sseq + 1 - if err == ErrStoreEOF { - o.updateSkipped() + if sm != nil { + filter.pmsg = pmsg + } else { + pmsg.returnToPool() + } + if sseq >= filter.nextSeq { + filter.nextSeq = sseq + 1 + if err == ErrStoreEOF { + o.updateSkipped(uint64(filter.currentSeq)) + } + } } + } - if sm == nil { - pmsg.returnToPool() - return nil, 0, err + // Don't sosrt the o.subjf if it's only one entry + // Sort uses `reflect` and can noticeably slow down fetching, + // even if len == 0 or 1. + // TODO(tp): we should have sort based off generics for server + // to avoid reflection. + if o.subjf != nil && len(o.subjf) > 1 { + sort.Slice(o.subjf, func(i, j int) bool { + return o.subjf[j].nextSeq > o.subjf[i].nextSeq + }) } - return pmsg, dc, err + // Grab next message applicable to us. + // Sort sequences first, to grab the first message. + for _, filter := range o.subjf { + // set o.sseq to the first subject sequence + if filter.nextSeq > o.sseq { + o.sseq = filter.nextSeq + } + // This means we got a message in this subject fetched. + if filter.pmsg != nil { + filter.currentSeq = filter.nextSeq + o.sseq = filter.currentSeq + returned := filter.pmsg + filter.pmsg = nil + return returned, 1, filter.err + } + if filter.err != nil { + lastErr = filter.err + } + } + + return nil, 0, lastErr + } // Will check for expiration and lack of interest on waiting requests. @@ -3190,7 +3359,7 @@ func (o *consumer) suppressDeletion() { } func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { - // On startup check to see if we are in a a reply situation where replay policy is not instant. + // On startup check to see if we are in a reply situation where replay policy is not instant. var ( lts int64 // last time stamp seen, used for replay. lseq uint64 @@ -3303,6 +3472,15 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { wr.hbt = time.Now().Add(wr.hb) } } else { + if o.subjf != nil { + for i, filter := range o.subjf { + if subjectIsSubsetMatch(pmsg.subj, filter.subject) { + o.subjf[i].currentSeq-- + o.subjf[i].nextSeq-- + break + } + } + } // We will redo this one. o.sseq-- if dc == 1 && o.npcm > 0 { @@ -3454,20 +3632,57 @@ func (o *consumer) numPending() uint64 { func (o *consumer) streamNumPending() uint64 { if o.mset == nil || o.mset.store == nil { o.npc, o.npcm = 0, 0 - } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { + return 0 + } + // Deliver Last Per Subject calculates num pending differently. + if o.cfg.DeliverPolicy == DeliverLastPerSubject { o.npc, o.npcm = 0, 0 - for _, ss := range o.mset.store.SubjectsState(o.cfg.FilterSubject) { - if o.sseq <= ss.Last { - o.npc++ - if ss.Last > o.npcm { - o.npcm = ss.Last + // Consumer without filters. + if o.subjf == nil { + for _, ss := range o.mset.store.SubjectsState("") { + if o.sseq <= ss.Last { + o.npc++ + if ss.Last > o.npcm { + o.npcm = ss.Last + } + } + } + return o.npc + } + // Consumer with filters. + for _, filter := range o.subjf { + for _, ss := range o.mset.store.SubjectsState(filter.subject) { + if filter.nextSeq <= ss.Last { + o.npc++ + if ss.Last > o.npcm { + o.npcm = ss.Last + } } } } - } else { - ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject) + return o.npc + } + // Every other Delivery Policy is handled here. + // Consumer without filters. + if o.subjf == nil { + ss := o.mset.store.FilteredState(o.sseq, "") o.npc, o.npcm = ss.Msgs, ss.Last + return o.npc + } + // Consumer with filters. + var npc uint64 + for _, filter := range o.subjf { + // We might loose state of o.subjf, so if we do recover from o.sseq + if filter.currentSeq < o.sseq { + filter.currentSeq = o.sseq + } + ss := o.mset.store.FilteredState(filter.currentSeq, filter.subject) + npc += ss.Msgs + if ss.Last > o.npcm { + o.npcm = ss.Last + } } + o.npc = npc return o.npc } @@ -3944,23 +4159,51 @@ func (o *consumer) selectStartingSeqNo() { if o.cfg.DeliverPolicy == DeliverAll { o.sseq = state.FirstSeq } else if o.cfg.DeliverPolicy == DeliverLast { - o.sseq = state.LastSeq + if o.subjf == nil { + o.sseq = state.LastSeq + return + } // If we are partitioned here this will be properly set when we become leader. - if o.cfg.FilterSubject != _EMPTY_ { - ss := o.mset.store.FilteredState(1, o.cfg.FilterSubject) - o.sseq = ss.Last + for _, filter := range o.subjf { + ss := o.mset.store.FilteredState(1, filter.subject) + filter.nextSeq = ss.Last + if ss.Last > o.sseq { + o.sseq = ss.Last + } } } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { - if mss := o.mset.store.SubjectsState(o.cfg.FilterSubject); len(mss) > 0 { - o.lss = &lastSeqSkipList{ - resume: state.LastSeq, - seqs: createLastSeqSkipList(mss), + if o.subjf == nil { + if mss := o.mset.store.SubjectsState(o.cfg.FilterSubject); len(mss) > 0 { + o.lss = &lastSeqSkipList{ + resume: state.LastSeq, + seqs: createLastSeqSkipList(mss), + } + o.sseq = o.lss.seqs[0] + } else { + // If no mapping info just set to last. + o.sseq = state.LastSeq } - o.sseq = o.lss.seqs[0] - } else { - // If no mapping info just set to last. + return + } + lss := &lastSeqSkipList{ + resume: state.LastSeq, + } + for _, filter := range o.subjf { + if mss := o.mset.store.SubjectsState(filter.subject); len(mss) > 0 { + lss.seqs = append(lss.seqs, createLastSeqSkipList(mss)...) + } + } + if len(lss.seqs) == 0 { o.sseq = state.LastSeq } + // Sort the skip list + sort.Slice(lss.seqs, func(i, j int) bool { + return lss.seqs[j] > lss.seqs[i] + }) + o.lss = lss + if len(o.lss.seqs) != 0 { + o.sseq = o.lss.seqs[0] + } } else if o.cfg.OptStartTime != nil { // If we are here we are time based. // TODO(dlc) - Once clustered can't rely on this. @@ -3975,11 +4218,30 @@ func (o *consumer) selectStartingSeqNo() { if state.FirstSeq == 0 { o.sseq = 1 + for _, filter := range o.subjf { + filter.nextSeq = 1 + } } else if o.sseq < state.FirstSeq { o.sseq = state.FirstSeq } else if o.sseq > state.LastSeq { o.sseq = state.LastSeq + 1 } + for _, filter := range o.subjf { + if state.FirstSeq == 0 { + filter.nextSeq = 1 + } + if filter.nextSeq < state.FirstSeq { + filter.nextSeq = state.FirstSeq + } + if filter.nextSeq > state.LastSeq { + filter.nextSeq = state.LastSeq + 1 + } + } + } + if o.subjf != nil { + sort.Slice(o.subjf, func(i, j int) bool { + return o.subjf[j].nextSeq > o.subjf[i].nextSeq + }) } // Always set delivery sequence to 1. @@ -3988,7 +4250,6 @@ func (o *consumer) selectStartingSeqNo() { o.adflr = o.dseq - 1 // Set ack store floor to store-1 o.asflr = o.sseq - 1 - // Set our starting sequence state. if o.store != nil && o.sseq > 0 { o.store.SetStarting(o.sseq - 1) @@ -4348,21 +4609,28 @@ func (o *consumer) account() *Account { return a } -func (o *consumer) signalSub() *subscription { +// Creates a sublist for consumer. +// All subjects share the same callback. +func (o *consumer) signalSubs() []*subscription { o.mu.Lock() defer o.mu.Unlock() - if o.sigSub != nil { - return o.sigSub + if o.sigSubs != nil { + return o.sigSubs + } + + subs := []*subscription{} + if o.subjf == nil { + subs = append(subs, &subscription{subject: []byte(fwcs), icb: o.processStreamSignal}) + o.sigSubs = subs + return subs } - subject := o.cfg.FilterSubject - if subject == _EMPTY_ { - subject = fwcs + for _, filter := range o.subjf { + subs = append(subs, &subscription{subject: []byte(filter.subject), icb: o.processStreamSignal}) } - sub := &subscription{subject: []byte(subject), icb: o.processStreamSignal} - o.sigSub = sub - return sub + o.sigSubs = subs + return subs } // This is what will be called when our parent stream wants to kick us regarding a new message. @@ -4388,3 +4656,31 @@ func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, s o.signalNewMessages() } } + +// Used to compare if two multiple filtered subject lists are equal. +func subjectSliceEqual(slice1 []string, slice2 []string) bool { + if len(slice1) != len(slice2) { + return false + } + set2 := make(map[string]struct{}, len(slice2)) + for _, val := range slice2 { + set2[val] = struct{}{} + } + for _, val := range slice1 { + if _, ok := set2[val]; !ok { + return false + } + } + return true +} + +// Utility for simpler if conditions in Consumer config checks. +// In future iteration, we can immediately create `o.subjf` and +// use it to validate things. +func gatherSubjectFilters(filter string, filters []string) []string { + if filter != _EMPTY_ { + filters = append(filters, filter) + } + // list of filters should never contain non-empty filter. + return filters +} diff --git a/server/errors.json b/server/errors.json index 7b048856dd..fff36b277e 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1338,5 +1338,45 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JsConsumerDuplicateFilterSubjects", + "code": 400, + "error_code": 10136, + "description": "consumer cannot have both FilterSubject and FilterSubjects specified", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JsConsumerMultipleFiltersNotAllowed", + "code": 400, + "error_code": 10137, + "description": "consumer with multiple subject filters cannot use subject based API", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JsConsumerOverlappingSubjectFilters", + "code": 400, + "error_code": 10138, + "description": "consumer subject filters cannot overlap", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSConsumerEmptyFilter", + "code": 400, + "error_code": 10139, + "description": "consumer filter in FilterSubjects cannot be empty", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] \ No newline at end of file diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 6129531cc2..2bc08f5851 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -3818,6 +3818,13 @@ func (s *Server) jsConsumerCreateRequest(sub *subscription, c *client, a *Accoun return } + // in case of multiple filters provided, error if new API is used. + if filteredSubject != _EMPTY_ && len(req.Config.FilterSubjects) != 0 { + resp.Error = NewJsConsumerMultipleFiltersNotAllowedError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Check for a filter subject. if filteredSubject != _EMPTY_ && req.Config.FilterSubject != filteredSubject { resp.Error = NewJSConsumerCreateFilterSubjectMismatchError() diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 0a7a829cc6..c731266e44 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -83,6 +83,9 @@ const ( // JSConsumerDurableNameNotSetErr consumer expected to be durable but a durable name was not set JSConsumerDurableNameNotSetErr ErrorIdentifier = 10018 + // JSConsumerEmptyFilter consumer filter in FilterSubjects cannot be empty + JSConsumerEmptyFilter ErrorIdentifier = 10139 + // JSConsumerEphemeralWithDurableInSubjectErr consumer expected to be ephemeral but detected a durable name set in subject JSConsumerEphemeralWithDurableInSubjectErr ErrorIdentifier = 10019 @@ -406,6 +409,15 @@ const ( // JSTemplateNameNotMatchSubjectErr template name in subject does not match request JSTemplateNameNotMatchSubjectErr ErrorIdentifier = 10073 + + // JsConsumerDuplicateFilterSubjects consumer cannot have both FilterSubject and FilterSubjects specified + JsConsumerDuplicateFilterSubjects ErrorIdentifier = 10136 + + // JsConsumerMultipleFiltersNotAllowed consumer with multiple subject filters cannot use subject based API + JsConsumerMultipleFiltersNotAllowed ErrorIdentifier = 10137 + + // JsConsumerOverlappingSubjectFilters consumer subject filters cannot overlap + JsConsumerOverlappingSubjectFilters ErrorIdentifier = 10138 ) var ( @@ -436,6 +448,7 @@ var ( JSConsumerDurableNameNotInSubjectErr: {Code: 400, ErrCode: 10016, Description: "consumer expected to be durable but no durable name set in subject"}, JSConsumerDurableNameNotMatchSubjectErr: {Code: 400, ErrCode: 10017, Description: "consumer name in subject does not match durable name in request"}, JSConsumerDurableNameNotSetErr: {Code: 400, ErrCode: 10018, Description: "consumer expected to be durable but a durable name was not set"}, + JSConsumerEmptyFilter: {Code: 400, ErrCode: 10139, Description: "consumer filter in FilterSubjects cannot be empty"}, JSConsumerEphemeralWithDurableInSubjectErr: {Code: 400, ErrCode: 10019, Description: "consumer expected to be ephemeral but detected a durable name set in subject"}, JSConsumerEphemeralWithDurableNameErr: {Code: 400, ErrCode: 10020, Description: "consumer expected to be ephemeral but a durable name was set in request"}, JSConsumerExistingActiveErr: {Code: 400, ErrCode: 10105, Description: "consumer already exists and is still active"}, @@ -544,6 +557,9 @@ var ( JSStreamWrongLastSequenceErrF: {Code: 400, ErrCode: 10071, Description: "wrong last sequence: {seq}"}, JSTempStorageFailedErr: {Code: 500, ErrCode: 10072, Description: "JetStream unable to open temp storage for restore"}, JSTemplateNameNotMatchSubjectErr: {Code: 400, ErrCode: 10073, Description: "template name in subject does not match request"}, + JsConsumerDuplicateFilterSubjects: {Code: 400, ErrCode: 10136, Description: "consumer cannot have both FilterSubject and FilterSubjects specified"}, + JsConsumerMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10137, Description: "consumer with multiple subject filters cannot use subject based API"}, + JsConsumerOverlappingSubjectFilters: {Code: 400, ErrCode: 10138, Description: "consumer subject filters cannot overlap"}, } // ErrJetStreamNotClustered Deprecated by JSClusterNotActiveErr ApiError, use IsNatsError() for comparisons ErrJetStreamNotClustered = ApiErrors[JSClusterNotActiveErr] @@ -847,6 +863,16 @@ func NewJSConsumerDurableNameNotSetError(opts ...ErrorOption) *ApiError { return ApiErrors[JSConsumerDurableNameNotSetErr] } +// NewJSConsumerEmptyFilterError creates a new JSConsumerEmptyFilter error: "consumer filter in FilterSubjects cannot be empty" +func NewJSConsumerEmptyFilterError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerEmptyFilter] +} + // NewJSConsumerEphemeralWithDurableInSubjectError creates a new JSConsumerEphemeralWithDurableInSubjectErr error: "consumer expected to be ephemeral but detected a durable name set in subject" func NewJSConsumerEphemeralWithDurableInSubjectError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) @@ -2124,3 +2150,33 @@ func NewJSTemplateNameNotMatchSubjectError(opts ...ErrorOption) *ApiError { return ApiErrors[JSTemplateNameNotMatchSubjectErr] } + +// NewJsConsumerDuplicateFilterSubjectsError creates a new JsConsumerDuplicateFilterSubjects error: "consumer cannot have both FilterSubject and FilterSubjects specified" +func NewJsConsumerDuplicateFilterSubjectsError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JsConsumerDuplicateFilterSubjects] +} + +// NewJsConsumerMultipleFiltersNotAllowedError creates a new JsConsumerMultipleFiltersNotAllowed error: "consumer with multiple subject filters cannot use subject based API" +func NewJsConsumerMultipleFiltersNotAllowedError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JsConsumerMultipleFiltersNotAllowed] +} + +// NewJsConsumerOverlappingSubjectFiltersError creates a new JsConsumerOverlappingSubjectFilters error: "consumer subject filters cannot overlap" +func NewJsConsumerOverlappingSubjectFiltersError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JsConsumerOverlappingSubjectFilters] +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index df16c58e93..7dbd5fa1a4 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -9387,7 +9387,7 @@ func TestJetStreamPubWithSyncPerf(t *testing.T) { func TestJetStreamConsumerPerf(t *testing.T) { // Comment out to run, holding place for now. - t.SkipNow() + // t.SkipNow() s := RunBasicJetStreamServer(t) defer s.Shutdown() @@ -18904,6 +18904,829 @@ func TestJetStreamServerCrashOnPullConsumerDeleteWithInactiveThresholdAfterAck(t require_NoError(t, err) } +func TestJetStreamConsumerMultipleSubjectsLast(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, js := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Subjects: []string{"events", "data", "other"}, + Name: "name", + }) + if err != nil { + t.Fatalf("error while creating stream") + } + + sendStreamMsg(t, nc, "events", "1") + sendStreamMsg(t, nc, "data", "2") + sendStreamMsg(t, nc, "other", "3") + sendStreamMsg(t, nc, "events", "4") + sendStreamMsg(t, nc, "data", "5") + sendStreamMsg(t, nc, "data", "6") + sendStreamMsg(t, nc, "other", "7") + sendStreamMsg(t, nc, "other", "8") + + // if they're not the same, expect error + _, err = mset.addConsumer(&ConsumerConfig{ + DeliverPolicy: DeliverLast, + AckPolicy: AckExplicit, + DeliverSubject: "deliver", + FilterSubjects: []string{"events", "data"}, + Durable: durable, + }) + require_NoError(t, err) + + sub, err := js.SubscribeSync("", nats.Bind("name", durable)) + require_NoError(t, err) + + msg, err := sub.NextMsg(time.Millisecond * 500) + require_NoError(t, err) + + j, err := strconv.Atoi(string(msg.Data)) + require_NoError(t, err) + expectedStreamSeq := 6 + if j != expectedStreamSeq { + t.Fatalf("wrong sequence, expected %v got %v", expectedStreamSeq, j) + } + + require_NoError(t, msg.AckSync()) + + // check if we don't get more than we wanted + msg, err = sub.NextMsg(time.Millisecond * 500) + if msg != nil || err == nil { + t.Fatalf("should not get more messages") + } + + info, err := js.ConsumerInfo("name", durable) + require_NoError(t, err) + + require_True(t, info.NumAckPending == 0) + require_True(t, info.AckFloor.Stream == 8) + require_True(t, info.AckFloor.Consumer == 1) + require_True(t, info.NumPending == 0) +} + +func TestJetStreamConsumerMultipleSubjectsLastPerSubject(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, js := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Subjects: []string{"events.*", "data.>", "other"}, + Name: "name", + }) + if err != nil { + t.Fatalf("error while creating stream") + } + + sendStreamMsg(t, nc, "events.1", "bad") + sendStreamMsg(t, nc, "events.1", "events.1") + + sendStreamMsg(t, nc, "data.1", "bad") + sendStreamMsg(t, nc, "data.1", "bad") + sendStreamMsg(t, nc, "data.1", "bad") + sendStreamMsg(t, nc, "data.1", "bad") + sendStreamMsg(t, nc, "data.1", "data.1") + + sendStreamMsg(t, nc, "events.2", "bad") + sendStreamMsg(t, nc, "events.2", "bad") + // this is last proper sequence, + sendStreamMsg(t, nc, "events.2", "events.2") + + sendStreamMsg(t, nc, "other", "bad") + sendStreamMsg(t, nc, "other", "bad") + + // if they're not the same, expect error + _, err = mset.addConsumer(&ConsumerConfig{ + DeliverPolicy: DeliverLastPerSubject, + AckPolicy: AckExplicit, + DeliverSubject: "deliver", + FilterSubjects: []string{"events.*", "data.>"}, + Durable: durable, + }) + require_NoError(t, err) + + sub, err := js.SubscribeSync("", nats.Bind("name", durable)) + require_NoError(t, err) + + checkMessage := func(t *testing.T, subject string, payload string, ack bool) { + msg, err := sub.NextMsg(time.Millisecond * 500) + require_NoError(t, err) + + if string(msg.Data) != payload { + t.Fatalf("expected %v paylaod, got %v", payload, string(msg.Data)) + } + if subject != msg.Subject { + t.Fatalf("expected %v subject, got %v", subject, msg.Subject) + } + if ack { + msg.AckSync() + } + } + + checkMessage(t, "events.1", "events.1", true) + checkMessage(t, "data.1", "data.1", true) + checkMessage(t, "events.2", "events.2", false) + + info, err := js.ConsumerInfo("name", durable) + require_NoError(t, err) + + require_True(t, info.AckFloor.Consumer == 2) + require_True(t, info.AckFloor.Stream == 9) + require_True(t, info.Delivered.Stream == 12) + require_True(t, info.Delivered.Consumer == 3) + + require_NoError(t, err) + +} +func TestJetStreamConsumerMultipleSubjects(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, js := jsClientConnect(t, s) + defer nc.Close() + + mset, err := s.GlobalAccount().addStream(&StreamConfig{ + Subjects: []string{"events.>", "data.>"}, + Name: "name", + }) + require_NoError(t, err) + + for i := 0; i < 20; i += 2 { + sendStreamMsg(t, nc, "events.created", fmt.Sprintf("created %v", i)) + sendStreamMsg(t, nc, "data.processed", fmt.Sprintf("processed %v", i+1)) + } + + _, err = mset.addConsumer(&ConsumerConfig{ + Durable: durable, + DeliverSubject: "deliver", + FilterSubjects: []string{"events.created", "data.processed"}, + }) + require_NoError(t, err) + + sub, err := js.SubscribeSync("", nats.Bind("name", durable)) + require_NoError(t, err) + + for i := 0; i < 20; i++ { + msg, err := sub.NextMsg(time.Millisecond * 500) + require_NoError(t, err) + require_NoError(t, msg.AckSync()) + } + info, err := js.ConsumerInfo("name", durable) + require_NoError(t, err) + require_True(t, info.NumAckPending == 0) + require_True(t, info.NumPending == 0) + require_True(t, info.AckFloor.Consumer == 20) + require_True(t, info.AckFloor.Stream == 20) + +} + +func TestJetStreamConsumerMultipleSubjectsWithEmpty(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Subjects: []string{"events.>"}, + Name: "name", + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + sendStreamMsg(t, nc, "events.created", fmt.Sprintf("%v", i)) + } + + // if they're not the same, expect error + _, err = js.AddConsumer("name", &nats.ConsumerConfig{ + DeliverSubject: "deliver", + FilterSubject: "", + Durable: durable, + AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + sub, err := js.SubscribeSync("", nats.Bind("name", durable)) + require_NoError(t, err) + + for i := 0; i < 9; i++ { + msg, err := sub.NextMsg(time.Millisecond * 500) + require_NoError(t, err) + j, err := strconv.Atoi(string(msg.Data)) + require_NoError(t, err) + if j != i { + t.Fatalf("wrong sequence, expected %v got %v", i, j) + } + require_NoError(t, msg.AckSync()) + } + + info, err := js.ConsumerInfo("name", durable) + require_NoError(t, err) + require_True(t, info.Delivered.Stream == 10) + require_True(t, info.Delivered.Consumer == 10) + require_True(t, info.AckFloor.Stream == 9) + require_True(t, info.AckFloor.Consumer == 9) + require_True(t, info.NumAckPending == 1) + + resp := createConsumer(t, nc, "name", ConsumerConfig{ + FilterSubjects: []string{""}, + DeliverSubject: "multiple", + Durable: "multiple", + AckPolicy: AckExplicit, + }) + require_True(t, resp.Error.ErrCode == 10139) +} + +func SingleFilterConsumerCheck(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, _ := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Subjects: []string{"events.>"}, + Name: "deliver", + }) + require_NoError(t, err) + + // if they're not the same, expect error + _, err = mset.addConsumer(&ConsumerConfig{ + DeliverSubject: "deliver", + FilterSubject: "SINGLE", + Durable: durable, + }) + require_Error(t, err) +} + +// createConsumer is a temporary method until nats.go client supports multiple subjects. +// it is used where lowe level call on mset is not enough, as we want to test error validation. +func createConsumer(t *testing.T, nc *nats.Conn, stream string, config ConsumerConfig) JSApiConsumerCreateResponse { + req, err := json.Marshal(&CreateConsumerRequest{Stream: stream, Config: config}) + require_NoError(t, err) + + resp, err := nc.Request(fmt.Sprintf("$JS.API.CONSUMER.DURABLE.CREATE.%s.%s", stream, config.Durable), req, time.Second*10) + require_NoError(t, err) + + var apiResp JSApiConsumerCreateResponse + require_NoError(t, json.Unmarshal(resp.Data, &apiResp)) + + return apiResp +} + +func TestJetStreamConsumerOverlappingSubjects(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + _, err := acc.addStream(&StreamConfig{ + Subjects: []string{"events.>"}, + Name: "deliver", + }) + require_NoError(t, err) + + resp := createConsumer(t, nc, "deliver", ConsumerConfig{ + FilterSubjects: []string{"events.one", "events.*"}, + Durable: "name", + }) + + if resp.Error.ErrCode != 10138 { + t.Fatalf("this should error as we have overlapping subjects, got %+v", resp.Error) + } +} + +func TestJetStreamMultipleSubjectsPushBasic(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + mset, err := s.GlobalAccount().addStream(&StreamConfig{ + Subjects: []string{"events", "data", "other"}, + Name: "deliver", + }) + require_NoError(t, err) + + _, err = mset.addConsumer(&ConsumerConfig{ + FilterSubjects: []string{"events", "data"}, + Durable: "name", + DeliverSubject: "push", + }) + require_NoError(t, err) + + sub, err := nc.SubscribeSync("push") + require_NoError(t, err) + + sendStreamMsg(t, nc, "other", "10") + sendStreamMsg(t, nc, "events", "0") + sendStreamMsg(t, nc, "data", "1") + sendStreamMsg(t, nc, "events", "2") + sendStreamMsg(t, nc, "events", "3") + sendStreamMsg(t, nc, "other", "10") + sendStreamMsg(t, nc, "data", "4") + sendStreamMsg(t, nc, "data", "5") + + for i := 0; i < 6; i++ { + msg, err := sub.NextMsg(time.Second * 1) + require_NoError(t, err) + if fmt.Sprintf("%v", i) != string(msg.Data) { + t.Fatalf("bad sequence. Expected %v, got %v", i, string(msg.Data)) + } + } + info, err := js.ConsumerInfo("deliver", "name") + require_NoError(t, err) + require_True(t, info.AckFloor.Consumer == 6) + require_True(t, info.AckFloor.Stream == 8) +} +func TestJetStreamMultipleSubjectsBasic(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Subjects: []string{"events", "data", "other"}, + Name: "deliver", + }) + require_NoError(t, err) + + mset.addConsumer(&ConsumerConfig{ + FilterSubjects: []string{"events", "data"}, + Durable: "name", + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "other", "10") + sendStreamMsg(t, nc, "events", "0") + sendStreamMsg(t, nc, "data", "1") + sendStreamMsg(t, nc, "events", "2") + sendStreamMsg(t, nc, "events", "3") + sendStreamMsg(t, nc, "other", "10") + sendStreamMsg(t, nc, "data", "4") + sendStreamMsg(t, nc, "data", "5") + + consumer, err := js.PullSubscribe("", "name", nats.Bind("deliver", "name")) + require_NoError(t, err) + + msg, err := consumer.Fetch(6) + require_NoError(t, err) + + for i, msg := range msg { + if fmt.Sprintf("%v", i) != string(msg.Data) { + t.Fatalf("bad sequence. Expected %v, got %v", i, string(msg.Data)) + } + } + _, err = js.ConsumerInfo("deliver", "name") + require_NoError(t, err) +} + +func TestJetStreamKVDelete(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: "deletion", + History: 10, + }) + require_NoError(t, err) + kv.Put("a", nil) + kv.Put("a.a", nil) + kv.Put("a.b", nil) + kv.Put("a.b.c", nil) + + keys, err := kv.Keys() + require_NoError(t, err) + require_True(t, len(keys) == 4) + + info, err := js.AddConsumer("KV_deletion", &nats.ConsumerConfig{ + Name: "keys", + FilterSubject: "$KV.deletion.a.*", + DeliverPolicy: nats.DeliverLastPerSubjectPolicy, + DeliverSubject: "keys", + MaxDeliver: 1, + AckPolicy: nats.AckNonePolicy, + MemoryStorage: true, + FlowControl: true, + Heartbeat: time.Second * 5, + }) + require_NoError(t, err) + require_True(t, info.NumPending == 2) + + sub, err := js.SubscribeSync("$KV.deletion.a.*", nats.Bind("KV_deletion", "keys")) + require_NoError(t, err) + + _, err = sub.NextMsg(time.Second * 1) + require_NoError(t, err) + _, err = sub.NextMsg(time.Second * 1) + require_NoError(t, err) + msg, err := sub.NextMsg(time.Second * 1) + require_True(t, msg == nil) + require_Error(t, err) + + require_NoError(t, kv.Delete("a.a")) + require_NoError(t, kv.Delete("a.b")) + + watcher, err := kv.WatchAll() + require_NoError(t, err) + + updates := watcher.Updates() + + keys = []string{} + for v := range updates { + if v == nil { + break + } + if v.Operation() == nats.KeyValueDelete { + keys = append(keys, v.Key()) + } + } + require_True(t, len(keys) == 2) +} + +func TestJetStreamDeliverLastPerSubjectWithKV(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + MaxMsgsPerSubject: 5, + Subjects: []string{"kv.>"}, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "kv.a", "bad") + sendStreamMsg(t, nc, "kv.a", "bad") + sendStreamMsg(t, nc, "kv.a", "bad") + sendStreamMsg(t, nc, "kv.a", "a") + sendStreamMsg(t, nc, "kv.a.b", "bad") + sendStreamMsg(t, nc, "kv.a.b", "bad") + sendStreamMsg(t, nc, "kv.a.b", "a.b") + sendStreamMsg(t, nc, "kv.a.b.c", "bad") + sendStreamMsg(t, nc, "kv.a.b.c", "bad") + sendStreamMsg(t, nc, "kv.a.b.c", "bad") + sendStreamMsg(t, nc, "kv.a.b.c", "a.b.c") + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "CONSUMER", + FilterSubject: "kv.>", + DeliverPolicy: nats.DeliverLastPerSubjectPolicy, + DeliverSubject: "deliver", + MaxDeliver: 1, + AckPolicy: nats.AckNonePolicy, + MemoryStorage: true, + FlowControl: true, + Heartbeat: time.Second * 5, + }) + require_NoError(t, err) + + sub, err := js.SubscribeSync("kv.>", nats.Bind("TEST", "CONSUMER")) + require_NoError(t, err) + + for i := 1; i <= 3; i++ { + _, err := sub.NextMsg(time.Second * 1) + require_NoError(t, err) + } + + msg, err := sub.NextMsg(time.Second * 1) + if err == nil || msg != nil { + t.Fatalf("should not get any more messages") + } +} + +func TestJetStreamConsumerMultipleSubjectsAck(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Subjects: []string{"events", "data", "other"}, + Name: "deliver", + }) + require_NoError(t, err) + + _, err = mset.addConsumer(&ConsumerConfig{ + FilterSubjects: []string{"events", "data"}, + Durable: "name", + AckPolicy: AckExplicit, + Replicas: 1, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "events", "1") + sendStreamMsg(t, nc, "data", "2") + sendStreamMsg(t, nc, "data", "3") + sendStreamMsg(t, nc, "data", "4") + sendStreamMsg(t, nc, "events", "5") + sendStreamMsg(t, nc, "data", "6") + sendStreamMsg(t, nc, "data", "7") + + consumer, err := js.PullSubscribe("", "name", nats.Bind("deliver", "name")) + require_NoError(t, err) + + msg, err := consumer.Fetch(3) + require_NoError(t, err) + + require_True(t, len(msg) == 3) + + require_NoError(t, msg[0].AckSync()) + require_NoError(t, msg[1].AckSync()) + + info, err := js.ConsumerInfo("deliver", "name") + require_NoError(t, err) + + if info.AckFloor.Consumer != 2 { + t.Fatalf("bad consumer sequence. expected %v, got %v", 2, info.AckFloor.Consumer) + } + if info.AckFloor.Stream != 2 { + t.Fatalf("bad stream sequence. expected %v, got %v", 2, info.AckFloor.Stream) + } + if info.NumPending != 4 { + t.Fatalf("bad num pending. Expected %v, got %v", 2, info.NumPending) + } + +} + +func TestJetStreamConsumerMultipleSubjectAndNewAPI(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + _, err := acc.addStream(&StreamConfig{ + Subjects: []string{"data", "events"}, + Name: "deliver", + }) + if err != nil { + t.Fatalf("error while creating stream") + } + + req, err := json.Marshal(&CreateConsumerRequest{Stream: "deliver", Config: ConsumerConfig{ + FilterSubjects: []string{"events", "data"}, + Name: "name", + Durable: "name", + }}) + require_NoError(t, err) + + resp, err := nc.Request(fmt.Sprintf("$JS.API.CONSUMER.CREATE.%s.%s.%s", "deliver", "name", "data.>"), req, time.Second*10) + + var apiResp JSApiConsumerCreateResponse + json.Unmarshal(resp.Data, &apiResp) + require_NoError(t, err) + + if apiResp.Error.ErrCode != 10137 { + t.Fatal("this should error as multiple subject filters is incompatible with new API and didn't") + } + +} + +func TestJetStreamConsumerMultipleSubjectsWithAddedMessages(t *testing.T) { + s := RunBasicJetStreamServer(t) + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + defer s.Shutdown() + + durable := "durable" + nc, js := jsClientConnect(t, s) + defer nc.Close() + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Subjects: []string{"events.>"}, + Name: "deliver", + }) + require_NoError(t, err) + + // if they're not the same, expect error + _, err = mset.addConsumer(&ConsumerConfig{ + DeliverSubject: "deliver", + FilterSubjects: []string{"events.created", "events.processed"}, + Durable: durable, + }) + + require_NoError(t, err) + + sendStreamMsg(t, nc, "events.created", "0") + sendStreamMsg(t, nc, "events.created", "1") + sendStreamMsg(t, nc, "events.created", "2") + sendStreamMsg(t, nc, "events.created", "3") + sendStreamMsg(t, nc, "events.other", "BAD") + sendStreamMsg(t, nc, "events.processed", "4") + sendStreamMsg(t, nc, "events.processed", "5") + sendStreamMsg(t, nc, "events.processed", "6") + sendStreamMsg(t, nc, "events.other", "BAD") + sendStreamMsg(t, nc, "events.processed", "7") + sendStreamMsg(t, nc, "events.processed", "8") + + sub, err := js.SubscribeSync("", nats.Bind("deliver", durable)) + if err != nil { + t.Fatalf("error while subscribing to Consumer: %v", err) + } + + for i := 0; i < 10; i++ { + if i == 5 { + sendStreamMsg(t, nc, "events.created", "9") + } + if i == 9 { + sendStreamMsg(t, nc, "events.other", "BAD") + sendStreamMsg(t, nc, "events.created", "11") + } + if i == 7 { + sendStreamMsg(t, nc, "events.processed", "10") + } + + msg, err := sub.NextMsg(time.Second * 1) + require_NoError(t, err) + j, err := strconv.Atoi(string(msg.Data)) + require_NoError(t, err) + if j != i { + t.Fatalf("wrong sequence, expected %v got %v", i, j) + } + if err := msg.AckSync(); err != nil { + t.Fatalf("error while acking the message :%v", err) + } + + } + info, err := js.ConsumerInfo("deliver", durable) + require_NoError(t, err) + + require_True(t, info.Delivered.Consumer == 12) + require_True(t, info.Delivered.Stream == 15) + require_True(t, info.AckFloor.Stream == 15) + require_True(t, info.AckFloor.Consumer == 12) + +} + +func TestJetStreamConsumerThreeFilters(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + mset, err := s.GlobalAccount().addStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"events", "data", "other", "ignored"}, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "ignored", "100") + sendStreamMsg(t, nc, "events", "0") + sendStreamMsg(t, nc, "events", "1") + + sendStreamMsg(t, nc, "data", "2") + sendStreamMsg(t, nc, "ignored", "100") + sendStreamMsg(t, nc, "data", "3") + + sendStreamMsg(t, nc, "other", "4") + sendStreamMsg(t, nc, "data", "5") + sendStreamMsg(t, nc, "other", "6") + sendStreamMsg(t, nc, "data", "7") + sendStreamMsg(t, nc, "ignored", "100") + + mset.addConsumer(&ConsumerConfig{ + FilterSubjects: []string{"events", "data", "other"}, + Durable: "multi", + }) + + consumer, err := js.PullSubscribe("", "multi", nats.Bind("TEST", "multi")) + require_NoError(t, err) + + msgs, err := consumer.Fetch(6) + require_NoError(t, err) + for i, msg := range msgs { + require_Equal(t, string(msg.Data), fmt.Sprintf("%d", i)) + require_NoError(t, msg.AckSync()) + } + + info, err := js.ConsumerInfo("TEST", "multi") + require_NoError(t, err) + require_True(t, info.Delivered.Stream == 8) + require_True(t, info.Delivered.Consumer == 6) + require_True(t, info.NumPending == 2) + require_True(t, info.NumAckPending == 0) + require_True(t, info.AckFloor.Consumer == 6) + require_True(t, info.AckFloor.Stream == 8) +} + +func TestJetStreamConsumerUpdateFilterSubjects(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + mset, err := s.GlobalAccount().addStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"events", "data", "other"}, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "other", "100") + sendStreamMsg(t, nc, "events", "0") + sendStreamMsg(t, nc, "events", "1") + sendStreamMsg(t, nc, "data", "2") + sendStreamMsg(t, nc, "data", "3") + sendStreamMsg(t, nc, "other", "4") + sendStreamMsg(t, nc, "data", "5") + + _, err = mset.addConsumer(&ConsumerConfig{ + FilterSubjects: []string{"events", "data"}, + Durable: "multi", + }) + require_NoError(t, err) + + consumer, err := js.PullSubscribe("", "multi", nats.Bind("TEST", "multi")) + require_NoError(t, err) + + msgs, err := consumer.Fetch(3) + require_NoError(t, err) + for i, msg := range msgs { + require_Equal(t, string(msg.Data), fmt.Sprintf("%d", i)) + require_NoError(t, msg.AckSync()) + } + + _, err = mset.addConsumer(&ConsumerConfig{ + FilterSubjects: []string{"events", "data", "other"}, + Durable: "multi", + }) + require_NoError(t, err) + + updatedConsumer, err := js.PullSubscribe("", "multi", nats.Bind("TEST", "multi")) + require_NoError(t, err) + + msgs, err = updatedConsumer.Fetch(3) + require_NoError(t, err) + for i, msg := range msgs { + require_Equal(t, string(msg.Data), fmt.Sprintf("%d", i+3)) + require_NoError(t, msg.AckSync()) + } +} func TestJetStreamStreamUpdateSubjectsOverlapOthers(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() @@ -18953,7 +19776,7 @@ func TestJetStreamMetaDataFailOnKernelFault(t *testing.T) { require_NoError(t, err) for i := 0; i < 10; i++ { - js.Publish("foo", []byte("OK")) + sendStreamMsg(t, nc, "foo", "OK") } sd := s.JetStreamConfig().StoreDir @@ -19017,7 +19840,6 @@ func TestJetStreamMetaDataFailOnKernelFault(t *testing.T) { } func TestJetstreamConsumerSingleTokenSubject(t *testing.T) { - s := RunBasicJetStreamServer(t) defer s.Shutdown() diff --git a/server/stream.go b/server/stream.go index 1f768bb907..72b019fc18 100644 --- a/server/stream.go +++ b/server/stream.go @@ -213,7 +213,7 @@ type stream struct { lseq uint64 lmsgId string consumers map[string]*consumer - numFilter int + numFilter int // number of filtered consumers cfg StreamConfig created time.Time stype StorageType @@ -246,7 +246,7 @@ type stream struct { cList []*consumer sch chan struct{} sigq *ipQueue // of *cMsg - csl *Sublist + csl *Sublist // Consumer Sublist // TODO(dlc) - Hide everything below behind two pointers. // Clustered mode. @@ -1743,16 +1743,20 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err mset.clsMu.RLock() for _, o := range mset.cList { + o.mu.RLock() // we update consumer sequences if: // no subject was specified, we can purge all consumers sequences - if preq == nil || + doPurge := preq == nil || preq.Subject == _EMPTY_ || // or consumer filter subject is equal to purged subject preq.Subject == o.cfg.FilterSubject || // or consumer subject is subset of purged subject, // but not the other way around. - subjectIsSubsetMatch(o.cfg.FilterSubject, preq.Subject) { + subjectIsSubsetMatch(o.cfg.FilterSubject, preq.Subject) + o.mu.RUnlock() + if doPurge { o.purge(fseq, lseq) + } } mset.clsMu.RUnlock() @@ -4620,7 +4624,7 @@ func (mset *stream) numConsumers() int { // Lock should be held. func (mset *stream) setConsumer(o *consumer) { mset.consumers[o.name] = o - if o.cfg.FilterSubject != _EMPTY_ { + if len(o.subjf) > 0 { mset.numFilter++ } if o.cfg.Direct { @@ -4652,7 +4656,9 @@ func (mset *stream) removeConsumer(o *consumer) { } // Always remove from the leader sublist. if mset.csl != nil { - mset.csl.Remove(o.signalSub()) + for _, sub := range o.signalSubs() { + mset.csl.Remove(sub) + } } mset.clsMu.Unlock() } @@ -4666,7 +4672,9 @@ func (mset *stream) setConsumerAsLeader(o *consumer) { if mset.csl == nil { mset.csl = NewSublistWithCache() } - mset.csl.Insert(o.signalSub()) + for _, sub := range o.signalSubs() { + mset.csl.Insert(sub) + } } // Remove the consumer as a leader. This will update signaling sublist. @@ -4674,49 +4682,55 @@ func (mset *stream) removeConsumerAsLeader(o *consumer) { mset.clsMu.Lock() defer mset.clsMu.Unlock() if mset.csl != nil { - mset.csl.Remove(o.signalSub()) + for _, sub := range o.signalSubs() { + mset.csl.Remove(sub) + } } } // swapSigSubs will update signal Subs for a new subject filter. // consumer lock should not be held. -func (mset *stream) swapSigSubs(o *consumer, newFilter string) { +func (mset *stream) swapSigSubs(o *consumer, newFilters []string) { mset.clsMu.Lock() o.mu.Lock() - if o.sigSub != nil { + if o.sigSubs != nil { if mset.csl != nil { - mset.csl.Remove(o.sigSub) + for _, sub := range o.sigSubs { + mset.csl.Remove(sub) + } } - o.sigSub = nil + o.sigSubs = nil } if o.isLeader() { - subject := newFilter - if subject == _EMPTY_ { - subject = fwcs - } - o.sigSub = &subscription{subject: []byte(subject), icb: o.processStreamSignal} if mset.csl == nil { mset.csl = NewSublistWithCache() } - mset.csl.Insert(o.sigSub) + // If no filters are preset, add fwcs to sublist for that consumer. + if newFilters == nil { + sub := &subscription{subject: []byte(fwcs), icb: o.processStreamSignal} + o.mset.csl.Insert(sub) + o.sigSubs = append(o.sigSubs, sub) + // If there are filters, add their subjects to sublist. + } else { + for _, filter := range newFilters { + sub := &subscription{subject: []byte(filter), icb: o.processStreamSignal} + o.mset.csl.Insert(sub) + o.sigSubs = append(o.sigSubs, sub) + } + } } - - oldFilter := o.cfg.FilterSubject - o.mu.Unlock() mset.clsMu.Unlock() - // Do any numFilter accounting needed. mset.mu.Lock() defer mset.mu.Unlock() - // Decrement numFilter if old filter was an actual filter. - if oldFilter != _EMPTY_ && mset.numFilter > 0 { + if mset.numFilter > 0 && len(o.subjf) > 0 { mset.numFilter-- } - if newFilter != _EMPTY_ { + if len(newFilters) > 0 { mset.numFilter++ } } @@ -4773,14 +4787,18 @@ func (mset *stream) Store() StreamStore { // Determines if the new proposed partition is unique amongst all consumers. // Lock should be held. -func (mset *stream) partitionUnique(partition string) bool { - for _, o := range mset.consumers { - if o.cfg.FilterSubject == _EMPTY_ { - return false - } - if subjectIsSubsetMatch(partition, o.cfg.FilterSubject) || - subjectIsSubsetMatch(o.cfg.FilterSubject, partition) { - return false +func (mset *stream) partitionUnique(partitions []string) bool { + for _, partition := range partitions { + for _, o := range mset.consumers { + if o.subjf == nil { + return false + } + for _, filter := range o.subjf { + if subjectIsSubsetMatch(partition, filter.subject) || + subjectIsSubsetMatch(filter.subject, partition) { + return false + } + } } } return true