# Adding Processes

In [1]:
procs()

1-element Array{Int64,1}:
 1

In [2]:
addprocs(3)

3-element Array{Int64,1}:
 2
 3
 4

In [3]:
workers()

3-element Array{Int64,1}:
 2
 3
 4

# How fast is our processor?

In [4]:
peakflops()

5.664936149053024e10

In [5]:
peakflops(parallel=true)

3.312115760498653e10

In [6]:
?peakflops

search: [1mp[22m[1me[22m[1ma[22m[1mk[22m[1mf[22m[1ml[22m[1mo[22m[1mp[22m[1ms[22m



```
peakflops(n::Integer=2000; parallel::Bool=false)
```

`peakflops` computes the peak flop rate of the computer by using double precision [`gemm!`](@ref Base.LinAlg.BLAS.gemm!). By default, if no arguments are specified, it multiplies a matrix of size `n x n`, where `n = 2000`. If the underlying BLAS is using multiple threads, higher flop rates are realized. The number of BLAS threads can be set with [`BLAS.set_num_threads(n)`](@ref).

If the keyword argument `parallel` is set to `true`, `peakflops` is run in parallel on all the worker processors. The flop rate of the entire parallel computer is returned. When running in parallel, only 1 BLAS thread is used. The argument `n` still refers to the size of the problem that is solved on each processor.


# Using pmap() function 

... to run the function across multiple processes.

In [7]:
pmap(x -> ("pid=$(myid())", x), 1:4)

4-element Array{Tuple{String,Int64},1}:
 ("pid=2", 1)
 ("pid=4", 2)
 ("pid=3", 3)
 ("pid=2", 4)

In [8]:
# error handler is called with the argument passed from the function
pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)

4-element Array{Any,1}:
 1                     
  ErrorException("foo")
 3                     
  ErrorException("foo")

In [9]:
# So, ErrorException is the type being passed into the error handler
# Let's prove it!
pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=(e) -> e.msg)

4-element Array{Any,1}:
 1     
  "foo"
 3     
  "foo"

# Making calls to remote worker processes

In [10]:
# usage: remotecall(function, pid, args...)
# returns a "future" reference immediately.  
#    Future(x, y, z, result) where x=remote_pid, y=this_pid, z=???
r = remotecall(rand, 2, 2, 2)

Future(2, 1, 20, Nullable{Any}())

In [11]:
# synchronously wait for the results
fetch(r)

2×2 Array{Float64,2}:
 0.649515  0.422219
 0.608687  0.1812  

In [12]:
# ok to fetch again
fetch(r)

2×2 Array{Float64,2}:
 0.649515  0.422219
 0.608687  0.1812  

In [13]:
r

Future(2, 1, 20, Nullable{Any}([0.649515 0.422219; 0.608687 0.1812]))

In [14]:
# try again with a single operand to the function
r = remotecall(rand, 2, 3)

Future(2, 1, 22, Nullable{Any}())

In [15]:
fetch(r)

3-element Array{Float64,1}:
 0.136689
 0.796799
 0.937846

# Call and fetch at the same time

In [16]:
remotecall_fetch(rand, 3, 3, 3)

3×3 Array{Float64,2}:
 0.300906  0.355753   0.532743
 0.132084  0.0367506  0.419235
 0.670273  0.38034    0.5176  

In [17]:
# not quite sure whether the operation went to pid 3?
# Let's return that as part of the result
remotecall_fetch((x...) -> (myid(), rand(x...)), 3, 3, 3)

(3, [0.130953 0.501367 0.962786; 0.16372 0.187953 0.707218; 0.676201 0.493621 0.566193])

# The Convenient @spawn macro

... is used so that we don't have to say which worker/pid to use and it does it with our familiar code syntax rather than passing function and its arguments around

In [18]:
# same example as above
r = @spawn rand(3,3)

Future(2, 1, 26, Nullable{Any}())

In [19]:
fetch(r)

3×3 Array{Float64,2}:
 0.757248  0.741234  0.844064 
 0.035747  0.974895  0.34351  
 0.454359  0.147437  0.0702147

In [20]:
# synchronous spawn call
fetch(@spawn 1 + rand(3))

3-element Array{Float64,1}:
 1.1591 
 1.28045
 1.65727

In [21]:
# we can still say where it goes to. 
# usage: @spawnat pid expression
@spawnat 2 1 + rand(3)

Future(2, 1, 30, Nullable{Any}())

In [22]:
# don't believe me?  try this.
display(fetch(@spawnat 2 myid()))
display(fetch(@spawnat 3 myid()))
display(fetch(@spawnat 4 myid()))

2

3

4

In [23]:
# pid doesn't exist
display(fetch(@spawnat 5 myid()))

LoadError: [91mno process with id 5 exists[39m

# Defining functions in all processes

Remote processes still need to load dependent julia source files (via `include`).  An easy way to achieve that is to use the `@everywhere` macro.

In [24]:
@everywhere fact(n) = n > 1 ? n * fact(n-1) : 1

In [25]:
fact(10)

3628800

In [26]:
fetch(@spawn fact(10))

3628800

# Using @parallel for loops

In [28]:
# Generate a single random number 6 times in 3 workers
# It just returns futures.
r = @parallel for i in 1:6 
    rand(1)
end

3-element Array{Future,1}:
 Future(2, 1, 45, #NULL)
 Future(3, 1, 46, #NULL)
 Future(4, 1, 47, #NULL)

In [29]:
# This is somewhat counterintuitive!
# Because for-loop returns nothing, the futures can be fetched but results would be nothing.
# See https://discourse.julialang.org/t/parallel-macro/4059/2
fetch.(r)

3-element Array{Void,1}:
 nothing
 nothing
 nothing

In [49]:
# we must use a reducer to access the final result
r = @parallel (+) for i in 1:3
    i
end

6

In [50]:
# let's try to understand what the reducer does.
# in fact, let's not reduce at all...
r = @parallel (x)->x for i in 1:3
    i
end

LoadError: [91mMethodError: no method matching (::##111#113)(::Int64, ::Int64)[0m
Closest candidates are:
  #111(::Any) at In[50]:3[39m

In [51]:
# let's try to understand what the reducer does.
# Since it's looking for two operands, let's use a reduce that takes 2 args.
# Looks like it's trying to reduce twice.  
r = @parallel (x,y)->(x,y) for i in 1:3
    i
end

((1, 2), 3)

In [39]:
# Try again with 6 iterations.
# It's reducing like a binary tree!
# Looks like @parallel calls workers individually, then collect them
# via a tree structure, which makes it an O(log(N)) reduction.
r = @parallel (x,y)->(x,y) for i in 1:6
    i
end

(((1, 2), (3, 4)), (5, 6))

# SharedArray - Simple Use Case of 1-dim array

In [54]:
# Let's create a shared array for all child processes
S = SharedArray{Int,1}((10,); init=false, pids=[2,3,4])

10-element SharedArray{Int64,1}:
 0
 0
 0
 0
 0
 0
 0
 0
 0
 0

In [55]:
S[1:3] = 1:3

1:3

In [56]:
fetch(@spawnat 2 S)

10-element SharedArray{Int64,1}:
 1
 2
 3
 0
 0
 0
 0
 0
 0
 0

In [57]:
fetch(@spawnat 3 sum(S))

6

#  SharedArray - using Init function

In [58]:
# each process has a chance to initialize a portion of the array
S = SharedArray{Int,1}((6,); init=(S)->S[myid()]=1, pids=[2,3,4])

6-element SharedArray{Int64,1}:
 0
 1
 1
 1
 0
 0

In [65]:
# what if we make a mistake and make all processes init every cell?
# wouldn't they run into conflict?  Let's try.
S = SharedArray{Int,1}((6,); init=(S)->S[1:end]=myid(), pids=[2,3,4])

6-element SharedArray{Int64,1}:
 3
 3
 3
 3
 3
 3

In [66]:
# Do it again.
# See? Looks like a "last one wins" situation as the result is undeterminstic.
S = SharedArray{Int,1}((6,); init=(S)->S[1:end]=myid(), pids=[2,3,4])

6-element SharedArray{Int64,1}:
 2
 2
 2
 2
 2
 2

# DistributedArrays

...are arrays that are too big to fit in a single process/machine.

In [68]:
@everywhere using DistributedArrays

[1m[36mINFO: [39m[22m[36mPrecompiling module DistributedArrays.
[39m

In [69]:
nworkers()

3

In [83]:
# create a 3-dim array 100x100x10 with zeros
x = drand(100,100,10);

In [84]:
# anything local?  Looks like nothing is local.
localpart(x)

0×0×0 Array{Float64,3}

In [85]:
# regular size() function still works
size(x)

(100, 100, 10)

In [96]:
# take a look at the data
x[1,1,:]

10-element SubArray{Float64,1,DistributedArrays.DArray{Float64,3,Array{Float64,3}},Tuple{Int64,Int64,Base.Slice{Base.OneTo{Int64}}},false}:
 0.142759 
 0.53089  
 0.742233 
 0.588181 
 0.123825 
 0.834653 
 0.17278  
 0.798462 
 0.667298 
 0.0148088

In [97]:
# subscript notation works properly
x[1:10,1:10,1]

10×10 SubArray{Float64,2,DistributedArrays.DArray{Float64,3,Array{Float64,3}},Tuple{UnitRange{Int64},UnitRange{Int64},Int64},false}:
 0.142759   0.0946618  0.181801  0.377718  …  0.863596   0.791174  0.435259
 0.751871   0.547027   0.385095  0.857416     0.48839    0.295926  0.50896 
 0.482967   0.906022   0.750859  0.312312     0.251467   0.688171  0.530704
 0.794977   0.354648   0.601413  0.986505     0.861908   0.483634  0.730824
 0.628623   0.814769   0.549572  0.139536     0.0771304  0.364527  0.233271
 0.0810531  0.206933   0.405859  0.312057  …  0.795307   0.036346  0.193977
 0.118778   0.417618   0.066417  0.177226     0.178786   0.827911  0.930045
 0.348539   0.443414   0.429401  0.243409     0.189222   0.256215  0.928934
 0.990184   0.366593   0.22884   0.193941     0.80198    0.198111  0.865264
 0.473546   0.926863   0.673413  0.908315     0.446035   0.949319  0.412547

In [98]:
# what's the size in local process since we have nothing?
# answer: very very small
Base.summarysize(x)

360

In [99]:
# what about the object size in the chld processes
for i in 2:4
    println(fetch(@spawnat i Base.summarysize(x)))
end

264360
272360
264360


In [100]:
# ok... let's create a local array and measure its size for comparison.
y = rand(100,100,10)
Base.summarysize(y)

800000

In [95]:
# Sum up the sizes in the above... Looks the same! :-)
264360 + 272360 + 264360

801080

In [101]:
# sum all numbers
sum(x)

49970.16215910363

In [102]:
for i in 2:4
    println(fetch(@spawnat i sum(x)))
end

49970.16215910363
49970.16215910363
49970.16215910363
