Skip to content

Commit

Permalink
runtime: speed up receive on empty closed channel
Browse files Browse the repository at this point in the history
Currently, nonblocking receive on an open channel is about
700 times faster than nonblocking receive on a closed channel.
This change makes closed channels equally fast.

Fixes golang#32529

relevant benchstat output:
name                       old time/op    new time/op    delta
MakeChan/Byte-40            140ns ± 4%     137ns ± 7%   -2.38%  (p=0.023 n=17+19)
MakeChan/Int-40             174ns ± 5%     173ns ± 6%     ~     (p=0.437 n=18+19)
MakeChan/Ptr-40             315ns ±15%     301ns ±15%     ~     (p=0.051 n=20+20)
MakeChan/Struct/0-40        123ns ± 8%      99ns ±11%  -19.18%  (p=0.000 n=20+17)
MakeChan/Struct/32-40       297ns ± 8%     241ns ±18%  -19.13%  (p=0.000 n=20+20)
MakeChan/Struct/40-40       344ns ± 5%     273ns ±23%  -20.49%  (p=0.000 n=20+20)
ChanNonblocking-40         0.32ns ± 2%    0.32ns ± 2%   -1.25%  (p=0.000 n=19+18)
SelectUncontended-40       5.72ns ± 1%    5.71ns ± 2%     ~     (p=0.326 n=19+19)
SelectSyncContended-40     10.9µs ±10%    10.6µs ± 3%   -2.77%  (p=0.009 n=20+16)
SelectAsyncContended-40    1.00µs ± 0%    1.10µs ± 0%  +10.75%  (p=0.000 n=18+19)
SelectNonblock-40          1.22ns ± 2%    1.21ns ± 4%     ~     (p=0.141 n=18+19)
ChanUncontended-40          240ns ± 4%     233ns ± 4%   -2.82%  (p=0.000 n=20+20)
ChanContended-40           86.7µs ± 0%    82.7µs ± 0%   -4.64%  (p=0.000 n=20+19)
ChanSync-40                 294ns ± 7%     284ns ± 9%   -3.44%  (p=0.006 n=20+20)
ChanSyncWork-40            38.4µs ±19%    34.0µs ± 4%  -11.33%  (p=0.000 n=20+18)
ChanProdCons0-40           1.50µs ± 1%    1.63µs ± 0%   +8.53%  (p=0.000 n=19+19)
ChanProdCons10-40          1.17µs ± 0%    1.18µs ± 1%   +0.44%  (p=0.000 n=19+20)
ChanProdCons100-40          985ns ± 0%     959ns ± 1%   -2.64%  (p=0.000 n=20+20)
ChanProdConsWork0-40       1.50µs ± 0%    1.60µs ± 2%   +6.54%  (p=0.000 n=18+20)
ChanProdConsWork10-40      1.26µs ± 0%    1.26µs ± 2%   +0.40%  (p=0.015 n=20+19)
ChanProdConsWork100-40     1.27µs ± 0%    1.22µs ± 0%   -4.15%  (p=0.000 n=20+19)
SelectProdCons-40          1.50µs ± 1%    1.53µs ± 1%   +1.95%  (p=0.000 n=20+20)
ChanCreation-40            82.1ns ± 5%    81.6ns ± 7%     ~     (p=0.483 n=19+19)
ChanSem-40                  877ns ± 0%     719ns ± 0%  -17.98%  (p=0.000 n=18+19)
ChanPopular-40             1.75ms ± 2%    1.78ms ± 3%   +1.76%  (p=0.002 n=20+19)
ChanClosed-40               215ns ± 1%       0ns ± 6%  -99.82%  (p=0.000 n=20+18)

Change-Id: I6d5ca4f1530cc9e1a9f3ef553bbda3504a036448
Reviewed-on: https://go-review.googlesource.com/c/go/+/181543
Run-TryBot: Keith Randall <khr@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Keith Randall <khr@golang.org>
  • Loading branch information
Ben Schwartz authored and randall77 committed Oct 7, 2019
1 parent a0894ea commit e1446d9
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 19 deletions.
76 changes: 57 additions & 19 deletions src/runtime/chan.go
Expand Up @@ -121,6 +121,21 @@ func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

// full reports whether a send on c would block (that is, the channel is full).
// It uses a single word-sized read of mutable state, so although
// the answer is instantaneously true, the correct answer may have changed
// by the time the calling function receives the return value.
func full(c *hchan) bool {
// c.dataqsiz is immutable (never written after the channel is created)
// so it is safe to read at any time during channel operation.
if c.dataqsiz == 0 {
// Assumes that a pointer read is relaxed-atomic.
return c.recvq.first == nil
}
// Assumes that a uint read is relaxed-atomic.
return c.qcount == c.dataqsiz
}

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
Expand Down Expand Up @@ -160,7 +175,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
Expand All @@ -169,9 +184,10 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
if !block && c.closed == 0 && full(c) {
return false
}

Expand Down Expand Up @@ -400,6 +416,16 @@ func closechan(c *hchan) {
}
}

// empty reports whether a read from c would block (that is, the channel is
// empty). It uses a single atomic read of mutable state.
func empty(c *hchan) bool {
// c.dataqsiz is immutable.
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
return atomic.Loaduint(&c.qcount) == 0
}

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
Expand Down Expand Up @@ -435,21 +461,33 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}

var t0 int64
Expand Down
14 changes: 14 additions & 0 deletions src/runtime/chan_test.go
Expand Up @@ -1126,6 +1126,20 @@ func BenchmarkChanPopular(b *testing.B) {
wg.Wait()
}

func BenchmarkChanClosed(b *testing.B) {
c := make(chan struct{})
close(c)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
select {
case <-c:
default:
b.Error("Unreachable")
}
}
})
}

var (
alwaysFalse = false
workSink = 0
Expand Down

0 comments on commit e1446d9

Please sign in to comment.