Skip to content

Commit

Permalink
Merge pull request #2214 from nats-io/issue_2213
Browse files Browse the repository at this point in the history
[FIXED] #2213
  • Loading branch information
derekcollison committed May 12, 2021
2 parents bc9ac88 + 6e17b7a commit 30191ad
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 31 deletions.
53 changes: 29 additions & 24 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2584,35 +2584,40 @@ func (o *consumer) selectSubjectLast() {

// Will select the starting sequence.
func (o *consumer) selectStartingSeqNo() {
stats := o.mset.store.State()
if o.cfg.OptStartSeq == 0 {
if o.cfg.DeliverPolicy == DeliverAll {
o.sseq = stats.FirstSeq
} else if o.cfg.DeliverPolicy == DeliverLast {
o.sseq = stats.LastSeq
// If we are partitioned here we may need to walk backwards.
if o.cfg.FilterSubject != _EMPTY_ {
o.selectSubjectLast()
if o.mset == nil || o.mset.store == nil {
o.sseq = 1
} else {
stats := o.mset.store.State()
if o.cfg.OptStartSeq == 0 {
if o.cfg.DeliverPolicy == DeliverAll {
o.sseq = stats.FirstSeq
} else if o.cfg.DeliverPolicy == DeliverLast {
o.sseq = stats.LastSeq
// If we are partitioned here we may need to walk backwards.
if o.cfg.FilterSubject != _EMPTY_ {
o.selectSubjectLast()
}
} else if o.cfg.OptStartTime != nil {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
} else {
// Default is deliver new only.
o.sseq = stats.LastSeq + 1
}
} else if o.cfg.OptStartTime != nil {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
} else {
// Default is deliver new only.
o.sseq = o.cfg.OptStartSeq
}

if stats.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < stats.FirstSeq {
o.sseq = stats.FirstSeq
} else if o.sseq > stats.LastSeq {
o.sseq = stats.LastSeq + 1
}
} else {
o.sseq = o.cfg.OptStartSeq
}

if stats.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < stats.FirstSeq {
o.sseq = stats.FirstSeq
} else if o.sseq > stats.LastSeq {
o.sseq = stats.LastSeq + 1
}
// Always set delivery sequence to 1.
o.dseq = 1
// Set ack delivery floor to delivery-1
Expand Down Expand Up @@ -2919,7 +2924,7 @@ func (o *consumer) requestNextMsgSubject() string {
// mset lock should be held.
func (o *consumer) setInitialPending() {
mset := o.mset
if mset == nil {
if mset == nil || mset.store == nil {
return
}
// notFiltered means we want all messages.
Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3243,7 +3243,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
return
}

obs := mset.getConsumers()
obs := mset.getPublicConsumers()
sort.Slice(obs, func(i, j int) bool {
return strings.Compare(obs[i].name, obs[j].name) < 0
})
Expand Down Expand Up @@ -3337,7 +3337,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
return
}

obs := mset.getConsumers()
obs := mset.getPublicConsumers()
sort.Slice(obs, func(i, j int) bool {
return strings.Compare(obs[i].name, obs[j].name) < 0
})
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,8 +1446,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {

// Check to see if we have restored consumers here.
// These are not currently assigned so we will need to do so here.
if consumers := mset.getConsumers(); len(consumers) > 0 {
for _, o := range mset.getConsumers() {
if consumers := mset.getPublicConsumers(); len(consumers) > 0 {
for _, o := range consumers {
rg := cc.createGroupForConsumer(sa)
// Pick a preferred leader.
rg.setPreferred()
Expand Down Expand Up @@ -2245,7 +2245,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme

// Check to see if we have restored consumers here.
// These are not currently assigned so we will need to do so here.
if consumers := mset.getConsumers(); len(consumers) > 0 {
if consumers := mset.getPublicConsumers(); len(consumers) > 0 {
js.mu.RLock()
cc := js.cluster
js.mu.RUnlock()
Expand Down
74 changes: 74 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11265,6 +11265,80 @@ func TestJetStreamServerDomainConfigButDisabled(t *testing.T) {
}
}

// Issue #2213
func TestJetStreamDirectConsumersBeingReported(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&nats.StreamConfig{
Name: "S",
Sources: []*nats.StreamSource{{
Name: "TEST",
}},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if _, err = js.Publish("foo", nil); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("S")
if err != nil {
return fmt.Errorf("Could not get stream info: %v", err)
}
if si.State.Msgs != 1 {
return fmt.Errorf("Expected 1 msg, got state: %+v", si.State)
}
return nil
})

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Direct consumers should not be reported
if si.State.Consumers != 0 {
t.Fatalf("Did not expect any consumers, got %d", si.State.Consumers)
}

// Now check for consumer in consumer names list.
var names []string
for name := range js.ConsumerNames("TEST") {
names = append(names, name)
}
if len(names) != 0 {
t.Fatalf("Expected no consumers but got %+v", names)
}

// Now check detailed list.
var cis []*nats.ConsumerInfo
for ci := range js.ConsumersInfo("TEST") {
cis = append(cis, ci)
}
if len(cis) != 0 {
t.Fatalf("Expected no consumers but got %+v", cis)
}
}

///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2339,7 +2339,7 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg
Config: cfg,
}
if optConsumers {
for _, consumer := range stream.getConsumers() {
for _, consumer := range stream.getPublicConsumers() {
cInfo := consumer.info()
if !optCfg {
cInfo.Config = nil
Expand Down
16 changes: 15 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3050,7 +3050,7 @@ func (mset *stream) getMsg(seq uint64) (*StoredMsg, error) {
return sm, nil
}

// Consunmers will return all the current consumers for this stream.
// getConsumers will return all the current consumers for this stream.
func (mset *stream) getConsumers() []*consumer {
mset.mu.Lock()
defer mset.mu.Unlock()
Expand All @@ -3062,6 +3062,20 @@ func (mset *stream) getConsumers() []*consumer {
return obs
}

// This returns all consumers that are not DIRECT.
func (mset *stream) getPublicConsumers() []*consumer {
mset.mu.Lock()
defer mset.mu.Unlock()

var obs []*consumer
for _, o := range mset.consumers {
if !o.cfg.Direct {
obs = append(obs, o)
}
}
return obs
}

// NumConsumers reports on number of active consumers for this stream.
func (mset *stream) numConsumers() int {
mset.mu.Lock()
Expand Down

0 comments on commit 30191ad

Please sign in to comment.