Skip to content

Commit

Permalink
Merge ef5d22b into 0778981
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jan 9, 2017
2 parents 0778981 + ef5d22b commit 01ac151
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 4 deletions.
13 changes: 9 additions & 4 deletions stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,16 @@ func (sc *conn) publishAsync(subject string, data []byte, ah AckHandler, ch chan
// Setup the timer for expiration.
sc.Lock()
a.t = time.AfterFunc(ackTimeout, func() {
sc.removeAck(peGUID)
if a.ah != nil {
ah(peGUID, ErrTimeout)
pubAck := sc.removeAck(peGUID)
// processAck could get here before and handle the ack.
// If that's the case, we would get nil here and simply return.
if pubAck == nil {
return
}
if pubAck.ah != nil {
pubAck.ah(peGUID, ErrTimeout)
} else if a.ch != nil {
a.ch <- ErrTimeout
pubAck.ch <- ErrTimeout
}
})
sc.Unlock()
Expand Down
65 changes: 65 additions & 0 deletions stan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2210,3 +2210,68 @@ func closeSubscriber(t *testing.T, channel, subType string) {
stackFatalf(t, "Timeout waiting for messages")
}
}

func TestDuplicateProcessingOfPubAck(t *testing.T) {
// We run our tests on Windows VM and this test would fail because
// server would be a slow consumer. So skipping for now.
if runtime.GOOS == "windows" {
t.SkipNow()
}
s := RunServer(clusterName)
defer s.Shutdown()

// Use a very small timeout purposely
sc, err := Connect(clusterName, clientName, PubAckWait(time.Millisecond))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sc.Close()

total := 10000
pubAcks := make(map[string]struct{}, total)
gotBug := false
errCh := make(chan error)
msg := []byte("msg")
count := 0
done := make(chan bool)
mu := &sync.Mutex{}

ackHandler := func(guid string, err error) {
mu.Lock()
if gotBug {
mu.Unlock()
return
}
if _, exist := pubAcks[guid]; exist {
gotBug = true
errCh <- fmt.Errorf("Duplicate processing of PubAck %d guid=%v", (count + 1), guid)
mu.Unlock()
return
}
pubAcks[guid] = struct{}{}
count++
if count == total {
done <- true
}
mu.Unlock()
}
for i := 0; i < total; i++ {
sc.PublishAsync("foo", msg, ackHandler)
}
select {
case <-done:
case e := <-errCh:
t.Fatal(e)
case <-time.After(10 * time.Second):
t.Fatal("Test took too long")
}
// If we are here is that we have published `total` messages.
// Since the bug is about processing duplicate PubAck,
// wait a bit more.
select {
case e := <-errCh:
t.Fatal(e)
case <-time.After(100 * time.Millisecond):
// This is more than the PubAckWait, so we should be good now.
}
}

0 comments on commit 01ac151

Please sign in to comment.