Skip to content

Commit

Permalink
[FIXED] Stalled Queue member may stall the whole Queue
Browse files Browse the repository at this point in the history
This happens if a member is stalled and other members are not but
have a number of outstanding acks higher than the stalled member.

Resolves #375
  • Loading branch information
kozlovic committed Aug 15, 2017
1 parent 4104ef6 commit f381a59
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 6 deletions.
11 changes: 5 additions & 6 deletions server/server.go
Expand Up @@ -2165,7 +2165,9 @@ func (s *StanServer) sendPublishErr(subj, guid string, err error) {

// FIXME(dlc) - place holder to pick sub that has least outstanding, should just sort,
// or use insertion sort, etc.
func findBestQueueSub(sl []*subState) (rsub *subState) {
func findBestQueueSub(sl []*subState) *subState {
var rsub *subState

for _, sub := range sl {

if rsub == nil {
Expand All @@ -2187,7 +2189,7 @@ func findBestQueueSub(sl []*subState) (rsub *subState) {

// Favor non stalled subscribers and clients that do not have
// failed heartbeats
if (!sStalled || rStalled) && (!sHasFailedHB || rHasFailedHB) && (sOut < rOut) {
if (!sStalled && !sHasFailedHB && (rStalled || rHasFailedHB)) || (sOut < rOut) {
rsub = sub
}
}
Expand All @@ -2198,15 +2200,12 @@ func findBestQueueSub(sl []*subState) (rsub *subState) {
sl[len-1] = rsub
}

return
return rsub
}

// Send a message to the queue group
// Assumes qs lock held for write
func (s *StanServer) sendMsgToQueueGroup(qs *queueState, m *pb.MsgProto, force bool) (*subState, bool, bool) {
if qs == nil {
return nil, false, false
}
sub := findBestQueueSub(qs.subs)
if sub == nil {
return nil, false, false
Expand Down
49 changes: 49 additions & 0 deletions server/server_queue_test.go
Expand Up @@ -680,3 +680,52 @@ func TestPersistentStoreMultipleShadowQSubs(t *testing.T) {
t.Fatalf("Recovered shadow queue sub should be ID 2, lastSent 2, got %v, %v", shadow.ID, lastSent)
}
}

func TestQueueWithOneStalledMemberDoesNotStallGroup(t *testing.T) {
s := runServer(t, clusterName)
defer s.Shutdown()

sc := NewDefaultConnection(t)
defer sc.Close()

ch := make(chan bool, 1)
// Create a member with low MaxInflight and Manual ack mode
if _, err := sc.QueueSubscribe("foo", "queue",
func(_ *stan.Msg) {
ch <- true
},
stan.MaxInflight(1),
stan.SetManualAckMode()); err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
if err := sc.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Unexpected error on publish: %v", err)
}
// Check message is received
if err := Wait(ch); err != nil {
t.Fatal("Did not get our message")
}
// Create another member with higher MaxInFlight and manual ack mode too.
count := 0
total := 5
if _, err := sc.QueueSubscribe("foo", "queue",
func(_ *stan.Msg) {
count++
if count == total {
ch <- true
}
},
stan.MaxInflight(10),
stan.SetManualAckMode()); err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
// Send messages and ensure they are received by 2nd member
for i := 0; i < total; i++ {
if err := sc.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Unexpected error on publish: %v", err)
}
}
if err := Wait(ch); err != nil {
t.Fatal("Did not get our messages")
}
}

0 comments on commit f381a59

Please sign in to comment.