Skip to content

Commit

Permalink
When a filtered consumer starts by time, there could be a large gap b…
Browse files Browse the repository at this point in the history
…etween the time to sequence and the real start.

This uses similar logic to starting from the beginning of a stream for a filtered consumer.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored and wallyqs committed Feb 15, 2024
1 parent cf1b8e4 commit 7887557
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4783,6 +4783,23 @@ func (o *consumer) selectStartingSeqNo() {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
// Here we want to see if we are filtered, and if so possibly close the gap
// to the nearest first given our starting sequence from time. This is so we do
// not force the system to do a linear walk between o.sseq and the real first.
if len(o.subjf) > 0 {
nseq := state.LastSeq
for _, filter := range o.subjf {
// Use first sequence since this is more optimized atm.
ss := o.mset.store.FilteredState(state.FirstSeq, filter.subject)
if ss.First > o.sseq && ss.First < nseq {
nseq = ss.First
}
}
// Skip ahead if possible.
if nseq > o.sseq && nseq < state.LastSeq {
o.sseq = nseq
}
}
} else {
// DeliverNew
o.sseq = state.LastSeq + 1
Expand Down

0 comments on commit 7887557

Please sign in to comment.