Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] #2213 #2214

Merged
merged 1 commit into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 29 additions & 24 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2584,35 +2584,40 @@ func (o *consumer) selectSubjectLast() {

// Will select the starting sequence.
func (o *consumer) selectStartingSeqNo() {
stats := o.mset.store.State()
if o.cfg.OptStartSeq == 0 {
if o.cfg.DeliverPolicy == DeliverAll {
o.sseq = stats.FirstSeq
} else if o.cfg.DeliverPolicy == DeliverLast {
o.sseq = stats.LastSeq
// If we are partitioned here we may need to walk backwards.
if o.cfg.FilterSubject != _EMPTY_ {
o.selectSubjectLast()
if o.mset == nil || o.mset.store == nil {
o.sseq = 1
} else {
stats := o.mset.store.State()
if o.cfg.OptStartSeq == 0 {
if o.cfg.DeliverPolicy == DeliverAll {
o.sseq = stats.FirstSeq
} else if o.cfg.DeliverPolicy == DeliverLast {
o.sseq = stats.LastSeq
// If we are partitioned here we may need to walk backwards.
if o.cfg.FilterSubject != _EMPTY_ {
o.selectSubjectLast()
}
} else if o.cfg.OptStartTime != nil {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
} else {
// Default is deliver new only.
o.sseq = stats.LastSeq + 1
}
} else if o.cfg.OptStartTime != nil {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
} else {
// Default is deliver new only.
o.sseq = o.cfg.OptStartSeq
}

if stats.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < stats.FirstSeq {
o.sseq = stats.FirstSeq
} else if o.sseq > stats.LastSeq {
o.sseq = stats.LastSeq + 1
}
} else {
o.sseq = o.cfg.OptStartSeq
}

if stats.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < stats.FirstSeq {
o.sseq = stats.FirstSeq
} else if o.sseq > stats.LastSeq {
o.sseq = stats.LastSeq + 1
}
// Always set delivery sequence to 1.
o.dseq = 1
// Set ack delivery floor to delivery-1
Expand Down Expand Up @@ -2919,7 +2924,7 @@ func (o *consumer) requestNextMsgSubject() string {
// mset lock should be held.
func (o *consumer) setInitialPending() {
mset := o.mset
if mset == nil {
if mset == nil || mset.store == nil {
return
}
// notFiltered means we want all messages.
Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3243,7 +3243,7 @@ func (s *Server) jsConsumerNamesRequest(sub *subscription, c *client, subject, r
return
}

obs := mset.getConsumers()
obs := mset.getPublicConsumers()
sort.Slice(obs, func(i, j int) bool {
return strings.Compare(obs[i].name, obs[j].name) < 0
})
Expand Down Expand Up @@ -3337,7 +3337,7 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, subject, re
return
}

obs := mset.getConsumers()
obs := mset.getPublicConsumers()
sort.Slice(obs, func(i, j int) bool {
return strings.Compare(obs[i].name, obs[j].name) < 0
})
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,8 +1446,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {

// Check to see if we have restored consumers here.
// These are not currently assigned so we will need to do so here.
if consumers := mset.getConsumers(); len(consumers) > 0 {
for _, o := range mset.getConsumers() {
if consumers := mset.getPublicConsumers(); len(consumers) > 0 {
for _, o := range consumers {
rg := cc.createGroupForConsumer(sa)
// Pick a preferred leader.
rg.setPreferred()
Expand Down Expand Up @@ -2245,7 +2245,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme

// Check to see if we have restored consumers here.
// These are not currently assigned so we will need to do so here.
if consumers := mset.getConsumers(); len(consumers) > 0 {
if consumers := mset.getPublicConsumers(); len(consumers) > 0 {
js.mu.RLock()
cc := js.cluster
js.mu.RUnlock()
Expand Down
74 changes: 74 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11265,6 +11265,80 @@ func TestJetStreamServerDomainConfigButDisabled(t *testing.T) {
}
}

// Issue #2213
func TestJetStreamDirectConsumersBeingReported(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&nats.StreamConfig{
Name: "S",
Sources: []*nats.StreamSource{{
Name: "TEST",
}},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if _, err = js.Publish("foo", nil); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("S")
if err != nil {
return fmt.Errorf("Could not get stream info: %v", err)
}
if si.State.Msgs != 1 {
return fmt.Errorf("Expected 1 msg, got state: %+v", si.State)
}
return nil
})

si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Direct consumers should not be reported
if si.State.Consumers != 0 {
t.Fatalf("Did not expect any consumers, got %d", si.State.Consumers)
}

// Now check for consumer in consumer names list.
var names []string
for name := range js.ConsumerNames("TEST") {
names = append(names, name)
}
if len(names) != 0 {
t.Fatalf("Expected no consumers but got %+v", names)
}

// Now check detailed list.
var cis []*nats.ConsumerInfo
for ci := range js.ConsumersInfo("TEST") {
cis = append(cis, ci)
}
if len(cis) != 0 {
t.Fatalf("Expected no consumers but got %+v", cis)
}
}

///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2339,7 +2339,7 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg
Config: cfg,
}
if optConsumers {
for _, consumer := range stream.getConsumers() {
for _, consumer := range stream.getPublicConsumers() {
cInfo := consumer.info()
if !optCfg {
cInfo.Config = nil
Expand Down
16 changes: 15 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3050,7 +3050,7 @@ func (mset *stream) getMsg(seq uint64) (*StoredMsg, error) {
return sm, nil
}

// Consunmers will return all the current consumers for this stream.
// getConsumers will return all the current consumers for this stream.
func (mset *stream) getConsumers() []*consumer {
mset.mu.Lock()
defer mset.mu.Unlock()
Expand All @@ -3062,6 +3062,20 @@ func (mset *stream) getConsumers() []*consumer {
return obs
}

// This returns all consumers that are not DIRECT.
func (mset *stream) getPublicConsumers() []*consumer {
mset.mu.Lock()
defer mset.mu.Unlock()

var obs []*consumer
for _, o := range mset.consumers {
if !o.cfg.Direct {
obs = append(obs, o)
}
}
return obs
}

// NumConsumers reports on number of active consumers for this stream.
func (mset *stream) numConsumers() int {
mset.mu.Lock()
Expand Down