

# Distributed arrays

Parallel computing is a mess: too many types of parallelism:
- MPI
- CUDA
- OpenMP
- Threads

Idea: `DistributedArrays` *will* be (currently not quite there yet) the best, and easiest, way to do parallel computing. 

It's an *easy* form of parallelism, when it works.

An array that lives on several processors -- each processor has a part of the array (a "local part"). The array is **partitioned** on the different processors.

Note that the idea of referring to a vector as just one object, `v`, was a big deal in Matlab / Python etc.

Now `v` will refer to an array that is a more complicated object.

A `DistributedArray` will just **look like** (to us) a standard Julia array. The complicated stuff inside is hidden by an **abstraction**.

In [1]:
# Add processes:
addprocs(4)

# Use package for "distributed arrays"
@everywhere using DistributedArrays



In [6]:
# Make some data:

a = 1:10^3  # standard Julia array

b = map(t->t^2, a)  # standard Julia array.

# Equivalent to:
a .^ 2  # Matlabby notation

1000-element Array{Int64,1}:
       1
       4
       9
      16
      25
      36
      49
      64
      81
     100
     121
     144
     169
       ⋮
  978121
  980100
  982081
  984064
  986049
  988036
  990025
  992016
  994009
  996004
  998001
 1000000

In [5]:
# Equivalent to:
sqr(t) = t^2
sqr.(a)

# Equivalent to
(t->t^2).(a)  # too messy



1000-element Array{Int64,1}:
       1
       4
       9
      16
      25
      36
      49
      64
      81
     100
     121
     144
     169
       ⋮
  978121
  980100
  982081
  984064
  986049
  988036
  990025
  992016
  994009
  996004
  998001
 1000000

**Exercise**: Check that there is no performance difference (within 10%) between these different ways of doing it. If there is, it's probably a bug.

## Parallelize it (where "it" is embarrassingly parallel)

In [7]:
procs()

5-element Array{Int64,1}:
 1
 2
 3
 4
 5

One "master" process and 4 "workers":

In [9]:
workers()

4-element Array{Int64,1}:
 2
 3
 4
 5

In [10]:
a  # standard Julia object

1:1000

In [11]:
# Distribute the data:

D = distribute(a)

1000-element DistributedArrays.DArray{Int64,1,UnitRange{Int64}}:
    1
    2
    3
    4
    5
    6
    7
    8
    9
   10
   11
   12
   13
    ⋮
  989
  990
  991
  992
  993
  994
  995
  996
  997
  998
  999
 1000

In [13]:
T = typeof(D)

DistributedArrays.DArray{Int64,1,UnitRange{Int64}}

In [14]:
supertype(T)

AbstractArray{Int64,1}

In [18]:
show(D)

[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200,201,202,203,204,205,206,207,208,209,210,211,212,213,214,215,216,217,218,219,220,221,222,223,224,225,226,227,228,229,230,231,232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,249,250,251,252,253,254,255,256,257,258,259,260,261,262,263,264,265,266,267,268,269,270,271,272,273,274,275,276,277

To get at the information inside the object:

    `D.<TAB>`

In [19]:
fieldnames(D)

6-element Array{Symbol,1}:
 :identity
 :dims    
 :pids    
 :indexes 
 :cuts    
 :release 

Which piece of the `DArray` is stored on each worker:

In [20]:
D.indexes

4-element Array{Tuple{UnitRange{Int64}},1}:
 (1:250,)   
 (251:500,) 
 (501:750,) 
 (751:1000,)

We want to write the *same* code and have it "just work":

In [28]:
dD = map(t -> t^2, D)  # dD is the distributed answer

1000-element DistributedArrays.DArray{Int64,1,Array{Int64,1}}:
       1
       4
       9
      16
      25
      36
      49
      64
      81
     100
     121
     144
     169
       ⋮
  978121
  980100
  982081
  984064
  986049
  988036
  990025
  992016
  994009
  996004
  998001
 1000000

In [29]:
# apply map to distributed vector (looks identical to non-distributed case)

dD == map(t->t^2, a)  # undistributes the array back onto the master node

true

In [31]:
@fetchfrom 3 localpart(dD)  # the result that worker 2 calculated

250-element Array{Int64,1}:
  63001
  63504
  64009
  64516
  65025
  65536
  66049
  66564
  67081
  67600
  68121
  68644
  69169
      ⋮
 239121
 240100
 241081
 242064
 243049
 244036
 245025
 246016
 247009
 248004
 249001
 250000

Remember: NEVER do performance comparisons in global scope, always inside a function. (See later for the reason.)

In [33]:
using BenchmarkTools

In [36]:
procs()

5-element Array{Int64,1}:
 1
 2
 3
 4
 5

In [37]:
rmprocs(1:5)



:ok

In [38]:
procs()

1-element Array{Int64,1}:
 1

In [39]:
addprocs(2)

2-element Array{Int64,1}:
 6
 7

In [40]:
@everywhere begin
    using DistributedArrays
    using BenchmarkTools
end

In [46]:
function compare_timings()
    
    # serial
    a = [rand(100, 100) for i in 1:500]
    display(@benchmark map(t->t^2, $a))  # put '$' inside @benchmark
    
    # parallel
    da = distribute(a)
    display(@benchmark map(t->t^2, $da))
end

compare_timings()

BenchmarkTools.Trial: 
  memory estimate:  38.20 mb
  allocs estimate:  1503
  --------------
  minimum time:     46.596 ms (7.82% GC)
  median time:      52.485 ms (11.20% GC)
  mean time:        52.732 ms (11.01% GC)
  maximum time:     59.618 ms (10.84% GC)
  --------------
  samples:          95
  evals/sample:     1
  time tolerance:   5.00%
  memory tolerance: 1.00%



BenchmarkTools.Trial: 
  memory estimate:  30.14 kb
  allocs estimate:  604
  --------------
  minimum time:     37.893 ms (0.00% GC)
  median time:      84.511 ms (0.00% GC)
  mean time:        107.572 ms (0.00% GC)
  maximum time:     533.922 ms (0.00% GC)
  --------------
  samples:          47
  evals/sample:     1
  time tolerance:   5.00%
  memory tolerance: 1.00%

In [47]:
function compare_timings()
    
    # serial
    a = [rand(100, 100) for i in 1:10]
    display(@benchmark map(t->t^2, $a))  # put '$' inside @benchmark
    
    # parallel
    da = distribute(a)
    display(@benchmark map(t->t^2, $da))
end

compare_timings()

BenchmarkTools.Trial: 
  memory estimate:  782.53 kb
  allocs estimate:  33
  --------------
  minimum time:     647.882 μs (0.00% GC)
  median time:      733.827 μs (0.00% GC)
  mean time:        908.287 μs (10.05% GC)
  maximum time:     4.721 ms (79.06% GC)
  --------------
  samples:          5495
  evals/sample:     1
  time tolerance:   5.00%
  memory tolerance: 1.00%



BenchmarkTools.Trial: 
  memory estimate:  30.11 kb
  allocs estimate:  602
  --------------
  minimum time:     1.097 ms (0.00% GC)
  median time:      1.825 ms (0.00% GC)
  mean time:        2.415 ms (0.50% GC)
  maximum time:     414.721 ms (0.00% GC)
  --------------
  samples:          2068
  evals/sample:     1
  time tolerance:   5.00%
  memory tolerance: 1.00%

In [None]:
# Distributed vectors not restricted to numerical types

map(t -> Dates.monthname((t - 1) % 12 + 1), D)

In [None]:
# A slightly more complicated example of map and reduce

monthString = map(t -> Dates.monthname((t - 1) % 12 + 1) |> s -> s*" is my favorite month.\n", D) |>
    t -> reduce(*, Array(t))
println(monthString)

In [None]:
# Distributed array comprehension

D55 = @DArray [randn(5,5) for i = 1:32]

In [None]:
# Compute singular values of the distributed vector of matrices: 

Dsvd = map(eigvals, D55)