Skip to content

Commit

Permalink
Avoid mutating the channel map while ranging over it in Connection.Sh…
Browse files Browse the repository at this point in the history
…utdown().
  • Loading branch information
domodwyer committed May 15, 2017
1 parent 6063341 commit a79bd14
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
30 changes: 17 additions & 13 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,38 +381,42 @@ func (c *Connection) shutdown(err *Error) {

c.destructor.Do(func() {
c.m.Lock()
closes := make([]chan *Error, len(c.closes))
copy(closes, c.closes)
c.m.Unlock()
defer c.m.Unlock()

if err != nil {
for _, c := range closes {
for _, c := range c.closes {
c <- err
}
}

for _, ch := range c.channels {
c.closeChannel(ch, err)
}

if err != nil {
c.errors <- err
}
// Shutdown handler goroutine can still receive the result.
close(c.errors)

c.conn.Close()

for _, c := range closes {
for _, c := range c.closes {
close(c)
}

for _, c := range c.blocks {
close(c)
}

c.m.Lock()
// Shutdown the channel, but do not use closeChannel() as it calls
// releaseChannel() which requires the connection lock.
//
// Ranging over c.channels and calling releaseChannel() that mutates
// c.channels is racy - see commit 6063341 for an example.
for _, ch := range c.channels {
ch.shutdown(err)
}

c.conn.Close()

c.channels = map[uint16]*Channel{}
c.allocator = newAllocator(1, c.Config.ChannelMax)
c.noNotify = true
c.m.Unlock()
})
}

Expand Down
5 changes: 5 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net"
"sync"
"testing"
"time"
)

func TestRequiredServerLocale(t *testing.T) {
Expand Down Expand Up @@ -68,6 +69,8 @@ func TestChannelOpenOnAClosedConnectionFails_ReleasesAllocatedChannel(t *testing
// See https://github.com/streadway/amqp/issues/251 - thanks to jmalloc for the
// test case.
func TestRaceBetweenChannelAndConnectionClose(t *testing.T) {
defer time.AfterFunc(10*time.Second, func() { panic("Close deadlock") }).Stop()

conn := integrationConnection(t, "allocation/shutdown race")

go conn.Close()
Expand All @@ -88,6 +91,8 @@ func TestRaceBetweenChannelAndConnectionClose(t *testing.T) {
// See https://github.com/streadway/amqp/pull/253#issuecomment-292464811 for
// more details - thanks to jmalloc again.
func TestRaceBetweenChannelShutdownAndSend(t *testing.T) {
defer time.AfterFunc(10*time.Second, func() { panic("Close deadlock") }).Stop()

conn := integrationConnection(t, "channel close/send race")
defer conn.Close()

Expand Down

0 comments on commit a79bd14

Please sign in to comment.