# <center><span style="color:red">Distributed Computing with Julia</span></center>

---

# Overview of Distributed Programming

### Sequential execution

* Our minds are single-core
 * pseudo-parallelism tricks us
 * e.g., round-robin scheduling algorithm

### Memory

* we only know what is stored at our memory (at least we hope so)
* we do not precisely know what is inside of others' memory

### Cooperative tasks $\rightarrow$ Communication

* suppose that we need to do a cooperative work together
* E.g.: writing a paper
* for that, we need to share data $\rightarrow$ we need to communicate

### Distributed communication

* **message passing**
 * the most basic communication primitive of computer systems
 * mimcs how human beings have been communicating to each other
 * hard to manage the data since they are spread among processors
 * Approaches: MPI, RPC, RMI, HTTP, etc.
* **shared memory**
 * requires a higher-level programming support (indirect communication)
 * easier to communicate (to code)
 * might creates bottlnecks (the shared memory)
 * Indirect Communication: when we need loosely coupled (time and space)
      * shared memory (Distributed File System like HDFS, etc.)
      * group communication
      * pub-subscribe systems (event-based)
      * message queues
      * produce-consume
      * ...
  
### Example

* Message passing

E.g. me $\rightarrow$ audience = message passing (broadcast, multicast, point-to-point)

* Shared memory

E.g. board (presentation) $\rightarrow$ audience = shared memory, e.g., $f(x)=2x$ 


# Master/Worker programming pattern

<img src="figures/master-worker.svg" width="425" height="300" alt="Master/Worker"></a>


Julia relies on the Master/Worker paradigm. While the master process controls the distributed computation, the **workers performs all the computation**.


* Each process has an associated identifier
    * **Master** process ID (PID) = 1
    * **Workers** process IDs (PID) >= 2 
* Only workers execute distributed calls
    * Except in a single-process environment
        * Process 1 is both master and worker
        * I.e., Julia interactive prompt

## Managing distributed processes

### Getting the current processes (master and workers)

* [`nprocs()`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.nprocs)
* [`procs()`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.procs-Tuple{SharedArray})
* [`nworkers()`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.nworkers)
* [`workers()`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.workers)

### Adding workers

* [`addprocs(...)`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.addprocs):

### Removing workers

* 

Let's use [`nprocs()`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.nprocs) to know the number of process in use:

In [3]:
procs()

1

We can either add more processes via **SSH** or add more **local processors** to take advantage of multi-core computers with [addprocs(...)](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.addprocs):

In [4]:
addprocs(2)

2-element Array{Int64,1}:
 2
 3

Then list again to see the just-added processes:

In [5]:
nprocs()

3

You can get the list of processes with [procs()](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.procs):

In [6]:
procs()

3-element Array{Int64,1}:
 1
 2
 3

On the other hand, you can list only the [workers()](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.workers):

In [11]:
workers()

2-element Array{Int64,1}:
 3
 4

We can also iterate over the `workers()` array:

In [12]:
for pid in workers()
    println("Listing worker #", pid)
end

Listing worker #3
Listing worker #4


Now we will remove both workers (the processes we previously added) with [`rmprocs(pids...)`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.rmprocs):

In [7]:
rmprocs(3,2)

Task (done) @0x000000011a677850

In [8]:
procs()

1-element Array{Int64,1}:
 1

# <span style="color:green">Exercise</span>


<span style="color:green">Add 2 workers and then remove them assuming that you don't know their IDs.</span>

In [None]:
#TODO

# [Clusters](http://docs.julialang.org/en/release-0.3/manual/parallel-computing/?highlight=cluster#clustermanagers)


## Basics

* Julia processes run on Julia [Clusters](http://docs.julialang.org/en/release-0.3/manual/parallel-computing/?highlight=cluster#clustermanagers)
    * **Local** cluster
        * Leverage local multi-cores 
    * **Distributed** cluster
        * On top of SSH/TCP/IP connections
* Clusters can be customized

* Clusters can be created as Julia starts. For instance, **to create a 2-core local cluster**:

```
$ julia -p 2
```

* A **distributed SSH cluster** can be created as follows:

```
julia --machinefile HOSTS_FILE common_code.jl
```

* Julia will start worker processes remotely through a password-less SSH login on the computers listed at the HOSTS_FILE file. In addition, it will deploy the *common_code.jl* file on all computers.

* It is important to remark that these computers can be any computer reachable by password-less SSH logins. For example, LAN computers, Amazon EC2 instances, [Docker](http://docker.com) containers, and do forth.

## Advanced

* See [Cluster Manager Interface](https://docs.julialang.org/en/stable/stdlib/parallel/#Cluster-Manager-Interface-1)
    

# Cloud Computing support

### [JuliaRun](https://juliacomputing.com/products/juliarun.html)

* Propriertary and payed solution from [Julia Computing](https://juliacomputing.com)
* JuliaRun provides batch and interactive deployment in the Cloud


### [Infra.jl]()

* **An automatic deployment support for processing remote sensing data in the Cloud**
* Paper presented at [IGARSS 2018 - Invited Special Session: Earth Data for Global Scale Applications](https://www.igarss2018.org/Papers/PublicSessionIndex3.asp?Sessionid=1310)
 * Actually, was presented today :-) Tuesday, July 24, 09:10 - 09:30 (Valencia, Spain timezone)



<!--
# Cloud support

## Virtual machines (VMs)

* [`Amazon Web Services (AWS)`](https://github.com/amitmurthy/AWS.jl)
* [`OpenStack.jl`](https://github.com/Keno/OpenStack.jl/blob/master/src/OpenStack.jl)
    * Uses [OpenStack Restful API](http://developer.openstack.org/api-ref-compute-v2.html)

## Containers

* [Infra.jl](https://github.com/gsd-ufal/CloudArray.jl/blob/master/src/Infra.jl)
    * Set of **management functions for managing Julia workers on Docker**
* [Docker.jl](https://github.com/Keno/Docker.jl)
    * Wrapper to [Docker HTTP API](https://docs.docker.com/reference/api/docker_remote_api/)

## Higher-level support 
-->

# [<span style="color:red">Julia programming support for distributed communication</span>](https://docs.julialang.org/en/latest/manual/parallel-computing/)

## Overview

Let's read the first paragraphs of the [Julia Official Documentation on Parallel Computing](https://docs.julialang.org/en/latest/manual/parallel-computing/)

* "Julia’s implementation of message passing is **different from other environments such as MPI**. 
* **Communication in Julia is generally “one-sided”**
    * The programmer needs **to explicitly manage only one process in a two-process operation**. 
    * These operations typically **do not look like “message send” and “message receive”** but rather resemble **higher-level operations like calls to user functions**.

* Parallel programming in Julia is built on two primitives: **remote references** and **remote calls**. 
    * A **remote reference** is an object that can be used from any process to **refer to an object stored on a particular process.**
    * A **remote call** is a **request by one process to call a certain function** on certain arguments on another (possibly the same) process. 
        * A remote call **returns a [`Future`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.Future)** to its result. 
        * Remote calls return immediately (**Asynchronous**); the process that made the call proceeds to its next operation while the remote call happens somewhere else. 
        * You can wait for a remote call to finish by calling [`wait`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.wait) on the returned Future
        * You can obtain the full value of the result using [`fetch()`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.fetch-Tuple{Channel}).

* [...] multiple processes can co-ordinate their processing by referencing the same remote Channel.on its remote reference (**Synchronous**).
 * You can obtain the full value of the result using **fetch**. 
 * You can store a value to a remote reference using **put**."

#### In other words...

* It is **not a send-receive paradigm**
    * <span style="color:red">There is no receive: it assumes that communication is managed by the calling process</span>
    * <span style="color:red">The calling process calls remote functions or expressions</span>
    
<img src="figures/master-worker.svg" width="425" height="300" alt="Master/Worker"></a>
 
#### Further supports

* [**Macros**](https://docs.julialang.org/en/stable/manual/parallel-computing/) to make communication easier
* **Tasks**
    * Coroutines, green threads that run on a single actual thread
    * **Tasks can be combined with remote calls for parallel programming**
    * [Julia multithreading](https://docs.julialang.org/en/latest/base/multi-threading/): on-going work (experimental)   
    
## Programming model


* Programming model
    * **Primitive: remote calls**
        * A remote request similiar to _send_
            * However, remote calls specify the operation to be performed
            * Similar to embed the _receive_ operation into _send_
        * [`remotecall(id, func, args...)`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.remotecall)
            * `id` is the process that will perform the computation
            * `func` is the function or expression to be executed
            * `args` ar the function arguments
    * **Abstraction: remote reference**
        * Remote calls return a remote reference (`RemoteRef`)
        * `RemoteRef` can be used by any processes
    
## Julia support for distributed communication - <span style="color:red">In a nutshell</span>

### 1. Add Workers

In order to put workers to execute some code, let's add two processes since we currently have a single (master) process:

In [14]:
addprocs(2)

2-element Array{Int64,1}:
 8
 9

In [13]:
workers()

2-element Array{Int64,1}:
 3
 4

### 2. If necessary, let them know about your code

* Use [@everywhere](https://docs.julialang.org/en/latest/stdlib/Distributed/#Distributed.@everywhere) macro
* See more on [Code Availability and Loading Packages](https://docs.julialang.org/en/latest/manual/parallel-computing/#Code-Availability-and-Loading-Packages-1)

In [17]:
@everywhere my_function(x,y) = x + y

### 3. Ask Workers to process what you need

In [19]:
remotecall(my_function,8,10,20)

Future(8, 1, 18, Nullable{Any}())

In [21]:
output_reference = remotecall(my_function,8,10,20)

Future(8, 1, 19, Nullable{Any}())

In [12]:
println(output_reference)

Future(2, 1, 15, Nullable{Any}())


### 4. Get the result back

In [22]:
fetch(output_reference)

30

### Remark

We can perform Steps 3 and 4 at once in an elegant fasion:

In [27]:
@fetchfrom 3 my_function(17,24)

41

In fact, we don't need to specify the Worker that will process the function:

In [43]:
@fetch my_function(120,2)

122

---

# A more comprenhesive overview of distributed communication in Julia

## [Remote calls](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.remotecall-Tuple{Any,Integer,Vararg{Any,N}%20where%20N}) and Synchronism

* Julia low-level communication primitives
* Remote calls can be performed **Synchronously** or **Asynchronously**.


### Asynchronous remote calls

* The calling process **does not wait** the worker computation
    * **Non-blocking call**
* [`remotecall(func, worker, args...)`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.remotecall-Tuple{Any,Integer,Vararg{Any,N}%20where%20N})

### Synchronous remote calls

* **Synchronous** communications block the sender till they receive the response
    * Makes the caller **wait**
* [`remotecall_wait(func, worker, args...)`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.remotecall_wait-Tuple{Any,Integer,Vararg{Any,N}%20where%20N})

The following example puts _Worker 2_ to _sleep 5 seconds_ and waits:

In [30]:
remotecall_wait(sleep, 6, 5)

println("This message waited for the Worker")

This message waited for the Worker


This call blocked the caller process (master, process whose ID is 1) for 5 seconds. 

Remark that the result from the Output appeared 5 seconds later.

Now let's execute the same command **asynchronously** (non-blocking call):

In [23]:
remotecall(sleep, 2, 5)

println("This message did NOT wait for the Worker")

This message did NOT wait for the Worker


### @sync and @async

* Macros that ease the construction of synchronous and asynchronous calls.
* These macros return [`Tasks`](https://docs.julialang.org/en/stable/stdlib/parallel/#Tasks-1).
* [**`@sync`**](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.@sync)
    * Makes the calling process **wait** for a function/expression execution
        * _Wait until all dynamically-enclosed uses of @async, @spawn, @spawnat and @parallel are complete._
    * Similiar to [monitors](https://en.wikipedia.org/wiki/Monitor_%28synchronization%29)
    * Optionally, we can use  `wait` instead:
        * [`wait([x])`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.wait)
            * _Block the current task until some event occurs, depending on the type of the argument._
        * [`timedwait(testcb, secs; pollint)`](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.timedwait)
            * _Waits till `testcb` returns true or for `secs` seconds, whichever is earlier._
            * _`testcb` is polled every `pollint` seconds._
            
* [**`@async`**](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.@async)
    * Free the calling process (Task) from a blocked operation
    * Similar to running a command in background in Bash
   

### Remark on Asynchronous vs. Synchronous calls

* Usage depends on the distributed application
* Use **Synchronous** calls when you need to wait a result to carry on the computation:
    * Workflows
    * Mutual exclusion: consistency, integrity
* Otherwise, use **Asynchronous** calls since blocking communications degrade the system performance and can eventually imply [deadlocks](https://en.wikipedia.org/wiki/Deadlock).

In [48]:
tic()
println("Calling distributed functions...")

# this @sync blocks the code execution till the end of the for
@sync for w in workers() 
    # this @async allows to launch all workers at once so their execution will be performed in parallel
     @async remotecall_wait(sleep, w, 3)
end

println("This line waited $(toc()) secs")

Calling distributed functions...
elapsed time: 3.025307282 seconds
This line waited 3.025307282 secs


# <span style="color:green">Exercise</span>


<span style="color:green">Remove the `@async` macro from the code above and run it again. Did the code change its behavior? Why? Use the following figure to help your explanation.</span>

<img src="figures/master-worker.svg" width="300" alt="Master/Worker"></a>



## @spawn and @spawnat

* The macro [@spawn](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.@spawn) spawns (runs) an expression/function at a worker.
* All function computation is done at **Worker side**

In [66]:
remote_matrix = @spawn rand(2,2)

Future(4, 1, 81, Nullable{Any}())

Now we fetch the `remote_matrix` variable with fetch(...):

In [67]:
fetch(remote_matrix)

2×2 Array{Float64,2}:
 0.16161   0.341916
 0.490374  0.11404 

If you need to execute an expression over the `remote_matrix`, you can still use @spawn combined with fetch(...):

For instance:

In [29]:
remote_matrix_plus_one = @spawn 1 .+ fetch(remote_matrix)

Future(2, 1, 33, Nullable{Any}())

Now we fetch the result:

In [31]:
fetch(remote_matrix_plus_one)

2×2 Array{Float64,2}:
 1.41991  1.28423
 1.21198  1.15855

If you are curious about where is `remote_matrix_plus_one`:

In [32]:
remote_matrix_plus_one.where

2

If you need to have more control of the distributed computation, you may use the macro [@spawnat](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.@spawnat) which allows to spawn an expression/function at a specific worker.

In the following example, we execute the sqtr(...) function at the worker whose ID is 3:

In [68]:
raiz = @spawnat 3 sqrt(2)

Future(3, 1, 83, Nullable{Any}())

In [69]:
?addprocs

search: [1ma[22m[1md[22m[1md[22m[1mp[22m[1mr[22m[1mo[22m[1mc[22m[1ms[22m



```
addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers
```

Launches worker processes via the specified cluster manager.

For example, Beowulf clusters are supported via a custom cluster manager implemented in the package `ClusterManagers.jl`.

The number of seconds a newly launched worker waits for connection establishment from the master can be specified via variable `JULIA_WORKER_TIMEOUT` in the worker process's environment. Relevant only when using TCP/IP as transport.

```
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers
```

Add processes on remote machines via SSH. Requires `julia` to be installed in the same location on each node, or to be available via a shared file system.

`machines` is a vector of machine specifications. Workers are started for each specification.

A machine specification is either a string `machine_spec` or a tuple - `(machine_spec, count)`.

`machine_spec` is a string of the form `[user@]host[:port] [bind_addr[:port]]`. `user` defaults to current user, `port` to the standard ssh port. If `[bind_addr[:port]]` is specified, other workers will connect to this worker at the specified `bind_addr` and `port`.

`count` is the number of workers to be launched on the specified host. If specified as `:auto` it will launch as many workers as the number of cores on the specific host.

Keyword arguments:

  * `tunnel`: if `true` then SSH tunneling will be used to connect to the worker from the master process. Default is `false`.
  * `sshflags`: specifies additional ssh options, e.g. ```sshflags=`-i /home/foo/bar.pem````
  * `max_parallel`: specifies the maximum number of workers connected to in parallel at a host. Defaults to 10.
  * `dir`: specifies the working directory on the workers. Defaults to the host's current directory (as found by `pwd()`)
  * `enable_threaded_blas`: if `true` then  BLAS will run on multiple threads in added processes. Default is `false`.
  * `exename`: name of the `julia` executable. Defaults to `"$JULIA_HOME/julia"` or `"$JULIA_HOME/julia-debug"` as the case may be.
  * `exeflags`: additional flags passed to the worker processes.
  * `topology`: Specifies how the workers connect to each other. Sending a message between unconnected workers results in an error.

      * `topology=:all_to_all`: All processes are connected to each other. The default.
      * `topology=:master_slave`: Only the driver process, i.e. `pid` 1 connects to the workers. The workers do not connect to each other.
      * `topology=:custom`: The `launch` method of the cluster manager specifies the connection topology via fields `ident` and `connect_idents` in `WorkerConfig`. A worker with a cluster manager identity `ident` will connect to all workers specified in `connect_idents`.

Environment variables :

If the master process fails to establish a connection with a newly launched worker within 60.0 seconds, the worker treats it as a fatal situation and terminates. This timeout can be controlled via environment variable `JULIA_WORKER_TIMEOUT`. The value of `JULIA_WORKER_TIMEOUT` on the master process specifies the number of seconds a newly launched worker waits for connection establishment.

```
addprocs(; kwargs...) -> List of process identifiers
```

Equivalent to `addprocs(Sys.CPU_CORES; kwargs...)`

Note that workers do not run a `.juliarc.jl` startup script, nor do they synchronize their global state (such as global variables, new method definitions, and loaded modules) with any of the other running processes.

```
addprocs(np::Integer; restrict=true, kwargs...) -> List of process identifiers
```

Launches workers using the in-built `LocalManager` which only launches workers on the local host. This can be used to take advantage of multiple cores. `addprocs(4)` will add 4 processes on the local machine. If `restrict` is `true`, binding is restricted to `127.0.0.1`. Keyword args `dir`, `exename`, `exeflags`, `topology`, and `enable_threaded_blas` have the same effect as documented for `addprocs(machines)`.


Now we fetch the `raiz` variable with `fetch` and get the worker where `raiz` is stored:

In [66]:
@show fetch(raiz)

@show raiz.where

fetch(raiz) = 1.4142135623730951
raiz.where = 3


3

## [@fetch](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.fetch-Tuple{Any}) and [@fetchfrom](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.@fetchfrom)

We can also use spawn and fetch at once with macro `@fetch(...)` which is equivalent to **`fetch(@spawn expr)`**

In [39]:
@fetch 2 + 6

8

For more control, use the macro `@fetchfrom(...)` which is equivalent to **`fetch(@spawnat p expr)`**:

In [40]:
@fetchfrom 3 12^9

5159780352

## [@everywhere](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.@everywhere)

* The [@everywhere](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.@everywhere) macro forces a command/expression to run on all processes.
* Example: let's print everywhere (at every workers) processes' IDs:

In [68]:
#addprocs(2)
@everywhere println(myid())

1
	From worker 2:	2
	From worker 3:	3


For instance, let's define on all processes a function that prints workers' IDs:

In [69]:
@everywhere who_are_you() = println("I'm worker $(myid())")

Now we call it:

In [70]:
for i=1:5
    @spawn who_are_you()
end

	From worker 3:	I'm worker 3
	From worker 3:	I'm worker 3
	From worker 2:	I'm worker 2
	From worker 2:	I'm worker 2
	From worker 2:	I'm worker 2


* @everywhere can also be used to load modules on all processes:

In [71]:
#Pkg.add("Calculus")
@everywhere using Calculus

In [76]:
@fetch derivative(x-> x^3, 2)

12.000000000099304

* For loading [Julia Modules](https://docs.julialang.org/en/stable/manual/modules/) use [`require`(...)](https://docs.julialang.org/en/stable/stdlib/base/#Base.require)
* For loading code source files, use [`include("file_path")`](https://docs.julialang.org/en/stable/stdlib/base/#Base.include)

Let's make the Workers know about the `MyCode.jl` file and then call its function:

In [83]:
@everywhere include("MyCode.jl")

In [87]:
@fetch custom_function(1,2,3)

-1

In [99]:
print(readstring("MyCode.jl"))

function custom_function(α,β,γ)
    return α * β - γ
end


## [@parallel](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.Distributed.@parallel)

* Parallelizes a for loop or a comprenhesion automatically among the available workers.
    * Split the range
    * Distribute the range to each process

In [100]:
@everywhere plus_my_id(x) = x + myid()

a = 100
@parallel for i=1:10
    println(plus_my_id(a))
end

2-element Array{Future,1}:
 Future(3, 1, 127, #NULL)
 Future(2, 1, 128, #NULL)

	From worker 3:	103
	From worker 3:	103
	From worker 3:	103
	From worker 3:	103
	From worker 3:	103
	From worker 2:	102
	From worker 2:	102
	From worker 2:	102
	From worker 2:	102
	From worker 2:	102


* **`@parallel`** may also performs a **reduce function on each worker and the final reduction at the calling process** (e.g., Master):
    * Optionally, @parallel may take a “reducer” as its first argument
    * The results from each remote reducer will be aggregated using the reducer locally
    * Tasks are assigned asynchronously when no reducer operation is used
    

In [123]:
@parallel (+) for i=1:10^7
   plus_my_id(a)
end

1025000000

---
# [<span style="color:red">Tasks (coroutines)</span>](https://docs.julialang.org/en/stable/manual/control-flow/#man-tasks-1)


* **Tasks are NOT thread**
    * Julia Tasks are **not thread-safe**.
    * See [Julia Multithreading](https://docs.julialang.org/en/stable/manual/parallel-computing/#Multi-Threading-(Experimental)-1)
* Julia tasks are **green threads (coroutines)**
    * Tasks do **not** spam over different CPUs
    * Julia scheduler decides which tasks runs at a given time
    * Tasks can yield, notify, schedule, etc.
* Useful when you need concurrency programming with no actual parallelization (<span style="color:red">**pseudo-parallelization**</span> instead)
    * Context change is very fast as tasks belongs to a single kernel thread


### Macro `@task `

* Optionally, you can create a task with macro [@task](https://docs.julialang.org/en/stable/stdlib/parallel/#Base.@task)

# [<span style="color:red">Distributed Arrays</span>](https://github.com/JuliaParallel/DistributedArrays.jl)


## Introduction

* **Distributed Memory** model
    * A multi-dimensional array stored at distributed processes
    * No shared memory
* **Dense Arrays**
    * E.g., non-sparse matrices
* All processes know about DistributedArrays
* **Only metadata is exchanged**
    * Each process holds a part of data the whole DistributedArray

```
2 2 2 2
3 3 3 3
4 4 4 4
5 5 5 5
```


In [130]:
workers()

2-element Array{Int64,1}:
 2
 3

In [132]:
Pkg.add("DistributedArrays")

[1m[36mINFO: [39m[22m[36mCloning cache of DistributedArrays from https://github.com/JuliaParallel/DistributedArrays.jl.git
[39m[1m[36mINFO: [39m[22m[36mCloning cache of Primes from https://github.com/JuliaMath/Primes.jl.git
[39m[1m[36mINFO: [39m[22m[36mInstalling DistributedArrays v0.4.0
[39m[1m[36mINFO: [39m[22m[36mInstalling Primes v0.3.0
[39m[1m[36mINFO: [39m[22m[36mPackage database updated
[39m[1m[36mINFO: [39m[22m[36mMETADATA is out-of-date — you may not have the latest version of DistributedArrays
[39m[1m[36mINFO: [39m[22m[36mUse `Pkg.update()` to get the latest versions of your packages
[39m

In [2]:
@everywhere using DistributedArrays

### Creating DistributedArrays

Use `Array` constructors with **d** at the beginning:

    dzeros(100,100,10)
    dones(100,100,10)
    drand(100,100,10)
    drandn(100,100,10)
    dfill(x,100,100,10)

In [3]:
d = dzeros(10)

10-element DistributedArrays.DArray{Float64,1,Array{Float64,1}}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

In [4]:
d[4]

0.0

In [5]:
i = d[4]
typeof(i)

Float64

#### Creating from an `Array`

In [6]:
arr = rand(10)
darr = distribute(arr)

10-element DistributedArrays.DArray{Float64,1,Array{Float64,1}}:
 0.321366
 0.236102
 0.214213
 0.756046
 0.541592
 0.11205 
 0.795864
 0.312882
 0.110152
 0.350713

#### Converting `DistributedArrays` to `Array`

In [7]:
arr2 = convert(Array, darr)

10-element Array{Float64,1}:
 0.321366
 0.236102
 0.214213
 0.756046
 0.541592
 0.11205 
 0.795864
 0.312882
 0.110152
 0.350713

In [8]:
typeof(arr2)

Array{Float64,1}

### Getting information on where DistributedArrays are stored

* **`localpart(a::DArray)`** obtains the locally-stored portion of a  DArray.

* **`localindexes(a::DArray)`** gives a tuple of the index ranges owned by the local process.

In [9]:
localpart(darr)

0-element Array{Float64,1}

In [10]:
for w in workers()
    @spawnat w println(localindexes(darr))
end

	From worker 2:	(1:3,)
	From worker 3:	(4:5,)
	From worker 4:	(6:7,)
	From worker 5:	(8:10,)


In [11]:
for w in workers()
    @spawnat w println(localpart(darr))
end

	From worker 2:	[0.321366, 0.236102, 0.214213]
	From worker 4:	[0.11205, 0.795864]
	From worker 5:	[0.312882, 0.110152, 0.350713]
	From worker 3:	[0.756046, 0.541592]


In [12]:
@show d.cuts
@show d.dims
@show d.id
@show d.indexes
@show d.localpart
@show d.pids
@show d.release

d.cuts = Array{Int64,1}[[1, 4, 6, 8, 11]]
d.dims = (10,)
d.id = (1, 1)
d.indexes = Tuple{UnitRange{Int64}}[(1:3,), (4:5,), (6:7,), (8:10,)]
d.localpart = Nullable{Array{Float64,1}}(Float64[])
d.pids = [2, 3, 4, 5]
d.release = true


true

In [13]:
?DistributedArrays

search: [1mD[22m[1mi[22m[1ms[22m[1mt[22m[1mr[22m[1mi[22m[1mb[22m[1mu[22m[1mt[22m[1me[22m[1md[22m[1mA[22m[1mr[22m[1mr[22m[1ma[22m[1my[22m[1ms[22m



No documentation found.

Displaying the `README.md` for the module instead.

---

# DistributedArrays.jl

[![Build Status](https://travis-ci.org/JuliaParallel/DistributedArrays.jl.svg?branch=master)](https://travis-ci.org/JuliaParallel/DistributedArrays.jl) [![Coverage Status](https://coveralls.io/repos/github/JuliaParallel/DistributedArrays.jl/badge.svg?branch=master)](https://coveralls.io/github/JuliaParallel/DistributedArrays.jl?branch=master) [![codecov](https://codecov.io/gh/JuliaParallel/DistributedArrays.jl/branch/master/graph/badge.svg)](https://codecov.io/gh/JuliaParallel/DistributedArrays.jl)

Distributed Arrays for Julia

***NOTE*** Distributed Arrays will only work on Julia v0.4.0 or later.

`DArray`s have been removed from Julia Base library in v0.4 so it is now necessary to import the `DistributedArrays` package on all spawned processes.

```julia
@everywhere using DistributedArrays
```

## Distributed Arrays

Large computations are often organized around large arrays of data. In these cases, a particularly natural way to obtain parallelism is to distribute arrays among several processes. This combines the memory resources of multiple machines, allowing use of arrays too large to fit on one machine. Each process operates on the part of the array it owns, providing a ready answer to the question of how a program should be divided among machines.

Julia distributed arrays are implemented by the `DArray` type. A `DArray` has an element type and dimensions just like an `Array`. A `DArray` can also use arbitrary array-like types to represent the local chunks that store actual data. The data in a `DArray` is distributed by dividing the index space into some number of blocks in each dimension.

Common kinds of arrays can be constructed with functions beginning with `d`:

```julia
    dzeros(100,100,10)
    dones(100,100,10)
    drand(100,100,10)
    drandn(100,100,10)
    dfill(x,100,100,10)
```

In the last case, each element will be initialized to the specified value `x`. These functions automatically pick a distribution for you. For more control, you can specify which processes to use, and how the data should be distributed:

```julia
    dzeros((100,100), workers()[1:4], [1,4])
```

The second argument specifies that the array should be created on the first four workers. When dividing data among a large number of processes, one often sees diminishing returns in performance. Placing `DArray`\ s on a subset of processes allows multiple `DArray` computations to happen at once, with a higher ratio of work to communication on each process.

The third argument specifies a distribution; the nth element of this array specifies how many pieces dimension n should be divided into. In this example the first dimension will not be divided, and the second dimension will be divided into 4 pieces. Therefore each local chunk will be of size `(100,25)`. Note that the product of the distribution array must equal the number of processes.

  * `distribute(a::Array)` converts a local array to a distributed array.
  * `localpart(d::DArray)` obtains the locally-stored portion

of a  `DArray`.

  * Localparts can be retrived and set via the indexing syntax too.

Indexing via symbols is used for this, specifically symbols `:L`,`:LP`,`:l`,`:lp` which are all equivalent. For example, `d[:L]` returns the localpart of `d` while `d[:L]=v` sets `v` as the localpart of `d`.

  * `localindexes(a::DArray)` gives a tuple of the index ranges owned by the

local process.

  * `convert(Array, a::DArray)` brings all the data to the local process.

Indexing a `DArray` (square brackets) with ranges of indexes always creates a `SubArray`, not copying any data.

## Constructing Distributed Arrays

The primitive `DArray` constructor has the following somewhat elaborate signature:

```julia
    DArray(init, dims[, procs, dist])
```

`init` is a function that accepts a tuple of index ranges. This function should allocate a local chunk of the distributed array and initialize it for the specified indices. `dims` is the overall size of the distributed array. `procs` optionally specifies a vector of process IDs to use. `dist` is an integer vector specifying how many chunks the distributed array should be divided into in each dimension.

The last two arguments are optional, and defaults will be used if they are omitted.

As an example, here is how to turn the local array constructor `fill` into a distributed array constructor:

```julia
    dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)
```

In this case the `init` function only needs to call `fill` with the dimensions of the local piece it is creating.

`DArray`s can also be constructed from multidimensional `Array` comprehensions with the `@DArray` macro syntax.  This syntax is just sugar for the primitive `DArray` constructor:

```julia
julia> [i+j for i = 1:5, j = 1:5]
5x5 Array{Int64,2}:
 2  3  4  5   6
 3  4  5  6   7
 4  5  6  7   8
 5  6  7  8   9
 6  7  8  9  10

julia> @DArray [i+j for i = 1:5, j = 1:5]
5x5 DistributedArrays.DArray{Int64,2,Array{Int64,2}}:
 2  3  4  5   6
 3  4  5  6   7
 4  5  6  7   8
 5  6  7  8   9
 6  7  8  9  10
```

## Distributed Array Operations

At this time, distributed arrays do not have much functionality. Their major utility is allowing communication to be done via array indexing, which is convenient for many problems. As an example, consider implementing the "life" cellular automaton, where each cell in a grid is updated according to its neighboring cells. To compute a chunk of the result of one iteration, each process needs the immediate neighbor cells of its local chunk. The following code accomplishes this::

```julia
    function life_step(d::DArray)
        DArray(size(d),procs(d)) do I
            top   = mod(first(I[1])-2,size(d,1))+1
            bot   = mod( last(I[1])  ,size(d,1))+1
            left  = mod(first(I[2])-2,size(d,2))+1
            right = mod( last(I[2])  ,size(d,2))+1

            old = Array(Bool, length(I[1])+2, length(I[2])+2)
            old[1      , 1      ] = d[top , left]   # left side
            old[2:end-1, 1      ] = d[I[1], left]
            old[end    , 1      ] = d[bot , left]
            old[1      , 2:end-1] = d[top , I[2]]
            old[2:end-1, 2:end-1] = d[I[1], I[2]]   # middle
            old[end    , 2:end-1] = d[bot , I[2]]
            old[1      , end    ] = d[top , right]  # right side
            old[2:end-1, end    ] = d[I[1], right]
            old[end    , end    ] = d[bot , right]

            life_rule(old)
        end
    end
```

As you can see, we use a series of indexing expressions to fetch data into a local array `old`. Note that the `do` block syntax is convenient for passing `init` functions to the `DArray` constructor. Next, the serial function `life_rule` is called to apply the update rules to the data, yielding the needed `DArray` chunk. Nothing about `life_rule` is `DArray`\ -specific, but we list it here for completeness::

```julia
    function life_rule(old)
        m, n = size(old)
        new = similar(old, m-2, n-2)
        for j = 2:n-1
            for i = 2:m-1
                nc = +(old[i-1,j-1], old[i-1,j], old[i-1,j+1],
                       old[i  ,j-1],             old[i  ,j+1],
                       old[i+1,j-1], old[i+1,j], old[i+1,j+1])
                new[i-1,j-1] = (nc == 3 || nc == 2 && old[i,j])
            end
        end
        new
    end
```

## Numerical Results of Distributed Computations

Floating point arithmetic is not associative and this comes up when performing distributed computations over `DArray`s.  All `DArray` operations are performed over the `localpart` chunks and then aggregated. The change in ordering of the operations will change the numeric result as seen in this simple example:

```julia
julia> addprocs(8);

julia> @everywhere using DistributedArrays

julia> A = fill(1.1, (100,100));

julia> sum(A)
11000.000000000013

julia> DA = distribute(A);

julia> sum(DA)
11000.000000000127

julia> sum(A) == sum(DA)
false
```

The ultimate ordering of operations will be dependent on how the Array is distributed.

## Garbage Collection and DArrays

When a DArray is constructed (typically on the master process), the returned DArray objects stores information on how the array is distributed, which procesor holds which indexes and so on. When the DArray object on the master process is garbage collected, all particpating workers are notified and localparts of the DArray freed on each worker.

Since the size of the DArray object itself is small, a problem arises as `gc` on the master faces no memory pressure to collect the DArray immediately. This results in a delay of the memory being released on the participating workers.

Therefore it is highly recommended to explcitly call `close(d::DArray)` as soon as user code has finished working with the distributed array.

It is also important to note that the localparts of the DArray is collected from all particpating workers when the DArray object on the process creating the DArray is collected. It is therefore important to maintain a reference to a DArray object on the creating process for as long as it is being computed upon.

`d_closeall()` is another useful function to manage distributed memory. It releases all darrays created from the calling process, including any temporaries created during computation.

## Working with distributed non-array data (requires Julia 0.6)

The function `ddata(;T::Type=Any, init::Function=I->nothing, pids=workers(), data::Vector=[])` can be used to created a distributed vector whose localparts need not be Arrays.

It returns a `DArray{T,1,T}`, i.e., the element type and localtype of the array are the same.

`ddata()` constructs a distributed vector of length `nworkers()` where each localpart can hold any value, initially initialized to `nothing`.

Argument `data` if supplied is distributed over the `pids`. `length(data)` must be a multiple of `length(pids)`. If the multiple is 1, returns a `DArray{T,1,T}` where T is `eltype(data)`. If the multiple is greater than 1, returns a `DArray{T,1,Array{T,1}}`, i.e., it is equivalent to calling `distribute(data)`.

`gather{T}(d::DArray{T,1,T})` returns an Array{T,1} consisting of all distributed elements of `d`

Given a `DArray{T,1,T}` object `d`, `d[:L]` returns the localpart on a worker. `d[i]` returns the `localpart` on the ith worker that `d` is distributed over.

## SPMD Mode (An MPI Style SPMD mode with MPI like primitives, requires Julia 0.6)

SPMD, i.e., a Single Program Multiple Data mode is implemented by submodule `DistributedArrays.SPMD`. In this mode the same function is executed in parallel on all participating nodes. This is a typical style of MPI programs where the same program is executed on all processors. A basic subset of MPI-like primitives are currently supported. As a programming model it should be familiar to folks with an MPI background.

The same block of code is executed concurrently on all workers using the `spmd` function.

```
# define foo() on all workers
@everywhere function foo(arg1, arg2)
    ....
end

# call foo() everywhere using the `spmd` function
d_in=DArray(.....)
d_out=ddata()
spmd(foo,d_in,d_out; pids=workers()) # executes on all workers
```

`spmd` is defined as `spmd(f, args...; pids=procs(), context=nothing)`

`args` is one or more arguments to be passed to `f`. `pids` identifies the workers that `f` needs to be run on. `context` identifies a run context, which is explained later.

The following primitives can be used in SPMD mode.

  * `sendto(pid, data; tag=nothing)` - sends `data` to `pid`
  * `recvfrom(pid; tag=nothing)` - receives data from `pid`
  * `recvfrom_any(; tag=nothing)` - receives data from any `pid`
  * `barrier(;pids=procs(), tag=nothing)` - all tasks wait and then proceeed
  * `bcast(data, pid; tag=nothing, pids=procs())` - broadcasts the same data over `pids` from `pid`
  * `scatter(x, pid; tag=nothing, pids=procs())` - distributes `x` over `pids` from `pid`
  * `gather(x, pid; tag=nothing, pids=procs())` - collects data from `pids` onto worker `pid`

Tag `tag` should be used to differentiate between consecutive calls of the same type, for example, consecutive `bcast` calls.

`spmd` and spmd related functions are defined in submodule `DistributedArrays.SPMD`. You will need to import it explcitly, or prefix functions that can can only be used in spmd mode with `SPMD.`, for example, `SPMD.sendto`.

## Example

This toy example exchanges data with each of its neighbors `n` times.

```
using DistributedArrays
addprocs(8)
@everywhere importall DistributedArrays
@everywhere importall DistributedArrays.SPMD

d_in=d=DArray(I->fill(myid(), (map(length,I)...)), (nworkers(), 2), workers(), [nworkers(),1])
d_out=ddata()

# define the function everywhere
@everywhere function foo_spmd(d_in, d_out, n)
    pids = sort(vec(procs(d_in)))
    pididx = findfirst(pids, myid())
    mylp = d_in[:L]
    localsum = 0

    # Have each worker exchange data with its neighbors
    n_pididx = pididx+1 > length(pids) ? 1 : pididx+1
    p_pididx = pididx-1 < 1 ? length(pids) : pididx-1

    for i in 1:n
        sendto(pids[n_pididx], mylp[2])
        sendto(pids[p_pididx], mylp[1])

        mylp[2] = recvfrom(pids[p_pididx])
        mylp[1] = recvfrom(pids[n_pididx])

        barrier(;pids=pids)
        localsum = localsum + mylp[1] + mylp[2]
    end

    # finally store the sum in d_out
    d_out[:L] = localsum
end

# run foo_spmd on all workers
spmd(foo_spmd, d_in, d_out, 10)

# print values of d_in and d_out after the run
println(d_in)
println(d_out)
```

## SPMD Context

Each SPMD run is implictly executed in a different context. This allows for multiple `spmd` calls to be active at the same time. A SPMD context can be explicitly specified via keyword arg `context` to `spmd`.

`context(pids=procs())` returns a new SPMD context.

A SPMD context also provides a context local storage, a dict, which can be used to store key-value pairs between spmd runs under the same context.

`context_local_storage()` returns the dictionary associated with the context.

NOTE: Implicitly defined contexts, i.e., `spmd` calls without specifying a `context` create a context which live only for the duration of the call. Explictly created context objects can be released early by calling `close(ctxt::SPMDContext)`. This will release the local storage dictionaries on all participating `pids`. Else they will be released when the context object is gc'ed on the node that created it.

## Nested `spmd` calls

As `spmd` executes the the specified function on all participating nodes, we need to be careful with nesting `spmd` calls.

An example of an unsafe(wrong) way:

```
function foo(.....)
    ......
    spmd(bar, ......)
    ......
end

function bar(....)
    ......
    spmd(baz, ......)
    ......
end

spmd(foo,....)
```

In the above example, `foo`, `bar` and `baz` are all functions wishing to leverage distributed computation. However, they themselves may be currenty part of a `spmd` call. A safe way to handle such a scenario is to only drive parallel computation from the master process.

The correct way (only have the driver process initiate `spmd` calls):

```
function foo()
    ......
    myid()==1 && spmd(bar, ......)
    ......
end

function bar()
    ......
    myid()==1 && spmd(baz, ......)
    ......
end

spmd(foo,....)
```

This is also true of functions which automatically distribute computation on DArrays.

```
function foo(d::DArray)
    ......
    myid()==1 && map!(bar, d)
    ......
end
spmd(foo,....)
```

Without the `myid()` check, the `spmd` call to `foo` would execute `map!` from all nodes, which is not what we probably want.

Similarly `@everywhere` from within a SPMD run should also be driven from the master node only.


## Advanced topics in DArray

### Customizing DistributedArrays creation

Let's add 2 more workers and customize the DistributedArray creation:

In [35]:
#addprocs(2)
#@everywhere using DistributedArrays

In [14]:
darray = drand((10,4), workers()[1:4], [1,4])

10×4 DistributedArrays.DArray{Float64,2,Array{Float64,2}}:
 0.492231   0.586387  0.786584  0.793067
 0.35221    0.754233  0.963894  0.387072
 0.427639   0.889434  0.980305  0.678837
 0.956464   0.37371   0.538015  0.56272 
 0.351742   0.763186  0.901514  0.241628
 0.178048   0.378676  0.243198  0.688606
 0.0295433  0.572676  0.597314  0.387573
 0.385754   0.8526    0.588251  0.547671
 0.153816   0.615701  0.407551  0.772227
 0.966094   0.616491  0.199936  0.975261

In [15]:
for w in workers()
    @spawnat w println(localindexes(darray))
end

	From worker 4:	(1:10, 3:3)
	From worker 2:	(1:10, 1:1)
	From worker 5:	(1:10, 4:4)
	From worker 3:	(1:10, 2:2)


* We can also know which processes hold `darray`:

In [18]:
darray.pids

1×4 Array{Int64,2}:
 2  3  4  5

* Or we can know the splitting indexes of `darray`:

In [19]:
darray.indexes

1×4 Array{Tuple{UnitRange{Int64},UnitRange{Int64}},2}:
 (1:10, 1:1)  (1:10, 2:2)  (1:10, 3:3)  (1:10, 4:4)

* From a process, we can know the indexes it holds:

In [20]:
for w in workers()
    @spawnat w @show localindexes(darray)
end

	From worker 4:	localindexes(darray) = (1:10, 3:3)
	From worker 3:	localindexes(darray) = (1:10, 2:2)
	From worker 2:	localindexes(darray) = (1:10, 1:1)
	From worker 5:	localindexes(darray) = (1:10, 4:4)


* We can get any data of `darray` from any process, for instance, from the master:



In [21]:
darray[5]

0.3517419161884381

* We can also access `darray` from any process such as worker 4:

In [22]:
fetch(@spawnat 4 darray[5])

0.3517419161884381

### Distributed reduce using DistributedArrays 
* From Jeff Bezanson's Tutorial on [Parallel and Distributed Computing with Julia](https://www.youtube.com/watch?v=JoRn4ryMclc)

* If we wanted to sum `darray` locally, we can simply call `+(darray)`

In [24]:
+(darray)

10×4 DistributedArrays.DArray{Float64,2,Array{Float64,2}}:
 0.492231   0.586387  0.786584  0.793067
 0.35221    0.754233  0.963894  0.387072
 0.427639   0.889434  0.980305  0.678837
 0.956464   0.37371   0.538015  0.56272 
 0.351742   0.763186  0.901514  0.241628
 0.178048   0.378676  0.243198  0.688606
 0.0295433  0.572676  0.597314  0.387573
 0.385754   0.8526    0.588251  0.547671
 0.153816   0.615701  0.407551  0.772227
 0.966094   0.616491  0.199936  0.975261

* The previous command was performed **locally**, at the Master process

* **Let's take advantage of parallelism** by putting all workers to execute the sum on the portion of `darray` each worker holds in order **to not imply data movement and to take advantage of parallel execution**
* It's similiar to a distributed `reduce` operation:
    * The way you think and prgram in Julia is natural, i.e., you can read the following line as you would say to some people to do some task:
    * _"spwan at process `p` the sum of its `darray` local part for all processes"_


In [25]:
local_sums = [ @spawnat w +(localpart(darray)) for w in workers()]

4-element Array{Future,1}:
 Future(2, 1, 611, #NULL)
 Future(3, 1, 612, #NULL)
 Future(4, 1, 613, #NULL)
 Future(5, 1, 614, #NULL)

* Remark: `@everywhere` could not be use in the previous example since it does not return any value/RemoteRef.

* Now let's see the contents of each `RemoteRef`:

In [26]:
for s in local_sums println("Local sum of $s is $(fetch(s))") end

Local sum of Future(2, 1, 611, Nullable{Any}([0.492231; 0.35221; 0.427639; 0.956464; 0.351742; 0.178048; 0.0295433; 0.385754; 0.153816; 0.966094])) is [0.492231; 0.35221; 0.427639; 0.956464; 0.351742; 0.178048; 0.0295433; 0.385754; 0.153816; 0.966094]
Local sum of Future(3, 1, 612, Nullable{Any}([0.586387; 0.754233; 0.889434; 0.37371; 0.763186; 0.378676; 0.572676; 0.8526; 0.615701; 0.616491])) is [0.586387; 0.754233; 0.889434; 0.37371; 0.763186; 0.378676; 0.572676; 0.8526; 0.615701; 0.616491]
Local sum of Future(4, 1, 613, Nullable{Any}([0.786584; 0.963894; 0.980305; 0.538015; 0.901514; 0.243198; 0.597314; 0.588251; 0.407551; 0.199936])) is [0.786584; 0.963894; 0.980305; 0.538015; 0.901514; 0.243198; 0.597314; 0.588251; 0.407551; 0.199936]
Local sum of Future(5, 1, 614, Nullable{Any}([0.793067; 0.387072; 0.678837; 0.56272; 0.241628; 0.688606; 0.387573; 0.547671; 0.772227; 0.975261])) is [0.793067; 0.387072; 0.678837; 0.56272; 0.241628; 0.688606; 0.387573; 0.547671; 0.772227; 0.975261]


* So basically `local_sums` is an array with the results of sums which were executed **at each worker**
* In other words, it reduced the values of `darray` each worker holds by executing the sum remotely.
* **Now lets merge all the reduces to generate the final output**:
    * First we `fetch` the results with `map`
    * Then we merge them (i.e. sum) localy, at the master process
        * This is similar to merging the _Output files_ of the [Google MapReduce framework (c.f. Figure 1](http://static.googleusercontent.com/media/research.google.com/pt-BR//archive/mapreduce-osdi04.pdf))

In [27]:
.+(map(fetch, [ @spawnat p +(localpart(darray)) for p in workers()] ))

4-element Array{Array{Float64,2},1}:
 [0.492231; 0.35221; … ; 0.153816; 0.966094] 
 [0.586387; 0.754233; … ; 0.615701; 0.616491]
 [0.786584; 0.963894; … ; 0.407551; 0.199936]
 [0.793067; 0.387072; … ; 0.772227; 0.975261]

* Indeed, **we can directly use the `reduce` function** instead of `sum`:

In [28]:
reduce(+, map(fetch, [ @spawnat p reduce(+, localpart(darray)) for p in workers()] ))

22.937860904099917

* Finallly, we can define a **parallel reduce** function

In [29]:
parallel_reduce(f,darray) = reduce(f, map(fetch, [ @spawnat p reduce(f, localpart(darray)) for p in workers()] ))

parallel_reduce (generic function with 1 method)

* Now we use the **`parallel_reduce` function to operate over other `DistributedArray`s**:

In [30]:
d = drand(10)
parallel_reduce(+, d)

2.998251991240546

* You can test with further function since it reduces, i.e., it's a binary operator:

In [31]:
parallel_reduce(*, d)
parallel_reduce(max, d)

0.5799447952120753

### Simple convolution using DistributedArrays

* [See code here](https://github.com/proflage/2015-julia-hands-on/blob/master/SimpleConvolution.jl)

### More on distributed reduce

* Intersting [discussion at Julia mailing list on ```pmap_reduce```](https://groups.google.com/forum/#!msg/julia-users/WJBIAYzrZgg/3xzjoMfpKVMJ)

## What if <span style="color:green">I don't know the size of the data I need to load</span>  and <span style="color:gree">I don't want to deal with low-level details</span> on DistributedArrays? $\rightarrow$ CloudArray

* [**CloudArray**](https://github.com/gsd-ufal/CloudArray.jl) is a research prototype which proposes a transparent solution for this problem
* Microsoft Azure project
* More information at [CloudArray paper (IGARSS 2016)](http://ieeexplore.ieee.org/document/7729158/)
 * _CloudArray: Easing huge image processing_. André Lage-Freitas, Alejandro C. Frery, Naelson D. C. Oliveira, Raphael P. Ribeiro and Rivo Sarmento (2016) 2016 IEEE International Geoscience and Remote Sensing Symposium (IGARSS): 631-634. doi: 10.1109/IGARSS.2016.7729158.

---
# <span style="color:red">Distributed Maps and Reduces</span>


* Map, Filter and Reduce are common [**functional programming**](https://en.wikipedia.org/wiki/Functional_programming) features
* [**Google MapReduce framework**](http://research.google.com/archive/mapreduce-osdi04.pdf) leveraged Map and Reduce concepts and proposed a _distributed framewrok for data-intensive applications_
    * [**Hadoop**](http://hadoop.apache.org) is an open-source implementation of Google MapReduce framework
    * [**Rhipe**](https://www.datadr.org) allows to transparently execute R codes on top of Hadoop (no need to deal with Hadoop details)
* Julia functional features include:
    * **Map, Filter, Reduce, and MapReduce**
    * However, they are not distributed
    * E.g., `mapreduce` uses the same worker to execute both map and reduce
* Native **Distribute MapReduce** in Julia is done in different ways
    * Spawning `map, reduce, mapreduce` calls on workers
        * @parallel, @spawn, remotecall, etc.
    * **Distributed map** called [**pmap**](http://docs.julialang.org/en/latest/stdlib/parallel/#Base.pmap)

<!--
    * pmap
        * https://groups.google.com/forum/#!msg/julia-users/WJBIAYzrZgg/_UPc8vhkCAAJ
    * Discussion on MapReduce, ShredArrays, etc. at [julia-users maling list](https://groups.google.com/forum/#!msg/julia-users/1sNXYtIbS1Q/1sNd0h92AQAJ)
    * distributed mapreduce by using @spawn
    * @distribute
    * @dmap
-->

## Distributed `reduce`

See example with `DistributedArrays`.

## `pmap`

* Context (motivation)
    * No reduction operator is needed
    * Any variables used inside the @parallel loop will be copied (broadcasted) to each process.
* `pmap` function
    * Applies a function to all elements in some collection
* `pmap(f, coll)`
    * `f: function`
    * `coll: collection`

In [33]:
M = [-1.02, 3.14, -5.63]

pmap(abs,M)

3-element Array{Float64,1}:
 1.02
 3.14
 5.63

Where `pmap` distributed my processing?

In [34]:
M = ["Where am I?", "Where am I?", "Where am I?", "Where am I?", "Where am I?"]

pmap(println,M)

	From worker 3:	Where am I?
	From worker 3:	Where am I?
	From worker 4:	Where am I?
	From worker 2:	Where am I?
	From worker 5:	Where am I?


5-element Array{Void,1}:
 nothing
 nothing
 nothing
 nothing
 nothing

In [35]:
@everywhere g(x) = x.+1

M = rand(10^4)
pmap(g, M)

10000-element Array{Float64,1}:
 1.9583 
 1.80685
 1.18382
 1.11332
 1.65904
 1.99119
 1.16482
 1.2815 
 1.89489
 1.73083
 1.78553
 1.5138 
 1.33964
 ⋮      
 1.23932
 1.72218
 1.90454
 1.6432 
 1.34595
 1.96735
 1.01152
 1.47903
 1.53329
 1.878  
 1.2016 
 1.20702

Now, suppose that we have to calculate the rank of a number of large matrices

In [43]:
function rank_marray()
   marr = [rand(1000,1000) for i=1:4]
   for arr in marr
      println(rank(arr))
   end
end

rank_marray (generic function with 1 method)

In [44]:
@time rank_marray()

1000
1000
1000
1000
  1.706015 seconds (12.64 k allocations: 64.036 MiB, 0.69% gc time)


In [46]:
marr = [rand(1000,1000) for i=1:4]

@time println(pmap(rank, marr))

[1000, 1000, 1000, 1000]
  2.080494 seconds (677 allocations: 60.859 KiB)


In [48]:
# [<span style="color:red">Distributed Arrays</span>](https://github.com/JuliaParallel/DistributedArrays.jl)


## Introduction

* **Distributed Memory** model
    * A multi-dimensional array stored at distributed processes
    * No shared memory
* **Dense Arrays**
    * E.g., non-sparse matrices
* All processes know about DistributedArrays
* **Only metadata is exchanged**
    * Each process holds a part of data the whole DistributedArray

```
2 2 2 2
3 3 3 3
4 4 4 4
5 5 5 5
```

?pmap

search: [1mp[22m[1mm[22m[1ma[22m[1mp[22m Ty[1mp[22me[1mM[22m[1ma[22m[1mp[22mLevel Ty[1mp[22me[1mM[22m[1ma[22m[1mp[22mEntry [1mp[22mro[1mm[22mote_sh[1ma[22m[1mp[22me re[1mp[22m[1mm[22m[1ma[22mt ty[1mp[22me[1mm[22m[1ma[22mx



```
pmap([::AbstractWorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[]), retry_check=nothing) -> collection
```

Transform collection `c` by applying `f` to each element using available workers and tasks.

For multiple collection arguments, apply `f` elementwise.

Note that `f` must be made available to all worker processes; see [Code Availability and Loading Packages](@ref) for details.

If a worker pool is not specified, all available workers, i.e., the default worker pool is used.

By default, `pmap` distributes the computation over all specified workers. To use only the local process and distribute over tasks, specify `distributed=false`. This is equivalent to using [`asyncmap`](@ref). For example, `pmap(f, c; distributed=false)` is equivalent to `asyncmap(f,c; ntasks=()->nworkers())`

`pmap` can also use a mix of processes and tasks via the `batch_size` argument. For batch sizes greater than 1, the collection is processed in multiple batches, each of length `batch_size` or less. A batch is sent as a single request to a free worker, where a local [`asyncmap`](@ref) processes elements from the batch using multiple concurrent tasks.

Any error stops `pmap` from processing the remainder of the collection. To override this behavior you can specify an error handling function via argument `on_error` which takes in a single argument, i.e., the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value which is then returned inline with the results to the caller.

Consider the following two examples. The first one returns the exception object inline, the second a 0 in place of any exception:

```julia-repl
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0
```

Errors can also be handled by retrying failed computations. Keyword arguments `retry_delays` and `retry_check` are passed through to [`retry`](@ref) as keyword arguments `delays` and `check` respectively. If batching is specified, and an entire batch fails, all items in the batch are retried.

Note that if both `on_error` and `retry_delays` are specified, the `on_error` hook is called before retrying. If `on_error` does not throw (or rethrow) an exception, the element will not be retried.

Example: On errors, retry `f` on an element a maximum of 3 times without any delay between retries.

```julia
pmap(f, c; retry_delays = zeros(3))
```

Example: Retry `f` only if the exception is not of type `InexactError`, with exponentially increasing delays up to 3 times. Return a `NaN` in place for all `InexactError` occurrences.

```julia
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow(e)), retry_delays = ExponentialBackOff(n = 3))
```


# [<span style="color:red">Shared Arrays</span>](https://github.com/JuliaParallel/SharedArray.jl)

* **Shared Memmory** model
* All local processors access the same data space
* See further information at [Shared Array manual page](https://docs.julialang.org/en/stable/manual/parallel-computing/#man-shared-arrays-1)

# [<span style="color:red">Multithreading</span>](https://docs.julialang.org/en/latest/base/multi-threading/)

* [Documentation](https://docs.julialang.org/en/latest/base/multi-threading/)
* [Manual](https://docs.julialang.org/en/latest/manual/parallel-computing/#Multi-Threading-(Experimental)-1)

# [GPU support](https://github.com/JuliaGPU)

# More on Distributed/Parallel Computing in Julia

* TODO update it with JuliaCon 2016, 2017, and 2018 contributions
* JuliaCon 2015
    * [JuliaCon2015 Workshop](https://github.com/JuliaParallel/JuliaCon2015_Workshop)    
    * [JuliaCon 2015 | Amit Murthy: Cluster Managers and parallel julia](https://www.youtube.com/watch?v=XJAQ24NS458) 
        * [IJulia Notebbok on Parallel Julia (internals)](https://github.com/JuliaParallel/JuliaCon2015_Workshop/blob/master/JuliaCon%202015.ipynb) 
    * [JuliaCon India 2015 - Concurrent and Parallel programming](https://github.com/ViralBShah/JuliaConIndia2015/blob/master/JuliaCon_Parallel_Workshop.ipynb)
    * [JuliaCon 2015 - Julia Parallel Workshop](https://github.com/JuliaParallel/JuliaCon2015_Workshop/blob/master/Workshop.ipynb)
* [Julia Oficcial Documentation on Parallel Computing](http://docs.julialang.org/en/latest/manual/parallel-computing/)
* [Official Julia packages on Parallel Computing](https://github.com/JuliaParallel)
* [Julia Lightning Round (Alan Edelman, Viral B. Shah)](https://www.youtube.com/watch?v=37L1OMk_3FU&list=PLP8iPy9hna6Si2sjMkrPY-wt2mEouZgaZ)
* [Stefan Karpinski - Julia: Fast Performance, Distributed Computing & Multiple Dispatch](https://www.youtube.com/watch?v=rUczbQ6ZPd8)
* [Viral's IJulia Notebooks](https://github.com/ViralBShah?tab=repositories)
* [Distributed DataFrame](https://github.com/JuliaParallel/Blocks.jl)
    * Discontinued but there are plans to carry on its implementation
    * See [this topic at julia-dev mailing list](https://groups.google.com/forum/#!msg/julia-dev/ja3ienKR0-g/wZg1NMZvB_QJ)