From a5739e4d548a278dbeb82508dafa480d83cb0c23 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 4 Mar 2021 16:57:00 -0700 Subject: [PATCH] [FIXED] Clustering: possible slow consumer errors This was introduced in v0.21.0 when I moved to sync subscriptions and missed the fact that the pending limits do not apply to those type of subscriptions. Switching to async sub and adding buffers to a pending list. These are consumed in the Read() call made by the RAFT library. Signed-off-by: Ivan Kozlovic --- server/raft_transport.go | 154 ++++++++++++++++++++++++---------- server/raft_transport_test.go | 18 ++++ 2 files changed, 129 insertions(+), 43 deletions(-) diff --git a/server/raft_transport.go b/server/raft_transport.go index 9265f22f..54316a71 100644 --- a/server/raft_transport.go +++ b/server/raft_transport.go @@ -72,64 +72,131 @@ type natsConn struct { remoteAddr natsAddr sub *nats.Subscription subTimeout time.Duration - pending []byte + pendingH *pendingBuf // head of pending buffers list + pendingT *pendingBuf // tail of pending buffers list + ch chan struct{} // to send notification that a buffer is available outbox string closed bool parent *natsStreamLayer } +type pendingBuf struct { + buf []byte + next *pendingBuf +} + +var pendingBufPool = &sync.Pool{ + New: func() interface{} { + return &pendingBuf{} + }, +} + +func (n *natsConn) onMsg(msg *nats.Msg) { + pb := pendingBufPool.Get().(*pendingBuf) + n.mu.Lock() + if n.closed { + n.mu.Unlock() + return + } + pb.buf = msg.Data + var notify bool + if n.pendingT != nil { + n.pendingT.next = pb + } else { + n.pendingH = pb + notify = true + } + n.pendingT = pb + n.mu.Unlock() + + if notify { + select { + case n.ch <- struct{}{}: + default: + } + } +} + func (n *natsConn) Read(b []byte) (int, error) { var subTimeout time.Duration n.mu.RLock() - closed := n.closed - buf := n.pending - pendingSize := len(buf) - // We need the timeout only if we are going to call NextMsg, and if we - // have a pending, we won't. - if pendingSize == 0 { + if n.closed { + n.mu.RUnlock() + return 0, io.EOF + } + // Reference, but do not remove the pending buffer in case we + // cannot copy the whole buffer, we will update the buffer slice. + pb := n.pendingH + // We will wait only if there is no pending buffer. + if pb == nil { subTimeout = n.subTimeout if subTimeout == 0 { subTimeout = time.Duration(0x7FFFFFFFFFFFFFFF) } } n.mu.RUnlock() - if closed { - return 0, io.EOF - } - // If we have a pending, process that first. - if pendingSize > 0 { - // We will copy all data that we have if it can fit, or up to the - // caller's buffer size. - limit := pendingSize - if limit > len(b) { - limit = len(b) + + // There was no buffer, so we need to wait. + if pb == nil { + WAIT_FOR_BUFFER: + select { + case <-time.After(subTimeout): + return 0, nats.ErrTimeout + case _, ok := <-n.ch: + if !ok { + return 0, io.EOF + } } - nb := copy(b, buf[:limit]) - // If we did not copy everything, reduce size by what we copied. - if nb != pendingSize { - buf = buf[nb:] - } else { - buf = nil + n.mu.RLock() + // We notify when adding the first pending buffer, but if Read() is called + // after, we will detect that there is a pending and skip the whole select. + // So after consuming the pending, the next Read() would get the past + // notification. If that is the case, go back to the select. + if n.pendingH == nil { + n.mu.RUnlock() + goto WAIT_FOR_BUFFER } - n.pending = buf - return nb, nil + // We have been notified, so get the reference to the head of the list. + pb = n.pendingH + n.mu.RUnlock() } - msg, err := n.sub.NextMsg(subTimeout) - if err != nil { - return 0, err - } - if len(msg.Data) == 0 { + + buf := pb.buf + bufSize := len(buf) + // A buf of size 0 means that the remote closed + if bufSize == 0 { n.close(false) return 0, io.EOF } - buf = msg.Data - if nb := len(buf); nb <= len(b) { - copy(b, buf) - return nb, nil + limit := bufSize + if limit > len(b) { + limit = len(b) + } + nb := copy(b, buf[:limit]) + // If we did not copy everything, reduce size by what we copied. + if nb != bufSize { + buf = buf[nb:] + } else { + buf = nil + } + var release bool + n.mu.Lock() + if buf != nil { + pb.buf = buf + } else { + // We are done with this pending buffer, remove from the pending list. + n.pendingH = n.pendingH.next + if n.pendingH == nil { + n.pendingT = nil + } + release = true + } + n.mu.Unlock() + if release { + pb.buf, pb.next = nil, nil + pendingBufPool.Put(pb) } - nb := copy(b, buf[:len(b)]) - n.pending = buf[nb:] return nb, nil } @@ -190,6 +257,7 @@ func (n *natsConn) close(signalRemote bool) error { n.closed = true stream := n.parent + close(n.ch) n.mu.Unlock() stream.mu.Lock() @@ -257,7 +325,6 @@ func newNATSStreamLayer(id string, conn *nats.Conn, logger hclog.Logger, timeout if err != nil { return nil, err } - sub.SetPendingLimits(-1, -1) if err := conn.FlushTimeout(n.dfTimeout); err != nil { sub.Unsubscribe() return nil, err @@ -274,6 +341,7 @@ func (n *natsStreamLayer) newNATSConn(address string) (*natsConn, error) { localAddr: n.localAddr, remoteAddr: natsAddr(address), parent: n, + ch: make(chan struct{}, 1), } if n.makeConn == nil { c.conn = n.conn @@ -328,14 +396,14 @@ func (n *natsStreamLayer) Dial(address raft.ServerAddress, timeout time.Duration } // Setup inbox. - sub, err := peerConn.conn.SubscribeSync(connect.Inbox) + peerConn.mu.Lock() + sub, err := peerConn.conn.Subscribe(connect.Inbox, peerConn.onMsg) if err != nil { + peerConn.mu.Unlock() peerConn.Close() return nil, err } sub.SetPendingLimits(-1, -1) - - peerConn.mu.Lock() peerConn.sub = sub peerConn.outbox = resp.Inbox peerConn.mu.Unlock() @@ -377,15 +445,15 @@ func (n *natsStreamLayer) Accept() (net.Conn, error) { // Setup inbox for peer. inbox := fmt.Sprintf(natsRequestInbox, n.localAddr.String(), nats.NewInbox()) - sub, err := peerConn.conn.SubscribeSync(inbox) + peerConn.mu.Lock() + sub, err := peerConn.conn.Subscribe(inbox, peerConn.onMsg) if err != nil { + peerConn.mu.Unlock() n.logger.Error("Failed to create inbox for remote peer", "error", err) peerConn.Close() continue } sub.SetPendingLimits(-1, -1) - - peerConn.mu.Lock() peerConn.outbox = connect.Inbox peerConn.sub = sub shouldFlush := !peerConn.streamConn diff --git a/server/raft_transport_test.go b/server/raft_transport_test.go index 6bfd0ac1..b5cb1a48 100644 --- a/server/raft_transport_test.go +++ b/server/raft_transport_test.go @@ -840,6 +840,24 @@ func TestRAFTTransportConnReader(t *testing.T) { t.Fatal("Stream connection should not have been closed") } + // Again, create a temp connection and make sure what we break out of + // a Read() if we close the connection. + tmp, err = stream1.newNATSConn("c") + if err != nil { + t.Fatalf("Error on create: %v", err) + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + time.Sleep(150 * time.Millisecond) + tmp.Close() + wg.Done() + }() + if n, err := tmp.Read(buf[:]); err != io.EOF || n > 0 { + t.Fatalf("Expected n=0 err=io.EOF, got n=%v err=%v", n, err) + } + wg.Wait() + // Now create a stream that will create a new NATS connection. nc3 := newNatsConnection(t) defer nc3.Close()