Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Support for redelivery count #997

Merged
merged 1 commit into from Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -9,7 +9,7 @@ require (
github.com/nats-io/nats-server/v2 v2.1.2
github.com/nats-io/nats.go v1.9.1
github.com/nats-io/nuid v1.0.1
github.com/nats-io/stan.go v0.5.2
github.com/nats-io/stan.go v0.6.0
github.com/prometheus/procfs v0.0.3
go.etcd.io/bbolt v1.3.3
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -49,8 +49,8 @@ github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nats-io/stan.go v0.5.2 h1:/DfflNAztFQVjssQ7hW8d9gWl3hU+SJ3mWjokaQEsog=
github.com/nats-io/stan.go v0.5.2/go.mod h1:eIcD5bi3pqbHT/xIIvXMwvzXYElgouBvaVRftaE+eac=
github.com/nats-io/stan.go v0.6.0 h1:26IJPeykh88d8KVLT4jJCIxCyUBOC5/IQup8oWD/QYY=
github.com/nats-io/stan.go v0.6.0/go.mod h1:eIcD5bi3pqbHT/xIIvXMwvzXYElgouBvaVRftaE+eac=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
109 changes: 109 additions & 0 deletions server/clustering_test.go
Expand Up @@ -7144,3 +7144,112 @@ func TestClusteringSubStateProperlyResetOnLeadershipAcquired(t *testing.T) {
t.Fatal("Did not get message 6")
}
}

func TestClusteringRedeliveryCount(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

getLeader(t, 10*time.Second, s1, s2, s3)

sc := NewDefaultConnection(t)
defer sc.Close()

restarted := int32(0)
rdlv := uint32(0)
errCh := make(chan error, 1)
ch := make(chan bool, 1)
if _, err := sc.Subscribe("foo",
func(m *stan.Msg) {
if !m.Redelivered && m.RedeliveryCount != 0 {
m.Sub.Close()
errCh <- fmt.Errorf("redelivery count is set although redelivered flag is not: %v", m)
return
}
if !m.Redelivered {
return
}
rd := atomic.AddUint32(&rdlv, 1)
if rd != m.RedeliveryCount {
m.Sub.Close()
errCh <- fmt.Errorf("expected redelivery count to be %v, got %v", rd, m.RedeliveryCount)
return
}
if m.RedeliveryCount == 3 {
if atomic.LoadInt32(&restarted) == 1 {
m.Ack()
}
ch <- true
}
},
stan.SetManualAckMode(),
stan.AckWait(ackWaitInMs(100))); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

sc.Publish("foo", []byte("msg"))

select {
case e := <-errCh:
t.Fatal(e.Error())
case <-ch:
case <-time.After(time.Second):
t.Fatalf("Timedout")
}

s1.Shutdown()
atomic.StoreUint32(&rdlv, 0)
atomic.StoreInt32(&restarted, 1)
s1 = runServerWithOpts(t, s1sOpts, nil)
getLeader(t, 10*time.Second, s1, s2, s3)

select {
case e := <-errCh:
t.Fatal(e.Error())
case <-ch:
case <-time.After(time.Second):
t.Fatalf("Timedout")
}

// Now start a new subscription and make sure that redelivery count is not set
// for message 1 on initial delivery.
if _, err := sc.Subscribe("foo",
func(m *stan.Msg) {
if m.RedeliveryCount != 0 {
m.Sub.Close()
errCh <- fmt.Errorf("redelivery count is set although redelivered flag is not: %v", m)
return
}
ch <- true
},
stan.DeliverAllAvailable()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
select {
case e := <-errCh:
t.Fatal(e.Error())
case <-ch:
case <-time.After(time.Second):
t.Fatalf("Timedout")
}
}
71 changes: 62 additions & 9 deletions server/server.go
Expand Up @@ -765,6 +765,7 @@ type queueState struct {
sync.RWMutex
lastSent uint64
subs []*subState
rdlvCount map[uint64]uint32
shadow *subState // For durable case, when last member leaves and group is not closed.
stalledSubCount int // number of stalled members
newOnHold bool
Expand Down Expand Up @@ -795,6 +796,8 @@ type subState struct {
replicate *subSentAndAck // Used in Clustering mode
norepl bool // When a sub is being closed, prevents collectSentOrAck to recreate `replicate`.

rdlvCount map[uint64]uint32 // Used only when not a queue sub, otherwise queueState's rldvCount is used.

// So far, compacting these booleans into a byte flag would not save space.
// May change if we need to add more.
initialized bool // false until the subscription response has been sent to prevent data to be sent too early.
Expand Down Expand Up @@ -1135,7 +1138,7 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
// Need to update if this member was the one with the last
// message of the group.
storageUpdate = sub.LastSent == qs.lastSent
sortedPendingMsgs := makeSortedPendingMsgs(sub.acksPending)
sortedPendingMsgs := sub.makeSortedPendingMsgs()
for _, pm := range sortedPendingMsgs {
// Get one of the remaning queue subscribers.
qsub := qs.subs[idx]
Expand Down Expand Up @@ -2580,6 +2583,13 @@ func (s *StanServer) performRedeliveryOnStartup(recoveredSubs []*subState) {
for qs, c := range queues {
qs.Lock()
qs.newOnHold = false
// Reset redelivery count map (this is to be consistent that if
// in cluster mode a node regains leadership the count restarts
// at 1. Otherwise, you could get something like this:
// node A is leader: 1, 2, 3 - then loses leadership (but does not exit)
// node B is leader: 1, 2, 3, 4, 5, 6 - then loses leadership
// node A is leader: 4, 5, 6, ...
qs.rdlvCount = nil
// This is required in cluster mode if a node was leader,
// lost it and then becomes leader again, all that without
// restoring from snapshot.
Expand Down Expand Up @@ -3384,9 +3394,9 @@ func (a bySeq) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a bySeq) Less(i, j int) bool { return a[i] < a[j] }

// Returns an array of message sequence numbers ordered by sequence.
func makeSortedSequences(sequences map[uint64]int64) []uint64 {
results := make([]uint64, 0, len(sequences))
for seq := range sequences {
func (sub *subState) makeSortedSequences() []uint64 {
results := make([]uint64, 0, len(sub.acksPending))
for seq := range sub.acksPending {
results = append(results, seq)
}
sort.Sort(bySeq(results))
Expand All @@ -3413,23 +3423,36 @@ func (a byExpire) Less(i, j int) bool {
// the expiration date in the pendingMsgs map is not set (0), which
// happens after a server restart. In this case, the array is ordered
// by message sequence numbers.
func makeSortedPendingMsgs(pendingMsgs map[uint64]int64) []*pendingMsg {
results := make([]*pendingMsg, 0, len(pendingMsgs))
for seq, expire := range pendingMsgs {
func (sub *subState) makeSortedPendingMsgs() []*pendingMsg {
results := make([]*pendingMsg, 0, len(sub.acksPending))
for seq, expire := range sub.acksPending {
results = append(results, &pendingMsg{seq: seq, expire: expire})
}
sort.Sort(byExpire(results))
return results
}

func qsLock(qs *queueState) {
if qs != nil {
qs.Lock()
}
}

func qsUnlock(qs *queueState) {
if qs != nil {
qs.Unlock()
}
}

// Redeliver all outstanding messages to a durable subscriber, used on resubscribe.
func (s *StanServer) performDurableRedelivery(c *channel, sub *subState) {
// Sort our messages outstanding from acksPending, grab some state and unlock.
sub.RLock()
sortedSeqs := makeSortedSequences(sub.acksPending)
sortedSeqs := sub.makeSortedSequences()
clientID := sub.ClientID
newOnHold := sub.newOnHold
subID := sub.ID
qs := sub.qstate
sub.RUnlock()

if s.debug && len(sortedSeqs) > 0 {
Expand Down Expand Up @@ -3458,10 +3481,12 @@ func (s *StanServer) performDurableRedelivery(c *channel, sub *subState) {
// Flag as redelivered.
m.Redelivered = true

qsLock(qs)
sub.Lock()
// Force delivery
s.sendMsgToSub(sub, m, forceDelivery)
sub.Unlock()
qsUnlock(qs)
}
}
// Release newOnHold if needed.
Expand All @@ -3476,7 +3501,7 @@ func (s *StanServer) performDurableRedelivery(c *channel, sub *subState) {
func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup bool) {
// Sort our messages outstanding from acksPending, grab some state and unlock.
sub.Lock()
sortedPendingMsgs := makeSortedPendingMsgs(sub.acksPending)
sortedPendingMsgs := sub.makeSortedPendingMsgs()
if len(sortedPendingMsgs) == 0 {
sub.clearAckTimer()
sub.Unlock()
Expand Down Expand Up @@ -3582,9 +3607,11 @@ func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup boo
s.processAck(c, sub, m.Sequence, false)
}
} else {
qsLock(qs)
sub.Lock()
s.sendMsgToSub(sub, m, forceDelivery)
sub.Unlock()
qsUnlock(qs)
}
}
if foundWithZero {
Expand Down Expand Up @@ -3634,6 +3661,7 @@ func (s *StanServer) subChangesOnLeadershipAcquired(sub *subState) {

func (s *StanServer) subChangesOnLeadershipLost(sub *subState) {
sub.Lock()
sub.rdlvCount = nil
sub.stopAckSub()
sub.clearAckTimer()
s.clearSentAndAck(sub)
Expand Down Expand Up @@ -3904,6 +3932,9 @@ func (s *StanServer) sendMsgToSub(sub *subState, m *pb.MsgProto, force bool) (bo
sub.ClientID, action, sub.ID, m.Subject, m.Sequence)
}

if m.Redelivered {
sub.updateRedeliveryCount(m)
}
// Marshal of a pb.MsgProto cannot fail
b, _ := m.Marshal()
// but protect against a store implementation that may incorrectly
Expand Down Expand Up @@ -3975,6 +4006,28 @@ func (s *StanServer) sendMsgToSub(sub *subState, m *pb.MsgProto, force bool) (bo
return true, true
}

// Will set the redelivery count for this message that is ready to be redelivered.
// sub's lock is held on entry, and if it is a queue sub, qstate's lock is held too.
func (sub *subState) updateRedeliveryCount(m *pb.MsgProto) {
var rdlvCountMap *map[uint64]uint32
if sub.qstate != nil {
rdlvCountMap = &sub.qstate.rdlvCount
} else {
rdlvCountMap = &sub.rdlvCount
}
if *rdlvCountMap == nil {
*rdlvCountMap = make(map[uint64]uint32)
}
for {
(*rdlvCountMap)[m.Sequence]++
m.RedeliveryCount = (*rdlvCountMap)[m.Sequence]
// Just in case we rolled over...
if m.RedeliveryCount != 0 {
break
}
}
}

// Sets up the ackTimer to fire at the given duration.
// sub's lock held on entry.
func (s *StanServer) setupAckTimer(sub *subState, d time.Duration) {
Expand Down
40 changes: 40 additions & 0 deletions server/server_queue_test.go
Expand Up @@ -1269,3 +1269,43 @@ func TestPersistentStoreDurableQueueSubRaceBetweenCreateAndClose(t *testing.T) {
t.Fatalf("Duplicate shadow subscription found!")
}
}

func TestQueueRedeliveryCount(t *testing.T) {
s := runServer(t, clusterName)
defer s.Shutdown()

sc := NewDefaultConnection(t)
defer sc.Close()

errCh := make(chan error, 2)
var mu sync.Mutex
var prev uint32
cb := func(m *stan.Msg) {
if m.Redelivered {
mu.Lock()
if m.RedeliveryCount != prev+1 {
m.Sub.Close()
errCh <- fmt.Errorf("previous was %v, current %v", prev, m.RedeliveryCount)
mu.Unlock()
return
}
prev = m.RedeliveryCount
mu.Unlock()
}
}

for i := 0; i < 2; i++ {
if _, err := sc.QueueSubscribe("foo", "bar", cb, stan.AckWait(ackWaitInMs(50)), stan.SetManualAckMode()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
}

sc.Publish("foo", []byte("msg"))

select {
case e := <-errCh:
t.Fatal(e.Error())
case <-time.After(500 * time.Millisecond):
// ok!
}
}