# Parallel computing in Julia

## On CPU: Multi-process on multiple machines

### Monte Carlo simulation

##### Running on one core

In [1]:
function calc_pi(samples)
    counter = 0
    for i in 1:samples
        x, y = rand(2)
        if (x^2 + y^2 <=1)
            counter += 1
        end
    end
    π = 4 * counter / samples
    return π
end;

In [2]:
samples = 1e8
@time calc_pi(samples)

  5.311127 seconds (100.06 M allocations: 8.944 GiB, 9.97% gc time)


3.1412788

In [5]:
nprocs()

4

#### Adding more processes is a one-liner

In [4]:
addprocs(3);

In [6]:
# Check number of processes
println("""
$(nprocs())
$(workers())
""")

4
[2, 3, 4]



In [7]:
randomVal_local = rand(2, 2)

2×2 Array{Float64,2}:
 0.196833  0.225983
 0.49847   0.787279

In [8]:
randomVal_worker = remotecall(rand, 3, 2, 2) # <--- Remote call returns remote reference (Future)

Future(3, 1, 5, Nullable{Any}())

In [9]:
blabla = fetch(randomVal_worker) # <--- Cached locally 

2×2 Array{Float64,2}:
 0.216832  0.655869
 0.611867  0.494522

##### Nicer syntax with @spawn

In [10]:
randomVal1 = @spawn rand(2,2)
#fetch(randomVal)

randomVal2 = @spawn rand(2,2)
randomVal3 = @spawn rand(2,2)

#

Future(4, 1, 9, Nullable{Any}())

In [11]:
sumVal = fetch(randomVal2) + fetch(randomVal2) + fetch(randomVal2)

2×2 Array{Float64,2}:
 2.6091    2.87389 
 0.316333  0.797901

#### Run @spawn on all available processes with @parallel. Pretty great.

In [12]:
function parallel_calc_pi(samples)
    counter = @parallel (+) for i=1:samples
        x, y = rand(2)
        ifelse(x^2 + y^2 <= 1, 1, 0)
    end
    π = 4 * counter / samples
end

parallel_calc_pi (generic function with 1 method)

In [14]:
samples=1e8
@time parallel_calc_pi(samples)

  1.595947 seconds (539 allocations: 36.953 KiB)


3.14138676

In [17]:
# Clear workers on all hosts
rmprocs(workers())

Task (done) @0x00007fe727e479d0

In [16]:
# Add some more on another host
addprocs([("root@10.4.1.6:6666",2), ("root@10.4.1.4:6666",2)], tunnel=true)

ErrorException("type LocalProcess has no field r_stream")CapturedException(ErrorException("type LocalProcess has no field r_stream"), Any[((::Base.Distributed.##call#19#20)(::VersionNumber, ::WorkerConfig, ::Type{T

4-element Array{Int64,1}:
 5
 6
 7
 8

}ErrorException( where T, ::Int64, ::TCPSocket, ::TCPSocket, ::Base.Distributed.DefaultClusterManager) at cluster.jl:74, 1), ((::Core.#kw#Type)(::Array{Any,1}, ::Type{Base.Distributed.Worker}, ::Int64, ::TCPSocket, ::TCPSocket, ::Base.Distributed.DefaultClusterManager) at <missing>:0, 1), (handle_msg(::Base.Distributed.IdentifySocketMsg, ::Base.Distributed.MsgHeader, ::TCPSocket, ::TCPSocket, ::VersionNumber) at process_messages.jl:290, 1), (message_handler_loop(::TCPSocket, ::TCPSocket, ::Bool) at process_messages.jl:149, 1), (process_tcp_streams(::TCPSocket, ::TCPSocket, ::Bool) at process_messages.jl:118, 1), ((::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at event.jl:73, 1)])
Process(6) - Unknown remote, closing connection.
"AssertionError("wpid > 0")CapturedException(AssertionError("wpid > 0"), Any[(message_handler_loop(::TCPSocket, ::TCPSocket, ::Bool) at process_messages.jl:151, 1), (process_tcp_streams(::TCPSocket, ::TCPSocket, ::Bool) at process_messages.jl:118, 1),

#### --> Multiple hosts are not necessarly a benefit
* Access to memory
* Communication
* Sharing of data

In [None]:
anotherStupidMatrix = rand(100,100)
calcMyStupidMatric = @spawn inv(anotherStupidMatrix)

lastPointlessMatrixIPromise = @spawn inv(rand(100,100))
# fetch() and so on and so forth

#### What about threads and co-routines?

## On a single GPU

In [None]:
using CUDAnative, CUDAdrv

In [None]:
function kernel_dist(X::AbstractVector{Float32}, Y::AbstractVector{Float32}, out::AbstractVector{Float32})
    i = (blockIdx().x-1) * blockDim().x + threadIdx().x
    out[i] = (X[i]-0.5)^2 + (Y[i]-0.5)^2
    return nothing
end

In [None]:
samples = Int64(5e8)
a = rand(Float32, (samples))
b = rand(Float32, (samples));
a_cu = CuArray(a)
b_cu = CuArray(b)
c_cu = similar(a_cu);
n = length(a)

ctx = CuCurrentContext()
dev = device(ctx)
max_threads = attribute(dev, CUDAdrv.MAX_THREADS_PER_BLOCK)
threads = min(max_threads, n)
blocks = ceil(Int, n/threads)

In [None]:
@time @cuda (blocks, threads) kernel_dist(a_cu, b_cu, c_cu)
@time c = Array(c_cu)
@time destroy!(ctx)
@time pi_single = 4*count(x->x<0.25,c)/length(c)

## On GPU's located on multiple machines

In [18]:
addprocs([("root@10.4.1.6:6666",1), ("root@10.4.1.4:6666",1)], sshflags=`-i id_rsa`, tunnel=true);

ErrorException("type LocalProcess has no field r_stream")CapturedException(ErrorException("type LocalProcess has no field r_stream"), Any[((::Base.Distributed.##call#19#20)(::VersionNumber, ::WorkerConfig, ::Type{T} where T, ::Int64, ::TCPSocket, ::TCPSocket, ::Base.Distributed.DefaultClusterManager) at cluster.jl:74, 1), ((::Core.#kw#Type)(::Array{Any,1}, ::Type{Base.Distributed.Worker}, ::Int64, ::TCPSocket, ::TCPSocket, ::Base.Distributed.DefaultClusterManager) at <missing>:0, 1), (handle_msg(::Base.Distributed.IdentifySocketMsg, ::Base.Distributed.MsgHeader, ::TCPSocket, ::TCPSocket, ::VersionNumber) at process_messages.jl:290, 1), (message_handler_loop(::TCPSocket, ::TCPSocket, ::Bool) at process_messages.jl:149, 1), (process_tcp_streams(::TCPSocket, ::TCPSocket, ::Bool) at process_messages.jl:118, 1), ((::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at event.jl:73, 1)])
Process(10) - Unknown remote, closing connection.


In [19]:
nprocs()

3

In [20]:
@everywhere using CUDAnative, CUDAdrv

In [21]:
@everywhere function kernel_dist(X::AbstractVector{Float32}, Y::AbstractVector{Float32}, gpu_cu::AbstractVector{Float32})
    i = (blockIdx().x-1) * blockDim().x + threadIdx().x
    gpu_cu[i] = (X[i]-0.5)^2 + (Y[i]-0.5)^2
    return nothing
end

In [22]:
@everywhere function distmontegpu(samples)
    a = rand(Float32, (samples))
    b = rand(Float32, (samples));
    a_cu = CuArray(a)
    b_cu = CuArray(b)
    c_cu = similar(a_cu);
    n = length(a)
    ctx = CuCurrentContext()
    dev = device(ctx)
    max_threads = attribute(dev, CUDAdrv.MAX_THREADS_PER_BLOCK)
    threads = min(max_threads, n)
    blocks = ceil(Int, n/threads)
    
    @cuda (blocks, threads) kernel_dist(a_cu, b_cu, c_cu)
    c = Array(c_cu)
    destroy!(ctx)
    return c
end

In [23]:
workers()

2-element Array{Int64,1}:
  9
 10

In [24]:
samples = Int64(1e8)
n1 = @spawn distmontegpu(samples);
n2 = @spawn distmontegpu(samples);
"""
pi_double = 4*(count(x->x<0.25,remotecall_fetch(getindex, workers()[1], n1))+
                    count(x->x<0.25,remotecall_fetch(getindex, workers()[2], n2)))
                    /(length(workers())*samples)
"""

"pi_double = 4*(count(x->x<0.25,remotecall_fetch(getindex, workers()[1], n1))+\n                    count(x->x<0.25,remotecall_fetch(getindex, workers()[2], n2)))\n                    /(length(workers())*samples)\n"

In [25]:
pi_double = 4*(count(x->x<0.25,fetch(n1))+count(x->x<0.25,fetch(n2)))/(length(workers())*samples)

3.1417007

In [None]:
pi_double

In [None]:
samples = Int64(1e8)
r3 = remotecall(distmontegpu, 1, samples)
r1 = remotecall(distmontegpu, 2, samples)
r2 = remotecall(distmontegpu, 3, samples)

In [None]:
pi_double = 4*(count(x->x<0.25,remotecall_fetch(getindex, 2, r1))+
    count(x->x<0.25,remotecall_fetch(getindex, 3, r2))+
    count(x->x<0.25,remotecall_fetch(getindex, 1, r3)))/(3*samples)

* r1,r2,r3 continues to reside on each worker even after fetch()

In [None]:
samples = Int64(1e8)
pi_double = 4*(count(x->x<0.25,remotecall_fetch(distmontegpu, 2, samples))+
    count(x->x<0.25,remotecall_fetch(distmontegpu, 3, samples)))/(2*samples)

In [None]:
println(gpu1[1])
println(length(gpu1))

In [None]:
println(gpu2[1])
println(length(gpu2))

In [None]:
@printf "%.15f" abs(π - pi_single) 

In [None]:
@printf "%.15f" abs(π - pi_double)

# Parallel macro

#### @parallel - The go-to tool for handling small tasks

In [None]:
addprocs([("root@10.4.1.4:6666", 1)], tunnel=true)

In [None]:
sum = 0
tic()
for i in 1:200000000
    sum += i
end
toc()
println(sum)

In [None]:
tic()
sum = @parallel (+) for i = 1:200000000
    Int(i)
end
toc()
println(sum)

###### (daaaaaaaaaaamn!)