# Running Native Julia Parallelism
## The Basics
### Jordan Jalving

### University of Wisconsin-Madison


## Starting a Parallel Session
There are two ways to start using Parallel processing in Julia.  
1. Start your Julia session with `~$ julia -p <n_workers>`
2. Run `addprocs(<n_workers>)` in a session

Since we are using an IJulia notebook, we will use the second option here to add our worker processes


In [2]:
#Add processors to your Julia session
addprocs(3) #Here, we add 3 workers
println("Processors in session = ",nprocs())

In [3]:
#gets number of worker processes.  The process running Julia is not a workers
n_workers = nworkers() 
println("The number of worker processors is: $n_workers")

In [4]:
#Get array of worker processors
worker_ids = workers()
println("The worker ids are: ", worker_ids)

Processors in session = 4
The number of worker processors is: 3


 --- 

## Distributed Parallelism: Remote Calls and Futures

Julia uses the concept of remote calls and futures to manage processing and communication. Think of it as if
each processor has its own memory domain and can send and request data from other processors.

We will showcase the basic functions using a simple 3x3 matrix

In [5]:
#This is how to make a random 3x3 matrix in Julia.  This creates the matrix on the local process.
rand(3,3)

3×3 Array{Float64,2}:
 0.479701  0.769008  0.537726
 0.883462  0.738838  0.185219
 0.679535  0.858712  0.312926

The worker ids are: [2, 3, 4]


### Use `remotecall` to evaluate a function on a given process
`remotecall(<function>,<process_id>,[arguments])` is the low-level function to request work to be done on a given process.
For instance, we can ask processor 2 to create a random 3x3 matrix.

In [6]:
#Call the rand function on worker 2 with arguments (3,3)
#This creates a Future r on the local process
r = remotecall(rand, 2, 3, 3)

Future(2, 1, 8, Nullable{Any}())

r is a Future (i.e. a remote reference).  Processor 1 (our julia session) has a reference to the result which will be available on processor 2.

Use `fetch(r)` to get the value processor 2 calculated.  **NOTE:  This will move the data from processor 2 to processor 1 (overhead).**

In [7]:
#Get the value of r.  The local process will wait until the 
#value is ready (i.e. fetch is blocking)
fetch(r)

3×3 Array{Float64,2}:
 0.125253  0.659167  0.338948 
 0.730049  0.173949  0.268018 
 0.264575  0.35669   0.0147856

--- 

### Use `@spawnat` to evaluate an expression on a given process
@spawnat is a macro, which means we can pass any valid expression as an argument.  It is the macro equivalent of `remotecall(<function>,<process_id>,[arguments])` and has the form `@spawnat <process_id> <expression>`
___

In [8]:
#run rand(3,3) command on processor 3
s1 = @spawnat 3 rand(3,3)

Future(3, 1, 10, Nullable{Any}())

In [9]:
#fetch the value the same way
fetch(s1)

3×3 Array{Float64,2}:
 0.0850143  0.515755  0.838141
 0.45166    0.722862  0.52113 
 0.896882   0.126582  0.770613

#### NOTE: `fetch(<Remote Reference>)` can be used to move data from worker to worker

In [10]:
#Call rand(3,3) on processor 2
r = remotecall(rand, 2, 3, 3) 

#Spawn the operation `1 + fetch(r)` on process 3 by passing the remote reference r
s2 = @spawnat 3 1 + fetch(r) 

Future(3, 1, 13, Nullable{Any}())

In [11]:
#fetch the value from processor 2
println(fetch(r))
#Fetch the value from processor 3
println(fetch(s2))

--- 

### Use `remotecall_fetch`  to obtain a remotely-computed value immediately

#### NOTE: This is equivalent to `fetch(remotecall())`, but it's more efficient.  This function is typically used inside loops to get values 

In [12]:
#run rand on process 2 with arguments (3,3).  Fetch the result immediately.
remotecall_fetch(rand, 2, 3, 3)

3×3 Array{Float64,2}:
 0.942117  0.0737329  0.195308 
 0.751556  0.257781   0.0448505
 0.648951  0.718478   0.648209 

In [13]:
#remotecall_fetch is the same as fetch(remotecall()), but usually more efficient
fetch(remotecall(rand,2,3,3))

3×3 Array{Float64,2}:
 0.893338  0.766448  0.389638 
 0.798965  0.813964  0.0714478
 0.48587   0.461909  0.433222 

--- 

### Use `@spawn` to make things easier.
`@spawn` is like `remotecall` and `@spawnat`, but Julia's task manager picks the processor to use

In [14]:
#Spawn the task rand(3,3).  Let Julia pick where to run it
s = @spawn rand(3,3)

[0.745895 0.533254 0.448283; 0.669988 0.842896 0.166619; 0.481021 0.356118 0.650462]
[1.74589 1.53325 1.44828; 1.66999 1.8429 1.16662; 1.48102 1.35612 1.65046]


Future(3, 1, 19, Nullable{Any}())

In [15]:
#Fetch the result the same way
fetch(s)

3×3 Array{Float64,2}:
 0.0741628  0.694528  0.115329
 0.147103   0.178699  0.657085
 0.869164   0.950188  0.919289

---
---

## Asynchronous Parallelism: Using @sync and @async to manage tasks
So far, we showed how to use `remotecall`,`@spawnat`, and `@spawn` to dispatch work to other processes.  In any useful parallel application however, we would want to manage communication between our workers.  This can be done using the `@async` and `@sync` macros.

### Use @sync and @async to manage asynchronous computing
`@async <expression>`: Create a task on the master process that runs the expression.  Julia will continue without waiting for `@async` to finish.

`@sync <expression>`: Wait for enclosed uses of `@async`,`@spawn`,`@spawnat` to finish.  Typically these to macros get used in the form:

```julia
@sync begin 
    for i = 1:n_workers
        @async begin #start a @async task on the Julia process
            #...
            #dispatch work to worker i (e.g. using remotecall_fetch())
            #...
        end
    end  #end for
end  #wait for all the @async tasks to finish
 ```

### @sync @async?  What's the difference?
These two macros often get confused.  Let's look at a couple examples to make their use more clear using 
some simple `sleep()` calls

In [16]:
#Sleep for 2 seconds
@time sleep(2)

#### Asynchrounous `sleep()`.  Remember, Julia will not wait for `@async` to complete

In [36]:
@time @async sleep(2)

Task (runnable) @0x00007f89bad63490

  0.000087 seconds (20 allocations: 1.875 KiB)


In [35]:
#@sync will wait for the enclosed use of @async to finish
@time @sync @async sleep(2)

Task (done) @0x00007f89bad4c010

  2.007089 seconds (235 allocations: 15.078 KiB)


### The wrong way to schedule workers.  How long will this code take?  Why?

In [37]:
@time begin
    for (idx, pid) in enumerate(workers())
        println("Sending work to $pid")
        remotecall_fetch(sleep,pid, 2)
    end
end

Sending work to 2
Sending work to 3
Sending work to 4
  6.012692 seconds (2.49 k allocations: 139.984 KiB)


### The correct way to schedule.  Use @sync and @async

In [38]:
@time begin
    @sync for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        @async remotecall_fetch(sleep,pid, 2)
    end
end

sending work to 2
sending work to 3
sending work to 4
  2.008188 seconds (1.32 k allocations: 83.664 KiB)


### What if we forget the @sync?

In [39]:
@time begin
    for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        @async remotecall_fetch(sleep,pid, 2)
    end
end

sending work to 2
sending work to 3
sending work to 4
  0.003885 seconds (432 allocations: 30.336 KiB)


--- 
---

## A more interesting example: `pmap`
**Now that we ~~have mastered~~ hopefully understand the basics of Julia's parallel functions, pmap may seem less mysterious.  Here, we have a slightly modified version of the pmap function.**

In [22]:
function my_pmap(f, lst)  #apply the function f to arguments in lst
    np = nprocs()         # determine the number of processes available
    n = length(lst)       #lst is a list of n arguments.
    results = Vector{Any}(n)
    i = 1
    # function to produce the next work item from the queue.
    # in this case it's just an index.
    nextidx() = (idx=i; i+=1; idx)
    @sync begin
        #Create a feeder task (using @async) for each worker
        for p = workers()
            @async begin  
                while true #feeder task runs constantly
                    idx = nextidx()
                    if idx > n
                        break
                    end
                    println("Sent argument $idx to worker $p")
                    results[idx] = remotecall_fetch(f, p, lst[idx])
                end
            end
        end
    end
    results
end

my_pmap (generic function with 1 method)

sending work to 3
sending work to 4
  0.005282 seconds (1.76 k allocations: 112.695 KiB)


#### Let's look at what happens when we want to do singular value decomposition on a list of matrices

In [23]:
@time begin 
    M = [rand(1000,1000) for _ = 1:20];
    my_pmap(svd,M);
end;

Sent argument 1 to worker 2
Sent argument 2 to worker 3
Sent argument 3 to worker 4
Sent argument 4 to worker 2
Sent argument 5 to worker 4
Sent argument 6 to worker 3
Sent argument 7 to worker 2
Sent argument 8 to worker 4
Sent argument 9 to worker 3
Sent argument 10 to worker 2
Sent argument 11 to worker 4
Sent argument 12 to worker 3
Sent argument 13 to worker 2
Sent argument 14 to worker 4
Sent argument 15 to worker 2
Sent argument 16 to worker 3
Sent argument 17 to worker 4
Sent argument 18 to worker 2
Sent argument 19 to worker 3
Sent argument 20 to worker 4


### How does the serial version compare?

In [24]:
@time begin 
    M = [rand(1000,1000) for _ = 1:20]
    map(svd,M)
end;

 20.367115 seconds (77.66 k allocations: 488.457 MiB, 2.09% gc time)


### Use `@everywhere` do define data on every process

In [25]:
#calculate the SVD of a random square matrix
@everywhere function calc_svd(i::Int)
    M = rand(i,i)
    return svd(M)
end

 25.661011 seconds (122.76 k allocations: 1.201 GiB, 2.16% gc time)


In [26]:
(U,E,V) = calc_svd(1000)


([-0.0317059 0.00404549 … -0.0342485 0.0111302; -0.0313829 -0.00418642 … -0.000998294 0.0451431; … ; -0.0312815 -0.0357524 … 0.00237379 -0.0343071; -0.0316478 0.0317083 … -0.0127114 0.0272311], [499.958, 18.1021, 17.9408, 17.8671, 17.8232, 17.7472, 17.7035, 17.6379, 17.5585, 17.5489  …  0.123851, 0.116806, 0.101283, 0.0936706, 0.0742818, 0.0563122, 0.0443592, 0.0282945, 0.0188325, 3.02856e-5], [-0.0320982 0.00691944 … 0.026676 0.0398728; -0.0311627 -0.00758673 … -0.0115975 0.0280118; … ; -0.0315299 0.0511513 … 0.0213456 -0.00964006; -0.0313196 -0.0353747 … 0.011678 0.0262191])

**Here, we can use a custom function to tell Julia to create the matrices on the workers instead of the master process.  However, since SVD is more computationally intense, this is unlikely to produce much speedup**

In [27]:
@time my_pmap(calc_svd,[1000 for _ = 1:20]);

Sent argument 1 to worker 2
Sent argument 2 to worker 3
Sent argument 3 to worker 4
Sent argument 4 to worker 2
Sent argument 5 to worker 3
Sent argument 6 to worker 4
Sent argument 7 to worker 4
Sent argument 8 to worker 2
Sent argument 9 to worker 3
Sent argument 10 to worker 3
Sent argument 11 to worker 4
Sent argument 12 to worker 2
Sent argument 13 to worker 3
Sent argument 14 to worker 2
Sent argument 15 to worker 4
Sent argument 16 to worker 2
Sent argument 17 to worker 3
Sent argument 18 to worker 4
Sent argument 19 to worker 2
Sent argument 20 to worker 4


---
---

## Another example: Calculating Pi using Monte Carlo Sampling

![](MonteCarlo.png)

**We can numerically estimate $\pi$ by randomly generating points inside a quadrantand counting how many land inside the unit circle**

$A_{square} = 1$

$A_{circle} = \frac{\pi}{4}$

$\pi = 4 * \frac{A_{circle}}{A_{square}}$

For any uniformly distributed point with uniformly distributed random coordinates $(x,y) \sim U(0,1)$, a point is in the unit circle if 
$x^2 + y^2 \le 1$

$x \sim U(0,1)$

$y \sim U(0,1)$

$x^2 + y^2 \le 1$

We can then estimate pi as the ratio of points inside the circle over total points generated:

$ \pi \approx 4 * \frac{\text{N points inside circle}}{\text{N total}}$

### A reasonable serial implementation for Julia
#### Here, we loop through 1 billion points and check if each is in the unit circle

In [28]:
tic()
function in_circle(numPoints)
    N_inside = 0           #initially, no points in circle
    for i = 1:numPoints
        x = rand()
        y = rand()
        x^2 + y^2 < 1 ? N_inside += 1 : N_inside += 0
    end
    return N_inside
end
numPoints = 1E9
pi_approx = 4 * in_circle(numPoints) / numPoints
toc()
println("approximation = ",pi_approx)

 19.086003 seconds (38.13 k allocations: 307.012 MiB)


## Exercise: Write a parallel code to estimate $\pi$

In [29]:
#Modify this serial code
function in_circle(numPoints)
    N_inside = 0
    for i = 1:numPoints
        x = rand()
        y = rand()
        x^2 + y^2 < 1 ? N_inside += 1 : N_inside += 0
    end
    return N_inside
end
numPoints = 1E9
pi_approx = 4 * in_circle(numPoints) / numPoints

#One hint: This can be done with partitions
#e.g. partitions = [convert(Int64,round(numPoints / 3)) for i = 1:3]

3.141629144

elapsed time: 8.978742288 seconds
approximation = 3.141486208


---
---
---
---
---

## One Solution: Partitioned pmap implementation
#### with pmap, we can partition into 3 subsets and have our workers run their own loops 

In [30]:
tic()
@everywhere function in_circle(numPoints)
    N_inside = 0
    for i = 1:numPoints
        x = rand()
        y = rand()
        x^2 + y^2 < 1? N_inside += 1 : N_inside += 0
    end
    return N_inside
end

numPoints = 1E9
partitions = [convert(Int64,round(numPoints / 3)) for i = 1:3]
N_inside = pmap(in_circle, partitions)
pi_approx = 4*sum(N_inside) / numPoints
toc()
println(pi_approx)

---

## Another solution: `@parallel for` ( This was replaced with `@distributed for` in Julia v1.0)
#### NOTE: `@parallel (<reduction>) for` allocates workers up front

In [31]:
tic()
numPoints = 1E9
inside = @parallel (+) for i = 1:numPoints
    x = rand()
    y = rand()
    Int(x^2 + y^2 < 1)
end
toc()
println(4 * inside / numPoints)

elapsed time: 5.4317022 seconds
3.141593004


## Be careful with `@parallel` when modifying variables

In [32]:
a = zeros(10)
@parallel for i = 1:10
    a[i] = i
end;
println(a)

elapsed time: 3.810343981 seconds
3.141624692


**NOTE: What happened here is that each process got its own copy of a to do its processing.  As a result, the vector a on the master process did not change.**

## Use a `SharedArray` to share values between all processes

In [34]:
a = SharedArray{Float64}(10)
result = @sync @parallel for i = 1:10
    a[i] = i
end;
println(a)

[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
