## Add available processors

Julia interpreter starts with one core process (main julia server)

In [2]:
using Distributed
nprocs()

1

To avoid over-allocating the number of workers, we check first if currently we have one processor and then allocate 3 more processors.

In [3]:
nprocs() == 1 && addprocs(3)

3-element Array{Int64,1}:
 2
 3
 4

Let's validate the number of workers we have, not counting the main Julia server (worker id=1).

In [4]:
workers()

3-element Array{Int64,1}:
 2
 3
 4

## Remote Calls and References

We use `@spawn` macro to dynamically assign a task to an available remote worker.

In [5]:
ref = @spawn sin(π/5)

Future(2, 1, 5, nothing)

The return type is a Future because the value may happen in the future or later due to network/communication delays. This is a non-blocking call which means Julia main processor does not wait for the task to finish. If you want for Julia to wait, you can use `@sync` command but it will defeat the purpose of parallelization if you wait for the remote call to finish before doing other independent tasks.

To get the value at a later time, you can use `fetch`. This is a blocking call so Julia waits until the `fetch` call gets its value.

In [6]:
fetch(ref)

0.5877852522924731

Here's an example of loops with remote calls. For simpler discussion, we use `sin` task but ideally, the task ideal for remote calls are those that takes time so that they can be run in parallel in the background.

We declare first an `Array` of `Futures` which will be populated by `Future` values by `@spawn` inside the loop.

In [7]:
n=10
refs = Array{Future,1}(undef,n)
for i = 1:n
    refs[i] = @spawn sin(i)
end

`refs` now contain an Array of Futures.

In [8]:
refs

10-element Array{Future,1}:
 Future(3, 1, 7, nothing) 
 Future(4, 1, 8, nothing) 
 Future(2, 1, 9, nothing) 
 Future(3, 1, 10, nothing)
 Future(4, 1, 11, nothing)
 Future(2, 1, 12, nothing)
 Future(3, 1, 13, nothing)
 Future(4, 1, 14, nothing)
 Future(2, 1, 15, nothing)
 Future(3, 1, 16, nothing)

We can map the `fetch` function to each of the `Futures` and aggregate (reduce) them by getting the sum.

In [9]:
reduce(+,map(fetch,refs))

1.4111883712180104

A more elegant way to do this is using `@distributed` as show below.

In [10]:
res=@distributed (vcat) for i=1:n
    sin(i)
end

10-element Array{Float64,1}:
  0.8414709848078965 
  0.9092974268256817 
  0.1411200080598672 
 -0.7568024953079282 
 -0.9589242746631385 
 -0.27941549819892586
  0.6569865987187891 
  0.9893582466233818 
  0.4121184852417566 
 -0.5440211108893698 

In [11]:
reduce(+,res)

1.4111883712180104

You can also replace (vcat) by (+) in the `@distributed`.

In [12]:
res=@distributed (+) for i=1:n
    sin(i)
end

1.4111883712180104

Here's another example which concatenates each `DataFrame` containing worker and their corresponding task result.

You will notice the pattern of task assignment. Workers are rotated sequentially in the beginning and then any idle worker will get the next task.

In [13]:
@everywhere using DataFrames
res=@distributed (vcat) for i = 1:10
    println((i,sin(i)))
    DataFrame(worker=myid(), vals=sin(i))
end
res

      From worker 4:	(8, 0.9893582466233818)
      From worker 2:	(1, 0.8414709848078965)
      From worker 2:	(2, 0.9092974268256817)
      From worker 4:	(9, 0.4121184852417566)
      From worker 3:	(5, -0.9589242746631385)
      From worker 3:	(6, -0.27941549819892586)
      From worker 2:	(3, 0.1411200080598672)
      From worker 4:	(10, -0.5440211108893698)
      From worker 2:	(4, -0.7568024953079282)
      From worker 3:	(7, 0.6569865987187891)


Unnamed: 0_level_0,worker,vals
Unnamed: 0_level_1,Int64,Float64
1,2,0.841471
2,2,0.909297
3,2,0.14112
4,2,-0.756802
5,3,-0.958924
6,3,-0.279415
7,3,0.656987
8,4,0.989358
9,4,0.412118
10,4,-0.544021


## Monte-Carlo Simulation to estimate $\pi$

In [14]:
#==========================#
# monte-carlo simulation
# π r^2 / 4 r^2 = s/n 
#==========================#


@everywhere function isInside() 
    x = rand()
    y = rand()
    x^2 + y^2 < 1 ? 1 : 0
end;

@everywhere function ppi(n)
    s=@distributed (+) for i = 1:n
        isInside()
    end
    4s/n
end;

function pi(n)
    s=0.0
    for i = 1:n
        s+=isInside()
    end
    4s/n
end;


In [17]:
@time ppi(10^9)

  5.981965 seconds (515 allocations: 17.531 KiB)


3.141518076

In [16]:
@time pi(10^3)

  0.004024 seconds (1.37 k allocations: 80.397 KiB)


3.128

## Cross-validation in parallel

In [None]:
@everywhere using RDatasets
@everywhere using Statistics
@everywhere using DecisionTree
@everywhere using Random

@everywhere function partitionTrainTest(data, at = 0.7)
    n = nrow(data)
    idx = shuffle(1:n)
    train_idx = view(idx, 1:floor(Int, at*n))
    test_idx = view(idx, (floor(Int, at*n)+1):n)
    return (data[train_idx,:], data[test_idx,:])
end


@everywhere function irisAcc() 
    iris = dataset("datasets", "iris")
    train,test = partitionTrainTest(iris, 0.7) # 70% train
    xtrain = train[:, 1:4] |>Matrix;
    ytrain = train[:, 5] |> Vector{String}
    xtest = test[:, 1:4] |>Matrix;
    ytest = test[:, 5] |> Vector{String}
    model = build_forest(ytrain, xtrain, 2, 4, 0.5, 6);
    pred = apply_forest(model,xtest);
    sum(ytest .== pred) / length(pred)
end

In [75]:
irisAcc()

0.9333333333333333

In [71]:
function mserial(n)
    sm=0.0
    for i=1:n
         sm += irisAcc()
    end
    print("Overall Acc:",sm/n*100.0)
end
@time mserial(1000)

Overall Acc:94.47555555555475  0.758686 seconds (1.40 M allocations: 192.921 MiB, 6.81% gc time)


In [73]:
function mparallel(n)
    s=@distributed (+) for i=1:n
        irisAcc()
    end
    print("Overall Acc:",s/n*100.0)
end
@time mparallel(1000)

Overall Acc:94.7422222222222  0.310889 seconds (136.63 k allocations: 6.636 MiB)


## Distributed Arrays

In [17]:
@everywhere using DistributedArrays

In [18]:
a = round.(10*rand(20,20))

20×20 Array{Float64,2}:
 9.0  6.0  6.0   9.0   5.0   7.0   7.0  …   0.0   1.0   5.0  8.0  7.0   4.0
 5.0  5.0  9.0   7.0   7.0   9.0  10.0      1.0   5.0   8.0  2.0  4.0   8.0
 0.0  6.0  4.0   4.0   1.0   7.0   3.0      3.0   4.0   2.0  9.0  4.0   3.0
 0.0  5.0  3.0   4.0   7.0   1.0   9.0      7.0   4.0  10.0  0.0  9.0  10.0
 7.0  9.0  7.0   2.0   2.0   6.0   8.0      3.0   7.0   8.0  9.0  3.0   0.0
 4.0  6.0  6.0   8.0   3.0   7.0   6.0  …  10.0   5.0   0.0  8.0  2.0   0.0
 6.0  5.0  1.0   2.0  10.0   8.0   5.0      1.0   0.0  10.0  1.0  3.0   2.0
 3.0  8.0  6.0   9.0   9.0   9.0   8.0      9.0  10.0   6.0  9.0  2.0  10.0
 1.0  9.0  2.0   5.0   5.0   4.0   7.0      0.0   8.0   9.0  0.0  4.0   7.0
 7.0  9.0  7.0   9.0   3.0   6.0   9.0      3.0   6.0   8.0  8.0  9.0   2.0
 7.0  8.0  5.0  10.0   8.0   6.0   6.0  …   6.0   8.0   9.0  1.0  6.0   5.0
 8.0  8.0  8.0   1.0   1.0   7.0   2.0      5.0   2.0   7.0  1.0  1.0   7.0
 8.0  9.0  8.0   2.0   2.0   7.0   1.0      5.0   7.0   3.0  9.0

In [19]:
map(x->myid(),a)

20×20 Array{Int64,2}:
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
 1  1  1  1  1  1 

In [20]:
da = distribute(a)

20×20 DArray{Float64,2,Array{Float64,2}}:
 9.0  6.0  6.0   9.0   5.0   7.0   7.0  …   0.0   1.0   5.0  8.0  7.0   4.0
 5.0  5.0  9.0   7.0   7.0   9.0  10.0      1.0   5.0   8.0  2.0  4.0   8.0
 0.0  6.0  4.0   4.0   1.0   7.0   3.0      3.0   4.0   2.0  9.0  4.0   3.0
 0.0  5.0  3.0   4.0   7.0   1.0   9.0      7.0   4.0  10.0  0.0  9.0  10.0
 7.0  9.0  7.0   2.0   2.0   6.0   8.0      3.0   7.0   8.0  9.0  3.0   0.0
 4.0  6.0  6.0   8.0   3.0   7.0   6.0  …  10.0   5.0   0.0  8.0  2.0   0.0
 6.0  5.0  1.0   2.0  10.0   8.0   5.0      1.0   0.0  10.0  1.0  3.0   2.0
 3.0  8.0  6.0   9.0   9.0   9.0   8.0      9.0  10.0   6.0  9.0  2.0  10.0
 1.0  9.0  2.0   5.0   5.0   4.0   7.0      0.0   8.0   9.0  0.0  4.0   7.0
 7.0  9.0  7.0   9.0   3.0   6.0   9.0      3.0   6.0   8.0  8.0  9.0   2.0
 7.0  8.0  5.0  10.0   8.0   6.0   6.0  …   6.0   8.0   9.0  1.0  6.0   5.0
 8.0  8.0  8.0   1.0   1.0   7.0   2.0      5.0   2.0   7.0  1.0  1.0   7.0
 8.0  9.0  8.0   2.0   2.0   7.0   1.0      5.

In [21]:
map(x->myid(),da)

20×20 DArray{Int64,2,Array{Int64,2}}:
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2  2  2  2  2  2  2  3  3  3  3  3  3  3  4  4  4  4  4  4
 2

In [22]:
da.pids

1×3 Array{Int64,2}:
 2  3  4

In [23]:
[@spawnat p sum(localpart(da)) for p in da.pids]

1×3 Array{Future,2}:
 Future(2, 1, 3358, nothing)  …  Future(4, 1, 3360, nothing)

In [24]:
map(fetch,[@spawnat p sum(localpart(da)) for p in da.pids])

1×3 Array{Float64,2}:
 785.0  694.0  595.0

In [25]:
reduce(+,map(fetch,[@spawnat p sum(localpart(da)) for p in da.pids]))

2074.0

In [26]:
sum(da)

2074.0

In [27]:
@code_typed sum(da)

CodeInfo(
[90m1 ─[39m %1 = Base.identity[36m::typeof(identity)[39m
[90m│  [39m %2 = Base.add_sum[36m::typeof(Base.add_sum)[39m
[90m│  [39m %3 = invoke Base._mapreduce(%1::Function, %2::Function, $(QuoteNode(IndexCartesian()))::IndexCartesian, _2::DArray{Float64,2,Array{Float64,2}})[36m::Any[39m
[90m└──[39m      return %3
) => Any