# CSP
**通信顺序进程，Communicating Sequential Process**

- Tony Hoare, 1977
- A concurrent programming language
- A process calculi/algebra

- 不关注发送消息的**process（进程/主体）**
- 关注发送消息时使用的**channel（通道）**

- process类似于Actor模型中的Actor，channel类似于Mailbox
- channel是一级对象，不与process紧耦合，而Mailbox是绑定至Actor的
- channel可以单独创建和读写，并在process间传递
- process可以没有实体，可以订阅任意个channel或向任意个channel发送消息

# Go语言中的Goroutine和Channel

CSP模型的一种**语言级**实现（不完全，变体）

- 消息通过channel在goroutine之间传递
- goroutine是process，是主体/执行体
- channel是通道

传统进程模型中上下文切换的开销：
- 存储和恢复所有寄存器的内容
- 刷新TLB cache（改变虚拟地址到物理地址的映射）
- OS调度选择下一个进程

传统线程模型：
- 类似于进程模型
- 但共享内存地址空间，上下文切换开销小了一点

# Goroutine

```Go
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
```

- 可动态增长的栈，初始很小（比如2KB，相比于线程栈典型大小2MB）
- M:N调度，M个goroutine复用N个OS线程
- GOMAXPROCS决定OS线程数
- goroutine没有标识（id）

## 调度

**协作式调度（cooperative scheduling）**，不同于进程或线程模型的时间分片式或抢占式调度

> 主动式调度，不同于被动式调度
>
> 知道的更多，因而更优地调度

确定性的调度点：
- channel的发送和接收操作，如果阻塞
- go语句，尽管新goroutine不一定立即被调度到
- 阻塞式的系统调用，如文件/网络IO
- 垃圾回收时

- M:1调度，上下文切换快，但不能有效利用多核
- 1:1调度，上下文切换代价太大
- M:N调度，上下文切换快，且能有效利用多核

M: 线程，P: 调度上下文/调度处理器，G: goroutine（蓝色正运行，灰色待调度）

- P的个数等于GOMAXPROCS
- 每个P都有一个runqueue，待调度的goroutine被队列到runqueue
- 调度实际上就是P从runqueue取出一个goroutine，切换上下文，并执行它


![](go-scheduler-in-motion.jpg)

当某个goroutine在某个线程上发生阻塞系统调用时：

- 调度处理器被卸载掉，并转移到另一个线程上（可以新建，也可以从线程池中取出）
- 当原goroutine从阻塞调用返回时，会试图从其它线程“偷”一个调度处理器
- 如果“偷”失败了，则将它放入**全局runqueue**，而原线程返回给线程池
- 调度处理器会在自己的runqueue为空时和周期检查时，从全局runqueue取出goroutine

![](go-scheduler-syscall.jpg)

- 如果全局runqueue也为空，则调度处理器会从其它调度处理器“偷”来**一半**数目的goroutine

> 达成一种“负载均衡”

![](go-scheduler-steal.jpg)

## 栈

- 进程虚拟地址空间中栈和堆，以及保护页

![](guard-page.png)

- 每个线程都有自己的栈和保护页
- 线程栈大小固定，不能太小，也不能太大
- 线程越多，则虚拟地址空间被浪费越多

![](threads.png)

- 每个goroutine的栈初始很小（比如2KB），从**heap**分配
- 不使用保护页，而是在每个函数调用时检查栈大小是否够用
- 如果栈小了，则分配更大的内存，拷贝到新栈，释放原来的栈
- 如果栈大了，则一部分会被垃圾回收

![](goroutine-stack-growth.png)

# Channel

```Go
ch := make(chan int) // 创建channel，收发int类型消息

ch <- x // 发送消息
x = <-ch // 接收并赋值消息
<-ch // 接收但丢弃消息

close(ch) // 关闭channel
```

```Go
ch = make(chan int) // 无缓冲channel
ch = make(chan int, 0) // 无缓冲channel
ch = make(chan int, 3) // 缓冲channel，容量为3
```

**无缓冲channel**，也称为**同步channel**
> 在无缓冲channel上的一次发送和接收操作
>
> 导致发送和接收goroutine做了一次同步操作
>
> 接收者收到数据发生在唤醒发送者goroutine之前（happens before）

**缓冲channel**，可以看作是**消息队列**

![](empty-buffered-channel.png)

In [20]:
import (
    "net/http"
    "time"
)

request := func(url string) *http.Response {
    rep, _ := http.Get(url)
    return rep
}

responses := make(chan *http.Response, 3)
go func() { responses <- request("https://www.baidu.com") }()
go func() { responses <- request("https://www.taobao.com") }()
go func() { responses <- request("http://www.qq.com") }()
go func() { time.Sleep(100*time.Millisecond); close(responses); }()

for r := range responses {
    fmt.Printf("%s: %s\n", r.Request.URL, r.Status)
}

http://www.qq.com: 200 OK
https://www.baidu.com: 200 OK
https://www.taobao.com: 200 OK


## 串联channel（pipeline）

![](three-stage-pipeline.png)

In [21]:
naturals := make(chan int)
squares := make(chan int)

// Counter
go func() {
    for x := 0; x < 10; x++ {
        naturals <- x
    }
    close(naturals)
}()

// Squarer
go func() {
    for x := range naturals {
        squares <- x * x
    }
    close(squares)
}()

// Printer (in main goroutine)
for x := range squares {
    fmt.Printf("%d ", x)
}

0 1 4 9 16 25 36 49 64 81 

## 单向channel
- 只能发送：`chan<- int`
- 只能接收：`<-chan int`

In [22]:
counter := func(out chan<- int) {
    for x := 0; x < 10; x++ {
        out <- x
    }
    close(out)
}

squarer := func(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}

printer := func(in <-chan int) {
    for v := range in {
        fmt.Printf("%d ", v)
    }
}

naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)

0 1 4 9 16 25 36 49 64 81 

## 多路复用：select

```Go
select {
case <-ch1: // 接收并丢弃
// ...
case x := <-ch2: // 接收
// ...use x...
case ch3 <- y: // 发送
// ...
default: // 不阻塞，可以用于实现轮询channel
// ...
}
```

一发一收，依次执行

In [23]:
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
    select {
    case x := <-ch:
        fmt.Printf("%d ", x) // "0" "2" "4" "6" "8"
    case ch <- i:
    }
}

0 2 4 6 8 

当多个分支同时满足时，**随机**选择一个分支执行

In [24]:
ch := make(chan int, 2)
for i := 0; i < 10; i++ {
    select {
    case x := <-ch:
        fmt.Printf("%d ", x) // "0" "2" "4" "6" "8"
    case ch <- i:
    }
}

0 1 3 6 7 

非阻塞，轮询（从来不应该这么用）

In [1]:
import (
    "time"
)

func() {
    ch := make(chan uint64)

    go func() {
        ch <- 1
        ch <- 1
    }()

    go func() {
        ch <- 1
        ch <- 1
    }()

    var count uint64
    for {
        select {
        case x := <-ch:
            count += x
            fmt.Println(count)
            if count == 4 {
                fmt.Println("Bye")
                return
            }
        default:
            fmt.Println("poll")
            time.Sleep(300*time.Nanosecond)
        }
    }
}()

poll
poll
poll
1
2
poll
poll
3
4
Bye


实现请求的超时机制

In [26]:
import (
    "net/http"
    "time"
)

request := func(url string) *http.Response {
    rep, _ := http.Get(url)
    return rep
}

responses := make(chan *http.Response, 3)
go func() { responses <- request("https://www.baidu.com") }()
go func() { responses <- request("https://www.taobao.com") }()
go func() { responses <- request("http://www.qq.com") }()

go func() {
    select {
        case <-time.After(100*time.Millisecond): // 100毫秒超时
        close(responses)
    }
}()

func() {
    for {
        select {
        case r := <-responses:
            if r == nil {
                return
            }
            fmt.Printf("%s: %s\n", r.Request.URL, r.Status)
        }
    }
}()

http://www.qq.com: 200 OK
https://www.taobao.com: 200 OK
https://www.baidu.com: 200 OK


## 广播并发退出：关闭channel

In [27]:
import "time"

ch := make(chan int)

go func() {
    ch <- 1
    ch <- 2
}()

go func() {
    ch <- 3
    ch <- 4
}()

go func() {
    time.Sleep(10*time.Millisecond)
    close(ch)
}()

print := func() {
    FOR:
    for {
        select {
        case x := <-ch:
            if x == 0 {
                fmt.Printf("Bye\n")
                break FOR
            }
            fmt.Println(x)
        }
    }
}

go print()
go print()

time.Sleep(100*time.Millisecond)

1
3
4
2
Bye
Bye


# 总结

简单，只有goroutine和channel

强大，灵活运用goroutine和channel，搭配原子变量/互斥体/同步设施/条件变量，**才能**写出千变万化的并发程序

依然是那句老话，高级抽象减少心智负担，但是**从来没有银弹！**