Skip to content

Commit

Permalink
Merge pull request #1177 from nats-io/fix_1176
Browse files Browse the repository at this point in the history
[FIXED] Clustering: memory growth
  • Loading branch information
kozlovic committed Apr 15, 2021
2 parents bdd4f16 + 43e3919 commit 7b507b3
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion server/raft_transport.go
Expand Up @@ -75,6 +75,7 @@ type natsConn struct {
pendingH *pendingBuf // head of pending buffers list
pendingT *pendingBuf // tail of pending buffers list
ch chan struct{} // to send notification that a buffer is available
tmr *time.Timer
outbox string
closed bool
parent *natsStreamLayer
Expand Down Expand Up @@ -134,14 +135,15 @@ func (n *natsConn) Read(b []byte) (int, error) {
if subTimeout == 0 {
subTimeout = time.Duration(0x7FFFFFFFFFFFFFFF)
}
n.tmr.Reset(subTimeout)
}
n.mu.RUnlock()

// There was no buffer, so we need to wait.
if pb == nil {
WAIT_FOR_BUFFER:
select {
case <-time.After(subTimeout):
case <-n.tmr.C:
return 0, nats.ErrTimeout
case _, ok := <-n.ch:
if !ok {
Expand All @@ -159,6 +161,7 @@ func (n *natsConn) Read(b []byte) (int, error) {
}
// We have been notified, so get the reference to the head of the list.
pb = n.pendingH
n.tmr.Stop()
n.mu.RUnlock()
}

Expand Down Expand Up @@ -258,6 +261,7 @@ func (n *natsConn) close(signalRemote bool) error {
n.closed = true
stream := n.parent
close(n.ch)
n.tmr.Stop()
n.mu.Unlock()

stream.mu.Lock()
Expand Down Expand Up @@ -342,6 +346,7 @@ func (n *natsStreamLayer) newNATSConn(address string) (*natsConn, error) {
remoteAddr: natsAddr(address),
parent: n,
ch: make(chan struct{}, 1),
tmr: time.NewTimer(time.Hour),
}
if n.makeConn == nil {
c.conn = n.conn
Expand Down

0 comments on commit 7b507b3

Please sign in to comment.