In [9]:
using Plots;
plotly()

Plots.PlotlyBackend()

# Group Seminar on Julia Pt. II  
# Parallel Techniques

- Julia's parallel architecture, like MPI, uses message passing between worker processes.
- Distributed Memory architecture $\rightarrow$ Each worker has its own memory.
- It is however more transparent and high level.

## 1. Add worker processes.

In [None]:
workers()

In [None]:
rmprocs(2:5)

In [2]:
addprocs();workers()

4-element Array{Int64,1}:
 2
 3
 4
 5

In [None]:
[nworkers(),
    nprocs()]

   ## 2. Running functions on other processes.

In [None]:
r = remotecall(2, +, 1,1)

In [None]:
fetch(r)

In [None]:
remotecall_fetch(2, +, 1,1)

### `remotecall` and `fetch` are rather "low level" and cumbersome.
### Julia offers convenient macros `@spawn`, `@spawnat`, `@fetch`

In [None]:
@spawn svd(rand(10,10))

In [None]:
fetch(ans)

In [None]:
@fetch svd(rand(10,10))

### Transforming datasets in parallel with `pmap`.

In [None]:
matrices = [ rand(5,5) for i in 1:4 ];
pmap(svd, matrices);

In [None]:
matrices

In [3]:
function tictoq(ex::Expr)
    tic()
    eval(ex)
    return toq()
end

tictoq (generic function with 1 method)

In [7]:
blas_set_num_threads(4)
maxSize = 100;
@time times = hcat([ [size(m[1],1), tictoq(:(pmap(svd, $m))), tictoq( :(map(svd,$m)))]
    for m in [ [rand(k,k) for i in 1:8] for k in 5:maxSize ]]...)';

  5.671089 seconds (174.68 k allocations: 237.596 MB, 1.39% gc time)


In [8]:
scatter(times[:,1],times[:,2:end],label=[:pmap :map],legend=:topleft)

In [None]:
for i = 1:nworkers()
    local matrices = [ rand(500,500) for j in 1:8 ];
    @time pmap(svd, matrices, pids=workers()[1:i]);
end

__Caveat:__ `pmap` only suitable for distributing large chunks of work.

In [None]:
@time pmap(x->x+1, collect(1:Int(1e5)));

In [None]:
@time collect(1:Int(1e5)) + 1 ;

### `@parallel` to the rescue!

__Scenario:__ Parallel calculations that are reduced (Matrix$\rightarrow$Vector, Vector$\rightarrow$Number)

Example 1: Estimate $\pi$

In [None]:
piEst = @parallel (+) for i = 1:Int(1e9)
    ifelse( abs2(rand()) + abs2(rand()) <= 1, 1, 0)
end
piEst /= (1e9 / 4)

Example 2: 1d Random Walk

In [11]:
@time randWalk = @parallel (+) for i = 1:Int(1e8)
    randn()
end

  0.141425 seconds (8.48 k allocations: 488.591 KB)


-3097.2397244210415

In [13]:
@time reduce(+, randn(Int(1e8)))

  0.826767 seconds (8 allocations: 762.940 MB, 6.15% gc time)


-12919.228125279884

In [None]:
s = 0.0
@time for i = 1:Int(1e8)
    s += randn()
end
s

In [15]:
function randomWalk_serial(L::Int)
    s = 0.0
    for i = 1:L
        s += randn()
    end
    s
end
@time randomWalk_serial(Int(1e8))

  0.475726 seconds (2.62 k allocations: 129.758 KB)


-5756.965121685245

----

__What if we need the results of every run?__  
Naively one could concatenate the results ($=$reduce with `vcat`).

In [16]:
L = Int(1e5);

In [18]:
@time randn(L);

  0.000773 seconds (6 allocations: 781.469 KB)


In [20]:
@time n = @parallel (vcat) for i in 1:L
    randn()
end;

  1.353674 seconds (4.85 k allocations: 2.828 MB)


__Abysmal runtime! __ Note the allocations.  
`vcat` allocates new memory __each__ iteration $\rightarrow$ dynamical resizing of arrays is not a good idea in performance critical code.

Alright, allocate the memory beforehand...

In [22]:
a = zeros(Int64, L)
@time begin 
    local l::Int = L
    @parallel for i in 1:l
        a[i] = randn()
    end
end

  0.011072 seconds (4.68 k allocations: 341.630 KB)


4-element Array{Any,1}:
 RemoteRef{Channel{Any}}(2,1,2329)
 RemoteRef{Channel{Any}}(3,1,2330)
 RemoteRef{Channel{Any}}(4,1,2331)
 RemoteRef{Channel{Any}}(5,1,2332)

Ok, that was fast, but did it do what we wanted?

In [23]:
println(maximum(a))

0


__No__, because `a[]` inside the parallel for-loop is a __local variable__ to each process.

__Needed:__ Data structure that is shared between processes.

### Shared Arrays

In [24]:
L = 10^8;

__Initialize the shared array.__

`init` is a function that runs on each worker that holds a chunk of the array to fill it at construction.

In [25]:
ShA = SharedArray(Float64, L, init= S->S[ localindexes(S) ] = 0);

__Check which workers it is distributed amongst.__

In [26]:
ShA.pids

4-element Array{Int64,1}:
 2
 3
 4
 5

In [27]:
[@fetchfrom w localindexes(ShA) for w in workers()]

4-element Array{Any,1}:
 1:25000000        
 25000001:50000000 
 50000001:75000000 
 75000001:100000000

Try filling it with values...

In [29]:
@time @sync @parallel for i in 1:length(ShA)
    ShA[i] = randn()
end;

  3.886640 seconds (6.64 k allocations: 479.347 KB)


Not so great... `@parallel` doesn't distribute the for-loop to the right workers.

__Define a function that fills the _local_ part of the shared array...__

In [30]:
@everywhere function Randn(S::SharedArray)
    for i in localindexes(S)
        S[i] = randn()
    end
end

__...and call it on each worker that holds a chunk of the array.__

In [32]:
@time @sync begin 
    for p in ShA.pids
        @async remotecall_wait(p, Randn, ShA)
    end
end

  0.149377 seconds (1.98 k allocations: 155.699 KB)


__Compare to single-threaded performance.__

In [None]:
function Randn(L::Int)
    a = zeros(Float64,L)
    @time for i in 1:L
        a[i] = randn()
    end
    return a
end;
Randn(L);

__And check that we really did the right thing.__

In [33]:
histogram(ShA,nbins=100,legend=:none)



---