<img
src="http://www.telecom-em.eu/sites/default/files/logoimt2016.JPG"
WIDTH=180 HEIGHT=180>

<CENTER>
<p>
<font size="5"> Introduction to parallel computing</font></p>
</CENTER>

----------------

# Parallel programming

Use the official documentation 
https://docs.julialang.org/en/v1/manual/parallel-computing/index.html

Julia evolves quickly so you will be sure that the official documentation is up to date.

Other resources can be found on the internet but not all of them are up to date...
(e.g. https://codingclubuc3m.github.io/2018-06-06-Parallel-computing-Julia.html , replace all occurences of  @parallel by @distributed)



In [1]:
# Using Pkg
# Pkg.add("Distributed")

using Distributed

CPU_CORES = 4 # number of cores on the machine

# Before adding workers.
nprocs()
nworkers() # when there are no no extra workers, nprocs() = nworkers().

# After adding them.
addprocs(CPU_CORES - 1) # 4 cores
# addprocs(1) # 2 cores
nprocs()
nworkers()
workers()

# nworkers() #Number of available worker processes. nworkers() = nprocs() - 1.


3-element Array{Int64,1}:
 2
 3
 4

### Using tasks

In [2]:
a() = sum(i for i in 1:1000) # sum of integers from 1 to 1000 = 500500
b   = Task(a)  # or b = @task a()  # associate task to this function

Task (runnable) @0x00007f1388cfba90

In [3]:
istaskstarted(b) 

false

In [4]:
schedule(b) # Adds the task to the scheduler's queue

Task (done) @0x00007f1388cfba90

In [5]:
istaskstarted(b)

true

In [6]:
istaskdone(b)

true

In [7]:
b

Task (done) @0x00007f1388cfba90

In [8]:
fetch(b) # get the answer of task

500500

### Using several workers

In [9]:
workers() # find out how many workers are active

3-element Array{Int64,1}:
 2
 3
 4

In [3]:
addprocs(2) # add two workers

2-element Array{Int64,1}:
 2
 3

In [4]:
workers() 

2-element Array{Int64,1}:
 2
 3

In [7]:
nprocs(),rmprocs(7) # list number of active processes, remove process with PID 7
                      # rmprocs(1) does not work because procs = 1 is not a worker

(3, Task (done) @0x00007f37b5395600)

In [12]:
w = 2 # worker number 2
r = remotecall(rand, w, 2, 2) # RemoteRef: reference to computed result.
# call rand function to be executed on worker 2.A "Future" object is returned (something that will be fetched later on)

Future(2, 1, 7, nothing)

In [11]:
fetch(r)  # gets the computed result
          # blocks main processor until result is available 

2×2 Array{Float64,2}:
 0.755939  0.880594 
 0.117979  0.0646422

In [None]:
x,y = 3,4
ψ= (x,y)->sqrt(x^2+y^2)
remotecall_fetch(ψ,w,x,y) # obtain value of computation of psi by worker number w and passing arguments x,y (both remotecall and fetch are performed at the same time)

In [13]:
using LinearAlgebra: diagm

w  = 3
d  = diagm(0=>[1,1]) # diagonal matrix with ones on the diagonal (first argument is the index of the diagonal used)
ex = :($d+fetch($r)) # this computes the value of d + r (which was computed by process 2)
s  = @spawnat w eval(ex) # worker w calculates expression ex
         # equivalent to s = @spawnat 3 [1 0;0 1]+fetch(r)
fetch(s)

# here we just passed information from worker 2 to 3 (if task performed at 3 requires info from task performed at 2)

2×2 Array{Float64,2}:
 1.22988   0.824312
 0.923996  1.16082 

In [None]:
s= @spawn  sin.(randn(2,2)) #worker is chosen automatically if not specified
fetch(s)

In [None]:
@everywhere using LinearAlgebra: det # execute this on all processes (load this function on every process)
result = [@spawnat w det(randn(10,10)) for w in workers()] # execute this function on all processes through a loop
# all processes have access to the function det
[fetch(result[k]) for k=1:nprocs()-1] # get results on all current processes

In [None]:
@everywhere println(det(randn(10,10))) # same calculation on each worker

In [None]:
@everywhere function f(x::Float64)
    println(x +randn())
end 
@everywhere x=5.
@everywhere f(x)

### Parallel loops

In [None]:
function MC_pi(n) # approximation of pi using MC method
    s = 0
    for k in 1:n
        if rand()^2+rand()^2<1. # if we fall in the disk (all possible numbers are in the square)
            s=s+1 # add 1
        end
    end
    4*s/n # ratio between area of circle and square is π/4, s/n is an estimation of that ratio 
    #(number of points falling in the circle/total)
end
@time MC_pi(10^6)

In [None]:
workers()
addprocs(3)
workers()

### The map and pmaps functions

map allows to call the same function for different inputs of the same sizes.

pmap does the same but distributes all the arguments to the different processes

In [None]:
function MC_pi_parallel(n)
    @everywhere function g(x) # function computing whether a point is in the circle or not
        rand()^2+rand()^2<1. ? 1 : 0 # no data is required here
    end
    s = sum(pmap(g,collect(1:n))) # 
    4*s/n
end
@time MC_pi_parallel(10^4)

In [None]:
M = [randn(1000,1000) for k=1:100];
@time [det(m) for m in M];
@time pmap(det,M);  #parallel mapping of a function

### Recovering returned values, Sending data and passing arguments to pmap

In [30]:
M = 100
N = 1000
X = randn(M,N)

X_parallel = [X[:,k] for k = 1:N]; # this creates an array containing all columns of X that can be iterated over

@everywhere using LinearAlgebra: det

@time dets = pmap(x -> det(x*x'),X_parallel);
@time dets = map(x -> det(x*x'),X_parallel);

println(size(dets))
println(dets[2]) # this is 0 because matrix has always a rank of 1

  0.178263 seconds (224.81 k allocations: 11.327 MiB)
  1.816960 seconds (57.17 k allocations: 156.194 MiB, 0.88% gc time)
(1000,)
0.0


In [40]:
# we need anonymous functions to call custom functions with pmap
# need the correct prototype to send parameters
@everywhere using LinearAlgebra: I

@everywhere function custom_det(x,λ,M) # computes the determinant of a new matrix with parameters
    return  determinant = det(x'*x .+ λ*Matrix{Float64}(I, M, M))
end
λ = 1

@time dets = pmap(x -> custom_det(x,λ,M),X_parallel);
@time dets = map(x -> custom_det(x,λ,M),X_parallel);

println(size(dets))
println(dets[2]) # this is 0 because matrix has always a rank of 1

  0.223467 seconds (214.78 k allocations: 10.697 MiB)
  2.003184 seconds (97.06 k allocations: 310.673 MiB, 1.51% gc time)
(1000,)
9024.493195489647


### Distributed arrays

In [None]:
# using Pkg
# Pkg.add("DistributedArrays") # compatibility pb with julia 1.2


In [41]:
@everywhere using DistributedArrays # distributed arrays are arrays that can be accessed and modified by any process

In [43]:
A = drandn(100,100) # directly generate distributed array
B = randn(100,100) 
dB = distribute(B) # convert array to distributed array
#C = @DArray [i+j for i = 1:100, j = 1:100];

In [45]:
@time A*A; # distributed array
@time dB*dB; # converted distributed array
@time B*B; # standard array

  0.004561 seconds (1.74 k allocations: 73.469 KiB)
  0.003906 seconds (1.77 k allocations: 74.672 KiB)
  0.000148 seconds (6 allocations: 78.359 KiB)


In [47]:
@time [i+j for i = 1:100, j = 1:100];
@time @DArray [i+j for i = 1:100, j = 1:100];

  0.040618 seconds (104.26 k allocations: 4.849 MiB)
  0.144409 seconds (405.71 k allocations: 20.409 MiB, 4.49% gc time)


In [None]:
println(nprocs())
# addprocs(7)
# rmprocs(12:13)
@everywhere using DistributedArrays
# WorkerPool([2,3,4])
# @DArray [@show x^2 for x = 1:10]; # print all squares from 1 to 10 (not in order because taken care of by different processes)

In [55]:
# results depending on nb of procs and data size
@time @DArray [x^2 for x = 1:10^8]; # distributed version (only advantageous for large datasets)
@time [x^2 for x = 1:10^8]; # standard version

# test as a function of number of processes and data size

  0.291654 seconds (359.45 k allocations: 18.755 MiB)
  0.343424 seconds (54.34 k allocations: 765.607 MiB, 16.70% gc time)


------------