In [1]:
using Distributed, LinearAlgebra, Statistics

In [2]:
nprocs()

1

In [3]:
addprocs(4)

4-element Array{Int64,1}:
 2
 3
 4
 5

In [4]:
nprocs()

5

In [5]:
A = rand(100);

In [6]:
sum(A)

47.03562619033149

In [7]:
mean(A)

0.4703562619033149

In [8]:
?pmap

search: [0m[1mp[22m[0m[1mm[22m[0m[1ma[22m[0m[1mp[22m [0m[1mp[22mro[0m[1mm[22mote_sh[0m[1ma[22m[0m[1mp[22me ty[0m[1mp[22me[0m[1mm[22m[0m[1ma[22mx [0m[1mP[22mer[0m[1mm[22mutedDims[0m[1mA[22mrray [0m[1mp[22mrocess_[0m[1mm[22mess[0m[1ma[22mges



```
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
```

Transform collection `c` by applying `f` to each element using available workers and tasks.

For multiple collection arguments, apply `f` elementwise.

Note that `f` must be made available to all worker processes; see [Code Availability and Loading Packages](@ref code-availability) for details.

If a worker pool is not specified, all available workers, i.e., the default worker pool is used.

By default, `pmap` distributes the computation over all specified workers. To use only the local process and distribute over tasks, specify `distributed=false`. This is equivalent to using [`asyncmap`](@ref). For example, `pmap(f, c; distributed=false)` is equivalent to `asyncmap(f,c; ntasks=()->nworkers())`

`pmap` can also use a mix of processes and tasks via the `batch_size` argument. For batch sizes greater than 1, the collection is processed in multiple batches, each of length `batch_size` or less. A batch is sent as a single request to a free worker, where a local [`asyncmap`](@ref) processes elements from the batch using multiple concurrent tasks.

Any error stops `pmap` from processing the remainder of the collection. To override this behavior you can specify an error handling function via argument `on_error` which takes in a single argument, i.e., the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value which is then returned inline with the results to the caller.

Consider the following two examples. The first one returns the exception object inline, the second a 0 in place of any exception:

```julia-repl
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0
```

Errors can also be handled by retrying failed computations. Keyword arguments `retry_delays` and `retry_check` are passed through to [`retry`](@ref) as keyword arguments `delays` and `check` respectively. If batching is specified, and an entire batch fails, all items in the batch are retried.

Note that if both `on_error` and `retry_delays` are specified, the `on_error` hook is called before retrying. If `on_error` does not throw (or rethrow) an exception, the element will not be retried.

Example: On errors, retry `f` on an element a maximum of 3 times without any delay between retries.

```julia
pmap(f, c; retry_delays = zeros(3))
```

Example: Retry `f` only if the exception is not of type [`InexactError`](@ref), with exponentially increasing delays up to 3 times. Return a `NaN` in place for all `InexactError` occurrences.

```julia
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
```


In [9]:
r1 = remotecall(rand, 2, 2, 2)
r2 = remotecall(rand, 2, 1:8, 3, 4)

Future(2, 1, 7, nothing)

In [10]:
fetch(r1)

2×2 Array{Float64,2}:
 0.984883   0.0781281
 0.0874078  0.524966

In [11]:
fetch(r2)

3×4 Array{Int64,2}:
 6  7  6  2
 2  2  5  4
 8  2  2  4

In [12]:
r1

Future(2, 1, 6, Some([0.98488338931112 0.07812808525097914; 0.08740779845484892 0.5249656669290184]))

In [13]:
typeof(r1)

Future

In [14]:
r3 = fetch(r1)

2×2 Array{Float64,2}:
 0.984883   0.0781281
 0.0874078  0.524966

In [15]:
s2 = @spawnat 2 rand(2, 2)
s3 = @spawnat 3 1 .+ fetch(s2)

Future(3, 1, 11, nothing)

In [16]:
fetch(s3)

2×2 Array{Float64,2}:
 1.34205  1.63544
 1.18633  1.65805

In [17]:
? @everywhere

```
@everywhere [procs()] expr
```

Execute an expression under `Main` on all `procs`. Errors on any of the processes are collected into a [`CompositeException`](@ref) and thrown. For example:

```
@everywhere bar = 1
```

will define `Main.bar` on all processes.

Unlike [`@spawnat`](@ref), `@everywhere` does not capture any local variables. Instead, local variables can be broadcast using interpolation:

```
foo = 1
@everywhere bar = $foo
```

The optional argument `procs` allows specifying a subset of all processes to have execute the expression.

Equivalent to calling `remotecall_eval(Main, procs, expr)`.


# Dead Simple Paraellism

In [18]:
nprocs()

5

In [19]:
# svd will use threads (but not hyperthreads); we force svd to only use 1 thread just to
# show the typical pattern (we don't care about svd; just using it as an example)
BLAS.set_num_threads(1)

In [20]:
# A collecton of 10 matrices
M = Matrix{Float64}[rand(1000, 1000) for i = 1:10];

In [21]:
# I want the svd for all of them
@time begin
    for i in 1:length(M)
        svd(M[i]);
    end
end

  6.545078 seconds (226.13 k allocations: 470.770 MiB, 2.64% gc time)


In [22]:
@time pmap(svd, M); # Compute the svd for each of them.

  2.887047 seconds (1.28 M allocations: 225.899 MiB, 0.69% gc time)


In [23]:
myid()

1

In [24]:
M = Matrix{Float64}[rand(800, 800), rand(600, 600), rand(800, 800), rand(600, 600)];

In [25]:
# The arguments are: 1) a function 'f' and 2) a list with the input.
function f_pmap(f, lst)
    np = nprocs()            # Number of processes available.
    n  = length(lst)         # Number of elements to apply the function.
    results = Vector{Any}(undef, n) # Where we will write the results. As we do not know
                             # the type (Integer, Tuple...) we write "Any"
    i = 1
    nextidx() = (idx = i; i += 1; idx) # Function to know which is the next work item.
                                       # In this case it is just an index.
    @sync begin # See below the discussion about all this part.
        for p = 1:np
            if p != myid() || np == 1
                @async begin
                    while true
                        idx = nextidx()
                        if idx > n
                            break
                        end
                        results[idx] = remotecall_fetch(f, p, lst[idx])
                    end
                end
            end
        end
    end
    results
end

f_pmap (generic function with 1 method)

In [27]:
@time f_pmap(svd, M);

  0.424848 seconds (1.02 k allocations: 30.572 MiB)


In [28]:
size(M)

(4,)

In [29]:
@time begin
rlin = Array{Any}(undef, length(M))
for i in 1:length(M)
    rlin[i] = svd(M[i])
end
end

  1.009041 seconds (58 allocations: 91.897 MiB, 1.15% gc time)
