# 并行计算简介 （二）

日期： 10.21

作者： 陈久宁

大纲:

- 进程模型
- 进程间通信

Julia 的多进程主要基于 [Distributed](https://docs.julialang.org/en/v1/manual/distributed-computing/) 这个标准库。

In [1]:
using Distributed, SharedArrays
using BenchmarkTools

## 多进程

每一个进程是一个独立的计算单元： 它有独立的计算资源和独立的内存资源。 不同于多线程互相共享内存的模式， 多进程的内存共享必须通过手动通信的方式来实现。

通讯网络的拓扑模型可以很简单也可以很复杂，最简单的两种模型是：

- all-to-all: 任意两个节点之间可以互相通信
- master-worker: 存在中心节点 master， 其他所有节点都只能与 master 节点通信. Mapreduce 就是典型的 master-worker 模型

![](master_worker_model.png)

一些查询进程状态的函数

In [3]:
@show Distributed.nprocs()
@show Distributed.procs()
@show Distributed.workers()
@show Distributed.myid()

Distributed.nprocs() = 1
Distributed.procs() = [1]
Distributed.workers() = [1]
Distributed.myid() = 1


1

增加或者删除进程可以通过 `addprocs` 和 `rmprocs` 来实现. 在启动 Julia 的时候也可以通过 `-p` 参数来设定，例如 `julia -p 4` 或者 `julia -p auto`

In [4]:
Distributed.addprocs(5)

5-element Vector{Int64}:
 2
 3
 4
 5
 6

In [5]:
@show Distributed.nprocs()
@show Distributed.procs()
@show Distributed.workers()
@show Distributed.myid()

Distributed.nprocs() = 6
Distributed.procs() = [1, 2, 3, 4, 5, 6]
Distributed.workers() = [2, 3, 4, 5, 6]
Distributed.myid() = 1


1

In [6]:
Distributed.rmprocs(2)

Task (done) @0x000000017b9171f0

In [7]:
@show Distributed.nprocs()
@show Distributed.procs()
@show Distributed.workers()
@show Distributed.myid()

Distributed.nprocs() = 5
Distributed.procs() = [1, 3, 4, 5, 6]
Distributed.workers() = [3, 4, 5, 6]
Distributed.myid() = 1


1

## `@distributed`、`SharedArray` 与 `@everywhere`

类似于 `@threads`，存在 `@distributed` 来对 for 循环实现基本的多进程.

由于内存不共享，所以像下面这种操作是无法正常工作的。这是因为每一个进程都保留了一份 `a` 的矩阵。

In [102]:
a = zeros(10)
@distributed for i = 1:10
    a[i] = Distributed.myid()
end
a

      From worker 6:	6
      From worker 6:	6
      From worker 5:	5
      From worker 3:	3
      From worker 5:	5
      From worker 3:	3
      From worker 4:	4
      From worker 3:	3
      From worker 4:	4
      From worker 4:	4


10-element Vector{Float64}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

可以使用 `SharedArray` 来共享：这个矩阵类型将具体的通讯细节隐藏在背后了

In [104]:
a = SharedArray{Float64}(10)
t = @distributed for i = 1:10
    a[i] = Distributed.myid()
end
wait(t)
a

10-element SharedVector{Float64}:
 3.0
 3.0
 3.0
 4.0
 4.0
 4.0
 5.0
 5.0
 6.0
 6.0

Note:

- `SharedArray`: 所有进程共享一个全局的内存空间，背后会发生隐式的数据同步。这其实
- `DistributedArray`: 每个进程仅仅拥有一小块数据的写权限，以及其他所有数据的读权限。

对于典型的 `mapreduce` 运算来说，`@distributed` 也提供了简单的接口

In [32]:
rst = @distributed (+) for i = 1:10
    i*i
end

rst == mapreduce(i->i*i, +, 1:10)

true

不同于多线程计算， 多进程中因为内存数据不共享， 因此函数也需要在每个进程上都有定义才行。 `@everywhere` 宏的意思是在每个进程上都执行对应的操作。

`@everywhere` 常用在函数定义和 `include` 上:

```julia
@everywhere function foo()
    ...
end

@everywhere include(...)
@everywhere using SomePackage
```

In [106]:
f1(x) = x*x # f1 仅在 master node 上有定义
rst = @distributed (+) for i = 1:10
    f1(i)
end

LoadError: TaskFailedException

[91m    nested task error: [39mOn worker 3:
    UndefVarError: #f1 not defined
    Stacktrace:
      [1] [0m[1mdeserialize_datatype[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:1332[24m[39m
      [2] [0m[1mhandle_deserialize[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:854[24m[39m
      [3] [0m[1mdeserialize[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:801[24m[39m
      [4] [0m[1mhandle_deserialize[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:861[24m[39m
      [5] [0m[1mdeserialize[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:801[24m[39m[90m [inlined][39m
      [6] [0m[1mdeserialize_global_from_main[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mclusterserialize.jl:160[24m[39m
      [7] [0m[1m#3[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mclusterserialize.jl:72[24m[39m[90m [inlined][39m
      [8] [0m[1mforeach[22m
    [90m    @ [39m[90m./[39m[90m[4mabstractarray.jl:2694[24m[39m
      [9] [0m[1mdeserialize[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mclusterserialize.jl:72[24m[39m
     [10] [0m[1mhandle_deserialize[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:947[24m[39m
     [11] [0m[1mdeserialize[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:801[24m[39m
     [12] [0m[1mhandle_deserialize[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:858[24m[39m
     [13] [0m[1mdeserialize[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:801[24m[39m
     [14] [0m[1mhandle_deserialize[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:861[24m[39m
     [15] [0m[1mdeserialize[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Serialization/src/[39m[90m[4mSerialization.jl:801[24m[39m[90m [inlined][39m
     [16] [0m[1mdeserialize_msg[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mmessages.jl:87[24m[39m
     [17] [0m[1m#invokelatest#2[22m
    [90m    @ [39m[90m./[39m[90m[4messentials.jl:716[24m[39m[90m [inlined][39m
     [18] [0m[1minvokelatest[22m
    [90m    @ [39m[90m./[39m[90m[4messentials.jl:714[24m[39m[90m [inlined][39m
     [19] [0m[1mmessage_handler_loop[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mprocess_messages.jl:169[24m[39m
     [20] [0m[1mprocess_tcp_streams[22m
    [90m    @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mprocess_messages.jl:126[24m[39m
     [21] [0m[1m#99[22m
    [90m    @ [39m[90m./[39m[90m[4mtask.jl:411[24m[39m
    Stacktrace:
     [1] [0m[1mremotecall_fetch[22m[0m[1m([22m::[0mFunction, ::[0mDistributed.Worker, ::[0mFunction, ::[0mVararg[90m{Any}[39m; [90mkwargs[39m::[0mBase.Pairs[90m{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}}[39m[0m[1m)[22m
    [90m   @ [39m[35mDistributed[39m [90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mremotecall.jl:449[24m[39m
     [2] [0m[1mremotecall_fetch[22m
    [90m   @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mremotecall.jl:441[24m[39m[90m [inlined][39m
     [3] [0m[1m#remotecall_fetch#158[22m
    [90m   @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mremotecall.jl:476[24m[39m[90m [inlined][39m
     [4] [0m[1mremotecall_fetch[22m
    [90m   @ [39m[90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mremotecall.jl:476[24m[39m[90m [inlined][39m
     [5] [0m[1m(::Distributed.var"#169#170"{typeof(+), var"#167#168", UnitRange{Int64}, Vector{UnitRange{Int64}}, Int64, Int64})[22m[0m[1m([22m[0m[1m)[22m
    [90m   @ [39m[35mDistributed[39m [90m/Applications/Julia-1.7.app/Contents/Resources/julia/share/julia/stdlib/v1.7/Distributed/src/[39m[90m[4mmacros.jl:270[24m[39m

In [113]:
@everywhere f2(x) = x*x # 在每个进程上都定义 f2
rst = @distributed (+) for i = 1:10
    f2(i)
end

      From worker 4:	tmp = 0.9308592623868832


385

## 多进程任务模型

[MPI (message passing interface)](https://en.wikipedia.org/wiki/Message_Passing_Interface) 是一个典型的基于消息传递的通讯协议，在其他语言的多进程计算中使用的比较广泛。

Julia 的多进程计算则是基于 Remote References `Future` 和 Remote call (`remotecall`/`@spawnat`) 的机制：

- 利用 `remotecall`/`@spawnat` 创建任务并分配出去，然后返回一个 `Future` 对象
- 利用 `wait`/`fetch` 来等待 `Future` 对象对应的任务执行完成

`remotecall` 或 `@spawnat` 会立刻返回一个 `Future` 对象，它并不含有真正的值：仅仅只是表明有一个异步任务被分配出去了。

In [114]:
# 在 3 这个进程上执行 rand(2, 2)
r = remotecall(rand, 3, 2, 2)

Future(3, 1, 466, nothing)

`@spawnat` 是对 `remotecall` 的一个宏包装，本质上没有太大差别

In [143]:
r = @spawn Distributed.myid()
fetch(r)

4

In [121]:
# 在 3 这个进程上执行 rand(2, 2)
r = @spawnat 3 rand(2, 2)

Future(3, 1, 479, nothing)

可以使用 `wait` 或者 `fetch` 来等待一个 `Future` 对象背后的任务执行完成。唯一的差别在于 `fetch` 会拿到返回值。

In [119]:
r = @time remotecall(rand, 3, 1000, 1000)
@time wait(r)

  0.000108 seconds (27 allocations: 752 bytes)
  0.000342 seconds (40 allocations: 1.547 KiB)


Future(3, 1, 475, nothing)

In [145]:
r = remotecall(rand, 3, 2, 2)
@time fetch(r)

  0.000394 seconds (66 allocations: 3.141 KiB)


2×2 Matrix{Float64}:
 0.628516  0.147178
 0.283362  0.960999

因为分布式计算的通讯开销一般比较大，`fetch` 是带有缓存机制的：多次 `fetch` 只会触发一次通讯。

In [64]:
@time fetch(r) # 拿到的是同样的结果，时间开销也更低

  0.000003 seconds


2×2 Matrix{Float64}:
 0.54678    0.0249949
 0.0988617  0.971928

问：下面的代码背后发生了什么？

In [153]:
r = @time @spawnat 3 rand(10000,10000)
s = @time @spawnat 3 1 .+ fetch(r)
@time fetch(s);

  0.000603 seconds (281 allocations: 16.781 KiB)
  0.000520 seconds (292 allocations: 17.548 KiB)
  1.305822 seconds (974 allocations: 762.962 MiB, 8.93% gc time)


In [169]:
r = @time @spawnat 3 rand(10000,10000)
s = @time @spawnat 4 1 .+ fetch(r)
@time fetch(s);

  0.002683 seconds (1.32 k allocations: 80.766 KiB)
  0.013692 seconds (38.17 k allocations: 2.184 MiB, 93.42% compilation time)
  2.340958 seconds (1.54 k allocations: 762.995 MiB, 10.83% gc time)


In [170]:
r = @time @spawnat 3 rand(10000,10000)
wait(r)
s = @time @spawnat 4 1 .+ fetch(r)
@time fetch(s);

  0.000593 seconds (320 allocations: 19.016 KiB)
  0.000633 seconds (354 allocations: 20.610 KiB)
  1.472900 seconds (1.01 k allocations: 762.962 MiB)


In [159]:
A = SharedArray{Float64}(1000, 1000)
A .= rand(1000, 1000)

rst = @distributed (+) for i in 1:1000
    sum(A, dims=1)
end

1×1000 Matrix{Float64}:
 500375.0  5.00346e5  4.99814e5  …  5.00228e5  5.00602e5  5.00038e5

## Actor 模型

https://github.com/JuliaActors/Actors.jl

## LoopVectorization

将 for 循环重写成更高效的版本，从而更好地利用 CPU 缓存、SIMD/AVX 等硬件资源，来达到更高的计算效率。

In [2]:
using LoopVectorization

In [9]:
function A_mul_B!(C, A, B)
    # C = A * B
    @inbounds for n ∈ indices((C,B), 2), m ∈ indices((C,A), 1)
        Cmn = zero(eltype(C))
        @simd for k ∈ indices((A,B), (2,1))
            Cmn += A[m,k] * B[k,n]
        end
        C[m,n] = Cmn
    end
end

A_mul_B! (generic function with 1 method)

In [10]:
A = rand(40, 40)
B = rand(40, 40)
C = similar(A)
@btime A_mul_B!($C, $A, $B);

  38.646 μs (0 allocations: 0 bytes)


In [11]:
function A_mul_B_t!(C, A, B)
    # C = A * B
    @turbo for n ∈ indices((C,B), 2), m ∈ indices((C,A), 1)
        Cmn = zero(eltype(C))
        for k ∈ indices((A,B), (2,1))
            Cmn += A[m,k] * B[k,n]
        end
        C[m,n] = Cmn
    end
end

A_mul_B_t! (generic function with 1 method)

In [12]:
A = rand(40, 40)
B = rand(40, 40)
C = similar(A)
@btime A_mul_B_t!($C, $A, $B);

  2.099 μs (0 allocations: 0 bytes)


In [13]:
@btime similar(A)
@btime $A * $B;

  129.903 ns (1 allocation: 12.62 KiB)
  4.228 μs (1 allocation: 12.62 KiB)
