Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Clustering: possible slow consumer errors #1169

Merged
merged 1 commit into from Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
154 changes: 111 additions & 43 deletions server/raft_transport.go
Expand Up @@ -72,64 +72,131 @@ type natsConn struct {
remoteAddr natsAddr
sub *nats.Subscription
subTimeout time.Duration
pending []byte
pendingH *pendingBuf // head of pending buffers list
pendingT *pendingBuf // tail of pending buffers list
ch chan struct{} // to send notification that a buffer is available
outbox string
closed bool
parent *natsStreamLayer
}

type pendingBuf struct {
buf []byte
next *pendingBuf
}

var pendingBufPool = &sync.Pool{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you use these, they help?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, with tests I have done, not really. But I have been using some in SQL code, etc..

New: func() interface{} {
return &pendingBuf{}
},
}

func (n *natsConn) onMsg(msg *nats.Msg) {
pb := pendingBufPool.Get().(*pendingBuf)
n.mu.Lock()
if n.closed {
n.mu.Unlock()
return
}
pb.buf = msg.Data
var notify bool
if n.pendingT != nil {
n.pendingT.next = pb
} else {
n.pendingH = pb
notify = true
}
n.pendingT = pb
n.mu.Unlock()

if notify {
select {
case n.ch <- struct{}{}:
default:
}
}
}

func (n *natsConn) Read(b []byte) (int, error) {
var subTimeout time.Duration

n.mu.RLock()
closed := n.closed
buf := n.pending
pendingSize := len(buf)
// We need the timeout only if we are going to call NextMsg, and if we
// have a pending, we won't.
if pendingSize == 0 {
if n.closed {
n.mu.RUnlock()
return 0, io.EOF
}
// Reference, but do not remove the pending buffer in case we
// cannot copy the whole buffer, we will update the buffer slice.
pb := n.pendingH
// We will wait only if there is no pending buffer.
if pb == nil {
subTimeout = n.subTimeout
if subTimeout == 0 {
subTimeout = time.Duration(0x7FFFFFFFFFFFFFFF)
}
}
n.mu.RUnlock()
if closed {
return 0, io.EOF
}
// If we have a pending, process that first.
if pendingSize > 0 {
// We will copy all data that we have if it can fit, or up to the
// caller's buffer size.
limit := pendingSize
if limit > len(b) {
limit = len(b)

// There was no buffer, so we need to wait.
if pb == nil {
WAIT_FOR_BUFFER:
select {
case <-time.After(subTimeout):
return 0, nats.ErrTimeout
case _, ok := <-n.ch:
if !ok {
return 0, io.EOF
}
}
nb := copy(b, buf[:limit])
// If we did not copy everything, reduce size by what we copied.
if nb != pendingSize {
buf = buf[nb:]
} else {
buf = nil
n.mu.RLock()
// We notify when adding the first pending buffer, but if Read() is called
// after, we will detect that there is a pending and skip the whole select.
// So after consuming the pending, the next Read() would get the past
// notification. If that is the case, go back to the select.
if n.pendingH == nil {
n.mu.RUnlock()
goto WAIT_FOR_BUFFER
}
n.pending = buf
return nb, nil
// We have been notified, so get the reference to the head of the list.
pb = n.pendingH
n.mu.RUnlock()
}
msg, err := n.sub.NextMsg(subTimeout)
if err != nil {
return 0, err
}
if len(msg.Data) == 0 {

buf := pb.buf
bufSize := len(buf)
// A buf of size 0 means that the remote closed
if bufSize == 0 {
n.close(false)
return 0, io.EOF
}
buf = msg.Data
if nb := len(buf); nb <= len(b) {
copy(b, buf)
return nb, nil
limit := bufSize
if limit > len(b) {
limit = len(b)
}
nb := copy(b, buf[:limit])
// If we did not copy everything, reduce size by what we copied.
if nb != bufSize {
buf = buf[nb:]
} else {
buf = nil
}
var release bool
n.mu.Lock()
if buf != nil {
pb.buf = buf
} else {
// We are done with this pending buffer, remove from the pending list.
n.pendingH = n.pendingH.next
if n.pendingH == nil {
n.pendingT = nil
}
release = true
}
n.mu.Unlock()
if release {
pb.buf, pb.next = nil, nil
pendingBufPool.Put(pb)
}
nb := copy(b, buf[:len(b)])
n.pending = buf[nb:]
return nb, nil
}

Expand Down Expand Up @@ -190,6 +257,7 @@ func (n *natsConn) close(signalRemote bool) error {

n.closed = true
stream := n.parent
close(n.ch)
n.mu.Unlock()

stream.mu.Lock()
Expand Down Expand Up @@ -257,7 +325,6 @@ func newNATSStreamLayer(id string, conn *nats.Conn, logger hclog.Logger, timeout
if err != nil {
return nil, err
}
sub.SetPendingLimits(-1, -1)
if err := conn.FlushTimeout(n.dfTimeout); err != nil {
sub.Unsubscribe()
return nil, err
Expand All @@ -274,6 +341,7 @@ func (n *natsStreamLayer) newNATSConn(address string) (*natsConn, error) {
localAddr: n.localAddr,
remoteAddr: natsAddr(address),
parent: n,
ch: make(chan struct{}, 1),
}
if n.makeConn == nil {
c.conn = n.conn
Expand Down Expand Up @@ -328,14 +396,14 @@ func (n *natsStreamLayer) Dial(address raft.ServerAddress, timeout time.Duration
}

// Setup inbox.
sub, err := peerConn.conn.SubscribeSync(connect.Inbox)
peerConn.mu.Lock()
sub, err := peerConn.conn.Subscribe(connect.Inbox, peerConn.onMsg)
if err != nil {
peerConn.mu.Unlock()
peerConn.Close()
return nil, err
}
sub.SetPendingLimits(-1, -1)

peerConn.mu.Lock()
peerConn.sub = sub
peerConn.outbox = resp.Inbox
peerConn.mu.Unlock()
Expand Down Expand Up @@ -377,15 +445,15 @@ func (n *natsStreamLayer) Accept() (net.Conn, error) {

// Setup inbox for peer.
inbox := fmt.Sprintf(natsRequestInbox, n.localAddr.String(), nats.NewInbox())
sub, err := peerConn.conn.SubscribeSync(inbox)
peerConn.mu.Lock()
sub, err := peerConn.conn.Subscribe(inbox, peerConn.onMsg)
if err != nil {
peerConn.mu.Unlock()
n.logger.Error("Failed to create inbox for remote peer", "error", err)
peerConn.Close()
continue
}
sub.SetPendingLimits(-1, -1)

peerConn.mu.Lock()
peerConn.outbox = connect.Inbox
peerConn.sub = sub
shouldFlush := !peerConn.streamConn
Expand Down
18 changes: 18 additions & 0 deletions server/raft_transport_test.go
Expand Up @@ -840,6 +840,24 @@ func TestRAFTTransportConnReader(t *testing.T) {
t.Fatal("Stream connection should not have been closed")
}

// Again, create a temp connection and make sure what we break out of
// a Read() if we close the connection.
tmp, err = stream1.newNATSConn("c")
if err != nil {
t.Fatalf("Error on create: %v", err)
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
time.Sleep(150 * time.Millisecond)
tmp.Close()
wg.Done()
}()
if n, err := tmp.Read(buf[:]); err != io.EOF || n > 0 {
t.Fatalf("Expected n=0 err=io.EOF, got n=%v err=%v", n, err)
}
wg.Wait()

// Now create a stream that will create a new NATS connection.
nc3 := newNatsConnection(t)
defer nc3.Close()
Expand Down