## Chapter 18: Parallel Computing

Briefly, parallel computing is a method of running code on multiple processors (or multiple cores of the same processor) at the same time. In general, this is a difficult task depending on where data is stored and retrieved. The Julia Documentation on parallel computing is a good place to start.

The following is a simple function that counts the number of heads out of n coin flips:

In [1]:
function countHeads(n::Int)
    c::Int = 0
    for i=1:n
        c += rand(Bool)
    end
    c
end

countHeads (generic function with 1 method)

This finds the fraction of heads from 2 billion coin flips. 

In [9]:
@time countHeads(2*10^9)/(2*10^9)

  3.271717 seconds


0.500002681

The `Distributed` package contains a lot of functionality to use the multiple cores in a CPU

In [3]:
using Distributed

The following will add a "processor" or core

In [4]:
addprocs(1)

1-element Vector{Int64}:
 2

And now we have the following number of cores:

In [23]:
nprocs()

10

The following is the same function as above, but is avaiable on all cores:

In [6]:
@everywhere function countHeads(n::Int)
   c::Int = 0
   for i=1:n
       c += rand(Bool)
   end
   c
end

Here's a simple way to "send" the functions to the two cores:

In [10]:
a= @spawn countHeads(10^9)
b= @spawn countHeads(10^9)

Future(2, 1, 10, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (0, 0, 0)), nothing)

Note that that took no time.  That's because it just sent the code, and didn't run it. The following now will run it and add the results

In [11]:
@time fetch(a)+fetch(b)

  1.643784 seconds (190 allocations: 7.531 KiB)


999995777

Note that this is faster than the original, but not much.  Basically, there is overhead into splitting code up and then bringing it back together.  Also, as we add more cores, this can be cumbersome.   We're going to see an alternative way.  This function will add an appropriate number of cores for your computer

The following gives the information about the individual cores in the CPU.

In [12]:
Sys.cpu_info()

8-element Vector{Base.Sys.CPUinfo}:
 Intel(R) Core(TM) i5-1030NG7 CPU @ 1.10GHz: 
        speed         user         nice          sys         idle          irq
     1100 MHz    1573927 s          0 s    1135760 s    2141322 s          0 s
 Intel(R) Core(TM) i5-1030NG7 CPU @ 1.10GHz: 
        speed         user         nice          sys         idle          irq
     1100 MHz     647068 s          0 s     436726 s    3766907 s          0 s
 Intel(R) Core(TM) i5-1030NG7 CPU @ 1.10GHz: 
        speed         user         nice          sys         idle          irq
     1100 MHz    1463183 s          0 s     966000 s    2421523 s          0 s
 Intel(R) Core(TM) i5-1030NG7 CPU @ 1.10GHz: 
        speed         user         nice          sys         idle          irq
     1100 MHz     658162 s          0 s     436329 s    3756208 s          0 s
 Intel(R) Core(TM) i5-1030NG7 CPU @ 1.10GHz: 
        speed         user         nice          sys         idle          irq
     1100 MHz    138949

This will add all available cores()

In [13]:
addprocs()

8-element Vector{Int64}:
  3
  4
  5
  6
  7
  8
  9
 10

In [16]:
@time let
 nheads = @distributed (+) for i = 1:2*10^9
   Int(rand(Bool))
 end
end

  1.139562 seconds (21.79 k allocations: 1.107 MiB, 2.53% compilation time)


999994035

As you can see, this has helped out a bit for time

#### 18.2: Writing a parallel card simulator

We now will look at writing a parallel version of the PlayingCards stuff:

In [17]:
include("../julia-files/PlayingCards.jl")
using .PlayingCards, Random

Here's the original runTrials function:

In [18]:
function runTrials(trials::Int,f::Function)
    local deck=map(Card,1:52)
    local numhands=0
    for i=1:trials
        shuffle!(deck)
        h = Hand(deck[1:5])
        if(f(h))
            numhands+=1
        end
    end
    numhands
end

runTrials (generic function with 1 method)

In [19]:
@time runTrials(10_000_000,isFullHouse)

  9.804040 seconds (30.00 M allocations: 3.129 GiB, 7.61% gc time)


14449

Here's a parallel version of this.  There are two important aspects of this:
* use `@everywhere` on all modules/functions that you need
* we switch the for loop to a distributed loop.  

In [20]:
@everywhere include("../julia-files/PlayingCards.jl")
@everywhere using .PlayingCards, Random



In [21]:
@everywhere function paraCountHands(trials::Integer,f::Function)
  local deck=map(Card,1:52)
  function checkHand(f::Function) ## shuffle the deck then check the hand.
    shuffle!(deck)
    f(Hand(deck[1:5]))
  end
  @distributed (+) for i = 1:trials
    Int(checkHand(f))
  end  
end

In [24]:
@time fh = paraCountHands(10_000_000,isFullHouse)

  3.736395 seconds (997 allocations: 54.016 KiB)


14269

This has cut the time by a significant amount. 

#### 18.3 A parallel map function

In [25]:
num_coins = 1_000_000_000*ones(Int64,12)

12-element Vector{Int64}:
 1000000000
 1000000000
 1000000000
 1000000000
 1000000000
 1000000000
 1000000000
 1000000000
 1000000000
 1000000000
 1000000000
 1000000000

Here's a parallell map function

Running this, you'll see an error, go back above and rerun the @everywhere countHeads cell

In [26]:
@time pmap(countHeads,num_coins)

LoadError: On worker 5:
UndefVarError: #countHeads not defined
Stacktrace:
  [1] [0m[1mdeserialize_datatype[22m
[90m    @ [39m[90m/Applications/Julia-1.8.app/Contents/Resources/julia/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:1364[24m[39m
  [2] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m/Applications/Julia-1.8.app/Contents/Resources/julia/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:866[24m[39m
  [3] [0m[1mdeserialize[22m
[90m    @ [39m[90m/Applications/Julia-1.8.app/Contents/Resources/julia/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:813[24m[39m
  [4] [0m[1mhandle_deserialize[22m
[90m    @ [39m[90m/Applications/Julia-1.8.app/Contents/Resources/julia/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:873[24m[39m
  [5] [0m[1mdeserialize[22m
[90m    @ [39m[90m/Applications/Julia-1.8.app/Contents/Resources/julia/share/julia/stdlib/v1.8/Serialization/src/[39m[90m[4mSerialization.jl:813[24m[39m[90m [inlined][39m
  [6] [0m[1mdeserialize_msg[22m
[90m    @ [39m[90m/Applications/Julia-1.8.app/Contents/Resources/julia/share/julia/stdlib/v1.8/Distributed/src/[39m[90m[4mmessages.jl:87[24m[39m
  [7] [0m[1m#invokelatest#2[22m
[90m    @ [39m[90m./[39m[90m[4messentials.jl:729[24m[39m[90m [inlined][39m
  [8] [0m[1minvokelatest[22m
[90m    @ [39m[90m./[39m[90m[4messentials.jl:726[24m[39m[90m [inlined][39m
  [9] [0m[1mmessage_handler_loop[22m
[90m    @ [39m[90m/Applications/Julia-1.8.app/Contents/Resources/julia/share/julia/stdlib/v1.8/Distributed/src/[39m[90m[4mprocess_messages.jl:176[24m[39m
 [10] [0m[1mprocess_tcp_streams[22m
[90m    @ [39m[90m/Applications/Julia-1.8.app/Contents/Resources/julia/share/julia/stdlib/v1.8/Distributed/src/[39m[90m[4mprocess_messages.jl:133[24m[39m
 [11] [0m[1m#103[22m
[90m    @ [39m[90m./[39m[90m[4mtask.jl:484[24m[39m

And here is the regular version of the `map` function

In [27]:
@time map(countHeads,num_coins)

 23.979835 seconds (46.62 k allocations: 2.384 MiB, 0.17% compilation time)


12-element Vector{Int64}:
 500013495
 500003571
 499994721
 499977769
 499997136
 500000355
 499982740
 500018941
 499993033
 499984270
 500008373
 500020481

#### 18.4 Shared Arrays

In [None]:
using Plots

One of the hard things to code in parallel manner is when there is something that needs to be accessed in a parallel manner.  It's difficult to just break up the code.  This example shows that when we have a array that we wish to smooth out, we can use a Shared Array

In [None]:
arr = [50+50*sin(x/1_000_000)+25*rand() for x=1:10_000_000];

In [None]:
plot(arr[1:5000:end])

The following function does a windowed mean, that is for a part of the array it calculates the mean of a subarray. 

In [None]:
function windowMean(arr::Vector{T},i::Integer,width::Integer) where T <: Real
  ## find a range of the window, making sure that it doesn't go beyond the bounds of the array
  window = max(1,i-width):min(i+width,length(arr))  
  sum(arr[window])/(last(window)-first(window)+1)
end

This now smooths the array, storing the results in `smoothed_array`

In [None]:
smoothed_array = zeros(Float64,length(arr));
@time let
  for i=1:length(arr)
    smoothed_array[i]=windowMean(arr,i,100)
  end
end

In [None]:
plot(arr[1:5000:end])
plot!(smoothed_array[1:5000:end])

In [None]:
@everywhere function windowMean(arr::Vector{T},i::Integer,width::Integer) where T <: Real
  ## find a range of the window, making sure that it doesn't go beyond the bounds of the array
  window = max(1,i-width):min(i+width,length(arr))  
  sum(arr[window])/(last(window)-first(window)+1)
end

In [None]:
@everywhere using SharedArrays

In [None]:
@everywhere arr = [50+50*sin(x/1_000_000)+25*rand() for x=1:10_000_000];
@everywhere orig_arr = SharedVector(arr);
@everywhere s_arr = SharedVector(zeros(Float64,length(orig_arr)));

In [None]:
@time let
  @sync @distributed for i=1:length(orig_arr)
    s_arr[i]=windowMean(arr,i,100)
  end
end

In [None]:
plot(orig_arr[1:5000:end])
plot!(s_arr[1:5000:end])