In [1]:
println("Hello World")

Hello World


# Multi-Core or Distributed Processing

Libreria necesaria para programación paralela

In [4]:
using Distributed

Se agregan 2 hilos (workers) necesarios para la ejecucion de llamadas remotas

In [10]:
addprocs(2)

2-element Array{Int32,1}:
 4
 5

## Remote call

En el primer argumento se le pasa la función que se desea ejecutar, en el segundo se le pasa el número del hilo en el que se desea ejecutar y los argumentos restantes son los argumentos de la funcion que se llamó

In [12]:
r = remotecall(rand, 2, 3, 4)
print(r)
2+2

Future(2, 1, 11, nothing)

4

En el hilo 2 se esta llamando la funcion rand(3,4), la cual genera una matriz de 3x4 con números aleatorios, esto retorna un "futuro" que es un "placeholder" para un computo con tiempo y estado de terminacion desconocidos.

Las llamadas remotas retornan el "Futuro" de inmediato y el hilo actual ejecuta la siguiente instruccion, mientras que la funcion dentro de la llamada remota se ejecuta en un hilo aparte.

## Parallel Map and Loops

In [2]:
using Distributed
addprocs(3)

3-element Array{Int32,1}:
 2
 3
 4

We can use @spawnat to flip coins on two processes. First, write the following function in count_heads:

In [11]:
@everywhere function count_heads(n)
    c::Int = 0
    for i = 1:n
        c += rand(Bool)
    end
    c
end

In [96]:
function sum()
    a = @spawnat 1 count_heads(100000000)
    b = @spawnat 2 count_heads(100000000)
    fetch(a)+fetch(b)
end
@time sum()

  1.419773 seconds (458.49 k allocations: 15.699 MiB, 2.16% gc time)


100000456

In [97]:
@time a = count_heads(200000000)

  2.032305 seconds (5 allocations: 156 bytes)


100015707

We used two explicit @spawnat statements, which limits the parallelism to two processes. To run on any number of processes, we can use a parallel for loop, running in distributed memory, which can be written in Julia using @distributed like this:

In [99]:
@time nheads = @distributed (+) for i = 1:200000000
    Int(rand(Bool))
end

  0.867010 seconds (57.53 k allocations: 1.826 MiB)


100003617

This construct implements the pattern of assigning iterations to multiple processes, and combining them with a specified reduction (in this case (+)). The result of each iteration is taken as the value of the last expression inside the loop. The whole parallel loop expression itself evaluates to the final answer.

Note that although parallel for loops look like serial for loops, their behavior is dramatically different. In particular, the iterations do not happen in a specified order, and writes to variables or arrays will not be globally visible since iterations run on different processes. Any variables used inside the parallel loop will be copied and broadcast to each process.

For example, the following code will not work as intended:

In [109]:
a = zeros(10)
@distributed for i = 1:10
    a[i] = i
end
a

10-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

This code will not initialize all of a, since each process will have a separate copy of it. Parallel for loops like these must be avoided.

Fortunately, Shared Arrays can be used to get around this limitation:

In [110]:
using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end
a

10-element SharedArray{Float64,1}:
  1.0
  2.0
  3.0
  4.0
  5.0
  6.0
  7.0
  8.0
  9.0
 10.0

SharedArray will be explained below

Using "outside" variables in parallel loops is perfectly reasonable if the variables are read-only:

In [120]:
a = randn(10)
@distributed for i = 1:10
    println(a[i])
end

Task (runnable) @0xec168e20

      From worker 4:	0.04822796371860417
      From worker 4:	-2.134310565996247
      From worker 4:	0.15369377912263554
      From worker 2:	0.2304838768074243
      From worker 2:	-0.46204039812900316
      From worker 2:	-0.21495007081177828
      From worker 2:	0.08370695408826308
      From worker 3:	0.5652795962176056
      From worker 3:	-0.4740936405924344
      From worker 3:	-0.04158722347476902


## Channels and RemoteChannels

- A Channel is local to a process. Worker 2 cannot directly refer to a Channel on worker 3 and vice-versa. A RemoteChannel, however, can put and take values across workers.


- A RemoteChannel can be thought of as a handle to a Channel.


- The process id, pid, associated with a RemoteChannel identifies the process where the backing store, i.e., the backing Channel exists.


- Any process with a reference to a RemoteChannel can put and take items from the channel. Data is automatically sent to (or retrieved from) the process a RemoteChannel is associated with.


- Serializing a Channel also serializes any data present in the channel. Deserializing it therefore effectively makes a copy of the original object.


- On the other hand, serializing a RemoteChannel only involves the serialization of an identifier that identifies the location and instance of Channel referred to by the handle. A deserialized RemoteChannel object (on any worker), therefore also points to the same backing store as the original.

The channels example from above can be modified for interprocess communication.

In [1]:
using Distributed
addprocs(4); # add worker processes

In [2]:
const jobs = RemoteChannel(()->Channel{Int}(32));
const results = RemoteChannel(()->Channel{Tuple}(32));

In [3]:
@everywhere function do_work(jobs, results) # define work function everywhere
   while true
       job_id = take!(jobs)
       exec_time = rand()
       sleep(exec_time) # simulates elapsed time doing actual work
       put!(results, (job_id, exec_time, myid()))
   end
end

In [4]:
function make_jobs(n)
   for i in 1:n
       put!(jobs, i)
   end
end;

In [5]:
n = 12;
@async make_jobs(n); # feed the jobs channel with "n" jobs

In [6]:
for p in workers() # start tasks on the workers to process requests in parallel
   remote_do(do_work, p, jobs, results)
end

In [7]:
@elapsed while n > 0 # print out results
   job_id, exec_time, where = take!(results)
   println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
   global n = n - 1
end

1 finished in 0.03 seconds on worker 5
3 finished in 0.23 seconds on worker 4
2 finished in 0.65 seconds on worker 2
4 finished in 0.75 seconds on worker 3
7 finished in 0.26 seconds on worker 2
5 finished in 0.64 seconds on worker 5
6 finished in 0.73 seconds on worker 4
8 finished in 0.65 seconds on worker 3
10 finished in 0.34 seconds on worker 5
12 finished in 0.2 seconds on worker 3
9 finished in 0.85 seconds on worker 2
11 finished in 0.6 seconds on worker 4


3.920312231

## Shared array

Shared Arrays use system shared memory to map the same array across many processes. In a SharedArray each "participating" process has access to the entire array. A SharedArray is a good choice when you want to have a large amount of data jointly accessible to two or more processes on the same machine.

SharedArray indexing (assignment and accessing values) works just as with regular arrays, and is efficient because the underlying memory is available to the local process. 

The constructor for a shared array is of the form:

In [2]:
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])

UndefVarError: UndefVarError: SharedArray not defined

Which creates an N-dimensional shared array of a bits type T and size dims across the processes specified by pids.
A shared array is accessible only from those participating workers specified by the pids named argument (and the creating process too, if it is on the same host).

If an init function, of signature initfn(S::SharedArray), is specified, it is called on all the participating workers. You can specify that each worker runs the init function on a distinct portion of the array, thereby parallelizing initialization.

In [2]:
using Distributed
addprocs(3)

3-element Array{Int32,1}:
 2
 3
 4

In [3]:
@everywhere using SharedArrays

In [4]:
S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))

3×4 SharedArray{Int32,2}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

In [26]:
S[3,2] = 7
S

3×4 SharedArray{Int32,2}:
 2  2  3  4
 2  3  3  4
 2  7  4  4

Since all processes have access to the underlying data, you do have to be careful not to set up conflicts. For example:

In [199]:
@sync begin
    for p in procs(S)
        @async begin
            remotecall_wait(fill!, p, S, p)  #Perform wait(remotecall(...))
        end
    end
end
S

3×4 SharedArray{Int32,2}:
 3  3  4  2
 3  4  4  2
 3  4  2  2

In [280]:
@sync begin
    for p in procs(S)
        @async begin
            remotecall_wait(fill!, p, S, p)  #Perform wait(remotecall(...))
        end
    end
end
S

3×4 SharedArray{Int32,2}:
 4  4  4  3
 4  4  4  3
 4  4  3  3

The parallel loops example from above can be better understood now

In [121]:
using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end
a

10-element SharedArray{Float64,1}:
  1.0
  2.0
  3.0
  4.0
  5.0
  6.0
  7.0
  8.0
  9.0
 10.0

# TEST

In [11]:
t =  @spawnat 3 localindices(S)
fetch(t)

5:8

In [13]:
@spawnat 3 S[localindices(S)] = @spawnat 3 repeat([myid()], length(localindices(S)))
fetch(a)

4-element Array{Int32,1}:
 3
 3
 3
 3

In [18]:
l = @spawnat 3 S
fetch(l)

3×4 SharedArray{Int32,2}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

In [20]:
S

3×4 SharedArray{Int32,2}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

In [29]:
S[6]

3