Skip to content

Commit

Permalink
[FIXED] Close() release resources immediately
Browse files Browse the repository at this point in the history
When a connection is closed very quickly after being created, there
was a race condition that could lead to the connection object still
being referenced (and therefore holding resources) until the first
ping interval elapsed.
This was because the pinger was created in spinGoRoutines() which
was itself created as a go routine. When the Close() call was made,
the ping timer may still be nil (therefore no way to stop it), but
then created in the go routine (which caused the reference to the
connection object). By default, it means that the connection object
would be held for 30 seconds.
Doing connect/close in a tight loop would show the process size
growing.

This PR refactors where we create the pinger and the use of the
wait group. There had been a change in a past to attempt to fix
a WaitGroup panic. I believe this approach is more stable. We do
create the pinger and bump the wait group in processConnectInit,
under connection lock. The doReconnect() go routine waits for
this wait group on entry and then on iterations where a failure
occurs after we know we have started the go routines.
In case of failures in the reconect loop, some state has also been
properly reset (namely the connection's buffered writer).

Resolves #368

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jun 12, 2018
1 parent 93a833d commit 35b4f0c
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 54 deletions.
110 changes: 56 additions & 54 deletions nats.go
Expand Up @@ -295,7 +295,7 @@ type Conn struct {
// Opts holds the configuration of the Conn.
// Modifying the configuration of a running Conn is a race.
Opts Options
wg *sync.WaitGroup
wg sync.WaitGroup
url *url.URL
conn net.Conn
srvPool []*srv
Expand Down Expand Up @@ -992,46 +992,15 @@ func (nc *Conn) makeTLSConn() {

// waitForExits will wait for all socket watcher Go routines to
// be shutdown before proceeding.
func (nc *Conn) waitForExits(wg *sync.WaitGroup) {
func (nc *Conn) waitForExits() {
// Kick old flusher forcefully.
select {
case nc.fch <- struct{}{}:
default:
}

// Wait for any previous go routines.
if wg != nil {
wg.Wait()
}
}

// spinUpGoRoutines will launch the Go routines responsible for
// reading and writing to the socket. This will be launched via a
// go routine itself to release any locks that may be held.
// We also use a WaitGroup to make sure we only start them on a
// reconnect when the previous ones have exited.
func (nc *Conn) spinUpGoRoutines() {
// Make sure everything has exited.
nc.waitForExits(nc.wg)

// Create a new waitGroup instance for this run.
nc.wg = &sync.WaitGroup{}
// We will wait on both.
nc.wg.Add(2)

// Spin up the readLoop and the socket flusher.
go nc.readLoop(nc.wg)
go nc.flusher(nc.wg)

nc.mu.Lock()
if nc.Opts.PingInterval > 0 {
if nc.ptmr == nil {
nc.ptmr = time.AfterFunc(nc.Opts.PingInterval, nc.processPingTimer)
} else {
nc.ptmr.Reset(nc.Opts.PingInterval)
}
}
nc.mu.Unlock()
nc.wg.Wait()
}

// Report the connected server's Url
Expand Down Expand Up @@ -1098,7 +1067,19 @@ func (nc *Conn) processConnectInit() error {
// Reset the number of PING sent out
nc.pout = 0

go nc.spinUpGoRoutines()
// Start or reset Timer
if nc.Opts.PingInterval > 0 {
if nc.ptmr == nil {
nc.ptmr = time.AfterFunc(nc.Opts.PingInterval, nc.processPingTimer)
} else {
nc.ptmr.Reset(nc.Opts.PingInterval)
}
}

// Start the readLoop and flusher go routines, we will wait on both on a reconnect event.
nc.wg.Add(2)
go nc.readLoop()
go nc.flusher()

return nil
}
Expand Down Expand Up @@ -1378,15 +1359,20 @@ func (nc *Conn) flushReconnectPendingItems() {
}
}

// Stops the ping timer if set.
// Connection lock is held on entry.
func (nc *Conn) stopPingTimer() {
if nc.ptmr != nil {
nc.ptmr.Stop()
}
}

// Try to reconnect using the option parameters.
// This function assumes we are allowed to reconnect.
func (nc *Conn) doReconnect() {
// We want to make sure we have the other watchers shutdown properly
// here before we proceed past this point.
nc.mu.Lock()
wg := nc.wg
nc.mu.Unlock()
nc.waitForExits(wg)
nc.waitForExits()

// FIXME(dlc) - We have an issue here if we have
// outstanding flush points (pongs) and they were not
Expand All @@ -1406,6 +1392,10 @@ func (nc *Conn) doReconnect() {
nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
}

// This is used to wait on go routines exit if we start them in the loop
// but an error occurs after that.
waitForGoRoutines := false

for len(nc.srvPool) > 0 {
cur, err := nc.selectNextServer()
if err != nil {
Expand Down Expand Up @@ -1433,6 +1423,11 @@ func (nc *Conn) doReconnect() {
} else {
time.Sleep(time.Duration(sleepTime))
}
// If the readLoop, etc.. go routines were started, wait for them to complete.
if waitForGoRoutines {
nc.waitForExits()
waitForGoRoutines = false
}
nc.mu.Lock()

// Check if we have been closed first.
Expand All @@ -1459,6 +1454,9 @@ func (nc *Conn) doReconnect() {
// Process connect logic
if nc.err = nc.processConnectInit(); nc.err != nil {
nc.status = RECONNECTING
// Reset the buffered writer to the pending buffer
// (was set to a buffered writer on nc.conn in createConn)
nc.bw.Reset(nc.pending)
continue
}

Expand All @@ -1476,6 +1474,14 @@ func (nc *Conn) doReconnect() {
nc.err = nc.bw.Flush()
if nc.err != nil {
nc.status = RECONNECTING
// Reset the buffered writer to the pending buffer (bytes.Buffer).
nc.bw.Reset(nc.pending)
// Stop the ping timer (if set)
nc.stopPingTimer()
// Since processConnectInit() returned without error, the
// go routines were started, so wait for them to return
// on the next iteration (after releasing the lock).
waitForGoRoutines = true
continue
}

Expand Down Expand Up @@ -1518,20 +1524,16 @@ func (nc *Conn) processOpErr(err error) {
if nc.Opts.AllowReconnect && nc.status == CONNECTED {
// Set our new status
nc.status = RECONNECTING
if nc.ptmr != nil {
nc.ptmr.Stop()
}
// Stop ping timer if set
nc.stopPingTimer()
if nc.conn != nil {
nc.bw.Flush()
nc.conn.Close()
nc.conn = nil
}

// Reset pending buffers before reconnecting.
if nc.pending == nil {
nc.pending = new(bytes.Buffer)
}
nc.pending.Reset()
// Create pending buffer before reconnecting.
nc.pending = new(bytes.Buffer)
nc.bw.Reset(nc.pending)

go nc.doReconnect()
Expand Down Expand Up @@ -1609,9 +1611,9 @@ func (ac *asyncCallbacksHandler) pushOrClose(f func(), close bool) {
// readLoop() will sit on the socket reading and processing the
// protocol from the server. It will dispatch appropriately based
// on the op type.
func (nc *Conn) readLoop(wg *sync.WaitGroup) {
func (nc *Conn) readLoop() {
// Release the wait group on exit
defer wg.Done()
defer nc.wg.Done()

// Create a parseState if needed.
nc.mu.Lock()
Expand Down Expand Up @@ -1853,9 +1855,9 @@ func (nc *Conn) processAuthorizationViolation(err string) {

// flusher is a separate Go routine that will process flush requests for the write
// bufio. This allows coalescing of writes to the underlying socket.
func (nc *Conn) flusher(wg *sync.WaitGroup) {
func (nc *Conn) flusher() {
// Release the wait group
defer wg.Done()
defer nc.wg.Done()

// snapshot the bw and conn since they can change from underneath of us.
nc.mu.Lock()
Expand Down Expand Up @@ -2986,9 +2988,9 @@ func (nc *Conn) close(status Status, doCBs bool) {
// Clear any queued and blocking Requests.
nc.clearPendingRequestCalls()

if nc.ptmr != nil {
nc.ptmr.Stop()
}
// Stop ping timer if set.
nc.stopPingTimer()
nc.ptmr = nil

// Go ahead and make sure we have flushed the outbound
if nc.conn != nil {
Expand Down
25 changes: 25 additions & 0 deletions nats_test.go
Expand Up @@ -1139,3 +1139,28 @@ func TestConnAsyncCBDeadlock(t *testing.T) {
t.Fatal("Deadlock")
}
}

func TestPingTimerLeakedOnClose(t *testing.T) {
s := RunServerOnPort(TEST_PORT)
defer s.Shutdown()

nc, err := Connect(fmt.Sprintf("nats://127.0.0.1:%d", TEST_PORT))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
nc.Close()
// There was a bug (issue #338) that if connection
// was created and closed quickly, the pinger would
// be created from a go-routine and would cause the
// connection object to be retained until the ping
// timer fired.
// Wait a little bit and check if the timer is set.
// With the defect it would be.
time.Sleep(100 * time.Millisecond)
nc.mu.Lock()
pingTimerSet := nc.ptmr != nil
nc.mu.Unlock()
if pingTimerSet {
t.Fatal("Pinger timer should not be set")
}
}

0 comments on commit 35b4f0c

Please sign in to comment.