Skip to content

Commit

Permalink
Merge pull request #2284 from nats-io/per_subject
Browse files Browse the repository at this point in the history
Adds in per subject tracking and limits for streams.
  • Loading branch information
derekcollison committed Jun 15, 2021
2 parents f4e3409 + 6219f03 commit be1be81
Show file tree
Hide file tree
Showing 10 changed files with 916 additions and 187 deletions.
69 changes: 37 additions & 32 deletions server/consumer.go
Expand Up @@ -345,7 +345,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
config.MaxAckPending = JsDefaultMaxAckPending
}

// Make sure any partition subject is also a literal.
// As best we can make sure the filtered subject is valid.
if config.FilterSubject != _EMPTY_ {
subjects, hasExt := mset.allSubjects()
if !validFilteredSubject(config.FilterSubject, subjects) && !hasExt {
Expand Down Expand Up @@ -393,7 +393,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}

sampleFreq := 0
if config.SampleFrequency != "" {
if config.SampleFrequency != _EMPTY_ {
s := strings.TrimSuffix(config.SampleFrequency, "%")
sampleFreq, err = strconv.Atoi(s)
if err != nil {
Expand Down Expand Up @@ -724,8 +724,8 @@ func (o *consumer) setLeader(isLeader bool) {
}
}

// Setup initial pending.
o.setInitialPending()
// Setup initial pending and proper start sequence.
o.setInitialPendingAndStart()

// If push mode, register for notifications on interest.
if o.isPushMode() {
Expand Down Expand Up @@ -1195,6 +1195,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
}
}

// In case we have anything pending on entry.
forwardProposals()

for {
select {
case <-qch:
Expand Down Expand Up @@ -2560,28 +2563,6 @@ func (o *consumer) nextSeq() uint64 {
return dseq
}

// This will select the store seq to start with based on the
// partition subject.
func (o *consumer) selectSubjectLast() {
stats := o.mset.store.State()
if stats.LastSeq == 0 {
o.sseq = stats.LastSeq
return
}
// FIXME(dlc) - this is linear and can be optimized by store layer.
for seq := stats.LastSeq; seq >= stats.FirstSeq; seq-- {
subj, _, _, _, err := o.mset.store.LoadMsg(seq)
if err == ErrStoreMsgNotFound {
continue
}
if o.isFilteredMatch(subj) {
o.sseq = seq
o.updateSkipped()
return
}
}
}

// Will select the starting sequence.
func (o *consumer) selectStartingSeqNo() {
if o.mset == nil || o.mset.store == nil {
Expand All @@ -2593,16 +2574,16 @@ func (o *consumer) selectStartingSeqNo() {
o.sseq = stats.FirstSeq
} else if o.cfg.DeliverPolicy == DeliverLast {
o.sseq = stats.LastSeq
// If we are partitioned here we may need to walk backwards.
// If we are partitioned here this will be properly set when we become leader.
if o.cfg.FilterSubject != _EMPTY_ {
o.selectSubjectLast()
ss := o.mset.store.FilteredState(1, o.cfg.FilterSubject)
o.sseq = ss.Last
}
} else if o.cfg.OptStartTime != nil {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
} else {
// Default is deliver new only.
o.sseq = stats.LastSeq + 1
}
} else {
Expand Down Expand Up @@ -2868,11 +2849,22 @@ func (mset *stream) deliveryFormsCycle(deliverySubject string) bool {
return false
}

// Check that the filtered subject is valid given a set of stream subjects.
func validFilteredSubject(filteredSubject string, subjects []string) bool {
if !IsValidSubject(filteredSubject) {
return false
}
hasWC := subjectHasWildcard(filteredSubject)

for _, subject := range subjects {
if subjectIsSubsetMatch(filteredSubject, subject) {
return true
}
// If we have a wildcard as the filtered subject check to see if we are
// a wider scope but do match a subject.
if hasWC && subjectIsSubsetMatch(subject, filteredSubject) {
return true
}
}
return false
}
Expand Down Expand Up @@ -2920,9 +2912,9 @@ func (o *consumer) requestNextMsgSubject() string {
return o.nextMsgSubj
}

// Will set the initial pending.
// Will set the initial pending and start sequence.
// mset lock should be held.
func (o *consumer) setInitialPending() {
func (o *consumer) setInitialPendingAndStart() {
mset := o.mset
if mset == nil || mset.store == nil {
return
Expand All @@ -2945,7 +2937,20 @@ func (o *consumer) setInitialPending() {
}
} else {
// Here we are filtered.
o.sgap = o.mset.store.NumFilteredPending(o.sseq, o.cfg.FilterSubject)
ss := mset.store.FilteredState(o.sseq, o.cfg.FilterSubject)
if ss.Msgs > 0 {
o.sgap = ss.Msgs
// See if we should update our starting sequence.
if dp := o.cfg.DeliverPolicy; dp == DeliverLast {
o.sseq = ss.Last
} else if dp == DeliverNew {
o.sseq = ss.Last + 1
} else {
// DeliverAll, DeliverByStartSequence, DeliverByStartTime
o.sseq = ss.First
}
}
o.updateSkipped()
}
}

Expand Down

0 comments on commit be1be81

Please sign in to comment.