Skip to content

Commit

Permalink
[FIXED] Clustering: possible slow consumer errors
Browse files Browse the repository at this point in the history
This was introduced in v0.21.0 when I moved to sync subscriptions
and missed the fact that the pending limits do not apply to those
type of subscriptions.
Switching to async sub and adding buffers to a pending list.
These are consumed in the Read() call made by the RAFT library.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Mar 4, 2021
1 parent 0b54427 commit a5739e4
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 43 deletions.
154 changes: 111 additions & 43 deletions server/raft_transport.go
Expand Up @@ -72,64 +72,131 @@ type natsConn struct {
remoteAddr natsAddr
sub *nats.Subscription
subTimeout time.Duration
pending []byte
pendingH *pendingBuf // head of pending buffers list
pendingT *pendingBuf // tail of pending buffers list
ch chan struct{} // to send notification that a buffer is available
outbox string
closed bool
parent *natsStreamLayer
}

type pendingBuf struct {
buf []byte
next *pendingBuf
}

var pendingBufPool = &sync.Pool{
New: func() interface{} {
return &pendingBuf{}
},
}

func (n *natsConn) onMsg(msg *nats.Msg) {
pb := pendingBufPool.Get().(*pendingBuf)
n.mu.Lock()
if n.closed {
n.mu.Unlock()
return
}
pb.buf = msg.Data
var notify bool
if n.pendingT != nil {
n.pendingT.next = pb
} else {
n.pendingH = pb
notify = true
}
n.pendingT = pb
n.mu.Unlock()

if notify {
select {
case n.ch <- struct{}{}:
default:
}
}
}

func (n *natsConn) Read(b []byte) (int, error) {
var subTimeout time.Duration

n.mu.RLock()
closed := n.closed
buf := n.pending
pendingSize := len(buf)
// We need the timeout only if we are going to call NextMsg, and if we
// have a pending, we won't.
if pendingSize == 0 {
if n.closed {
n.mu.RUnlock()
return 0, io.EOF
}
// Reference, but do not remove the pending buffer in case we
// cannot copy the whole buffer, we will update the buffer slice.
pb := n.pendingH
// We will wait only if there is no pending buffer.
if pb == nil {
subTimeout = n.subTimeout
if subTimeout == 0 {
subTimeout = time.Duration(0x7FFFFFFFFFFFFFFF)
}
}
n.mu.RUnlock()
if closed {
return 0, io.EOF
}
// If we have a pending, process that first.
if pendingSize > 0 {
// We will copy all data that we have if it can fit, or up to the
// caller's buffer size.
limit := pendingSize
if limit > len(b) {
limit = len(b)

// There was no buffer, so we need to wait.
if pb == nil {
WAIT_FOR_BUFFER:
select {
case <-time.After(subTimeout):
return 0, nats.ErrTimeout
case _, ok := <-n.ch:
if !ok {
return 0, io.EOF
}
}
nb := copy(b, buf[:limit])
// If we did not copy everything, reduce size by what we copied.
if nb != pendingSize {
buf = buf[nb:]
} else {
buf = nil
n.mu.RLock()
// We notify when adding the first pending buffer, but if Read() is called
// after, we will detect that there is a pending and skip the whole select.
// So after consuming the pending, the next Read() would get the past
// notification. If that is the case, go back to the select.
if n.pendingH == nil {
n.mu.RUnlock()
goto WAIT_FOR_BUFFER
}
n.pending = buf
return nb, nil
// We have been notified, so get the reference to the head of the list.
pb = n.pendingH
n.mu.RUnlock()
}
msg, err := n.sub.NextMsg(subTimeout)
if err != nil {
return 0, err
}
if len(msg.Data) == 0 {

buf := pb.buf
bufSize := len(buf)
// A buf of size 0 means that the remote closed
if bufSize == 0 {
n.close(false)
return 0, io.EOF
}
buf = msg.Data
if nb := len(buf); nb <= len(b) {
copy(b, buf)
return nb, nil
limit := bufSize
if limit > len(b) {
limit = len(b)
}
nb := copy(b, buf[:limit])
// If we did not copy everything, reduce size by what we copied.
if nb != bufSize {
buf = buf[nb:]
} else {
buf = nil
}
var release bool
n.mu.Lock()
if buf != nil {
pb.buf = buf
} else {
// We are done with this pending buffer, remove from the pending list.
n.pendingH = n.pendingH.next
if n.pendingH == nil {
n.pendingT = nil
}
release = true
}
n.mu.Unlock()
if release {
pb.buf, pb.next = nil, nil
pendingBufPool.Put(pb)
}
nb := copy(b, buf[:len(b)])
n.pending = buf[nb:]
return nb, nil
}

Expand Down Expand Up @@ -190,6 +257,7 @@ func (n *natsConn) close(signalRemote bool) error {

n.closed = true
stream := n.parent
close(n.ch)
n.mu.Unlock()

stream.mu.Lock()
Expand Down Expand Up @@ -257,7 +325,6 @@ func newNATSStreamLayer(id string, conn *nats.Conn, logger hclog.Logger, timeout
if err != nil {
return nil, err
}
sub.SetPendingLimits(-1, -1)
if err := conn.FlushTimeout(n.dfTimeout); err != nil {
sub.Unsubscribe()
return nil, err
Expand All @@ -274,6 +341,7 @@ func (n *natsStreamLayer) newNATSConn(address string) (*natsConn, error) {
localAddr: n.localAddr,
remoteAddr: natsAddr(address),
parent: n,
ch: make(chan struct{}, 1),
}
if n.makeConn == nil {
c.conn = n.conn
Expand Down Expand Up @@ -328,14 +396,14 @@ func (n *natsStreamLayer) Dial(address raft.ServerAddress, timeout time.Duration
}

// Setup inbox.
sub, err := peerConn.conn.SubscribeSync(connect.Inbox)
peerConn.mu.Lock()
sub, err := peerConn.conn.Subscribe(connect.Inbox, peerConn.onMsg)
if err != nil {
peerConn.mu.Unlock()
peerConn.Close()
return nil, err
}
sub.SetPendingLimits(-1, -1)

peerConn.mu.Lock()
peerConn.sub = sub
peerConn.outbox = resp.Inbox
peerConn.mu.Unlock()
Expand Down Expand Up @@ -377,15 +445,15 @@ func (n *natsStreamLayer) Accept() (net.Conn, error) {

// Setup inbox for peer.
inbox := fmt.Sprintf(natsRequestInbox, n.localAddr.String(), nats.NewInbox())
sub, err := peerConn.conn.SubscribeSync(inbox)
peerConn.mu.Lock()
sub, err := peerConn.conn.Subscribe(inbox, peerConn.onMsg)
if err != nil {
peerConn.mu.Unlock()
n.logger.Error("Failed to create inbox for remote peer", "error", err)
peerConn.Close()
continue
}
sub.SetPendingLimits(-1, -1)

peerConn.mu.Lock()
peerConn.outbox = connect.Inbox
peerConn.sub = sub
shouldFlush := !peerConn.streamConn
Expand Down
18 changes: 18 additions & 0 deletions server/raft_transport_test.go
Expand Up @@ -840,6 +840,24 @@ func TestRAFTTransportConnReader(t *testing.T) {
t.Fatal("Stream connection should not have been closed")
}

// Again, create a temp connection and make sure what we break out of
// a Read() if we close the connection.
tmp, err = stream1.newNATSConn("c")
if err != nil {
t.Fatalf("Error on create: %v", err)
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
time.Sleep(150 * time.Millisecond)
tmp.Close()
wg.Done()
}()
if n, err := tmp.Read(buf[:]); err != io.EOF || n > 0 {
t.Fatalf("Expected n=0 err=io.EOF, got n=%v err=%v", n, err)
}
wg.Wait()

// Now create a stream that will create a new NATS connection.
nc3 := newNatsConnection(t)
defer nc3.Close()
Expand Down

0 comments on commit a5739e4

Please sign in to comment.