# Computación paralela

El paralelismo en Julia se hace por CPU, no GPU, y se basa en el paso de mensajes entre programas corriendo en varios procesadores con memoria compartida. Al principio hay que específicar cuantos workers se van a usar para esto

```
julia -p 2
```

Esto corre 2 workers desde linea de comandos. Cuando se corre desde Jupyter o cuando ya está dentro del prompt de Julia se agregan workers de esta manera (se recomienda que sea el mismo número de _cores_ del procesador):

In [1]:
# revisamos el número actual de workers
println(nprocs())
addprocs(1)

# revisamos el número workers después
println(nprocs())

1
2


Cada trabajador tiene un **id**, que resulta importante para la asignación de tareas manualmente como se mostrará más adelante

In [2]:
@everywhere println(myid())

1
	From worker 2:	2


La programación paralela en Julia está construida sobre dos primitivas:

  - __remotes references__: objeto que se puede utilizar desde cualquier proceso para referirse a un objeto almacenado en un proceso particular
  - __remotes calls__: solicitud de un proceso para llamar a una función determinada con ciertos argumentos en otro (posiblemente el mismo) proceso

Una remote call devuelve una remote reference, de manera que el programa puede seguir su funcionamiento mientras esta tarea se ejecuta en otra parte. Cuando es necesario tener el valor de un remote reference se usa la función `fetch()`.

Para realizar el llamado de un remote call sobre un worker específico se puede hacer de dos formas:

In [3]:
# el primer parámetro es el id del worker, luego la función y después los parámetros de la función
r = remotecall(2, rand, 2, 2)

# el primer parámetro es el id del worker, luego la función con sus parámetros
s = @spawnat 2 rand(2, 2)

# fetch evalúa los valores del remote reference
println(fetch(r))
println(fetch(s))

[0.2628392974833349 0.6763392065375571
 0.5590070164639327 0.11981752344831476]
[0.1113960568297554 0.9256475723916022
 0.7547629762940384 0.6221259684508706]


Una simplificación del `fetch(remotecall(...))` es la función `remotecall_fetch(...)`.

La sintáxis de la función `remotecall()` no resulta muy ventajosa, por lo que se usa la macro `@spawn`, la cual evalúa una expresión en vez de una función y elige automáticamente el worker.

In [4]:
r = @spawn rand(2, 2)
s = @spawn 1 .+ fetch(r)
println(fetch(s))

[1.1861445980897083 1.9734900290726816
 1.1496917709575176 1.8356032502787187]


## Disposición del código

Como se va a estar trabajando en diferentes workers, es posible que al declararse una función o al cargarse una librería, esta no sea reconocida por todos los workers. Para evitar este tipo de problemas se puede usar la macro `@everywere`, la cual obliga a un comando a ejecutarse en todos los workers.

In [5]:
@everywhere function rand2(dims...)
    return 2*rand(dims...)
end

@spawnat 1 rand2(2,2)
@spawnat 2 rand2(2,2)

RemoteRef{Channel{Any}}(2,1,12)

## Flujo de datos

El envío de mensajes y el flujo de datos consisten en los problemas más usuales para la computación paralela, no solo en terminos de dificultad si no además de escalabilidad y rendimiento.

Estas operaciones usualmente se realizan con `fetch()` y `@spawn`, ya que con la primera se solicita que esos datos se traigan al procedimiento actual y con la segunda se envían datos a otro worker.

Es necesario tener en consideración la necesidad y practicidad de cada evento, ya que por ejemplo el cuadrado de una matrix de $1000 \times 1000$ puede realizarse por los siguientes métodos:

In [6]:
# método 1
A = rand(1000,1000)
Bref = @spawn A^2
fetch(Bref)

# método 2
Bref = @spawn rand(1000,1000)^2
fetch(Bref)

1000x1000 Array{Float64,2}:
 258.868  258.271  256.779  257.312  …  257.695  240.166  245.439  251.106
 259.845  251.679  259.477  253.688     260.96   240.673  248.111  244.996
 256.409  243.846  252.933  244.023     248.149  231.805  237.562  239.073
 258.268  243.266  253.209  253.813     256.462  236.576  246.222  240.14 
 260.717  248.235  257.361  254.639     258.458  238.572  248.888  248.854
 256.337  252.397  256.399  253.071  …  254.811  241.595  243.108  245.644
 249.313  244.945  251.67   251.498     252.791  238.949  245.325  238.127
 257.267  254.378  255.947  253.28      255.813  242.881  245.028  244.87 
 264.441  258.821  258.553  256.847     262.939  244.619  251.19   250.175
 257.502  249.004  254.683  259.16      259.526  241.488  243.576  244.864
 259.418  253.357  258.61   258.221  …  259.496  244.603  248.219  251.456
 270.496  261.178  268.336  264.702     264.043  245.077  256.042  256.902
 255.541  242.653  251.646  245.391     247.668  233.28   238.78   242.6

La diferencia entre estos dos es clara en este caso, y es que la matriz `A` es enviada a otro worker en el método 1, lo cual significa que tiene un mayor envío de datos, lo cual implica una baja en el rendimiento, pero puede permitir en otros casos usar esa información para otras tareas.

## Parallel map y ciclos

Existen varias operaciones que pueden trabajar completamente de manera paralela (sin flujos de datos entre ellas), como por ejemplo una simulación de Monte Carlo, como se muestra a continuación:

In [7]:
# Cuenta caras en n lanzamientos aleatorios
@everywhere function count_heads(n)
    c::Int = 0
    for i=1:n
        c += rand(Bool)
    end
    c
end

a = @spawn count_heads(100000000)
b = @spawn count_heads(100000000)
println(fetch(a)+fetch(b))

100007704


La última operación suele ser conocida como reducción (_reduction_), la cual hace parte de un patrón de computación paralela o distribuida bastante usual (_map-reduce_). Esta operación suele ser de la forma $x = f(x,v[i])$, donde $x$ es el acumulador, $f()$ la función de reducción (función asociativa) y $v[]$ la lista de elementos a reducir.

Cabe notar que el código de arriba puede ser generalizado, e incluso no restringirlo a que sean solo 2 workers los que realicen estas tareas, esto se puede hacer de la siguiente manera:

In [8]:
nheads = @parallel (+) for i=1:200000000
  Int(rand(Bool))
end

100004937

En esta operación se ejecuta el ciclo for de manera paralela entre los workers disponibles y al terminar se ejecuta la función de reducción, la cual en este caso es (+).

Aunque la notación del for paralelo es similar al normal su uso es completamente diferente, ya que no ejecuta las instrucciones en un órden específico y la escritura en arreglos y matrices resulta complejo, ya que no se está manejando el mismo worker. Por ejemplo el siguiente código no hará lo esperado:

In [9]:
a = zeros(100000)
@parallel for i=1:100000
  a[i] = i
end
a

100000-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 ⋮  
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

El operador de reducción puede omitirse en caso de no necesitarse, por lo que retornará un arreglo de remote references. Preferiblemente cuando se desea aplicar una función sobre todos los elementos de un arreglo o matriz se recomienda el uso de `pmap()`:

In [10]:
M = Matrix{Float64}[rand(10,10) for i=1:10]
pmap(svd, M)

10-element Array{Any,1}:
 (
10x10 Array{Float64,2}:
 -0.245542  -0.361681    0.132146   …  -0.0771622   0.639107    0.0590786
 -0.285159  -0.0630745  -0.336472       0.040401    0.156018   -0.234977 
 -0.326592   0.163739    0.429688       0.0324591  -0.41473    -0.244779 
 -0.306996  -0.328721   -0.0173562     -0.245929   -0.0113104   0.496277 
 -0.409207   0.400704   -0.246665      -0.53768    -0.218209    0.297007 
 -0.326613  -0.185657    0.464774   …   0.427765   -0.21823     0.279808 
 -0.285028   0.606614    0.0519806      0.37485     0.465113    0.208987 
 -0.342775  -0.0940527   0.300458      -0.384008    0.159364   -0.548494 
 -0.329604  -0.377654   -0.4583         0.305187   -0.237897   -0.0227785
 -0.275652   0.125936   -0.32329        0.278715   -0.0388625  -0.351486 ,

[4.8790806501452915,1.6899872210780877,1.3160539526274126,1.0071916251474002,0.8019319849929022,0.6030619505962146,0.529824457084005,0.36667642174819076,0.22414829962137345,0.056747154527782985],
10x10 Arra

## Sincronización con remote references

### Task

El paralelismo usa por "debajo" _Tasks_ (corutinas), las cuales son elementos de control de flujo que permiten una manera sencilla de suspenderse y reactivarse. A continuación se muestra un ejemplo de esto:

In [11]:
# Se genera la lista de tasks
t = @task Any[ for x in [1,2,4] println(x) end ]

# No hay tasks?
println(istaskdone(t))

# Cual es la siguiente task?
println(current_task())

# Ejecutar task
println(consume(t))

false
Task (runnable) @0x00000001081a0160
1
2
4
Any[nothing]


### Shared Arrays

En un shared array se puede compartir un arreglo entre los diferentes workers. A continuación se muestra un ejemplo:

In [12]:
S = SharedArray(Int, (3,4), init = S -> S[Base.localindexes(S)] = myid(), pids = Int[])
println(S)
S[3,2] = 7
println(S)

[2 2 2 2
 2 2 2 2
 2 2 2 2]
[2 2 2 2
 2 2 2 2
 2 7 2 2]


### Distributed Arrays

Se diferencian de los __Sharded Arrays__ en que los sharded son compartidos por completo entre todos los workers, mientras que los distributed son divididos en pedazos y repartidos entre los workers, compartiendo únicamente la metadata de estos. La inicialización de estos arrays debe ser de esta forma:

In [13]:
# Desde la versión 0.4 toca importar el paquete
# Pkg.add("DistributedArrays")
@everywhere using DistributedArrays

dzeros(100,100,10) # matriz de ceros
dones(100,100,10) # matriz de unos
drand(100,100,10) # matriz de números aleatorios
drandn(100,100,10) # matriz de números aleatorios (fuera del rango 0-1)
dfill(8,100,100,10) # el primer parámetro es el valor a llenar



100x100x10 DistributedArrays.DArray{Int64,3,Array{Int64,3}}:
[:, :, 1] =
 8  8  8  8  8  8  8  8  8  8  8  8  8  …  8  8  8  8  8  8  8  8  8  8  8  8
 8  8  8  8  8  8  8  8  8  8  8  8  8     8  8  8  8  8  8  8  8  8  8  8  8
 8  8  8  8  8  8  8  8  8  8  8  8  8     8  8  8  8  8  8  8  8  8  8  8  8
 8  8  8  8  8  8  8  8  8  8  8  8  8     8  8  8  8  8  8  8  8  8  8  8  8
 8  8  8  8  8  8  8  8  8  8  8  8  8     8  8  8  8  8  8  8  8  8  8  8  8
 8  8  8  8  8  8  8  8  8  8  8  8  8  …  8  8  8  8  8  8  8  8  8  8  8  8
 8  8  8  8  8  8  8  8  8  8  8  8  8     8  8  8  8  8  8  8  8  8  8  8  8
 8  8  8  8  8  8  8  8  8  8  8  8  8     8  8  8  8  8  8  8  8  8  8  8  8
 8  8  8  8  8  8  8  8  8  8  8  8  8     8  8  8  8  8  8  8  8  8  8  8  8
 8  8  8  8  8  8  8  8  8  8  8  8  8     8  8  8  8  8  8  8  8  8  8  8  8
 8  8  8  8  8  8  8  8  8  8  8  8  8  …  8  8  8  8  8  8  8  8  8  8  8  8
 8  8  8  8  8  8  8  8  8  8  8  8  8     8  8  8  8  8  8  8  8  8 

El constructor primitivo tiene la forma `DArray(init, dims[, procs, dist])`, donde `init` es una función que acepta una tupla de rango de indices, `dims` son las dimensiones, `procs` especifica un vector de id a usar y `dist` es un vector de enteros que especifica entre cuantas dimensiones realizar la división para dada dimensión.

Los métodos para estos arreglos son:
  * `distribute(a::Array)`: convierte un arreglo local a un arreglo distribuido
  * `localpart(a::DArray)`: obtiene la porción almacenada localmente del `DArray`
  * `localindexes(a::DArray)`: devuelve una tupla con los rangos de los indices almacenados localmente
  * `convert(Array, a::DArray)`: proceso contrario a `distribute`

In [14]:
A = fill(1.1, (100,100))
println(sum(A))
DA = distribute(A)
println(sum(DA))
println(sum(A) == sum(DA))

10999.999999999969
10999.999999999969
true


Dado el hecho de que el manejo del garbage collector es más compleja en este tipo de tareas, se recomienda cerrar estos arreglos al final del código de la siguiente forma:

In [15]:
darray_closeall()

LoadError: LoadError: UndefVarError: darray_closeall not defined
while loading In[15], in expression starting on line 1