- 12 Concurrency: Channel
Channel 可以想像是一個資料的通道 (pipe),一頭是 write,另一頭是 read,資料順序是 FIFO (First In First Out)。通常用在 goroutine 間資料交換。channel 是 thread-safe,因此可以同時讀寫 channel。
channel 的注意事項:
- 用
make
與chan
關鍵字來產生一個 channel,不用時,要用close
關閉。 - 一個 channel 只能包含一種 data type
- channel 當作參數傳給 function 時,最好指定是要做 read or write。
@import "ex12_01/main.go" {class=line-numbers}
說明:
-
c := make(chan int)
: 產生一個 channel 且 data type 是int
。並defer close(c)
確保 channel 會被關閉。 -
go readChannel(c)
: goroutine 執行 readChannel。func readChannel(c <-chan int) { log.Println("reading from channel") defer waitGroup.Done() x := <-c log.Println("read: ", x) }
注意:
c <-chan
,是 read only channel -
go writeChannel(c, 10)
: goroutine 執行 writeChannel。func writeChannel(c chan<- int, x int) { defer waitGroup.Done() log.Println("writing ", x) c <- x log.Println("wrote ", x) }
注意:
c chan<- int
是 write only channel。
c := make(chan int)
宣告時,沒有指定 channel 的容量,因此在 read/write 時,會 block。在上例中,因為是用 goroutine 執行, 所以不會有問題。
@import "ex12_02/main.go" {class=line-numbers}
執行結果,發生 deadlock:
2020/01/16 13:54:02 writing...
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
main.go:13 +0xdb
exit status 2
此時,可以設定 channel 的容量,eg: c := make(chan int, 1)
。則結果如下:
@import "ex12_03/main.go" {class=line-numbers highlight="8"}
2020/01/16 13:57:29 writing...
2020/01/16 13:57:29 written
2020/01/16 13:57:29 reading
2020/01/16 13:57:29 read 10
2020/01/16 13:57:29 exit...
先執 write,資料放在 channel,供之後 read。
但如果程式的順序,改成先 read 再 write 時,一樣會發生 deadlock。因為還沒寫資料,根本沒資料供 read。
@import "ex12_04/main.go" {class="line-numbers"}
結果:
2020/01/16 13:58:46 reading
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
main.go:13 +0xe2
exit status 2
Producer/Consumer 是 channel 最常用的實作模型。概念是一端產出資料 (可能是從資料庫或大檔案讀取資料),另一端運算資料。
@import "ex12_05/main.go" {.line-numbers}
與先前的範例最大不同是,這次關閉 channel 是在 producer
執行,而非主程序,也就是說在產生完資料後,就關閉 channel,之後就不能再寫入。而 consumer
端,在 channel 資料讀完後,就會跳出 for-range 的迴圈而執行完畢。
如果不在 producer
關閉 channel,而是在主程序,則會發生 deadlock。
@import "ex12_06/main.go" {class="line-numbers"}
結果:
2020/01/16 14:02:31 start...
2020/01/16 14:02:31 comsumer 2 starting...
2020/01/16 14:02:31 comsumer 1 starting...
2020/01/16 14:02:31 producer start...
2020/01/16 14:02:31 1 got 2
...
2020/01/16 14:02:31 1 got 98
2020/01/16 14:02:31 producer end and close channel
2020/01/16 14:02:31 1 got 99
2020/01/16 14:02:31 2 got 97
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x1199b68)
/usr/local/go/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0x1199b60)
/usr/local/go/src/sync/waitgroup.go:130 +0x64
main.main()
main.go:48 +0x198
goroutine 19 [chan receive]:
main.consumer(0x1, 0xc000060060)
main.go:27 +0x1fd
created by main.main
main.go:43 +0x144
goroutine 20 [chan receive]:
main.consumer(0x2, 0xc000060060)
main.go:27 +0x1fd
created by main.main
main.go:46 +0x188
exit status 2
Actor Pattern 與 Producer/Consumer Pattern 類似,概念是每一個 Actor 只負責固定的工作。Producer 必須將資料,傳到每個 Actor。以下的範例,是模擬訂單成立後,傳給兩個 Actor,一個負責計算每個分類的業績,另一個計算全站的業績。
@import "ex12_07/main.go" {class="line-numbers"}
說明:
Producer
: 負責模擬產生 100 筆訂單後,往後送給 consumer actor 處理。最後再關閉 consumer actor 的 channel,讓程式可以執行完畢。CategorySum
: 負責主要統計每個分類的業績。SiteSum
: 負責統計全站業績
可以透過 select
來偵測 channel 是否可以被寫入及是否有資料可以讀取。select
可以撘配 time.After
來實作 timeout 的機制。
@import "ex12_08/main.go" {class=line-numbers}
package main
import (
"log"
"math/rand"
"time"
)
func createNumber(max int, randomChannel chan<- int, finishChannel <-chan bool) {
for {
select {
case randomChannel <- rand.Intn(max):
time.Sleep(1 * time.Second)
case x := <-finishChannel:
log.Println("finish channel got ", x)
if x {
close(randomChannel)
log.Println("createNumber end")
return
}
}
}
}
func readNumber(randomChannel <-chan int) {
for {
select {
case x, ok := <-randomChannel:
if !ok {
log.Println("readNumber end")
return
}
log.Println("random channel got ", x)
case <-time.After(500 * time.Millisecond):
log.Println("time out")
}
}
}
func main() {
rand.Seed(time.Now().Unix())
randomChannel := make(chan int)
finishChannel := make(chan bool)
go createNumber(100, randomChannel, finishChannel)
go readNumber(randomChannel)
time.Sleep(2 * time.Second)
finishChannel <- false
time.Sleep(3 * time.Second)
finishChannel <- true
time.Sleep(1 * time.Second)
close(finishChannel)
log.Println("end")
}
for { }
: 無窮迴圈select - case
: 使用select
來偵測 channel 狀態。case randomChannel <- rand.Intn(max)
: 對randomChannel
寫入資料x := <-finishChannel
: 從finishChannel
讀取資料,如果為true
則關閉randomChannel
並結束select - case
迴圈。
for { }
: 無窮迴圈select - case
: 使用select
來偵測 channel 狀態。case x, ok := <-randomChannel
: 從randomChannel
讀取資料,這邊與先前從 channel 讀資料不同,多了一個ok
來判斷 channel 是否已經被關閉了。如果randomChannel
已被關閉,則跳出迴圈。case <-time.After(500 * time.Millisecond)
: Timeout 機制,如果 500 ms 內,randomChannel 一直沒有資料寫入的話,則會觸發。
- 初始化 channel 及 goroutine.
- 先停 2 sec. 後,先對
finishChannel
寫入false
,此時不會中止所有活動,但finishChannel
會得到一個false
值。 - 再停 3 sec. 後,再對
finishChannel
寫入true
,此時會中斷createNumber
的迴圈,且randomChannel
會被關閉。 randomChannel
被關閉後,readNumber
會偵測到randomChannel
被關閉,而中斷readNumber
迴圈。- 再停 1 sec. 關閉
finishChannel
。
2020/01/16 14:08:28 random channel got 98
2020/01/16 14:08:29 time out
2020/01/16 14:08:29 random channel got 65
2020/01/16 14:08:30 time out
2020/01/16 14:08:30 random channel got 33
2020/01/16 14:08:31 time out
2020/01/16 14:08:31 finish channel got false
2020/01/16 14:08:31 random channel got 92
2020/01/16 14:08:32 time out
2020/01/16 14:08:32 random channel got 46
2020/01/16 14:08:33 time out
2020/01/16 14:08:33 random channel got 57
2020/01/16 14:08:34 time out
2020/01/16 14:08:34 random channel got 57
2020/01/16 14:08:35 time out
2020/01/16 14:08:35 finish channel got true
2020/01/16 14:08:35 createNumber end
2020/01/16 14:08:35 readNumber end
2020/01/16 14:08:36 end