Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to preacks when multiple consumers are mutually exclusive #4007

Merged
merged 1 commit into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4357,14 +4357,15 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
}

var rmseqs []uint64
mset.mu.RLock()
mset.mu.Lock()
for seq := start; seq <= stop; seq++ {
if !mset.checkInterest(seq, o) {
if mset.noInterest(seq, o) {
rmseqs = append(rmseqs, seq)
}
}
mset.mu.RUnlock()
mset.mu.Unlock()

// These can be removed.
for _, seq := range rmseqs {
mset.store.RemoveMsg(seq)
}
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7609,7 +7609,7 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
ddloaded := mset.ddloaded
tierName := mset.tier

if mset.hasAllPreAcks(seq) {
if mset.hasAllPreAcks(seq, subj) {
mset.clearAllPreAcks(seq)
// Mark this to be skipped
subj, ts = _EMPTY_, 0
Expand Down
108 changes: 108 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7575,3 +7575,111 @@ func TestNoRaceJetStreamClusterUnbalancedInterestMultipleConsumers(t *testing.T)
require_True(t, numPreAcks == 0)
}
}

func TestNoRaceJetStreamClusterUnbalancedInterestMultipleFilteredConsumers(t *testing.T) {
c, np := createStretchUnbalancedCluster(t)
defer c.shutdown()
defer np.stop()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Now create the stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"EV.>"},
Replicas: 3,
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

// Make sure it's leader is on S2.
sl := c.servers[1]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnStreamLeader(globalAccountName, "EVENTS")
if s := c.streamLeader(globalAccountName, "EVENTS"); s != sl {
s.JetStreamStepdownStream(globalAccountName, "EVENTS")
return fmt.Errorf("Server %s is not stream leader yet", sl)
}
return nil
})

// Create a fast ack consumer.
_, err = js.Subscribe("EV.NEW", func(m *nats.Msg) {
m.Ack()
}, nats.Durable("C"), nats.ManualAck())
require_NoError(t, err)

// Make sure the consumer leader is on S3.
cl := c.servers[2]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "C")
if s := c.consumerLeader(globalAccountName, "EVENTS", "C"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "C")
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})

// Connect a client directly to the stream leader.
nc, js = jsClientConnect(t, sl)
defer nc.Close()

// Now create another fast ack consumer.
_, err = js.Subscribe("EV.UPDATED", func(m *nats.Msg) {
m.Ack()
}, nats.Durable("D"), nats.ManualAck())
require_NoError(t, err)

// Make sure this consumer leader is on S1.
cl = c.servers[0]
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
c.waitOnConsumerLeader(globalAccountName, "EVENTS", "D")
if s := c.consumerLeader(globalAccountName, "EVENTS", "D"); s != cl {
s.JetStreamStepdownConsumer(globalAccountName, "EVENTS", "D")
return fmt.Errorf("Server %s is not consumer leader yet", cl)
}
return nil
})

numToSend := 500
for i := 0; i < numToSend; i++ {
_, err := js.PublishAsync("EV.NEW", nil)
require_NoError(t, err)
_, err = js.PublishAsync("EV.UPDATED", nil)
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(20 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Let acks propagate.
time.Sleep(250 * time.Millisecond)

ci, err := js.ConsumerInfo("EVENTS", "D")
require_NoError(t, err)
require_True(t, ci.NumPending == 0)
require_True(t, ci.NumAckPending == 0)
require_True(t, ci.Delivered.Consumer == 500)
require_True(t, ci.Delivered.Stream == 1000)
require_True(t, ci.AckFloor.Consumer == 500)
require_True(t, ci.AckFloor.Stream == 1000)

// Check final stream state on all servers.
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("EVENTS")
require_NoError(t, err)
state := mset.state()
require_True(t, state.Msgs == 0)
require_True(t, state.FirstSeq == 1001)
require_True(t, state.LastSeq == 1000)
require_True(t, state.Consumers == 2)
// Now check preAcks
mset.mu.RLock()
numPreAcks := len(mset.preAcks)
mset.mu.RUnlock()
require_True(t, numPreAcks == 0)
}
}
57 changes: 31 additions & 26 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4004,7 +4004,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Make sure to take into account any message assignments that we had to skip (clfs).
seq = lseq + 1 - clfs
// Check for preAcks and the need to skip vs store.
if mset.hasAllPreAcks(seq) {
if mset.hasAllPreAcks(seq, subject) {
mset.clearAllPreAcks(seq)
store.SkipMsg()
} else {
Expand Down Expand Up @@ -4826,8 +4826,22 @@ func (mset *stream) potentialFilteredConsumers() bool {
return false
}

// Check if there is no interest in this sequence number across our consumers.
// The consumer passed is optional if we are processing the ack for that consumer.
// Write lock should be held.
func (mset *stream) noInterest(seq uint64, obs *consumer) bool {
return !mset.checkForInterest(seq, obs)
}

// Check if there is no interest in this sequence number and subject across our consumers.
// The consumer passed is optional if we are processing the ack for that consumer.
// Write lock should be held.
func (mset *stream) noInterestWithSubject(seq uint64, subj string, obs *consumer) bool {
return !mset.checkForInterestWithSubject(seq, subj, obs)
}

// Write lock should be held here for the stream to avoid race conditions on state.
func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
func (mset *stream) checkForInterest(seq uint64, obs *consumer) bool {
var subj string
if mset.potentialFilteredConsumers() {
pmsg := getJSPubMsgFromPool()
Expand All @@ -4839,10 +4853,16 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
mset.registerPreAck(obs, seq)
return true
}
mset.clearAllPreAcks(seq)
return false
}
subj = sm.subj
}
return mset.checkForInterestWithSubject(seq, subj, obs)
}

// Checks for interest given a sequence and subject.
func (mset *stream) checkForInterestWithSubject(seq uint64, subj string, obs *consumer) bool {
for _, o := range mset.consumers {
// If this is us or we have a registered preAck for this consumer continue inspecting.
if o == obs || mset.hasPreAck(o, seq) {
Expand All @@ -4853,6 +4873,7 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
return true
}
}
mset.clearAllPreAcks(seq)
return false
}

Expand All @@ -4870,20 +4891,15 @@ func (mset *stream) hasPreAck(o *consumer, seq uint64) bool {
return found
}

// Check if we have all consumers pre-acked.
// Check if we have all consumers pre-acked for this sequence and subject.
// Write lock should be held.
func (mset *stream) hasAllPreAcks(seq uint64) bool {
if len(mset.preAcks) == 0 {
func (mset *stream) hasAllPreAcks(seq uint64, subj string) bool {
if len(mset.preAcks) == 0 || len(mset.preAcks[seq]) == 0 {
return false
}
return len(mset.preAcks[seq]) >= len(mset.consumers)
}

// Check if we have all consumers pre-acked.
func (mset *stream) clearAllPreAcksLock(seq uint64) {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.clearAllPreAcks(seq)
// Since these can be filtered and mutually exclusive,
// if we have some preAcks we need to check all interest here.
return mset.noInterestWithSubject(seq, subj, nil)
}

// Check if we have all consumers pre-acked.
Expand Down Expand Up @@ -4925,13 +4941,6 @@ func (mset *stream) registerPreAck(o *consumer, seq uint64) {
mset.preAcks[seq][o] = struct{}{}
}

// This will clear an ack for a consumer.
func (mset *stream) clearPreAckLock(o *consumer, seq uint64) {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.clearPreAck(o, seq)
}

// This will clear an ack for a consumer.
// Write lock should be held.
func (mset *stream) clearPreAck(o *consumer, seq uint64) {
Expand Down Expand Up @@ -4981,25 +4990,21 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {
case WorkQueuePolicy:
// Normally we just remove a message when its ack'd here but if we have direct consumers
// from sources and/or mirrors we need to make sure they have delivered the msg.
shouldRemove = mset.directs <= 0 || !mset.checkInterest(seq, o)
shouldRemove = mset.directs <= 0 || mset.noInterest(seq, o)
case InterestPolicy:
shouldRemove = !mset.checkInterest(seq, o)
shouldRemove = mset.noInterest(seq, o)
}
mset.mu.Unlock()

// If nothing else to do.
if !shouldRemove {
// Clear any pending preAcks for this consumer.
mset.clearPreAckLock(o, seq)
return
}

// If we are here we should attempt to remove.
if _, err := mset.store.RemoveMsg(seq); err == ErrStoreEOF {
// This should not happen, but being pedantic.
mset.registerPreAckLock(o, seq)
} else {
mset.clearAllPreAcksLock(seq)
}
}

Expand Down