Skip to content

Commit

Permalink
[FIXED] Avoid possible duplicate redeliveries after message is ack'ed
Browse files Browse the repository at this point in the history
If the streaming server is connecting to a standalone NATS Server,
and the streaming server is redelivering messages, if a disconnect
occurs, these redeliveries may be buffered by the NATS client
library used by the Streaming server. When the NATS Server is restarted
and the Streaming server reconnects, the pending buffer is flushed,
which if the client has reconnected before the server, would cause
the client to receive possibly a serie of the same redelivered
messages, even if the client is ack'ing them.
To prevent this, set the reconnect buffer to -1 to prevent any
buffering.
  • Loading branch information
kozlovic committed Apr 23, 2018
1 parent dfab8d0 commit 0347f1d
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 6 deletions.
10 changes: 4 additions & 6 deletions server/server.go
Expand Up @@ -1307,12 +1307,10 @@ func (s *StanServer) createNatsClientConn(name string) (*nats.Conn, error) {
ncOpts.ReconnectWait = 250 * time.Millisecond
// Make it try to reconnect for ever.
ncOpts.MaxReconnect = -1
// For FT make the reconnect buffer as small as possible since
// we don't really want FT HBs to be buffered while we are disconnected
// and be sent as a burst on reconnect.
if name == "ft" {
ncOpts.ReconnectBufSize = 128
}
// To avoid possible duplicate redeliveries, etc.., set the reconnect
// buffer to -1 to avoid any buffering in the nats library and flush
// on reconnect.
ncOpts.ReconnectBufSize = -1

s.log.Tracef(" NATS conn opts: %v", ncOpts)

Expand Down
83 changes: 83 additions & 0 deletions server/server_redelivery_test.go
Expand Up @@ -15,12 +15,14 @@ package server

import (
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

natsdTest "github.com/nats-io/gnatsd/test"
"github.com/nats-io/go-nats"
"github.com/nats-io/go-nats-streaming"
)
Expand Down Expand Up @@ -1240,3 +1242,84 @@ func TestQueueRedeliveryOnStartup(t *testing.T) {
case <-time.After(250 * time.Millisecond):
}
}

type delayReconnectDialer struct {
fail int
}

func (d *delayReconnectDialer) Dial(network, address string) (net.Conn, error) {
d.fail++
if d.fail >= 10 {
return net.Dial(network, address)
}
return nil, errOnPurpose
}

func TestNoDuplicateRedeliveryDueToDisconnect(t *testing.T) {
ns := natsdTest.RunServer(nil)
defer shutdownRestartedNATSServerOnTestExit(&ns)

opts := GetDefaultOptions()
opts.NATSServerURL = nats.DefaultURL
s := runServerWithOpts(t, opts, nil)
defer s.Shutdown()

// If the server's NATS connections were to use a reconnect buffer,
// in case of redeliveries, the server will Publish() them while
// being disconnected. On reconnect, the redeliveries would be flushed,
// which would result in client getting the same message redelivered
// multiple times, even after being ack'ed.
// Use a custom dialer for the server's send connection to ensure that
// the streaming client lib has a chance to reconnect before the server's
// connection reconnects, otherwise, even with a reconnect buffer, the
// pending redeliveries may be flushed before the client's subs are
// recreated on the NATS server and the issue would not be observed.
s.mu.Lock()
s.ncs.Opts.CustomDialer = &delayReconnectDialer{}
s.mu.Unlock()

sc, nc := createConnectionWithNatsOpts(t, clientName, nats.MaxReconnects(-1), nats.ReconnectWait(15*time.Millisecond))
defer nc.Close()
defer sc.Close()

ch := make(chan bool, 1)
rdc := int32(0)
ok := int32(0)
if _, err := sc.Subscribe("foo", func(m *stan.Msg) {
if !m.Redelivered {
ch <- true
return
}
// Do not ack until we know that the NATS server has not
// been stopped and restarted.
if atomic.LoadInt32(&ok) != 1 {
return
}
m.Ack()
if atomic.AddInt32(&rdc, 1) == 1 {
ch <- true
}
}, stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(100))); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
if err := Wait(ch); err != nil {
t.Fatal("Did not get out message")
}
// Shutdown NATS Server and wait for several redelivery attempts
ns.Shutdown()
time.Sleep(500 * time.Millisecond)
ns = natsdTest.RunServer(nil)
atomic.StoreInt32(&ok, 1)
// Now wait for the redelivered message to be ack'ed.
if err := Wait(ch); err != nil {
t.Fatal("Did not get redeliverd")
}
// Wait a bit and check that the rdc count is 1.
time.Sleep(400 * time.Millisecond)
if c := atomic.LoadInt32(&rdc); c != 1 {
t.Fatalf("Message redelivered after being ack'ed: %v", c)
}
}

0 comments on commit 0347f1d

Please sign in to comment.