Skip to content

Commit

Permalink
Fix data races in tcp.Mux and tcp.listener
Browse files Browse the repository at this point in the history
This fixes two data races around concurrent calls to (*tcp.Mux).Close
and (*tcp.Mux).handleConn, discovered in Enterprise test suites.
  • Loading branch information
mark-rushakoff committed Jan 19, 2018
1 parent f46342e commit 4b1a35f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
- [#9255](https://github.com/influxdata/influxdb/issues/9255): Fix missing sorting of blocks by time when compacting.
- [#9327](https://github.com/influxdata/influxdb/pull/9327): wal: update lastWriteTime behavior
- [#9290](https://github.com/influxdata/influxdb/issues/9290): Fix regression to allow binary operations on literals.
- [#9342](https://github.com/influxdata/influxdb/pull/9342): Fix data races in tcp.Mux and tcp.listener

## v1.4.3 [unreleased]

Expand Down
105 changes: 77 additions & 28 deletions tcp/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,25 @@ func (mux *Mux) Serve(ln net.Listener) error {
// Wait for all connections to be demux
mux.wg.Wait()

mux.mu.Lock()
// Concurrently close all registered listeners.
// Because mux.m is keyed by byte, in the worst case we would spawn 256 goroutines here.
var wg sync.WaitGroup
mux.mu.RLock()
for _, ln := range mux.m {
close(ln.c)
wg.Add(1)
go func(ln *listener) {
defer wg.Done()
ln.Close()
}(ln)
}
mux.m = nil
mux.mu.Unlock()

if mux.defaultListener != nil {
close(mux.defaultListener.c)
mux.mu.RUnlock()
wg.Wait()

mux.mu.RLock()
dl := mux.defaultListener
mux.mu.RUnlock()
if dl != nil {
dl.Close()
}

return err
Expand Down Expand Up @@ -127,7 +137,10 @@ func (mux *Mux) handleConn(conn net.Conn) {
}

// Retrieve handler based on first byte.
mux.mu.RLock()
handler := mux.m[typ[0]]
mux.mu.RUnlock()

if handler == nil {
if mux.defaultListener == nil {
conn.Close()
Expand All @@ -142,31 +155,25 @@ func (mux *Mux) handleConn(conn net.Conn) {
handler = mux.defaultListener
}

// Send connection to handler. The handler is responsible for closing the connection.
timer := time.NewTimer(mux.Timeout)
defer timer.Stop()

select {
case handler.c <- conn:
case <-timer.C:
conn.Close()
mux.Logger.Printf("tcp.Mux: handler not ready: %d. Connection from %s closed", typ[0], conn.RemoteAddr())
return
}
handler.HandleConn(conn, typ[0])
}

// Listen returns a listener identified by header.
// Any connection accepted by mux is multiplexed based on the initial header byte.
func (mux *Mux) Listen(header byte) net.Listener {
mux.mu.Lock()
defer mux.mu.Unlock()

// Ensure two listeners are not created for the same header byte.
if _, ok := mux.m[header]; ok {
panic(fmt.Sprintf("listener already registered under header byte: %d", header))
}

// Create a new listener and assign it.
ln := &listener{
c: make(chan net.Conn),
mux: mux,
c: make(chan net.Conn),
done: make(chan struct{}),
mux: mux,
}
mux.m[header] = ln

Expand Down Expand Up @@ -196,10 +203,13 @@ func (mux *Mux) release(ln *listener) bool {
// with registered listener bytes and the first character of the HTTP request:
// 71 ('G') for GET, etc.
func (mux *Mux) DefaultListener() net.Listener {
mux.mu.Lock()
defer mux.mu.Unlock()
if mux.defaultListener == nil {
mux.defaultListener = &listener{
c: make(chan net.Conn),
mux: mux,
c: make(chan net.Conn),
done: make(chan struct{}),
mux: mux,
}
}

Expand All @@ -208,27 +218,66 @@ func (mux *Mux) DefaultListener() net.Listener {

// listener is a receiver for connections received by Mux.
type listener struct {
c chan net.Conn
mux *Mux

// The done channel is closed before taking a lock on mu to close c.
// That way, anyone holding an RLock can release the lock by receiving from done.
done chan struct{}

mu sync.RWMutex
c chan net.Conn
}

// Accept waits for and returns the next connection to the listener.
func (ln *listener) Accept() (c net.Conn, err error) {
conn, ok := <-ln.c
if !ok {
func (ln *listener) Accept() (net.Conn, error) {
ln.mu.RLock()
defer ln.mu.RUnlock()

select {
case <-ln.done:
return nil, errors.New("network connection closed")
case conn := <-ln.c:
return conn, nil
}
return conn, nil
}

// Close removes this listener from the parent mux and closes the channel.
func (ln *listener) Close() error {
if ok := ln.mux.release(ln); ok {
close(ln.c)
// Close done to signal to any RLock holders to release their lock.
close(ln.done)

// Hold a lock while reassigning ln.c to nil
// so that attempted sends or receives will block forever.
ln.mu.Lock()
ln.c = nil
ln.mu.Unlock()
}
return nil
}

// HandleConn handles the connection, if the listener has not been closed.
func (ln *listener) HandleConn(conn net.Conn, handlerID byte) {
ln.mu.RLock()
defer ln.mu.RUnlock()

// Send connection to handler. The handler is responsible for closing the connection.
timer := time.NewTimer(ln.mux.Timeout)
defer timer.Stop()

select {
case <-ln.done:
// Receive will return immediately if ln.Close has been called.
conn.Close()
case ln.c <- conn:
// Send will block forever if ln.Close has been called.
case <-timer.C:
conn.Close()
ln.mux.Logger.Printf("tcp.Mux: handler not ready: %d. Connection from %s closed", handlerID, conn.RemoteAddr())
return
}
}

// Addr returns the Addr of the listener
func (ln *listener) Addr() net.Addr {
if ln.mux == nil {
Expand Down

0 comments on commit 4b1a35f

Please sign in to comment.