# <span style="color:red">Distributed Computing with Julia</span>

---
# Master/Worker architecture

Julia relies on the Master/Worker paradigm. While the master process controls the distributed computation, the **workers performs all the computation**.

<img src="figures/master-worker.png" width="400"height="400" />


* Each process has an associated identifier
    * **Master** PID = 1
    * **Workers** PID >= 2 
* Only workers execute distributed calls
    * Except in a single-process environment
        * Process 1 is both master and worker
        * I.e., Julia interactive prompt


Use [nprocs()](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.nprocs) to know the number of process in use:

In [1]:
nprocs()

1

We can either add more processes via **SSH** or add more **local processors** to take advantage of multi-core computers with [addprocs(...)](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.addprocs):

_Remark_: _"Julia 0.4 supports "auto\*host" which will launch as many workers on host as cores"_. Se [discusion at users mailing list](https://groups.google.com/forum/#!msg/julia-users/XH80LzxCERY/jMhbw5YpjvEJ) 

In [2]:
addprocs(2)

2-element Array{Int64,1}:
 2
 3

Then list again to see the just-added processes:

In [3]:
nprocs()

3

You can get the list of processes with [procs()](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.procs):

In [4]:
procs()

3-element Array{Int64,1}:
 1
 2
 3

On the other hand, you can list only the [workers()](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.workers):

In [5]:
workers()

2-element Array{Int64,1}:
 2
 3

We can also iterate over the `workers()` array:

In [6]:
for pid in workers()
    println("Listing worker #", pid)
end

Hi, it's the Master (process ID=1) listing the workers...
Listing worker #2
Listing worker #3


Now we will remove both workers (the processes we previously added) with [rmprocs(pids...)](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.rmprocs):

In [8]:
rmprocs(3,2)

:ok

In [9]:
procs()

1-element Array{Int64,1}:
 1

# [Clusters](http://docs.julialang.org/en/release-0.3/manual/parallel-computing/?highlight=cluster#clustermanagers)


## Basics

* Julia processes run on Julia [Clusters](http://docs.julialang.org/en/release-0.3/manual/parallel-computing/?highlight=cluster#clustermanagers)
    * **Local** cluster
        * Leverage local multi-cores 
    * **Distributed** cluster
        * On top of SSH/TCP/IP connections
* Clusters can be customized

* Clusters can be created as Julia starts. For instance, **to create a 2-core local cluster**:

```
$ julia -p 2
```

* A **distributed SSH cluster** can be created as follows:

```
julia --machinefile HOSTS_FILE common_code.jl
```

* Julia will start worker processes remotely through a password-less SSH login on the computers listed at the HOSTS_FILE file. In addition, it will deploy the *common_code.jl* file on all computers.

* It is important to remark that these computers can be any computer reachable by password-less SSH logins. For example, LAN computers, Amazon EC2 instances, [Docker](http://docker.com) containers, and do forth.

## Advanced

* See [Cluster Manager Interface](http://docs.julialang.org/en/latest/stdlib/parallel/#cluster-manager-interface)
    <!--
    * [launch](http://docs.julialang.org/en/latest/stdlib/parallel/#Base.launch)
     -->
    
<!--
# Cloud support

## Virtual machines (VMs)

* [`Amazon Web Services (AWS)`](https://github.com/amitmurthy/AWS.jl)
* [`OpenStack.jl`](https://github.com/Keno/OpenStack.jl/blob/master/src/OpenStack.jl)
    * Uses [OpenStack Restful API](http://developer.openstack.org/api-ref-compute-v2.html)

## Containers

* [Infra.jl](https://github.com/gsd-ufal/CloudArray.jl/blob/master/src/Infra.jl)
    * Set of **management functions for managing Julia workers on Docker**
* [Docker.jl](https://github.com/Keno/Docker.jl)
    * Wrapper to [Docker HTTP API](https://docs.docker.com/reference/api/docker_remote_api/)

## Higher-level support 
-->

# [<span style="color:red">Julia support for distributed communication</span>](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#general-parallel-computing-support)

## Overview

Let's read the first paragraphs of the [Julia Official Documentation on Parallel Computing](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#general-parallel-computing-support)

* "Julia’s implementation of message passing is **different from other environments such as MPI**. 
* **Communication in Julia is generally “one-sided”**
    * The programmer needs **to explicitly manage only one process in a two-process operation**. 
    * These operations typically **do not look like “message send” and “message receive”** but rather resemble **higher-level operations like calls to user functions**.

* Parallel programming in Julia is built on two primitives: **remote references** and **remote calls**. 
    * A **remote reference** is an object that can be used from any process to **refer to an object stored on a particular process.**
    * A **remote call** is a **request by one process to call a certain function** on certain arguments on another (possibly the same) process. 
        * A remote call **returns a remote reference** to its result. 
        * Remote calls return immediately [**Asynchronous**]; the process that made the call proceeds to its next operation while the remote call happens somewhere else. 
        * You can wait for a remote call to finish by calling wait on its remote reference[**Synchronous**].
        * You can obtain the full value of the result using **fetch**. 
        * You can store a value to a remote reference using **put**.
"

#### In other words...

* It is **not a send-receive paradigm**
    * There is no receive: it assumes that communication is managed by the calling process
    * The calling process calls remote functions or expressions
    
#### Further supports

* **Macros** to make communication easier
* **Tasks**
    * Coroutines, green threads that run on a single actual thread
    * **Tasks can be combined with remote calls for parallel programming**
    * Julia multithreading is being developed at Julia 0.5     
    
## Programming model


* Programming model
    * **Primitive: remote calls**
        * A remote request similiar to _send_
            * However, remote calls specify the operation to be performed
            * Similar to embed the _receive_ operation into _send_
        * [`remotecall(id, func, args...)`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.remotecall)
            * `id` is the process that will perform the computation
            * `func` is the function or expression to be executed
            * `args` ar the function arguments
    * **Abstraction: remote reference**
        * Remote calls return a remote reference (`RemoteRef`)
        * `RemoteRef` can be used by any processes
    


## [Remote calls](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.remotecall)

* Julia low-level communication primitives
* Remote calls can be performed **Synchronously** or **Asynchronously**.

In order to put workers to execute some code, let's add two processes since we currently have a single (master) process:

In [9]:
addprocs(2)

5-element Array{Int64,1}:
 1
 2
 3
 4
 5

As processes have unique ID, remark that the new processes' IDs are 4 and 5.



### Asynchronous remote calls

* The calling process **does not wait** the worker computation
    * **Non-blocking call**
* [`remotecall(id, func, args...)`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.remotecall)
    * E.g., let's put *worker 4* to calculate the *square root of 144*:

In [10]:
remotecall(4, sqrt, 144)

RemoteRef{Channel{Any}}(4,1,5)

In [11]:
async_sqtr = remotecall(4, sqrt, 144)

RemoteRef{Channel{Any}}(4,1,6)

In [12]:
fetch(async_sqtr)

12.0

### Synchronous remote calls

* **Synchronous** communications block the sender till they receive the response
    * Makes the caller **wait**
* [`remotecall_wait(id, func, args...)`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.remotecall_wait)
    * The following example puts *worker 2* to *sleep 3 seconds* and waits:

In [13]:
wait_till_worker_2_sleeps_5_secs = remotecall_wait(2, sleep, 3)

println("This message waited for worker 2")

This message waited for worker 2


This call blocked the caller process (master, process whose ID is 1) for 3 seconds. 

Remark that the result from the Output appeared 3 seconds later with the RemoteRef.

Now let's execute the same command **asynchronously** (non-blocking call):

In [14]:
dont_wait_till_worker_2_sleeps_5_secs = remotecall(2, sleep, 3)

println("This message did NOT wait for worker 2")

This message did NOT wait for worker 2


### @sync and @async

* Macros that ease the construction of synchronous and asynchronous calls.
* These macros return [`Tasks`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#tasks).
* [**`@sync`**](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.@sync)
    * Makes the calling process **wait** for a function/expression execution
        * _Wait until all dynamically-enclosed uses of @async, @spawn, @spawnat and @parallel are complete._
    * Similiar to [monitors](https://en.wikipedia.org/wiki/Monitor_%28synchronization%29)
    * Optionally, we can use  `wait` instead:
        * [`wait([x])`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.wait)
            * _Block the current task until some event occurs, depending on the type of the argument._
        * [`timedwait(testcb, secs; pollint)`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.timedwait)
            * _Waits till `testcb` returns true or for `secs` seconds, whichever is earlier._
            * _`testcb` is polled every `pollint` seconds._
            
* [**`@async`**](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.@async)
    * Free the calling process (Task) from a blocked operation
    * Similar to running a command in background in Bash
   

### Remark on Asynchronous vs. Synchronous calls

* Usage depends on the distributed application
* Use **Synchronous** calls when you need to wait a result to carry on the computation:
    * Workflows
    * Mutual exclusion: consistency, integrity
* Otherwise, use **Asynchronous** calls since blocking communications degrade the system performance and can eventually imply [deadlocks](https://en.wikipedia.org/wiki/Deadlock).

## [Remote reference](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.RemoteRef)

* A [`RemoteRef`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.RemoteRef) is a **reference to an object stored at a remote process**.
    * It is NOT in the sense of the ~~Object-oriented Programming (OOP)~~
* All `RemoteRef` are **visible by all processes**
* **You can put to and get values from it**
    * It's like a data/function/expression recipient


### `RemoteRef` operations:

* [`RemoteRef()`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.RemoteRef)
* [`RemoteRef(p)`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.RemoteRef)
* [`fetch(RemoteRef)`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.fetch)
    * _Wait for and get the value of a remote reference._
* [`put!(RemoteRef, value)`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.put!)
    * _Store a value to a remote reference. Implements **“shared queue of length 1”** semantics: if a value is already present, **blocks** until the value is removed with take!. Returns its first argument._
    * Example: let's create a RemoteRef that will run asynchronously (in background) till we put some value to it:
    

In [15]:
r = RemoteRef()
@async (fetch(r), println("hi"))

Task (waiting) @0x000000011295d7b0

* Now let's put some value to `r`:

In [16]:
put!(r,0)

hi

RemoteRef{Channel{Any}}(1,1,11)




* [`take!(RemoteRef)`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.take!)
    * _Fetch the value of a remote reference, removing it so that the reference is empty again._
    * **Blocks** the caller if the `RemoteRef` is empty
* [`isready(r::RemoteRef)`](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.isready)
    * _Determine whether a `RemoteRef` has a value stored to it._
    * **Blocks** the caller
    * Should be used **only on `RemoteRef`s that are assigned once**.
        * `isready(..)` might cause race conditions -> consistency issues
    * Example: checking whether a `put!` was performed:

In [275]:
rr = RemoteRef()
@async put!(rr, remotecall_fetch(4, sleep, 10))
isready(rr)

false

If you wait 10 seconds, the condition will be set to true:

In [274]:
isready(rr)

true

## @spawn and @spawnat

* The macro [@spawn](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.@spawn) spawns (runs) an expression/function at a worker.
* All function computation is done at worker side

In [17]:
remote_matrix = @spawn rand(2,2)

RemoteRef{Channel{Any}}(2,1,12)

Now we fetch the `remote_matrix` variable with fetch(...):

In [18]:
fetch(remote_matrix)

2x2 Array{Float64,2}:
 0.179349  0.094475
 0.557984  0.185575

If you need to execute an expression over the `remote_matrix`, you can still use @spawn combined with fetch(...):

For instance:

In [20]:
remote_matrix_plus_one = @spawn 1 .+ fetch(remote_matrix)

RemoteRef{Channel{Any}}(4,1,15)

Now we fetch the result:

In [21]:
fetch(remote_matrix_plus_one)

2x2 Array{Float64,2}:
 1.17935  1.09448
 1.55798  1.18558

If you are curious about where is `remote_matrix_plus_one`:

In [25]:
remote_matrix_plus_one.where

4

If you need to have more control of the distributed computation, you may use the macro [@spawnat](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.@spawnat) which allows to spawn an expression/function at a specific worker.

In the following example, we execute the sqtr(...) function at the worker whose ID is 4:

In [26]:
sqrt_at_4 = @spawnat 4 sqrt(3)

RemoteRef{Channel{Any}}(4,1,17)

Now we fetch the `sqrt_at_4` variable with `fetch` and get the worker where `sqrt_at_4` is stored:

In [27]:
fetch(sqrt_at_4)

sqrt_at_4.where

4

## [@fetch](http://docs.julialang.org/en/latest/stdlib/parallel/#Base.@fetch) and [@fetchfrom](http://docs.julialang.org/en/latest/stdlib/parallel/#Base.@fetch)

We can also use spawn and fetch at once with macro [@fetch(...)](http://docs.julialang.org/en/latest/stdlib/parallel/#Base.@fetch) which is equivalent to **`fetch(@spawn expr)`**:

In [28]:
@fetch 2 + 6

8

For more control, use the macro [@fetchfrom(...)](http://docs.julialang.org/en/latest/stdlib/parallel/#Base.@fetchfrom) which is equivalent to **`fetch(@spawnat p expr)`**:

In [29]:
@fetchfrom 5 12^9

5159780352

## [@everywhere](http://julia.readthedocs.org/en/latest/stdlib/parallel/#Base.@everywhere)

* The [@everywhere](http://julia.readthedocs.org/en/latest/stdlib/parallel/#Base.@everywhere) macro forces a command/expression to run on all processes.
* Example: let's print everywhere (at every workers) processes' IDs:

In [31]:
#addprocs(2)
@everywhere println(myid())

1
	From worker 2:	2
	From worker 4:	4
	From worker 5:	5
	From worker 3:	3


For instance, let's define on all processes a function that prints workers' IDs:

In [32]:
@everywhere who_are_you() = println("I'm your father... oops, I'm worker $(myid())")

Now we call it:

In [57]:
@spawn who_are_you()

RemoteRef{Channel{Any}}(2,1,53)

	From worker 2:	I'm your father... oops, I'm worker 2


* @everywhere can also be used to load modules on all processes:

In [58]:
#Pkg.add("Calculus")
@everywhere using Calculus



In [60]:
@fetchfrom 4 derivative(x-> x^3, 2)

12.000000000099304

* For loading source files, use [`require(..)`](http://julia.readthedocs.org/en/latest/stdlib/base/#Base.require) instead.

In [62]:
require(MyCode.jl)

LoadError: LoadError: UndefVarError: MyCode not defined
while loading In[62], in expression starting on line 1

##[@parallel](http://docs.julialang.org/en/latest/stdlib/parallel/#Base.@parallel)

* Parallelizes a for loop or a comprenhesion automatically among the available workers.
    * Split the range
    * Distribute the range to each process

In [66]:
@everywhere plus_my_id(x) = x + myid()

a = 100
@parallel for i=1:10
    println(plus_my_id(a))
end

4-element Array{Any,1}:
 RemoteRef{Channel{Any}}(3,1,96)
 RemoteRef{Channel{Any}}(4,1,97)
 RemoteRef{Channel{Any}}(5,1,98)
 RemoteRef{Channel{Any}}(2,1,99)

	From worker 4:	104
	From worker 4:	104
	From worker 4:	104
	From worker 2:	102
	From worker 3:	103
	From worker 2:	102
	From worker 3:	103
	From worker 3:	103
	From worker 5:	105
	From worker 5:	105


* **`@parallel`** may also performs a **reduce function on each worker and the final reduction at the calling process** (e.g., Master):
    * Optionally, @parallel may take a “reducer” as its first argument
    * The results from each remote reducer will be aggregated using the reducer locally
    * Tasks are assigned asynchronously when no reducer operation is used
    

In [71]:
@parallel (+) for i=1:10
   plus_my_id(a)
end

1033

---
# [<span style="color:red">Tasks (coroutines)</span>](http://docs.julialang.org/en/latest/stdlib/parallel/#tasks)


* There is **no thread**.
    * Multi-threading is begin developed at Julia 0.5
    * Julia Tasks are **not thread-safe**.
* Julia tasks are **green threads (coroutines)**
    * Tasks do **not** spam over different CPUs
    * Julia scheduler decides which tasks runs at a given time
    * Tasks can yield, notify, schedule, etc.
* Useful when you need concurrency programming with no actual parallelization (**pseudo-parallelization** instead)
    * Context change is very fast as tasks belongs to a single kernel thread
* Julia relies on ["**remote function execution** as opposed to message passing"](https://github.com/ViralBShah/JuliaConIndia2015/blob/master/JuliaCon_Parallel_Workshop.ipynb)
    * “... Our current approach to concurrent computation is a **shared-nothing multi-process model** that also works across machines.” [Stefan Karpinski at julia-users mailing list](https://groups.google.com/forum/#!searchin/julia-users/kernel$20thread/julia-users/Y9XYxI5Bgi4/yUSszl1ajioJ)


<img src="figures/julia-tasks.png" width="300" />


<!--
## Task as a programming abstraction 

* States
 * Runnable (“created”)
 * Running
 * Done
 * ...

* Communication **primitives**:
 * **produce**(..)
 * **consume**(..) -> synchronous call
 * From [Amit Murthy ipynb](https://github.com/ViralBShah/JuliaConIndia2015/blob/master/JuliaCon_Parallel_Workshop.ipynb)
     * "produce and consume allow a producer task to "feed" one or more consumers
     * produce blocks till a consumer removes a value
     * consume blocks till the producer adds a value or exits"
* High-level API (macros)
    * @task
    * @sync
    * @async
    * @schedule
        * "@schedule schedules an expression to be run like @async except: 
            * launched task is not waited on by enclosing @sync blocks
            * does not "localize" variables"
    * wait
    * 
    
## Channels 
# See: https://github.com/ViralBShah/JuliaConIndia2015/blob/master/JuliaCon_Parallel_Workshop.ipynb
-->

### Examples

####Using the **`produce`** function

In [73]:
t = Task( () -> produce(5) )

Task (runnable) @0x000000010fd75510

In [74]:
consume(t)

5

#### Implementing a **function with [produce](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.produce)**:

In [75]:
function producer(p)
    for i in 1:p
        produce(i)
    end
end

producer (generic function with 1 method)

* Create a task with the implemented function with [Task](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.Task)
    * We have to use an anonymous function (lambda) for that: 

In [81]:
t2 = Task( () -> producer(4) )

Task (runnable) @0x00000001102cc7f0

* Run the task with [consume](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.consume)
    * Let's perform a single consume:

In [82]:
consume(t2)

1

* Now, let's consume the last products from function `producer` and calculates their square root in a `for` loop.
    * Each iteration performs a `consume(tsk1)` call:

In [83]:
for t in t2
    println("t=", t, "; t^2=", t^2)
end

t=2; t^2=4
t=3; t^2=9
t=4; t^2=16


### Macro @task 

* Optionally, you can create a task with macro [@task](http://docs.julialang.org/en/release-0.3/stdlib/parallel/#Base.@task)
* Let's define a function which produces [Fibonacci numbers](https://en.wikipedia.org/wiki/Fibonacci_number):

In [84]:
function fib_producer(n)
    a, b = (0, 1)
    for i = 1:n
        produce(b)
        a, b = (b, a + b)
    end
end

fib_producer (generic function with 1 method)

* Now we create a 6-element Fibonacci series:

In [85]:
t3 = @task fib_producer(6)

Task (runnable) @0x000000011130a8c0

* We can perform a multiple consume in a for loop
* For instance, `println(t)` implicitly calls `consume(t)` as we iterate over `tsk2`:

<!--
a = @async 1+2
println("non-blocking call")
consume(a)
-->

In [86]:
for t in t3; println(t); end

1
1
2
3
5
8


## Example: concurrent (parallel) clients accessing an authentication server

### Goal

Show how we can **spawn and manage parallel codes wrapped as tasks**

### Defining type, variables, and functions

* `type User`
* The list of users
* Functions specific to the authentication server (Worker 2)

In [221]:
#
# workers 2:n
#

@everywhere type User
    username
    password
end

#
# worker 2
#

@everywhere list_of_users = User[]

@everywhere function auth(user::User)
    for u in list_of_users
        if user.username == u.username && user.password == u.password
            return true
        end
    end
    return false
end

@everywhere function add_user(user::User)
    push!(list_of_users,user)
end

@everywhere function list_users()
    for u in list_of_users
        @show u.username
    end
end

@everywhere function remove_user(username::AbstractString )
    i=1
    for u in list_of_users
        if username == u.username 
            deleteat!(list_of_users,i)
            return true
        end
        i=i+1
    end
    return false
end

@everywhere function remove_all_users()
    empty!(list_of_users)
end

### Defining client `sign_up` function

In [243]:
#
# workers 3:n
#

@everywhere function sign_up()
    u = User("client-$(myid())","$(myid())")
    remotecall(2,add_user,u)
end

### Each Worker signs up once

In [244]:
for w in workers()
    if w != 2 # skipping Worker 2   
        remotecall(w,sign_up)
    end
end

### Listing user at authentication server (at Worker 2)

In [245]:
remotecall(2,list_users)

RemoteRef{Channel{Any}}(2,1,7868)

	From worker 2:	u.username = "client-3"
	From worker 2:	u.username = "client-4"
	From worker 2:	u.username = "client-5"


### Cleaning up list of users

In [255]:
remotecall(2, remove_all_users)
remotecall(2,list_users)

RemoteRef{Channel{Any}}(2,1,7917)

### Using `Task` to define sign up functions

In [247]:
@everywhere function launch_sing_ups(n_of_sign_ups)
#    users_task = Task( () -> launch_sing_ups_task(n_of_sign_ups) )
    users_task = @task launch_sing_ups_task(n_of_sign_ups)
    for u in users_task
        remotecall_wait(2,add_user,u)
    end
end

@everywhere function launch_sing_ups_task(n_of_sign_ups)
    for n=1:n_of_sign_ups
        u = User("client-$(myid())-$n","$(myid())-$n")
        println("User created at process $(myid())")
        produce(u)
    end
end

### Launching sing up Tasks then listing users

In [256]:
for w in workers()
    if w != 2 # skipping Worker 2 
        remotecall(w,launch_sing_ups, 2)
    end
end

	From worker 3:	User created at process 3
	From worker 4:	User created at process 4
	From worker 5:	User created at process 5
	From worker 3:	User created at process 3
	From worker 4:	User created at process 4
	From worker 5:	User created at process 5


In [257]:
remotecall(2,list_users)

RemoteRef{Channel{Any}}(2,1,7921)

	From worker 2:	u.username = "client-3-1"
	From worker 2:	u.username = "client-4-1"
	From worker 2:	u.username = "client-5-1"
	From worker 2:	u.username = "client-3-2"
	From worker 2:	u.username = "client-5-2"
	From worker 2:	u.username = "client-4-2"


# [<span style="color:red">Distributed Arrays</span>](https://github.com/JuliaParallel/DistributedArrays.jl)


## Introduction

* **Distributed Memory** model
    * A multi-dimensional array stored at distributed processes
    * No shared memory
* **Dense Arrays**
    * E.g., non-sparse matrices
* All processes know about DistributedArrays
* **Only metadata is exchanged**
    * Each process holds a part of data the whole DistributedArray

```
2 2 2 2
3 3 3 3
4 4 4 4
5 5 5 5
```


In [1]:
addprocs(4)

4-element Array{Int64,1}:
 2
 3
 4
 5

In [87]:
@everywhere using DistributedArrays



### Creating DistributedArrays

Use `Array` constructors with **d** at the beginning:

    dzeros(100,100,10)
    dones(100,100,10)
    drand(100,100,10)
    drandn(100,100,10)
    dfill(x,100,100,10)

In [88]:
d = dzeros(10)

10-element DistributedArrays.DArray{Float64,1,Array{Float64,1}}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

In [89]:
d[4]

0.0

In [90]:
i = d[4]
typeof(i)

Float64

#### Creating from an `Array`

In [97]:
arr = rand(10)
darr = distribute(arr)

10-element DistributedArrays.DArray{Float64,1,Array{Float64,1}}:
 0.966206 
 0.643017 
 0.482076 
 0.216727 
 0.228819 
 0.933118 
 0.891193 
 0.0656999
 0.69795  
 0.0493549

#### Converting `DistributedArrays` to `Array`

In [92]:
arr2 = convert(Array, darr)

10-element Array{Float64,1}:
 0.812878
 0.84906 
 0.122719
 0.899384
 0.282108
 0.182757
 0.774173
 0.715845
 0.998593
 0.443607

In [94]:
typeof(arr2)

Array{Float64,1}

### Getting information on where DistributedArrays are stored

* **`localpart(a::DArray)`** obtains the locally-stored portion of a  DArray.

* **`localindexes(a::DArray)`** gives a tuple of the index ranges owned by the local process.

In [101]:
localpart(darr)

0-element Array{Float64,1}

In [102]:
for w in workers()
    @spawnat w println(localindexes(darr))
end

	From worker 2:	(1:3,)
	From worker 4:	(6:7,)
	From worker 3:	(4:5,)
	From worker 5:	(8:10,)


In [48]:
for w in workers()
    @spawnat w println(localpart(darr))
end

	From worker 2:	[0.7252225541990764,0.6608562929625534,0.8582114993759962]
	From worker 3:	[0.08690975874742035,0.6524378568671487]
	From worker 5:	[0.9784513095442966,0.29507647226882905,0.5295115322543209]
	From worker 4:	[0.6569416593382784,0.25312886183129724]


In [103]:
d = darr
@show d.chunks
@show d.cuts
@show d.dims
@show d.indexes
@show d.pids

d.chunks = RemoteRef[RemoteRef{Channel{Any}}(2,1,244),RemoteRef{Channel{Any}}(3,1,245),RemoteRef{Channel{Any}}(4,1,246),RemoteRef{Channel{Any}}(5,1,247)]

4-element Array{Int64,1}:
 2
 3
 4
 5


d.cuts = [[1,4,6,8,11]]
d.dims = (10,)
d.indexes = [(1:3,),(4:5,),(6:7,),(8:10,)]
d.pids = [2,3,4,5]


In [104]:
?DistributedArrays

search: 

# DistributedArrays.jl

[![Build Status](https://travis-ci.org/JuliaParallel/DistributedArrays.jl.svg?branch=master)](https://travis-ci.org/JuliaParallel/DistributedArrays.jl)

Distributed Arrays for Julia

***NOTE*** Distributed Arrays will only work on the latest development version of Julia (v0.4.0-dev).

`DArray`s have been removed from Julia Base library in v0.4 so it is now necessary to import the `DistributedArrays` package on all spawned processes.

```julia
@everywhere using DistributedArrays
```

## Distributed Arrays

Large computations are often organized around large arrays of data. In these cases, a particularly natural way to obtain parallelism is to distribute arrays among several processes. This combines the memory resources of multiple machines, allowing use of arrays too large to fit on one machine. Each process operates on the part of the array it owns, providing a ready answer to the question of how a program should be divided among machines.

Julia distributed arrays are implemented by the `DArray` type. A `DArray` has an element type and dimensions just like an `Array`. A `DArray` can also use arbitrary array-like types to represent the local chunks that store actual data. The data in a `DArray` is distributed by dividing the index space into some number of blocks in each dimension.

Common kinds of arrays can be constructed with functions beginning with `d`:

```julia
    dzeros(100,100,10)
    dones(100,100,10)
    drand(100,100,10)
    drandn(100,100,10)
    dfill(x,100,100,10)
```

In the last case, each element will be initialized to the specified value `x`. These functions automatically pick a distribution for you. For more control, you can specify which processes to use, and how the data should be distributed:

```julia
    dzeros((100,100), workers()[1:4], [1,4])
```

The second argument specifies that the array should be created on the first four workers. When dividing data among a large number of processes, one often sees diminishing returns in performance. Placing `DArray`\ s on a subset of processes allows multiple `DArray` computations to happen at once, with a higher ratio of work to communication on each process.

The third argument specifies a distribution; the nth element of this array specifies how many pieces dimension n should be divided into. In this example the first dimension will not be divided, and the second dimension will be divided into 4 pieces. Therefore each local chunk will be of size `(100,25)`. Note that the product of the distribution array must equal the number of processes.

  * `distribute(a::Array)` converts a local array to a distributed array.

  * `localpart(a::DArray)` obtains the locally-stored portion of a  `DArray`.

  * `localindexes(a::DArray)` gives a tuple of the index ranges owned by the local process.

  * `convert(Array, a::DArray)` brings all the data to the local process.

Indexing a `DArray` (square brackets) with ranges of indexes always creates a `SubArray`, not copying any data.

## Constructing Distributed Arrays

The primitive `DArray` constructor has the following somewhat elaborate signature:

```julia
    DArray(init, dims[, procs, dist])
```

`init` is a function that accepts a tuple of index ranges. This function should allocate a local chunk of the distributed array and initialize it for the specified indices. `dims` is the overall size of the distributed array. `procs` optionally specifies a vector of process IDs to use. `dist` is an integer vector specifying how many chunks the distributed array should be divided into in each dimension.

The last two arguments are optional, and defaults will be used if they are omitted.

As an example, here is how to turn the local array constructor `fill` into a distributed array constructor:

```julia
    dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)
```

In this case the `init` function only needs to call `fill` with the dimensions of the local piece it is creating.

`DArray`s can also be constructed from multidimensional `Array` comprehensions with the `@DArray` macro syntax.  This syntax is just sugar for the primitive `DArray` constructor:

```julia
julia> [i+j for i = 1:5, j = 1:5]
5x5 Array{Int64,2}:
 2  3  4  5   6
 3  4  5  6   7
 4  5  6  7   8
 5  6  7  8   9
 6  7  8  9  10

julia> @DArray [i+j for i = 1:5, j = 1:5]
5x5 DistributedArrays.DArray{Int64,2,Array{Int64,2}}:
 2  3  4  5   6
 3  4  5  6   7
 4  5  6  7   8
 5  6  7  8   9
 6  7  8  9  10
```

## Distributed Array Operations

At this time, distributed arrays do not have much functionality. Their major utility is allowing communication to be done via array indexing, which is convenient for many problems. As an example, consider implementing the "life" cellular automaton, where each cell in a grid is updated according to its neighboring cells. To compute a chunk of the result of one iteration, each process needs the immediate neighbor cells of its local chunk. The following code accomplishes this::

```julia
    function life_step(d::DArray)
        DArray(size(d),procs(d)) do I
            top   = mod(first(I[1])-2,size(d,1))+1
            bot   = mod( last(I[1])  ,size(d,1))+1
            left  = mod(first(I[2])-2,size(d,2))+1
            right = mod( last(I[2])  ,size(d,2))+1

            old = Array(Bool, length(I[1])+2, length(I[2])+2)
            old[1      , 1      ] = d[top , left]   # left side
            old[2:end-1, 1      ] = d[I[1], left]
            old[end    , 1      ] = d[bot , left]
            old[1      , 2:end-1] = d[top , I[2]]
            old[2:end-1, 2:end-1] = d[I[1], I[2]]   # middle
            old[end    , 2:end-1] = d[bot , I[2]]
            old[1      , end    ] = d[top , right]  # right side
            old[2:end-1, end    ] = d[I[1], right]
            old[end    , end    ] = d[bot , right]

            life_rule(old)
        end
    end
```

As you can see, we use a series of indexing expressions to fetch data into a local array `old`. Note that the `do` block syntax is convenient for passing `init` functions to the `DArray` constructor. Next, the serial function `life_rule` is called to apply the update rules to the data, yielding the needed `DArray` chunk. Nothing about `life_rule` is `DArray`\ -specific, but we list it here for completeness::

```julia
    function life_rule(old)
        m, n = size(old)
        new = similar(old, m-2, n-2)
        for j = 2:n-1
            for i = 2:m-1
                nc = +(old[i-1,j-1], old[i-1,j], old[i-1,j+1],
                       old[i  ,j-1],             old[i  ,j+1],
                       old[i+1,j-1], old[i+1,j], old[i+1,j+1])
                new[i-1,j-1] = (nc == 3 || nc == 2 && old[i,j])
            end
        end
        new
    end
```

## Numerical Results of Distributed Computations

Floating point arithmetic is not associative and this comes up when performing distributed computations over `DArray`s.  All `DArray` operations are performed over the `localpart` chunks and then aggregated. The change in ordering of the operations will change the numeric result as seen in this simple example:

```julia
julia> addprocs(8);

julia> @everywhere using DistributedArrays

julia> A = fill(1.1, (100,100));

julia> sum(A)
11000.000000000013

julia> DA = distribute(A);

julia> sum(DA)
11000.000000000127

julia> sum(A) == sum(DA)
false
```

The ultimate ordering of operations will be dependent on how the Array is distributed.


DistributedArrays



## Advanced topics in DArray

### Customizing DistributedArrays creation


In [105]:
darray = drand((10,4), workers()[1:4], [1,4])

10x4 DistributedArrays.DArray{Float64,2,Array{Float64,2}}:
 0.587376    0.875714   0.557657   0.642919 
 0.00463682  0.543705   0.421258   0.894397 
 0.807876    0.21561    0.791013   0.115873 
 0.47104     0.0123357  0.594544   0.832076 
 0.156641    0.68095    0.31139    0.53989  
 0.0745466   0.72249    0.0258673  0.0449686
 0.0763677   0.212898   0.353551   0.59597  
 0.0559051   0.142527   0.215963   0.983642 
 0.428478    0.43608    0.35838    0.267668 
 0.0359058   0.10167    0.543784   0.317812 

In [106]:
for w in workers()
    @spawnat w println(localindexes(darray))
end

	From worker 4:	(1:10,3:3)
	From worker 2:	(1:10,1:1)
	From worker 3:	(1:10,2:2)
	From worker 5:	(1:10,4:4)


* Now let's know which `RemoteRef`s holds the chunks of `darray`:

In [91]:
darray.chunks

1x4 Array{RemoteRef{RemoteStore},2}:
 RemoteRef{Channel{Any}}(2,1,7147)  …  RemoteRef{Channel{Any}}(5,1,7150)

* We can also know which processes hold `darray`:

In [143]:
darray.pids

1x4 Array{Int64,2}:
 2  3  4  5

* Or we can know the splitting indexes of `darray`:

In [142]:
darray.indexes

1x4 Array{Tuple{UnitRange{Int64},UnitRange{Int64}},2}:
 (1:10,1:1)  (1:10,2:2)  (1:10,3:3)  (1:10,4:4)

* From a process, we can know the indexes it holds:

In [139]:
for w in workers()
    @spawnat w @show localindexes(darray)
end

	From worker 2:	localindexes(darray) = (1:10,1:1)
	From worker 3:	localindexes(darray) = (1:10,2:2)
	From worker 4:	localindexes(darray) = (1:10,3:3)
	From worker 5:	localindexes(darray) = (1:10,4:4)


* We can get any data of `darray` from any process, for instance, from the master:



In [141]:
darray[5]

0.32587670612620934

* We can also access `darray` from any process such as worker 4:

In [140]:
fetch(@spawnat 4 darray[5])

0.32587670612620934

### Distributed reduce using DistributedArrays
* From Jeff Bezanson's Tutorial on [Parallel and Distributed Computing with Julia](https://www.youtube.com/watch?v=JoRn4ryMclc)

* If we wanted to sum `darray` locally, we can simply call `sum(darray)`

In [104]:
sum(darray)

20.357692287307746

* The previous command was performed **locally**, at the master process

* **Let's take advantage of parallelism** by putting all workers to execute the sum on the portion of `darray` each worker holds in order to not imply data movement
* It's similiar to a distributed `reduce` operation:
    * The way you think and prgram in Julia is natural, i.e., you can read the following line as you would say to some people to do some task:
    * _"spwan at process `p` the sum of its `darray` local part for all processes"_


In [108]:
local_sums = [ @spawnat w sum(localpart(darray)) for w in workers()]

4-element Array{Any,1}:
 RemoteRef{Channel{Any}}(2,1,7522)
 RemoteRef{Channel{Any}}(3,1,7523)
 RemoteRef{Channel{Any}}(4,1,7524)
 RemoteRef{Channel{Any}}(5,1,7525)

* Remark: `@everywhere` could not be use in the previous example since it does not return any value/RemoteRef.

* Now let's see the contents of each `RemoteRef`:

In [110]:
for s in local_sums println("Local sum of $s is $(fetch(s))") end

Local sum of RemoteRef{Channel{Any}}(2,1,7522) is 5.272844798025005
Local sum of RemoteRef{Channel{Any}}(3,1,7523) is 5.755148538138882
Local sum of RemoteRef{Channel{Any}}(4,1,7524) is 5.093243829209424
Local sum of RemoteRef{Channel{Any}}(5,1,7525) is 4.2364551219344335


* So basically `local_sums` is an array with the results of sums which were executed **at each worker**
* In other words, it reduced the values of `darray` each worker holds by executing the sum remotely.
* **Now lets merge all the reduces to generate the final output**:
    * First we `fetch` the results with `map`
    * Then we merge them (i.e. sum) localy, at the master process
        * This is similar to merging the _Output files_ of the MapReduce framework ([c.f. Figure 1](http://static.googleusercontent.com/media/research.google.com/pt-BR//archive/mapreduce-osdi04.pdf))

In [112]:
sum(map(fetch, [ @spawnat p sum(localpart(darray)) for p in workers()] ))

20.357692287307746

* Indeed, **we can directly use the `reduce` function** instead of `sum`:

In [113]:
reduce(+, map(fetch, [ @spawnat p reduce(+, localpart(darray)) for p in workers()] ))

20.357692287307746

* Finallly, we can define a **parallel reduce** function

In [116]:
parallel_reduce(f,darray) = reduce(f, map(fetch, [ @spawnat p reduce(f, localpart(darray)) for p in workers()] ))

parallel_reduce (generic function with 1 method)

* Now we use the **`parallel_reduce` function to operate over other `DistributedArray`s**:

In [117]:
d = drand(10)
parallel_reduce(+, d)

4.142936730385074

* You can test with further function since it reduces, i.e., it's a binary operator:

In [118]:
parallel_reduce(*, d)
parallel_reduce(max, d)

0.8151657589779049

### More on distributed reduce

* Intersting [discussion at Julia mailing list on ```pmap_reduce```](https://groups.google.com/forum/#!msg/julia-users/WJBIAYzrZgg/3xzjoMfpKVMJ)

#### What if <span style="color:green">I don't know the size of the data I need to load</span>  and <span style="color:gree">I don't want to deal with low-level details</span> on DistributedArrays?

#### [<span style="color:red">CloudArray</span> solves this transparently!](cloudarray.ipynb)



---
# <span style="color:red">Distributed Maps and Reduces</span>


* Map, Filter and Reduce are common [**functional programming**](https://en.wikipedia.org/wiki/Functional_programming) features
* [**Google MapReduce framework**](http://research.google.com/archive/mapreduce-osdi04.pdf) leveraged Map and Reduce concepts and proposed a _distributed framewrok for data-intensive applications_
    * [**Hadoop**](http://hadoop.apache.org) is an open-source implementation of Google MapReduce framework
    * [**Rhipe**](https://www.datadr.org) allows to transparently execute R codes on top of Hadoop (no need to deal with Hadoop details)
* Julia functional features include:
    * **Map, Filter, Reduce, and MapReduce**
    * However, they are not distributed
    * E.g., `mapreduce` uses the same worker to execute both map and reduce
* Native **Distribute MapReduce** in Julia is done in different ways
    * Spawning `map, reduce, mapreduce` calls on workers
        * @parallel, @spawn, remotecall, etc.
    * **Distributed map** called [**pmap**](http://docs.julialang.org/en/latest/stdlib/parallel/#Base.pmap)

<!--
    * pmap
        * https://groups.google.com/forum/#!msg/julia-users/WJBIAYzrZgg/_UPc8vhkCAAJ
    * Discussion on MapReduce, ShredArrays, etc. at [julia-users maling list](https://groups.google.com/forum/#!msg/julia-users/1sNXYtIbS1Q/1sNd0h92AQAJ)
    * distributed mapreduce by using @spawn
    * @distribute
    * @dmap
-->

## Distributed `reduce`

See example with `DistributedArrays`.

## `pmap`

* Context (motivation)
    * No reduction operator is needed
    * Any variables used inside the @parallel loop will be copied (broadcasted) to each process.
* `pmap` function
    * Applies a function to all elements in some collection
* `pmap(f, coll)`
    * `f: function`
    * `coll: collection`

In [107]:
M = [-1.02, 3.14, -5.63]

pmap(abs,M)

3-element Array{Any,1}:
 1.02
 3.14
 5.63

Where `pmap` distributed my processing?

In [345]:
M = ["Where am I?", "Where am I?", "Where am I?", "Where am I?", "Where am I?"]

pmap(println,M)

	From worker 2:	Where am I?
	From worker 3:	Where am I?


5-element Array{Any,1}:
 nothing
 nothing
 nothing
 nothing
 nothing

	From worker 4:	Where am I?
	From worker 5:	Where am I?
	From worker 2:	Where am I?


In [405]:
@everywhere f(x) = x.+1

M = rand(10^4)
pmap(f, M)

10000-element Array{Any,1}:
 1.10063
 1.62362
 1.43912
 1.24644
 1.21631
 1.79434
 1.02257
 1.67295
 1.66239
 1.6951 
 1.2725 
 1.48917
 1.11586
 ⋮      
 1.68033
 1.52385
 1.71493
 1.3362 
 1.43307
 1.57744
 1.47357
 1.51247
 1.85764
 1.68235
 1.02524
 1.27252

Now, suppose that we have to calculate the rank of a number of large matrices

In [20]:
function rank_marray()
   marr = [rand(1000,1000) for i=1:4]
   for arr in marr
      println(rank(arr))
   end
end

rank_marray (generic function with 1 method)

In [22]:
@time rank_marray()

1000
1000
1000
1000
  5.118474 seconds (1.38 k allocations: 63.449 MB, 0.19% gc time)


In [25]:
marr = [rand(1000,1000) for i=1:4]

@time println(pmap(rank, marr))

Any[1000,1000,1000,1000]
  2.963267 seconds (1.03 k allocations: 70.031 KB)


In [1]:
?pmap

search: 

```rst
..  pmap(f, lsts...; err_retry=true, err_stop=false, pids=workers())

Transform collections ``lsts`` by applying ``f`` to each element in parallel.
(Note that ``f`` must be made available to all worker processes; see :ref:`Code Availability and Loading Packages <man-parallel-computing-code-availability>` for details.)
If ``nprocs() > 1``, the calling process will be dedicated to assigning tasks.
All other available processes will be used as parallel workers, or on the processes specified by ``pids``.

If ``err_retry`` is ``true``, it retries a failed application of ``f`` on a different worker.
If ``err_stop`` is ``true``, it takes precedence over the value of ``err_retry`` and ``pmap`` stops execution on the first error.
```


pmap promote_shape repmat typemax permutations SparseMatrix



# More on Distributed/Parallel Computing in Julia

* JuliaCon 2015
    * [JuliaCon2015 Workshop](https://github.com/JuliaParallel/JuliaCon2015_Workshop)    
    * [JuliaCon 2015 | Amit Murthy: Cluster Managers and parallel julia](https://www.youtube.com/watch?v=XJAQ24NS458) 
        * [IJulia Notebbok on Parallel Julia (internals)](https://github.com/JuliaParallel/JuliaCon2015_Workshop/blob/master/JuliaCon%202015.ipynb) 
    * [JuliaCon India 2015 - Concurrent and Parallel programming](https://github.com/ViralBShah/JuliaConIndia2015/blob/master/JuliaCon_Parallel_Workshop.ipynb)
    * [JuliaCon 2015 - Julia Parallel Workshop](https://github.com/JuliaParallel/JuliaCon2015_Workshop/blob/master/Workshop.ipynb)
* [Julia Oficcial Documentation on Parallel Computing](http://docs.julialang.org/en/latest/manual/parallel-computing/)
* [Official Julia packages on Parallel Computing](https://github.com/JuliaParallel)
* [Julia Lightning Round (Alan Edelman, Viral B. Shah)](https://www.youtube.com/watch?v=37L1OMk_3FU&list=PLP8iPy9hna6Si2sjMkrPY-wt2mEouZgaZ)
* [Stefan Karpinski - Julia: Fast Performance, Distributed Computing & Multiple Dispatch](https://www.youtube.com/watch?v=rUczbQ6ZPd8)
* [Viral's IJulia Notebooks](https://github.com/ViralBShah?tab=repositories)
* [Distributed DataFrame](https://github.com/JuliaParallel/Blocks.jl)
    * Discontinued but there are plans to carry on its implementation
    * See [this topic at julia-dev mailing list](https://groups.google.com/forum/#!msg/julia-dev/ja3ienKR0-g/wZg1NMZvB_QJ)