# Parallel Computing in Julia
[Official Document](https://docs.julialang.org/en/v1/manual/parallel-computing/)

Tianjing Zhao

## Overall

There are three categories:
1. Coroutines
2. Multi-Threading
3. Multi-Core or Distributed Processing

## Other pkg

`MPI.jl`, `DistributedArrays.jl`

1. Low-level (C kernel) based operations `OpenCL.jl` and `CUDAdrv.jl` which are respectively an OpenCL interface and a CUDA wrapper.

2. Low-level (Julia Kernel) interfaces like `CUDAnative.jl` which is a Julia native CUDA implementation.

3. High-level vendor specific abstractions like `CuArrays.jl` and `CLArrays.jl`

4. High-level libraries like `ArrayFire.jl` and `GPUArrays.jl`

## Backgraound
1. Program
  * Application
  * web browsing/email a message/...
2. Process (program + resources) 
  * has separate memory;
  * process runs **independently**. (one window crash not affect others)
![image info](3.png)
3. Thread
  * **unit of execution** within process
  * Each thread in the process shares memory and resources.
  * Multiple core/processor: process at the same time.
![image info](1.png)
![image info](2.png)

Same heap. (easy communicate)

## Method 1: Coroutines

•	sequential tasks

•	execute only one task at a time

•	Julia: put all tasks onto a single thread.


### Overall
* Use **Tasks** (aka Coroutines) to switch among multiple computations.
* Use **Channels** to pass data between running tasks.

### Example

In [64]:
# create channel to receive job data
const jobs = Channel{Int}(32);

ErrorException: invalid redefinition of constant jobs

In [2]:
# create channel to receive result data
const results = Channel{Tuple}(32);

In [3]:
# function: do jobs and put result into result channel
function do_work()
           for job_id in jobs
               exec_time = rand()
               sleep(exec_time)    # simulates elapsed time doing actual work
                                   # typically performed externally.
               put!(results, (job_id, exec_time))
           end
       end;

In [4]:
# function: create job data and put into job channel
function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

In [5]:
# in total 12 jobs
n = 12;

In [6]:
@async make_jobs(n); # feed the jobs channel with "n" jobs

In [7]:
# start 4 tasks to process requests in parallel
for i in 1:4 
    @async do_work()
end

In [8]:
#print result channel
@elapsed while n > 0 # print out results
           job_id, exec_time = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds")
           global n = n - 1
       end

4 finished in 0.09 seconds
2 finished in 0.12 seconds
1 finished in 0.34 seconds
7 finished in 0.02 seconds
3 finished in 0.43 seconds
5 finished in 0.33 seconds
8 finished in 0.08 seconds
10 finished in 0.56 seconds
6 finished in 0.96 seconds
11 finished in 0.83 seconds
9 finished in 0.87 seconds
12 finished in 0.38 seconds


1.090924772

## Method 2: Multi-Threading
![image info](4.png)

### Overall
* use the "**Threads.@threads**" macro to support parallel loops.
* use Atomic Operations to avoid race conditions.

race condition: output is dependent on the sequence, events do not happen in the order the programmer intended.

### Example (parallel loops)

In [19]:
# check numebr of threads
Threads.nthreads()

1

In [20]:
export JULIA_NUM_THREADS=4

ErrorException: syntax: invalid assignment location "export JULIA_NUM_THREADS"

In [65]:
# check thread id
Threads.threadid()

1

In [12]:
a = zeros(10)
Threads.@threads for i = 1:10
           a[i] = Threads.threadid()
       end

The iteration space is **split** amongst the threads, write thread ID into `a`.

a
10-element Array{Float64,1}:
 1.0
 1.0
 1.0
 2.0
 2.0
 2.0
 3.0
 3.0
 4.0
 4.0

### Example 1 (Atomic Operations)
race condition: output is dependent on the sequence, events do not happen in the order the programmer intended.

#### with Atomic Operations

In [23]:
using Base.Threads

In [24]:
acc = Atomic{Int64}(0)

Atomic{Int64}(0)

In [25]:
@threads for i in 1:1000
          atomic_add!(acc, 1)
       end

In [26]:
acc[]

1000

#### without Atomic Operations

In [35]:
acc = Ref(0)

Base.RefValue{Int64}(0)

In [36]:
@threads for i in 1:1000
          acc[] += 1
       end

In [37]:
acc[]

1000

## Method 3: Multi-Core or Distributed Processing

Multiple “cores” on a CPU,  like having multiple CPUs. 

Two cores: two single core processors in one CPU.

Independently

### Overall
#### 1. remote references

  an object that can be used from any process to refer to an **object** stored on a particular process.
  

  
   * ##### RemoteChannel
  RemoteChannel s are rewritable. For example, multiple processes can co-ordinate their processing by referencing the same remote Channel.


#### 2. remote calls

 a request by one process to call a certain **function** on certain arguments on another (possibly the same) process  
 
   * #####  Future 
  
  A remote call returns a Future to its result. Remote calls return immediately; the process that made the call proceeds to its next operation while the remote call happens somewhere else. 
  
  You can wait for a remote call to finish by calling wait on the returned Future, and you can obtain the full value of the result using fetch.

### Clusters

* A local cluster specified with the -p option

Starting with `julia -p n` provides `n` worker processes.



In [1]:
using Distributed
addprocs(2) 

2-element Array{Int64,1}:
 2
 3

Master:1

Worker: 2, 3

### Example 1. remotecall()

In [2]:
# ask process 2 to construct a 2000-by-2000 random matrix
r = remotecall(rand, 2, 5, 5)  # remotecall(func, process#2, para for func)

Future(2, 1, 4, nothing)

Future(which process run?, current process, furture id)

In [3]:
fetch(r)   

5×5 Array{Float64,2}:
 0.430077   0.9979    0.0233998  0.637561  0.261268
 0.0936379  0.361854  0.952373   0.111028  0.7041  
 0.30238    0.199314  0.652515   0.794073  0.785154
 0.826905   0.720616  0.513937   0.442484  0.195432
 0.58626    0.415829  0.222701   0.972524  0.921059

In [None]:
# getindex(r,1,1) in process#2
remotecall_fetch(getindex, 2, r, 1, 1)

### Example 2.  @spawn

Create a closure around an expression and run it on an automatically-chosen process, returning a Future to the result.

In [4]:
r = @spawn rand(2000,2000)

Future(2, 1, 6, nothing)

In [58]:
fetch(r)

2000×2000 Array{Float64,2}:
 0.782044   0.728519   0.521057   …  0.574093   0.619523   0.702463  
 0.685303   0.106052   0.674226      0.168966   0.909303   0.219053  
 0.880893   0.457408   0.0667173     0.657994   0.260433   0.0743496 
 0.995482   0.305453   0.99011       0.375518   0.712771   0.899068  
 0.695709   0.374857   0.434968      0.67448    0.224529   0.454395  
 0.705327   0.670299   0.910327   …  0.114623   0.22566    0.678743  
 0.610953   0.624547   0.653904      0.243574   0.390657   0.30878   
 0.619398   0.207289   0.0866107     0.774062   0.169693   0.669659  
 0.926544   0.520002   0.726469      0.213022   0.3713     0.769833  
 0.531774   0.0886039  0.0450317     0.203762   0.416715   0.9958    
 0.0830001  0.614697   0.12427    …  0.865373   0.480189   0.588488  
 0.354107   0.416322   0.264461      0.410401   0.637225   0.682085  
 0.908731   0.945972   0.971682      0.193317   0.448617   0.00207508
 ⋮                                ⋱                           

### No data movement ("Monte Carlo" simulation)

adds one 300 times.

In [None]:
count_head2.jl

function count_heads(n)
    c::Int = 0
    for i = 1:n
        c += i
    end
    c
end

In [1]:
using Distributed
addprocs(2) 

2-element Array{Int64,1}:
 2
 3

In [2]:
@everywhere include_string(Main, $(read("count_head2.jl", String)), "count_head2.jl")

In [3]:
ncounts= 1_000_000_000

1000000000

In [21]:
a4 = @spawn count_heads(ncounts)

Future(2, 1, 24, nothing)

In [22]:
b4 = @spawn count_heads(ncounts)

Future(3, 1, 25, nothing)

In [23]:
@elapsed fetch(a4)+fetch(b4)  # 0.00064,0.00059, 0.0068

0.006797852

In [24]:
fetch(a4)+fetch(b4)

1000000001000000000

In [None]:
# test

In [30]:
aa = @spawn count_heads(ncounts)

Future(2, 1, 32, nothing)

In [26]:
bb = @spawn count_heads(ncounts)

Future(3, 1, 29, nothing)

In [31]:
@elapsed aa_v =  fetch(aa)  #0.00036,0.0004

0.000402233

In [28]:
@elapsed bb_v =  fetch(bb) #0.00034, 0.00034

0.000346074

In [20]:
aa_v+bb_v

1000000001000000000

### Example 3. 

`@everywhere`:load on every process

When `DummyModule.jl` needs to be loaded on every process

In [2]:
@everywhere include("DummyModule.jl")

loaded
      From worker 2:	loaded
      From worker 3:	loaded


In [3]:
using .DummyModule

In [4]:
MyType(7)

MyType(7)

In [5]:
fetch(@spawnat 2 DummyModule.MyType(7))

MyType(7)

### Data Movement
![image info](6.png)

**Case 1**. A random matrix is constructed locally, then sent to another process where it is squared:

`A = rand(1000,1000);
Bref = @spawn A^2;
fetch(Bref);`


In [6]:
A = rand(10000,10000);

In [29]:
Bref = @spawn A^2

Future(3, 1, 14, nothing)

In [30]:
fetch(Bref)   

55.199853766

**Case 2.** a random matrix is both constructed and squared on another process:

`Bref = @spawn rand(1000,1000)^2;
fetch(Bref);`


**sends much less data than the first**

In [31]:
Bref = @spawn rand(10000,10000)^2

Future(2, 1, 16, nothing)

In [32]:
fetch(Bref)  

57.092824349

### Parallel for loop


**Reduction**: Many iterations run independently over several processes, and then their results are combined using some function.


In [1]:
using Distributed
addprocs(10) 

10-element Array{Int64,1}:
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11

In [2]:
nprocs()

11

In [50]:
nheads = @distributed (+) for i = 1:10
    2
end

20

In [60]:
## add 0/1
nheads = @distributed (+) for i = 1:10
    Int(rand(Bool))
end

6

In [10]:
nheads = @distributed (-) for i = 1:10
    Int(rand(Bool))
end

-4

In [43]:
nheads = @distributed (*) for i = 1:10
    Int(rand(Bool))
end

0

#### Wrong example

This code will not initialize all of `a`, since each process will have a **separate copy** of it. Parallel for loops like these must be avoided

In [51]:
a = zeros(10);
@distributed for i = 1:10
    a[i] = i
end

Task (queued) @0x000000000677f3d0

In [52]:
a

10-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

Fortunately, Shared Arrays can be used to get around this limitation

In [53]:
using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end

Task (queued) @0x00000000077cdcd0

In [54]:
a

10-element SharedArray{Float64,1}:
  1.0
  2.0
  3.0
  4.0
  5.0
  6.0
  7.0
  8.0
  9.0
 10.0

### Parallel for map

**no reduction** is needed

In [16]:
@everywhere using LinearAlgebra

In [17]:
M = Matrix{Float64}[rand(1000,1000) for i = 1:10]; #10 big matrix

In [18]:
@elapsed res=pmap(svdvals, M)

4.073649893

### comparison
* reduction
* use `pmap` : each function call does a large amount of work.(eg. svd())
* use `@distributed for`: each iteration is tiny.

Both `pmap` and `@distributed for`: only use worker processes.

In [None]:
###############################

In [63]:
#test below

In [19]:
addprocs(10)

10-element Array{Int64,1}:
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21

In [20]:
nprocs()

21

In [21]:
M = Matrix{Float64}[rand(1000,1000) for i = 1:10]; #10 big matrix

In [22]:
@everywhere using LinearAlgebra

In [23]:
using Distributed

Time for pmap:    2.596805109, 2.567780132, 2.640953034

Time for forloop: 4.264198688, 4.091798161, 4.112079761

In [24]:
# MAKE IT SAME TO PMAP
#### distributed for loop
@elapsed @distributed for i = 1:length(M)
    svdvals(M[i])
    println(nprocs())
    
end

0.011179386

In [25]:
using SharedArrays
res = SharedArray{Float64,3}(1000,1,10);

      From worker 12:	21
      From worker 13:	21
      From worker 14:	21
      From worker 15:	21
      From worker 16:	21
      From worker 17:	21
      From worker 18:	21
      From worker 19:	21
      From worker 20:	21
      From worker 21:	21


In [26]:
# MAKE IT SAME TO PMAP
#### distributed for loop
@elapsed @distributed (+) for i = 1:length(M)
    println(nprocs())
    res[:,:,i] = svdvals(M[i]);
end

      From worker 7:	21
      From worker 2:	21
      From worker 11:	21
      From worker 8:	21
      From worker 9:	21
      From worker 10:	21
      From worker 4:	21
      From worker 5:	21
      From worker 3:	21
      From worker 6:	21


5.269636677

In [27]:
res

1000×1×10 SharedArray{Float64,3}:
[:, :, 1] =
 499.9966446561154     
  18.14744934860105    
  17.999192544153328   
  17.94711051155583    
  17.81996302328169    
  17.79433182149896    
  17.733745093404664   
  17.724415935905427   
  17.675067603744985   
  17.617520414536234   
  17.50299516376098    
  17.473893162760792   
  17.42104677970939    
   ⋮                   
   0.15898633754650254 
   0.15250690185569232 
   0.12126857401034243 
   0.11730649317743988 
   0.10273861584037713 
   0.09614711599875211 
   0.07527041467919135 
   0.06257702914386455 
   0.04995372080769225 
   0.04477314472394897 
   0.03071901457165942 
   0.006593787433589939

[:, :, 2] =
 499.9765180739922     
  18.188199787701986   
  17.999552061529908   
  17.988348051709753   
  17.93634985210346    
  17.836709272712174   
  17.806652797250713   
  17.724303280857217   
  17.705112316335157   
  17.6168652869468     
  17.55748291424697    
  17.51481060749364    
  17.50503502545473    
   ⋮ 

In [28]:
@elapsed @distributed (+) for i = 1:length(M)
    println(nprocs())
    svdvals(M[i]);
end

      From worker 2:	21
      From worker 3:	21
      From worker 4:	21
      From worker 5:	21
      From worker 6:	21
      From worker 7:	21
      From worker 8:	21
      From worker 9:	21
      From worker 10:	21
      From worker 11:	21


2.493232416

### Remote Channel
Call object from other process.

 `RemoteChannel(()->Channel{Int}(10), pid)`, will return a reference to a channel of type Int and size 10. The channel exists on worker pid.
 
 * A Channel is local to a process. Worker 2 cannot directly refer to a Channel on worker 3 and vice-versa. A RemoteChannel, however, can put and take values across workers.

 * A RemoteChannel can be thought of as a handle to a Channel.

#### Example
We start 4 workers to process a single jobs remote channel. Jobs, identified by an id (job_id), are written to the channel. Each remotely executing task in this simulation reads a job_id, waits for a random amount of time and writes back a tuple of job_id, time taken and its own pid to the results channel. Finally all the results are printed out on the master process.

In [1]:
using Distributed
addprocs(4)

4-element Array{Int64,1}:
 2
 3
 4
 5

In [2]:
const jobs = RemoteChannel(()->Channel{Int}(32))

RemoteChannel{Channel{Int64}}(1, 1, 6)

In [3]:
const results = RemoteChannel(()->Channel{Tuple}(32))

RemoteChannel{Channel{Tuple}}(1, 1, 7)

In [4]:
@everywhere function do_work(jobs, results) # define work function everywhere
           while true
               job_id = take!(jobs)
               exec_time = rand()
               sleep(exec_time) # simulates elapsed time doing actual work
               put!(results, (job_id, exec_time, myid()))  # feed the result channel
           end
       end

In [5]:
function make_jobs(n)
           for i in 1:n
               put!(jobs, i) # feed the jobs channel with "n" jobs
           end
       end;

In [6]:
n = 12;

In [7]:
# @async--Wrap an expression in a Task
@async make_jobs(n) # use 4 workers to process
#job channel: 1,2,3,4,.....,11,12

Task (done) @0x000000000774f3d0

In [8]:
workers()

4-element Array{Int64,1}:
 2
 3
 4
 5

In [9]:
for p in workers() # start tasks on the workers to process requests in parallel
           remote_do(do_work, p, jobs, results)  # remote_do(fcn,pro_id,para for fcn)
       end

In [10]:
@elapsed while n > 0 # print out results
           job_id, exec_time, where = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
           global n = n - 1
       end

2 finished in 0.0 seconds on worker 3
1 finished in 0.06 seconds on worker 5
4 finished in 0.09 seconds on worker 2
6 finished in 0.02 seconds on worker 5
5 finished in 0.22 seconds on worker 3
7 finished in 0.23 seconds on worker 2
9 finished in 0.19 seconds on worker 3
8 finished in 0.59 seconds on worker 5
12 finished in 0.02 seconds on worker 5
3 finished in 0.79 seconds on worker 4
10 finished in 0.82 seconds on worker 2
11 finished in 0.66 seconds on worker 3


0.159630441

### Shared Arrays

SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])

which creates an `N`-dimensional shared array of a bits type `T` and size `dims` across the processes specified by `pids`. 

#### Compare
 
 In a `DArray`, each process can **read** and **write** to the part of the array it owns and has **read-only** access to the parts it doesn't own.
 
 `Shared Arrays` use system shared memory to map the same array across many processes. While there are some similarities to a `DArray`, the behavior of a SharedArray is quite different. In a DArray, each process has local access to just a chunk of the data, and no two processes share the same chunk; in contrast, in a SharedArray each "participating" process has access to the entire array.**A SharedArray is a good choice when you want to have a large amount of data jointly accessible to two or more processes on the same machine**.

In [38]:
using Distributed
addprocs(3)

3-element Array{Int64,1}:
 26
 27
 28

In [39]:
@everywhere using SharedArrays

In [40]:
S = SharedArray{Int,2}(3,4) 

3×4 SharedArray{Int64,2}:
 0  0  0  0
 0  0  0  0
 0  0  0  0

In [42]:
q = SharedArray{Float64,3}((5,5,2))

5×5×2 SharedArray{Float64,3}:
[:, :, 1] =
 0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0

[:, :, 2] =
 0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0