# Scaling up numerical computing in Julia
Przemysław Szufel

This Jupyter notebook has two major parts.
Firstly, we are going discuss some typical pitfall with Julia performance.
Secondly we will see how parallel and distributed computing can further speed-up the Julia code.

Przemysław Szufel


<a class="anchor" id="toc"></a>
## Table of content

1. [Basic performance tips](#introduction)
    1. [Avoid global variables](#globals)
    2. [Avoid abstract lists and structs](#lists)
    3. [Do not change the type of a variable within a function](#fvariable)
    4. [Mitigate type uncertainty with barrier functions](#barrier)
    5. [Remember about column-major layout of matrices](#column)
    6. [Prealocate, use views, vectorize](#views)
    7. [Check type stability with the `@code_warntype` macro](#warntype)
    
2. [Parallel and distributed computing](#distributed)
    1. [Parallelize via Single Instruction Multiple Data (SIMD)](#simd)
    2. [Green threading](#green)
    3. [Multithreading](#multithreading)
    4. [Multi-processing and distributed computing](#multiprocessing)

<a class="anchor" id="introduction"></a>
## 1. Basic performance tips

---- [Return to table of contents](#toc) ---


<p style="font-size:8pt">
    Reference: several examples below are inspired by
https://docs.julialang.org/en/v1/manual/performance-tips/
 </p>

<a class="anchor" id="globals"></a>
### Avoid global variables
---- [Return to table of contents](#toc) ---

In [1]:
x = rand(10_000)  #global

function loop_over_global()
    s = 0.0
    for i in x
        s += i
    end
    return s
end

function loop_over_x(x)
    s = 0.0
    for i in x
        s += i
    end
    return s
end

function loop_over_global_type_assert()
    s = 0.0
    for i in x::Vector{Float64}
        s += i
    end
    return s
end

@time loop_over_global()
@time loop_over_global()
@time loop_over_x(x)
@time loop_over_x(x)

  0.009665 seconds (39.68 k allocations: 781.234 KiB, 94.41% compilation time)
  0.000514 seconds (39.49 k allocations: 773.281 KiB)
  0.006372 seconds (2.11 k allocations: 136.891 KiB, 99.53% compilation time)
  0.000010 seconds (1 allocation: 16 bytes)


5006.859998463825

In [2]:
using BenchmarkTools
BenchmarkTools.@btime  loop_over_global()
@btime loop_over_x(x)

  458.900 μs (39490 allocations: 773.28 KiB)
  3.986 μs (1 allocation: 16 bytes)


5006.859998463825

In [3]:
BenchmarkTools.@benchmark loop_over_global()

BenchmarkTools.Trial: 4033 samples with 1 evaluation.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m444.700 μs[22m[39m … [35m  7.200 ms[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m  1.300 ms               [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m  1.233 ms[22m[39m ± [32m462.583 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m1.95% ± 6.95%

  [39m [39m [39m [39m▆[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [32m▂[39m[39m▁[34m▃[39m[39m█[39m▇[39m▂[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m 
  [39m▆[39m▄[39m▄[39

In [4]:
BenchmarkTools.@benchmark loop_over_x($x)

BenchmarkTools.Trial: 10000 samples with 8 evaluations.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m 3.975 μs[22m[39m … [35m77.425 μs[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m11.537 μs              [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m11.186 μs[22m[39m ± [32m 3.854 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m▄[39m [39m [39m [39m▄[39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m▆[39m▄[39m▃[39m▁[39m [39m [39m [39m▂[39m [39m [39m [39m [32m [39m[34m█[39m[39m▂[39m [39m▂[39m [39m▂[39m [39m▃[39m [39m [39m▄[39m [39m▅[39m [39m [39m▅[39m [39m [39m▂[39m [39m [39m [39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▁
  [39m█[39m█[39m▇[39m▆[39m█[39m█[

In [5]:
const zzz = x
function loop_over_global_zz()
    s = 0.0
    for i in zzz  
        s += i
    end
    return s
end

@benchmark loop_over_global_zz()

BenchmarkTools.Trial: 10000 samples with 8 evaluations.
 Range [90m([39m[36m[1mmin[22m[39m … [35mmax[39m[90m):  [39m[36m[1m 3.975 μs[22m[39m … [35m50.388 μs[39m  [90m┊[39m GC [90m([39mmin … max[90m): [39m0.00% … 0.00%
 Time  [90m([39m[34m[1mmedian[22m[39m[90m):     [39m[34m[1m11.537 μs              [22m[39m[90m┊[39m GC [90m([39mmedian[90m):    [39m0.00%
 Time  [90m([39m[32m[1mmean[22m[39m ± [32mσ[39m[90m):   [39m[32m[1m 9.877 μs[22m[39m ± [32m 3.820 μs[39m  [90m┊[39m GC [90m([39mmean ± σ[90m):  [39m0.00% ± 0.00%

  [39m▆[39m▂[39m [39m▅[39m▅[39m▁[39m [39m [39m [39m▁[39m [39m [39m [39m▁[39m▆[39m▄[39m▂[39m▁[39m▃[39m▁[32m▂[39m[39m▁[39m▃[39m▁[39m▁[39m█[34m▃[39m[39m▃[39m [39m▂[39m [39m▁[39m [39m▃[39m [39m▅[39m [39m▄[39m [39m [39m▁[39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m [39m▂
  [39m█[39m█[39m█[39m█[39m█[39m█[

<a class="anchor" id="lists"></a>
### Avoid abstract lists and  structs
---- [Return to table of contents](#toc) ---

In [6]:
# do not use abstract lists
function addelems_and_sum(arr)
    for i in 1.0:100.0
        push!(arr,i)
    end
    sum(arr)
end
myarr = [] # DO NOT DO IT!  Any[]
myarr1 = Real[]  # DO NOT DO IT !
myarr2 = Float64[] # Correct
myarr3 = Union{Float64, ComplexF32}[] # If you really need this type flexibility use small unions




Union{Float64, ComplexF32}[]

In [7]:
@btime addelems_and_sum($([]))
@btime addelems_and_sum($(Real[]))
@btime addelems_and_sum($(Float64[]))
@btime addelems_and_sum($(Union{Float64, ComplexF32}[]))


  2.377 ms (177399 allocations: 2.71 MiB)
  2.770 ms (177399 allocations: 2.71 MiB)
  412.640 μs (0 allocations: 0 bytes)
  949.020 μs (1023 allocations: 15.98 KiB)


2.60277e7

In [8]:
#do not create data structures with undefined types (abstract containers)
struct MyAmbiguousType
    a
end
struct MyNonambiguousType
    a::Float64
end
function sumarr(arr)
    res = 0.0
    for e in arr
        res += e.a
    end
end
tt1 = MyAmbiguousType.(1.:1000.)
tt2 = MyNonambiguousType.(1.:1000.)

@btime sumarr($tt1)
@btime sumarr($tt2)

  12.600 μs (1000 allocations: 15.62 KiB)
  1.400 ns (0 allocations: 0 bytes)


In [9]:
struct MyParametrizedType{T}
    a::T
end
tt_p = MyParametrizedType{Float64}.(1.0:1000.0)
@btime sumarr($tt_p)


  1.400 ns (0 allocations: 0 bytes)


In [10]:
tt_miss = MyParametrizedType{Union{Float64, Missing}}.(1.0:1000.0)


1000-element Vector{MyParametrizedType{Union{Missing, Float64}}}:
 MyParametrizedType{Union{Missing, Float64}}(1.0)
 MyParametrizedType{Union{Missing, Float64}}(2.0)
 MyParametrizedType{Union{Missing, Float64}}(3.0)
 MyParametrizedType{Union{Missing, Float64}}(4.0)
 MyParametrizedType{Union{Missing, Float64}}(5.0)
 MyParametrizedType{Union{Missing, Float64}}(6.0)
 MyParametrizedType{Union{Missing, Float64}}(7.0)
 MyParametrizedType{Union{Missing, Float64}}(8.0)
 MyParametrizedType{Union{Missing, Float64}}(9.0)
 MyParametrizedType{Union{Missing, Float64}}(10.0)
 MyParametrizedType{Union{Missing, Float64}}(11.0)
 MyParametrizedType{Union{Missing, Float64}}(12.0)
 MyParametrizedType{Union{Missing, Float64}}(13.0)
 ⋮
 MyParametrizedType{Union{Missing, Float64}}(989.0)
 MyParametrizedType{Union{Missing, Float64}}(990.0)
 MyParametrizedType{Union{Missing, Float64}}(991.0)
 MyParametrizedType{Union{Missing, Float64}}(992.0)
 MyParametrizedType{Union{Missing, Float64}}(993.0)
 MyParametrizedTy

In [11]:
@btime sumarr($tt_p) # parametrized
@btime sumarr($tt_miss)  # parametrized allowing missings

  2.000 ns (0 allocations: 0 bytes)
  610.857 ns (0 allocations: 0 bytes)


<a class="anchor" id="fvariable"></a>
### Do not change the type of a variable within a function
---- [Return to table of contents](#toc) ---

In [12]:
function foo()
    x = 1   # Int
    for i = 1:1_000
        x /= i
    end
    return x
end

function foo2()
    x = 1.0  #Float64
    for i = 1:1_000
        x /= i
    end
    return x
end
@btime foo()
@btime foo2()

  3.562 μs (0 allocations: 0 bytes)
  2.767 μs (0 allocations: 0 bytes)


0.0

<a class="anchor" id="barrier"></a>
### Mitigate type uncertainty with barrier functions
---- [Return to table of contents](#toc) ---

In [13]:
a = Vector{rand(Bool) ? Int64 : Float64}(undef, 4)

4-element Vector{Float64}:
 6.951706147848e-310
 6.95170614788753e-310
 6.95170614788753e-310
 6.951706147848e-310

In [14]:
function strange_twos(n)
    a = Vector{rand(Bool) ? Int64 : Float64}(undef, n)
    for i = 1:n
       a[i] = 2
    end
    return a
end;

# Let's refactor that to 2 functions

function fill_twos!(a)
   for i = eachindex(a)
       a[i] = 2
   end
end;

function strange_twos2(n)
   a = Vector{rand(Bool) ? Int64 : Float64}(undef, n)
   fill_twos!(a)
   return a
end



strange_twos2 (generic function with 1 method)

In [15]:
@btime strange_twos(1_000);
@btime strange_twos2(1_000);

  13.600 μs (491 allocations: 15.59 KiB)
  574.809 ns (2 allocations: 7.95 KiB)


<a class="anchor" id="column"></a>
### Remember about column-major layout of matrices
---- [Return to table of contents](#toc) ---

In [16]:
#note data in memory is aligned along columns
x = [1 2; 3 4]
display(x);
display(vec(x));


2×2 Matrix{Int64}:
 1  2
 3  4

4-element Vector{Int64}:
 1
 3
 2
 4

In [17]:
xx2 = rand(10_000,10_000);

In [18]:
function sum1(x)
    sum_ = 0.0
    for row in 1:100
        @simd for column in 1:100
            @inbounds sum_ += x[row,column]
        end
    end
    sum_
end

function sum2(x)
    sum_ = 0.0
    for column in 1:100
        @simd for row in 1:100
            @inbounds sum_ += x[row,column]
        end
    end
    sum_
end

@btime sum1($xx2)
@btime sum2($xx2)


  4.014 μs (0 allocations: 0 bytes)
  1.210 μs (0 allocations: 0 bytes)

4997.130127764881




<a class="anchor" id="views"></a>
### Prealocate, use views, vectorize
---- [Return to table of contents](#toc) ---

In [19]:
#preallocate vectors and matrices

function xinc(x)
    return [x, x+1, x+2]
end;

function loopinc()
   y = 0
   for i = 1:10^6
       ret = xinc(i)
       y += ret[2]
   end
   return y
end;

function xinc!(ret::AbstractVector{T}, x::T) where T
    ret[1] = x
    ret[2] = x+1
    ret[3] = x+2
    nothing
end;

function loopinc_prealloc()
   ret = Vector{Int}(undef, 3)
   y = 0
   for i = 1:10^6
       xinc!(ret, i)
       y += ret[2]
   end
   return y
end;

@btime loopinc()
@btime loopinc_prealloc()


  39.221 ms (1000000 allocations: 76.29 MiB)
  300.600 μs (1 allocation: 80 bytes)


500001500000

In [20]:
# if you need subranges of matrices use views - do not copy the data!
fcopy(x) = sum(x[2:end-1]);
fview(x) = sum(@view x[2:end-1]);  
@views fview2(x) = sum(x[2:end-1]);

xvv = rand(10^6);

@btime fcopy(xvv);
@btime fview(xvv);
@btime fview2(xvv);



  1.330 ms (3 allocations: 7.63 MiB)
  108.500 μs (1 allocation: 16 bytes)
  108.500 μs (1 allocation: 16 bytes)


In [21]:
#macro @. can aggregate several vectorization operations

f(x) =       3x.^2 + 4x + 7x.^3;

fdot(x) = @. 3x^2 + 4x + 7x^3

f2(x) =       3 .* x.^2 .+ 4 .* x .+ 7 .* x.^3;


xv = rand(10000)

@btime f(xv)
@btime fdot(xv)
@btime f2(xv);

  29.900 μs (12 allocations: 469.03 KiB)
  5.333 μs (6 allocations: 78.20 KiB)
  5.000 μs (6 allocations: 78.20 KiB)


<a class="anchor" id="warntype"></a>
### Check type stability with the `@code_warntype` macro
---- [Return to table of contents](#toc) ---

In [22]:
pos(x) = x < 0 ? 0 : x;

pos(rand()-0.5)

0

In [23]:
function f(x)
   y = pos(x)
   return sin(y*x + 1)
end;
f(3)

-0.5440211108893698

In [24]:
@code_warntype f(2.3)

MethodInstance for f(::Float64)
  from f([90mx[39m)[90m @[39m [90mMain[39m [90m[4mIn[23]:1[24m[39m
Arguments
  #self#[36m::Core.Const(f)[39m
  x[36m::Float64[39m
Locals
  y[33m[1m::Union{Float64, Int64}[22m[39m
Body[36m::Float64[39m
[90m1 ─[39m      (y = Main.pos(x))
[90m│  [39m %2 = (y * x)[36m::Float64[39m
[90m│  [39m %3 = (%2 + 1)[36m::Float64[39m
[90m│  [39m %4 = Main.sin(%3)[36m::Float64[39m
[90m└──[39m      return %4



In [25]:
pos(x) = x < 0 ? zero(x) : x;
@code_warntype f(2.3)

MethodInstance for f(::Float64)
  from f([90mx[39m)[90m @[39m [90mMain[39m [90m[4mIn[23]:1[24m[39m
Arguments
  #self#[36m::Core.Const(f)[39m
  x[36m::Float64[39m
Locals
  y[36m::Float64[39m
Body[36m::Float64[39m
[90m1 ─[39m      (y = Main.pos(x))
[90m│  [39m %2 = (y * x)[36m::Float64[39m
[90m│  [39m %3 = (%2 + 1)[36m::Float64[39m
[90m│  [39m %4 = Main.sin(%3)[36m::Float64[39m
[90m└──[39m      return %4



<a class="anchor" id="distributed"></a>
## Parallel and distributed computing

---- [Return to table of contents](#toc) ---

Before running Jupyter notebook set in Julia number of threads.
This should be done *before* actually running the `notebook()` command.
The number of threads can be also set up in Julia options in Visual Studio code (if this is used to run this notebook).
```
# run this code from Julia console just before starting Jupyter Notebook
ENV["JULIA_NUM_THREADS"]=4
```

In [26]:
println("Number of threads that your Julia is run: ## $(Threads.nthreads())")

Number of threads that your Julia is run: ## 4


In [27]:
using BenchmarkTools, Distributed

<a class="anchor" id="simd"></a>
### Parallelize via Single Instruction Multiple Data (SIMD)
---- [Return to table of contents](#toc) ---



In [28]:
function dot1(x, y)
    s = 0.0
    for i in 1:length(x)
        @inbounds s += x[i]*y[i]
    end
    s
end

dot1 (generic function with 1 method)

In [29]:
function dot2(x, y)
    s = 0.0
    @simd for i in 1:length(x)
        @inbounds s += x[i]*y[i]
    end
    s
end

dot2 (generic function with 1 method)

In [30]:
x = 100*rand(10000)
y = 100*rand(10000);

@btime dot1($x, $y)
@btime dot2($x, $y)

  4.000 μs (0 allocations: 0 bytes)
  763.063 ns (0 allocations: 0 bytes)


2.4943127090471864e7

In [31]:
res1 =  dot1(x, y)

2.4943127090471815e7

In [32]:
res2 =  dot2(x, y)

2.4943127090471864e7

In [33]:
res1 == res2

false

In [34]:
@show res1 
@show res2

res1 = 2.4943127090471815e7
res2 = 2.4943127090471864e7


2.4943127090471864e7

<a class="anchor" id="green"></a>
### Green threading 
---- [Return to table of contents](#toc) ---


In [35]:
@time sleep(2)

  2.015877 seconds (43 allocations: 1024 bytes)


In [36]:
@time t = @async sleep(4)

  0.013347 seconds (2.80 k allocations: 201.922 KiB, 99.11% compilation time)


Task (runnable) @0x0000029eabec9940

In [37]:
t

Task (runnable) @0x0000029eabec9940

In [38]:
function dojob(i)
    val = round(rand(), digits=2)
    sleep(val)   # this could be external computations with I/O
    i, val
end

dojob (generic function with 1 method)

In [39]:
result = Vector{Tuple{Int,Float64}}(undef, 8)

8-element Vector{Tuple{Int64, Float64}}:
 (1, 1.0e-323)
 (3, 2.0e-323)
 (5, 3.0e-323)
 (7, 4.0e-323)
 (9, 5.0e-323)
 (11, 8.4e-323)
 (18, 9.4e-323)
 (20, 1.04e-322)

In [40]:
@time for i=1:8
    result[i] = dojob(i)
end
result

  1.985352 seconds (622 allocations: 40.812 KiB, 0.52% compilation time)


8-element Vector{Tuple{Int64, Float64}}:
 (1, 0.59)
 (2, 0.32)
 (3, 0.06)
 (4, 0.03)
 (5, 0.18)
 (6, 0.03)
 (7, 0.0)
 (8, 0.73)

In [41]:
result = Vector{Tuple{Int,Float64}}(undef, 8);
@time for i=1:8
   @async result[i] = dojob(i)
end
result

  0.000098 seconds (81 allocations: 7.055 KiB)


8-element Vector{Tuple{Int64, Float64}}:
 (140704098874896, 6.95170614788753e-310)
 (140704098866272, 6.95170614788437e-310)
 (140704098874896, 1.424167516463e-311)
 (2882547144272, 1.4241675165105e-311)
 (2882547144400, 1.4241675165737e-311)
 (140704098866272, 1.424167249557e-311)
 (140704098874896, 6.95170614788753e-310)
 (2882538132032, 6.9517061483136e-310)

In [42]:
result


8-element Vector{Tuple{Int64, Float64}}:
 (140704098874896, 6.95170614788753e-310)
 (140704098866272, 6.95170614788437e-310)
 (140704098874896, 1.424167516463e-311)
 (2882547144272, 1.4241675165105e-311)
 (2882547144400, 1.4241675165737e-311)
 (140704098866272, 1.424167249557e-311)
 (140704098874896, 6.95170614788753e-310)
 (2882538132032, 6.9517061483136e-310)

In [43]:
result = Vector{Tuple{Int,Float64}}(undef, 8);
@time @sync for i=1:8
   @async result[i] = dojob(i)
end
result

  0.822314 seconds (1.51 k allocations: 82.570 KiB, 3.52% compilation time)


8-element Vector{Tuple{Int64, Float64}}:
 (1, 0.14)
 (2, 0.08)
 (3, 0.8)
 (4, 0.82)
 (5, 0.56)
 (6, 0.72)
 (7, 0.79)
 (8, 0.63)

#### Programming a simple web server
You should be able to connect using the address <a href="http://localhost:9992/3+4" target="about:blank">http://localhost:9992/3+4</a>

To stop web server click <a href="http://localhost:9992/stopme" target="about:blank">http://localhost:9992/stopme</a>

In [44]:
using Sockets
println("Starting the web server...")
server = Sockets.listen(9992)

Starting the web server...


LoadError: IOError: listen: address already in use (EADDRINUSE)

In [45]:
@async begin
    contt = Ref(true)
    while contt[]
        sock = Sockets.accept(server)
        @async begin
            data = readline(sock)
            print("Got request:\n", data, "\n")
            cmd = split(data, " ")[2][2:end]
            println(sock, "\nHTTP/1.1 200 OK\nContent-Type: text/html\n")
            contt[] = contt[] && (!occursin("stopme", data))
            if contt[]
                 println(sock, string("<html><body>", cmd, "=", 
                     eval(Meta.parse(cmd)), "</body></html>"))
            else
                println(sock,"<html><body>stopping</body></html>")
            end
            close(sock)
        end
    end
    println("Handling requests stopped")
end

Task (failed) @0x0000029f1b8bb840
UndefVarError: `server` not defined
Stacktrace:
 [1] [0m[1m(::var"#7#9")[22m[0m[1m([22m[0m[1m)[22m
[90m   @[39m [36mMain[39m [90m.\[39m[90m[4mIn[45]:4[24m[39m

<a class="anchor" id="multithreading"></a>
### Multithreading
---- [Return to table of contents](#toc) ---


In [46]:
Threads.nthreads()

4

In [47]:
function ssum(x)
    r, c = size(x)
    y = zeros(c)
    for i in 1:c
        for j in 1:r
            @inbounds y[i] += x[j, i]
        end
    end
    y
end

ssum (generic function with 1 method)

In [48]:
function tsum(x)
    r, c = size(x)
    y = zeros(c)
    Threads.@threads for i in 1:c
        for j in 1:r
            @inbounds y[i] += x[j, i]
        end
    end
    y
end


tsum (generic function with 1 method)

In [49]:
x = rand(1000,10000);

In [50]:
@time ssum(x)
@time ssum(x);

  0.029085 seconds (6.63 k allocations: 533.969 KiB, 56.86% compilation time)
  0.011706 seconds (2 allocations: 78.172 KiB)


In [51]:
@time tsum(x)
@time tsum(x);

  0.109102 seconds (44.38 k allocations: 3.117 MiB, 132.62% compilation time)
  0.005122 seconds (30 allocations: 81.047 KiB)


#### Locking mechanism for threads

In [52]:
function f_bad()
    x = 0
    Threads.@threads for i in 1:10^6
        x += 1
    end
    return x
end


f_bad (generic function with 1 method)

In [53]:
f_bad()

250346

In [54]:
function f_add()
    x = 0 
    for i in 1:10^6
        x += 1
    end
    x
end
@btime f_add()
    

  1.400 ns (0 allocations: 0 bytes)


1000000

In [55]:
function f_atomic()
    x = Threads.Atomic{Int}(0)
    Threads.@threads for i in 1:10^6
        Threads.atomic_add!(x, 1)
    end
    return x[]
end
f_atomic()

1000000

In [56]:
function f_spin()
    l = Threads.SpinLock()
    x = 0
    Threads.@threads for i in 1:10^6
        Threads.lock(l) do
            x += 1
        end
    end
    return x
end

function f_reentrant()
    l = ReentrantLock()
    x = 0
    Threads.@threads for i in 1:10^6
        Threads.lock(l) do
            x += 1
        end
    end
    return x
end


f_reentrant (generic function with 1 method)

In [57]:
using DataFrames
stats = DataFrame()
for f in [f_bad, f_atomic, f_spin, f_reentrant]
    for i = 1:2
        value, elapsedtime  = @timed f()
        push!(stats, (f=string(f),i=i, value=value, timems=elapsedtime*1000))
    end
end
println(stats)


[1m8×4 DataFrame[0m
[1m Row [0m│[1m f           [0m[1m i     [0m[1m value   [0m[1m timems   [0m
     │[90m String      [0m[90m Int64 [0m[90m Int64   [0m[90m Float64  [0m
─────┼───────────────────────────────────────
   1 │ f_bad            1   251023   35.483
   2 │ f_bad            2   252967   35.2314
   3 │ f_atomic         1  1000000   20.4998
   4 │ f_atomic         2  1000000   18.539
   5 │ f_spin           1  1000000  465.34
   6 │ f_spin           2  1000000  257.439
   7 │ f_reentrant      1  1000000  480.495
   8 │ f_reentrant      2  1000000  385.128


<a class="anchor" id="multiprocessing"></a>
### Multi-processing and distributed computing
---- [Return to table of contents](#toc) ---


In [58]:
using Distributed

This code adds 4 workers (and avoids adding more)

In [59]:
addprocs(max(0, 5-nprocs()));

In [60]:
workers()

4-element Vector{Int64}:
 2
 3
 4
 5

In [61]:
function s_rand()
    n = 10^4
    x = 0.0
    for i in 1:n
        x += sum(rand(10^4))
    end
    x / n
end
 
@time s_rand()
@time s_rand()


  0.920285 seconds (20.00 k allocations: 763.397 MiB, 25.61% gc time)
  0.725482 seconds (20.00 k allocations: 763.397 MiB, 17.42% gc time)


5000.044150780158

In [62]:
using Distributed
 
function p_rand()
    n = 10^4
    x = @distributed (+) for i in 1:n
        # the last line will be aggregated
        sum(rand(10^4))
    end
    x / n
end

@time p_rand()
@time p_rand()


  1.612347 seconds (463.07 k allocations: 31.114 MiB, 47.33% compilation time)
  0.173494 seconds (426 allocations: 24.016 KiB)


4999.3877325494

In [63]:
workers()'

1×4 adjoint(::Vector{Int64}) with eltype Int64:
 2  3  4  5

In [64]:
fetch(@spawnat 3 4+3)

7

In [65]:
function myf() 
    println("I am on worker ", myid())
    rand()
end
myf()

I am on worker 1


0.21248315596630618

In [66]:
a = nothing
try 
    fetch(@spawnat 4 myf())
catch e
    println(e)
end

RemoteException(4, CapturedException(UndefVarError(Symbol("#myf")), Any[(deserialize_datatype at Serialization.jl:1399, 1), (handle_deserialize at Serialization.jl:867, 1), (deserialize at Serialization.jl:814, 1), (handle_deserialize at Serialization.jl:874, 1), (deserialize at Serialization.jl:814 [inlined], 1), (deserialize_global_from_main at clusterserialize.jl:160, 1), (#5 at clusterserialize.jl:72 [inlined], 1), (foreach at abstractarray.jl:3086, 1), (deserialize at clusterserialize.jl:72, 1), (handle_deserialize at Serialization.jl:960, 1), (deserialize at Serialization.jl:814, 1), (handle_deserialize at Serialization.jl:871, 1), (deserialize at Serialization.jl:814, 1), (handle_deserialize at Serialization.jl:874, 1), (deserialize at Serialization.jl:814 [inlined], 1), (deserialize_msg at messages.jl:87, 1), (#invokelatest#2 at essentials.jl:887 [inlined], 1), (invokelatest at essentials.jl:884 [inlined], 1), (message_handler_loop at process_messages.jl:176, 1), (process_tcp_s

In [67]:
@everywhere function myf() 
    println("I am on worker ", myid())
    rand()
end
fetch(@spawnat 4 myf())

      From worker 4:	I am on worker 4


0.7642915859931477

#### A typical pattern for setting an intial state across workers

In [68]:
using Distributed
@everywhere using Pkg
@everywhere Pkg.activate(".")
@everywhere using Distributed, Random, DataFrames

@everywhere function calc(x, y)
    2x + y
end

@everywhere function init_worker()    
   Random.seed!(myid())
    # reading initial data from files or other actions
end

@sync for wid in workers()
    @async fetch(@spawnat wid init_worker())
end


      From worker 5:	[32m[1m  Activating[22m[39m project at `C:\AAABIBLIOTEKA\Berkeley`
      From worker 2:	[32m[1m  Activating[22m[39m project at `C:\AAABIBLIOTEKA\Berkeley`
      From worker 3:	[32m[1m  Activating[22m[39m project at `C:\AAABIBLIOTEKA\Berkeley`
      From worker 4:	[32m[1m  Activating[22m[39m project at `C:\AAABIBLIOTEKA\Berkeley`


[32m[1m  Activating[22m[39m project at `C:\AAABIBLIOTEKA\Berkeley`


Typically results are collected to a `DataFrame`

In [69]:
data = @distributed (append!) for (i, j) = vec(collect(Iterators.product(1:4, 1:3)))
    a = rand(1:499)
    b = rand(1:9)*1000
    c = calc(a, b)
    DataFrame(;i,j,a,b,c,procid = myid())
end

Row,i,j,a,b,c,procid
Unnamed: 0_level_1,Int64,Int64,Int64,Int64,Int64,Int64
1,1,1,217,1000,1434,2
2,2,1,125,6000,6250,2
3,3,1,39,5000,5078,2
4,4,1,363,3000,3726,3
5,1,2,370,6000,6740,3
6,2,2,32,2000,2064,3
7,3,2,76,4000,4152,4
8,4,2,82,8000,8164,4
9,1,3,441,4000,4882,4
10,2,3,201,3000,3402,5


#### Advanced Interprocess communication - cellular automaton example

In [70]:
using Distributed
@everywhere using ParallelDataTransfer, Distributed


@everywhere function rule30()
    lastv = Main.caa[1]
    for i in 2:(length(Main.caa)-1)
        current = Main.caa[i]
        Main.caa[i] = xor(lastv, Main.caa[i] || Main.caa[i+1])
        lastv = current
    end
end


@everywhere function getcaa()
    Main.caa
end
@everywhere function getsetborder()
    #println(myid(),"gs");flush(stdout)
    Main.caa[1] = (@fetchfrom Main.neighbours[1] getcaa()[15+1])
    #println(myid(),"gs1");flush(stdout)
    Main.caa[end] = (@fetchfrom Main.neighbours[2] getcaa()[2])
    #println(myid(),"gse");flush(stdout)
end

function printsimdist(workers::Array{Int})
    for w in workers
        dat = @fetchfrom w caa
        for b in dat[2:end-1]
            print(b ? "#" : " ")
        end
    end
    println()
    flush(stdout)
end

function runca(steps::Int, visualize::Bool)
    @sync for w in workers()
        @async @fetchfrom w fill!(caa, false)
    end
    @fetchfrom wks[Int(nwks/2)+1] caa[2]=true
    visualize && printsimdist(workers())
    for i in 1:steps
        @sync for w in workers()
            @async @fetchfrom w getsetborder()
        end
        @sync for w in workers()
            @async @fetchfrom w rule30()
        end
        visualize && printsimdist(workers())
    end
end



runca (generic function with 1 method)

In [71]:
wks = workers()
nwks = length(wks)
for i in 1:nwks
    sendto(wks[i], neighbours = (i==1 ? wks[nwks] : wks[i-1],
                                i==nwks ? wks[1] : wks[i+1]))
    fetch(@defineat wks[i] const caa = zeros(Bool, 15+2));
end

runca(20,true)


                              #                             
                             ###                            
                            ##  #                           
                           ## ####                          
                          ##  #   #                         
                         ## #### ###                        
                        ##  #    #  #                       
                       ## ####  ######                      
                      ##  #   ###     #                     
                     ## #### ##  #   ###                    
                    ##  #    # #### ##  #                   
                   ## ####  ## #    # ####                  
                  ##  #   ###  ##  ## #   #                 
                 ## #### ##  ### ###  ## ###                
                ##  #    # ###   #  ###  #  #               
               ## ####  ## #  # #####  #######              
              ##  #   ##