# 并行计算

> cocurrency(异步) Vs. multi-threading（多线程）
> + https://www.rugu.dev/en/blog/concurrency-and-parallelism/

并行计算(适用于计算顺序不影响最终结果的for-loop中)
- `@simd` 单指令多数据, `simd`只能用于简单的数学运算，例如`+-`，而且Julia的编译器会自动帮你查询，你的for-loop能不能使用simd，可以的话它会自动帮你加上
- `@turbo` 进阶版的`@simd`， 可以替代`@inbounds @simd`，比这俩要快很多
    - 貌似对自动微分的支持比较差，不建议使用
- `@threads` 单进程多线程，共享一块内存区域
    + 不推荐使用`@threads`宏，https://julialang.org/blog/2023/07/PSA-dont-use-threadid/
    + 建议使用`@sync`和`Threads.@spawn`，`using Base.Threads`来使用多线程
- 多进程与分布式数据（或者copy数据到所有其它julia进程），每个julia程序单独享用一块内存区域

参考资料
+ https://johnnychen94.github.io/Julia_and_its_applications/6_2_parallel_intro.jl.html

## 异步

异步适用于IO密集型任务，不适用于CPU密集型任务，一般用不上
协程不能调用CPU的多核资源，因为这些协程实际上共享同一个系统线程
异步还是没有搞明白 :sad:

## 多进程

### 底层接口

消息传递接口（Message Passing Interface， MPI）

In [1]:
using Distributed

In [2]:
nprocs()

1

In [3]:
addprocs(3)

3-element Vector{Int64}:
 2
 3
 4

+ 远程调用 remote reference
+ 远程引用 remote calls

In [4]:
# 远程调用：通过本地进程在远程Worker中启动某一处理过程
# 当前进程PID = 1，在当前进程中调用PID = 2的进程，在这个2进程中生成随机数矩阵
r = remotecall(rand, 2, 3, 3)

Future(2, 1, 5, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (8, 105553119440896, 4294967297)), nothing)

In [5]:
# whence为本地进程，where 为被调用的进程, v为返回结果
# remotecall只会建立本地进程，被调用进程和处理过程之间的关系，不会返回结果
dump(r)

Future
  where: Int64 2
  whence: Int64 1
  id: Int64 5
  lock: ReentrantLock
    locked_by: Nothing nothing
    reentrancy_cnt: UInt32 0x00000000
    havelock: UInt8 0x00
    cond_wait: Base.GenericCondition{Base.Threads.SpinLock}
      waitq: Base.IntrusiveLinkedList{Task}
        head: Nothing nothing
        tail: Nothing nothing
      lock: Base.Threads.SpinLock
        owned: Int64 0
    _: Tuple{Int64, Int64, Int64}
      1: Int64 8
      2: Int64 105553119440896
      3: Int64 4294967297
  v: Nothing nothing


In [6]:
# fetch第一次被执行，远程的计算结果就会被传输到本地进程中的Future对象中缓存起来，远程的数据则会被删除
# 多次执行返回的结果都是一样的，看起来fetch之前就已经执行完毕了
fetch(r)

3×3 Matrix{Float64}:
 0.528922  0.680318  0.544822
 0.904478  0.145737  0.249805
 0.123782  0.948467  0.832162

`spawn`在英文中有生产、造成的意思

In [7]:
# 使用进程4生成4X4矩阵，不返回计算结果
# @spawn和remotecall的功能是一样的，区别在于remotecall只能执行函数，而spawn可以执行表达式
m1 = @spawnat 4 rand(4, 4)

Future(4, 1, 7, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (140714828219088, 140714828219088, 0)), nothing)

In [8]:
# 系统自动选择一个进程生成4X4矩阵, 不返回计算结果
m2 = @spawn rand(4, 4)

Future(2, 1, 8, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (8, 140714822336512, 0)), nothing)

In [9]:
fetch(m1), fetch(m2)

([0.228160 0.024405 0.528314 0.711916; 0.874635 0.687846 0.559633 0.452102; 0.190403 0.282483 0.846801 0.813613; 0.561852 0.309151 0.017492 0.855308], [0.953928 0.170724 0.482075 0.121593; 0.789284 0.271510 0.950596 0.774774; 0.145357 0.368087 0.057804 0.747346; 0.014133 0.330092 0.148950 0.063762])

In [10]:
# @fetch = fetch(@spawn function)
# @fetchfrom  PID = fetch(@spawnat PID function)

@fetch rand(3, 3), @fetchfrom 1 rand(3, 3)

([0.595564 0.338191 0.963176; 0.685822 0.711244 0.273929; 0.024006 0.962724 0.138202], [0.340016 0.548147 0.628379; 0.586101 0.521727 0.448607; 0.665175 0.245007 0.236799])

In [11]:
# 不同的进程有不同的作用域，@everywhere可以把你定义的一些东西放到所有的进程中
@everywhere function rand2(ndim)
    return 2 * rand(ndim, ndim)
end

In [12]:
# @fetch和fetch不是一回事，别弄混了
@fetch rand2(2), @fetchfrom 2 rand2(2)

([0.504044 1.927709; 0.795914 0.183168], [0.354061 0.747744; 0.198152 0.503928])

In [13]:
# 自动将for-loop分配到所有的进程中，但是结果的顺序是乱的
@distributed (+) for i = 1:200000000
                    Int64(rand(Bool))
end

99998073

### 共享数组

分布式数据和共享数据不一样哈😄，前者用在集群，后者是单机

In [14]:
using SharedArrays, LogExpFunctions

In [15]:
a = SharedArray{Float64}(1, 10)
@distributed for i in 1:10
    a[i] = i / 10
end
logsumexp(a)

2.302585

### 上层接口

TODO

## 多线程

### `@sync + Threads.@spawn / Distributed.@spawn`

在for-loop中
+ 使用`Threads.@spawn`会将要执行的函数分别让多个线程一起执行，但是不管他们执行是否完毕，就显示程序运行完成，返回结果；
+ 使用`@sync`可以等待所有线程执行完毕以后，再返回结果，显示程序运行完成；

由此可以推广到`@sync + Distributed.@spawn`的使用，也是类似的原理

In [16]:
function _f(i)
    println("Threadid: ",Threads.threadid(), " ", i, " direct")
    sleep(1)
    println("Threadid: ",Threads.threadid(), " ", i, " sleep 1")
    sleep(1)
    println("Threadid: ",Threads.threadid(), " ", i, " sleep 2")
end
function f1(x)
    for i in x
        Threads.@spawn _f(i)
    end
end
function f2(x)
    @sync for i in x
        Threads.@spawn _f(i)
    end
end

f2 (generic function with 1 method)

In [17]:
# 发现了一个有趣的现象
# 整个for-loop的多线程任务分配（不是执行）是一次完成的
f1(1:2); @time f1(1:2)

Threadid: 2 2 direct
Threadid: 3 1 direct
Threadid: 3 1 direct
Threadid: 4 2 direct
  0.000022 seconds (10 allocations: 1.156 KiB)


In [18]:
f2(1:2); @time f2(1:2)

Threadid: 2 1 direct
Threadid: 3 2 direct
Threadid: 2 2 sleep 1
Threadid: 6 1 sleep 1
Threadid: 3 2 sleep 1
Threadid: 4 1 sleep 1
Threadid: 2 1 sleep 1
Threadid: 5 2 sleep 1
Threadid: 4 1 sleep 2
Threadid: 2 2 sleep 2
Threadid: 6 1 sleep 2
Threadid: 3 2 sleep 2
Threadid: 5 2 sleep 2
Threadid: 2 1 sleep 2
Threadid: 4 2 direct
Threadid: 2 1 direct
Threadid: 4 2 sleep 1
Threadid: 2 1 sleep 1
Threadid: 4 2 sleep 2
Threadid: 2 1 sleep 2
  2.032554 seconds (744 allocations: 45.203 KiB)


### 数据竞争与锁

在多线程的场景下，同时写入会存在数据竞争，导致结果出现错误，下图是[JohnnyChen](https://johnnychen94.github.io/Julia_and_its_applications/6_2_parallel_intro.jl.html)画的，如果两个线程同时写入，就会导致结果与真实结果不一致。

解决方法有：（1）加锁🔒；（2）使用线程上下文避免数据竞争；（3）原子操作；

+ 加锁：当一个变量正在被写入时，它会被锁定，下一个线程必须等待该线程写入完成后才能写入
+ 线程上下文：每一个线程的计算结果单独用一个变量来累加，最后所有线程的结果再相加就是最后的计算结果，这样就不会出现数据竞争
+ 原子操作：对`Threads.Atomic{T}(0)`通过`Threads.atomic_add!()`等操作，可以自动解决数据竞争问题，原子操作就是为了解决数据竞争而存在的
    + 原子操作貌似只能对处理原子类型(Primitive type: Int, Float, ...)，抽象类型(Abstract type: Real, Integer, etc.)无法使用，所以使用Optim.jl中的自动微分时无法使用
    + > "Julia 支持访问和修改值的原子操作，即以一种线程安全的方式来避免竞态条件。一个值（必须是基本类型的，primitive type）可以通过 Threads.Atomic 来包装起来从而支持原子操作"
      
*Notes.* 加锁可能会有性能损耗，使用线程上下文比较方便

<img src="https://raw.githubusercontent.com/xuestrange/picGoUploader/main/img/20231010171913.png" width=800 height=250>

In [5]:
using BenchmarkTools

In [7]:
# 加锁
function test_lock()
    a = 1:10
    l = ReentrantLock()
    rst = 0.0
    Threads.@threads for i in a
        lock(l) do 
            rst += i
        end
    end
    return rst
end
@btime test_lock()

  4.083 μs (50 allocations: 4.26 KiB)


55.000000

In [10]:
# 原子操作
function test_atomic()
    a = 1:10
    rst = Threads.Atomic{Int64}(0)
    Threads.@threads for i in a
        Threads.atomic_add!(rst, i)
    end
    return rst[]
end
@btime test_atomic()

  3.337 μs (37 allocations: 3.98 KiB)


55

### Overhead

The cost of creating and scheduling the tasks, ~20微妙 / per task
+ https://discourse.julialang.org/t/how-to-use-multithreading-appropriately/105154

(TODO)

### 案例：Logit模型

In [1]:
using LogExpFunctions, Optim
using Random, Distributions
using LinearAlgebra:dot
using BenchmarkTools
const seed = Random.seed!(2022 - 1 - 3)
BenchmarkTools.DEFAULT_PARAMETERS.samples = 10

begin
    Inds = 1000
    nX = 4
    Tlen = rand([15, 10, 20], Inds)
    X = Vector{Matrix{Float64}}()
    for i in eachindex(Tlen)
        push!(X, randn(seed, (nX, Tlen[i])))
    end
    beta = [1.0, 2, -3, 4]
    y = Vector{Vector{Int64}}()
    for i in 1:Inds
        Xi = X[i]
        yi = Vector{Int64}()
        for t in axes(Xi, 1)
            U = @views dot(Xi[:, t], beta)
            push!(yi, rand(BernoulliLogit(-U)))
        end
        push!(y, yi)
    end
end

function nll_threading(X, beta::Vector{T}, y) where {T <: Real}
    # 使用线程上下文
    ind_ll = zeros(T, Threads.nthreads())
    Threads.@threads for i in eachindex(y)
        Xi = X[i]
        yi = y[i] 
        for t in axes(Xi, 1)
            U = @views dot(Xi[:, t], beta)
            ind_ll[Threads.threadid()] += logpdf(BernoulliLogit(-U), yi[t])
        end
    end
    return - sum(ind_ll)
end

function diary(tr)
        println("Iter ", tr.iteration, " ---> f_val: ", tr.value, " g_norm: ", tr.g_norm)
        println("\t x_hat: ", tr.metadata["x"])
        false # 不知道为啥要加false
end

fit_optim = optimize(
    pars -> nll_threading(X, pars, y),
    zeros(4),
    Newton(),
    # 使用callback就必须关闭store_trace
    Optim.Options(show_trace = false, extended_trace = true, store_trace = false, callback = diary);
    autodiff = :forward
)

# fit_optim.minimizer

Iter 0 ---> f_val: 2772.588722 g_norm: 1123.279946
	 x_hat: [0.000000, 0.000000, 0.000000, 0.000000]
Iter 1 ---> f_val: 949.441080 g_norm: 27.357037
	 x_hat: [1.325210, 2.549662, -3.970691, 5.016058]
Iter 2 ---> f_val: 914.524226 g_norm: 8.540237
	 x_hat: [1.027858, 2.025966, -3.214843, 4.031323]
Iter 3 ---> f_val: 912.105887 g_norm: 0.245877
	 x_hat: [0.935645, 1.869017, -2.999550, 3.742520]
Iter 4 ---> f_val: 912.105455 g_norm: 0.000084
	 x_hat: [0.934208, 1.867128, -2.998153, 3.739742]
Iter 5 ---> f_val: 912.105455 g_norm: 0.000000
	 x_hat: [0.934208, 1.867129, -2.998154, 3.739743]


 * Status: success

 * Candidate solution
    Final objective value:     9.121055e+02

 * Found with
    Algorithm:     Newton's Method

 * Convergence measures
    |x - x'|               = 8.99e-07 ≰ 0.0e+00
    |x - x'|/|x'|          = 2.40e-07 ≰ 0.0e+00
    |f(x) - f(x')|         = 3.83e-11 ≰ 0.0e+00
    |f(x) - f(x')|/|f(x')| = 4.20e-14 ≰ 0.0e+00
    |g(x)|                 = 6.96e-12 ≤ 1.0e-08

 * Work counters
    Seconds run:   0  (vs limit Inf)
    Iterations:    5
    f(x) calls:    13
    ∇f(x) calls:   13
    ∇²f(x) calls:  5
