<img
src="http://www.telecom-em.eu/sites/default/files/logoimt2016.JPG"
WIDTH=180 HEIGHT=180>

<CENTER>
<p>
<font size="5"> Introduction to parallel computing</font></p>
</CENTER>

----------------

# Parallel programming

Use the official documentation 
https://docs.julialang.org/en/v1/manual/parallel-computing/index.html

Julia evolves quickly so you will be sure that the official documentation is up to date.

Other resources can be found on the internet but not all of them are up to date...
(e.g. https://codingclubuc3m.github.io/2018-06-06-Parallel-computing-Julia.html , replace all occurences of  @parallel by @distributed)



In [1]:
# Using Pkg
# Pkg.add("Distributed")

using Distributed

CPU_CORES = 4 # number of cores on the machine

# Before adding workers.
nprocs()
nworkers() # when there are no no extra workers, nprocs() = nworkers().

# After adding them.
addprocs(CPU_CORES - 1) # 4 cores
# addprocs(1) # 2 cores
nprocs()
nworkers()
workers()

# nworkers() #Number of available worker processes. nworkers() = nprocs() - 1.
# rmproc()

3-element Array{Int64,1}:
 2
 3
 4

### Using tasks

In [2]:
a() = sum(i for i in 1:1000) # sum of integers from 1 to 1000 = 500500
b   = Task(a)  # or b = @task a()  # associate task to this function

Task (runnable) @0x00007f5339348280

In [3]:
istaskstarted(b) 

false

In [4]:
schedule(b) # Adds the task to the scheduler's queue

Task (done) @0x00007f5339348280

In [5]:
istaskstarted(b)

true

In [6]:
istaskdone(b)

true

In [7]:
b

Task (done) @0x00007f5339348280

In [8]:
fetch(b) # get the answer of task

500500

### Using several workers

In [9]:
workers() # find out how many workers are active

3-element Array{Int64,1}:
 2
 3
 4

In [10]:
addprocs(2) # add two workers

2-element Array{Int64,1}:
 5
 6

In [11]:
workers() 

5-element Array{Int64,1}:
 2
 3
 4
 5
 6

In [12]:
nprocs(),rmprocs(7) # list number of active processes, remove process with PID 7
                      # rmprocs(1) does not work because procs = 1 is not a worker

(6, Task (done) @0x00007f5339229ae0)

In [13]:
w = 2 # worker number 2
r = remotecall(rand, w, 2, 2) # RemoteRef: reference to computed result.
# call rand function to be executed on worker 2.A "Future" object is returned (something that will be fetched later on)

Future(2, 1, 7, nothing)

In [14]:
fetch(r)  # gets the computed result
          # blocks main processor until result is available 

2×2 Array{Float64,2}:
 0.164143   0.153965
 0.0850897  0.355181

In [15]:
x,y = 3,4
ψ= (x,y)->sqrt(x^2+y^2)
remotecall_fetch(ψ,w,x,y) # obtain value of computation of psi by worker number w and passing arguments x,y (both remotecall and fetch are performed at the same time)

5.0

In [16]:
using LinearAlgebra: diagm

w  = 3
d  = diagm(0=>[1,1]) # diagonal matrix with ones on the diagonal (first argument is the index of the diagonal used)
ex = :($d+fetch($r)) # this computes the value of d + r (which was computed by process 2)
s  = @spawnat w eval(ex) # worker w calculates expression ex
         # equivalent to s = @spawnat 3 [1 0;0 1]+fetch(r)
fetch(s)

# here we just passed information from worker 2 to 3 (if task performed at 3 requires info from task performed at 2)

2×2 Array{Float64,2}:
 1.16414    0.153965
 0.0850897  1.35518 

In [17]:
s= @spawn  sin.(randn(2,2)) #worker is chosen automatically if not specified
fetch(s)

2×2 Array{Float64,2}:
  0.0497165  -0.777812
 -0.534875   -0.175802

In [18]:
@everywhere using LinearAlgebra: det # execute this on all processes (load this function on every process)
result = [@spawnat w det(randn(10,10)) for w in workers()] # execute this function on all processes through a loop
# all processes have access to the function det
[fetch(result[k]) for k=1:nprocs()-1] # get results on all current processes

5-element Array{Float64,1}:
 -224.63620001490915
 -850.5071619017439 
 1277.9516953175375 
 -162.01719519848024
 -550.9830209805899 

In [19]:
@everywhere println(det(randn(10,10))) # same calculation on each worker

85.02692477386184
      From worker 6:	1518.6819854082423
      From worker 5:	-96.10086265240606
      From worker 3:	-1510.6954098784688
      From worker 4:	-42.661971487350804
      From worker 2:	-48.97098459936341


In [21]:
@everywhere function f(x::Float64)
    println(x +randn())
end 
@everywhere x=5.
@everywhere f(x)

5.152432445750008
      From worker 5:	5.38425178533312
      From worker 6:	3.9078623209019043
      From worker 3:	5.051712348202915
      From worker 4:	3.857434021569288
      From worker 2:	5.64050141776188


### Parallel loops

In [39]:
function MC_pi(n) # approximation of pi using MC method
    s = 0
    for k in 1:n
        if rand()^2+rand()^2<1. # if we fall in the disk (all possible numbers are in the square)
            s=s+1 # add 1
        end
    end
    4*s/n # ratio between area of circle and square is π/4, s/n is an estimation of that ratio 
    #(number of points falling in the circle/total)
end
@time MC_pi(10^5)

  0.023296 seconds (29.28 k allocations: 1.552 MiB)


3.1362

workers()
addprocs(3)
workers()

### The map and pmaps functions

map allows to call the same function for different inputs of the same sizes.

pmap does the same but distributes all the arguments to the different processes

In [42]:
function MC_pi_parallel(n)
    @everywhere function g(x) # function computing whether a point is in the circle or not
        rand()^2+rand()^2<1. ? 1 : 0 # no data is required here, function returns 1 if condition is true, 0 othw
    end
    s = sum(pmap(g,collect(1:n))) # we call function g with N different arguments, and distribute the computations
    4*s/n
end
@time MC_pi_parallel(10^5)


  5.464437 seconds (7.62 M allocations: 210.155 MiB, 1.19% gc time)


3.14084

"pmap" is slower here! The reason is that the tasks made by each worker are too simple and fast, but they are numerous, so the overhead from transferring data in and out for each worker dominates the computing time

In [45]:
M = [randn(1000,1000) for k=1:100];
@time [det(m) for m in M];
@time pmap(det,M);  #parallel mapping of a function


  2.236882 seconds (406 allocations: 763.726 MiB)
  0.919974 seconds (8.53 k allocations: 255.531 KiB)


### Recovering returned values, Sending data and passing arguments to pmap

In [47]:
M = 100
N = 1000
X = randn(M,N)

X_parallel = [X[:,k] for k = 1:N]; # this creates an array containing all columns of X that can be iterated over

@everywhere using LinearAlgebra: det

@time dets = pmap(x -> det(x*x'),X_parallel);
@time dets = map(x -> det(x*x'),X_parallel);

println(size(dets))
println(dets[2]) # this is 0 because matrix has always a rank of 1

  0.143301 seconds (226.99 k allocations: 11.111 MiB)
  1.086187 seconds (53.17 k allocations: 156.034 MiB)
(1000,)
-0.0


In [49]:
# we need anonymous functions to call custom functions with pmap
# need the correct prototype to send parameters
@everywhere using LinearAlgebra: I

@everywhere function custom_det(x,λ,M) # computes the determinant of a new matrix with parameters
    return  determinant = det(x'*x .+ λ*Matrix{Float64}(I, M, M))
end
λ = 1

@time dets = pmap(x -> custom_det(x,λ,M),X_parallel);
@time dets = map(x -> custom_det(x,λ,M),X_parallel);

println(size(dets))
println(dets[2]) # this is 0 because matrix has always a rank of 1

  0.200573 seconds (216.85 k allocations: 10.591 MiB, 1.98% gc time)
  1.105608 seconds (91.74 k allocations: 310.457 MiB, 2.44% gc time)
(1000,)
10418.753351270703


### Distributed arrays

In [50]:
# using Pkg
# Pkg.add("DistributedArrays") # compatibility pb with julia 1.2


In [51]:
@everywhere using DistributedArrays # distributed arrays are arrays that can be accessed and modified by any process

┌ Info: Recompiling stale cache file /home/administrateur/.julia/compiled/v1.2/DistributedArrays/Gfdem.ji for DistributedArrays [aaf54ef3-cdf8-58ed-94cc-d582ad619b94]
└ @ Base loading.jl:1240


In [54]:
A = drandn(100,100) # directly generate distributed array
B = randn(100,100) 
dB = distribute(B) # convert array to distributed array
#C = @DArray [i+j for i = 1:100, j = 1:100];

In [55]:
@time A*A; # distributed array
@time dB*dB; # converted distributed array
@time B*B; # standard array

  5.339598 seconds (4.30 M allocations: 211.548 MiB, 1.34% gc time)
  0.011585 seconds (6.36 k allocations: 244.906 KiB)
  0.283996 seconds (725.71 k allocations: 36.320 MiB, 2.23% gc time)


In [57]:
@time [i+j for i = 1:100, j = 1:100];
@time @DArray [i+j for i = 1:100, j = 1:100];

  0.043379 seconds (99.87 k allocations: 4.705 MiB)
  0.172500 seconds (388.01 k allocations: 19.892 MiB, 3.35% gc time)


In [60]:
println(nprocs())
# addprocs(7)
# rmprocs(12:13)
@everywhere using DistributedArrays
# WorkerPool([2,3,4])
# @DArray [@show x^2 for x = 1:10]; # print all squares from 1 to 10 (not in order because taken care of by different processes)

9


In [62]:
# results depending on nb of procs and data size
@time @DArray [x^2 for x = 1:10^8]; # distributed version (only advantageous for large datasets)
@time [x^2 for x = 1:10^8]; # standard version

# test as a function of number of processes and data size

  0.288127 seconds (354.19 k allocations: 18.781 MiB)
  0.299925 seconds (51.18 k allocations: 765.501 MiB, 4.51% gc time)


In [64]:
dzeros((100,100), workers()[1:4], [1,4]) # creates a distributed zero array where 
# the second dimension is spread around workers 1 to 4

100×100 DArray{Float64,2,Array{Float64,2}}:
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0  …  0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0  0.0  0.0  0.0  0.0     0.0  0.0  0.0  0.0  0.0  0.0  0.0
 0.0  0.0  0.0  0.0 

------------