In [1]:
println("Number of threads that your Julia is run: ## $(Threads.nthreads())")

Number of threads that your Julia is run: ## 8


In [3]:
using BenchmarkTools, Distributed

### Parallelize via Single Instruction Multiple Data (SIMD)

In [4]:
function dot1(x, y)
    s = 0.0
    for i in 1:length(x)
        @inbounds s += x[i]*y[i]
    end
    s
end

dot1 (generic function with 1 method)

In [5]:
function dot2(x, y)
    s = 0.0
    @simd for i in 1:length(x)
        @inbounds s += x[i]*y[i]
    end
    s
end

dot2 (generic function with 1 method)

In [6]:
x = 100*rand(10000)
y = 100*rand(10000)

res1 = @btime dot1($x, $y)
res2 = @btime dot2($x, $y)

println(res1)
println(res2)

  12.025 μs (0 allocations: 0 bytes)
  2.110 μs (0 allocations: 0 bytes)
2.5095465536428425e7
2.5095465536428474e7


### Green threading

In [9]:
@time sleep(2)

  2.005748 seconds (64 allocations: 1.922 KiB)


In [10]:
@time @async sleep(2)

  0.005964 seconds (8.01 k allocations: 462.538 KiB)


Task (runnable) @0x0000023ddb060200

In [1]:
function dojob(i)
    val = round(rand(), digits=2)
    sleep(val)   # this could be external computations with I/O
    i, val
end

dojob (generic function with 1 method)

In [2]:
result = Vector{Tuple{Int,Float64}}(undef, 8)

8-element Vector{Tuple{Int64, Float64}}:
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)

In [3]:
@time for i=1:8
    result[i] = dojob(i)
end
result

  3.378491 seconds (1.48 k allocations: 85.574 KiB, 0.35% compilation time)


8-element Vector{Tuple{Int64, Float64}}:
 (1, 0.23)
 (2, 0.77)
 (3, 0.11)
 (4, 0.47)
 (5, 0.61)
 (6, 0.22)
 (7, 0.14)
 (8, 0.76)

In [23]:
result = Vector{Tuple{Int,Float64}}(undef, 8);
@time for i=1:8
   @async result[i] = dojob(i)
end
result

  0.000062 seconds (83 allocations: 7.139 KiB)


8-element Vector{Tuple{Int64, Float64}}:
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)
 (0, 0.0)

In [4]:
result = Vector{Tuple{Int,Float64}}(undef, 8);
@time @sync for i=1:8
   @async result[i] = dojob(i)
end
result

  0.943283 seconds (16.85 k allocations: 1.017 MiB, 6.64% compilation time)


8-element Vector{Tuple{Int64, Float64}}:
 (1, 0.74)
 (2, 0.85)
 (3, 0.07)
 (4, 0.65)
 (5, 0.33)
 (6, 0.04)
 (7, 0.78)
 (8, 0.22)

#### Programming a simple web server
You should be able to connect using the address <a href="http://localhost:9991/3+4" target="about:blank">http://localhost:9991/3+4</a>

To stop web server click <a href="http://localhost:9991/stopme" target="about:blank">http://localhost:9991/stopme</a>

In [5]:
using Sockets
println("Starting the web server...")
server = Sockets.listen(9991)

Starting the web server...


Sockets.TCPServer(Base.Libc.WindowsRawSocket(0x000000000000042c) active)

In [6]:
@async begin
    contt = Ref(true)
    while contt[]
        sock = Sockets.accept(server)
        @async begin
            data = readline(sock)
            print("Got request:\n", data, "\n")
            cmd = split(data, " ")[2][2:end]
            println(sock, "\nHTTP/1.1 200 OK\nContent-Type: text/html\n")
            contt[] = contt[] && (!occursin("stopme", data))
            if contt[]
                 println(sock, string("<html><body>", cmd, "=", eval(Meta.parse(cmd)), "</body></html>"))
            else
                println(sock,"<html><body>stopping</body></html>")
            end
            close(sock)
        end
    end
    println("Handling requests stopped")
end

Task (runnable) @0x0000016a0d7d05e0

Got request:
GET /3+4 HTTP/1.1
Got request:
GET /stopme HTTP/1.1
Handling requests stopped
Got request:



### Multithreading

In [7]:
Threads.nthreads()

1

In [28]:
function ssum(x)
    r, c = size(x)
    y = zeros(c)
    for i in 1:c
        for j in 1:r
            @inbounds y[i] += x[j, i]
        end
    end
    y
end

ssum (generic function with 1 method)

In [30]:
function tsum(x)
    r, c = size(x)
    y = zeros(c)
    Threads.@threads for i in 1:c
        for j in 1:r
            @inbounds y[i] += x[j, i]
        end
    end
    y
end


tsum (generic function with 1 method)

In [42]:
x = rand(10000,1000);

In [43]:
@time ssum(x)
@time ssum(x);

  0.015025 seconds (1 allocation: 7.938 KiB)
  0.009474 seconds (1 allocation: 7.938 KiB)


In [44]:
@time tsum(x)
@time tsum(x);

  0.004882 seconds (52 allocations: 13.359 KiB)
  0.001982 seconds (52 allocations: 13.359 KiB)


#### Locking mechanism for threads

In [45]:
function f_bad()
    x = 0
    Threads.@threads for i in 1:10^7
        x += 1
    end
    return x
end


f_bad (generic function with 1 method)

In [48]:
@time f_bad()

  0.525740 seconds (10.00 M allocations: 152.578 MiB)


1293224

In [49]:
function f_atomic()
    x = Threads.Atomic{Int}(0)
    Threads.@threads for i in 1:10^7
        Threads.atomic_add!(x, 1)
    end
    return x[]
end

function f_spin()
    l = Threads.SpinLock()
    x = 0
    Threads.@threads for i in 1:10^7
        Threads.lock(l) do
            x += 1
        end
    end
    return x
end

function f_reentrant()
    l = ReentrantLock()
    x = 0
    Threads.@threads for i in 1:10^7
        Threads.lock(l) do
            x += 1
        end
    end
    return x
end


f_reentrant (generic function with 1 method)

In [50]:
using DataFrames
stats = DataFrame()
for f in [f_bad, f_atomic, f_spin, f_reentrant]
    for i = 1:2
        value, elapsedtime  = @timed f()
        push!(stats, (f=string(f),i=i, value=value, timems=elapsedtime*1000))
    end
end
println(stats)


[1m8×4 DataFrame[0m
[1m Row [0m│[1m f           [0m[1m i     [0m[1m value    [0m[1m timems   [0m
     │[90m String      [0m[90m Int64 [0m[90m Int64    [0m[90m Float64  [0m
─────┼────────────────────────────────────────
   1 │ f_bad            1   1306731   254.793
   2 │ f_bad            2   1494738   397.46
   3 │ f_atomic         1  10000000   231.957
   4 │ f_atomic         2  10000000   195.303
   5 │ f_spin           1  10000000  4758.85
   6 │ f_spin           2  10000000  4721.66
   7 │ f_reentrant      1  10000000  6788.93
   8 │ f_reentrant      2  10000000  6573.98


### Multi-processing and distributed computing

In [8]:
using Distributed

In [10]:
addprocs(max(0, 5-nworkers()));

In [11]:
workers()

4-element Vector{Int64}:
 2
 3
 4
 5

In [63]:
function s_rand()
    n = 10^4
    x = 0.0
    for i in 1:n
        x += sum(rand(10^4))
    end
    x / n
end
 
@time s_rand()
@time s_rand()


  0.435935 seconds (20.00 k allocations: 763.397 MiB, 22.88% gc time)
  0.419023 seconds (20.00 k allocations: 763.397 MiB, 16.58% gc time)


5000.002440794839

In [12]:
using Distributed
 
 
function p_rand()
    n = 10^4
    x = @distributed (+) for i in 1:n
        sum(rand(10^4))
    end
    x / n
end

@time p_rand()
@time p_rand()


  2.651535 seconds (1.01 M allocations: 51.516 MiB, 0.62% gc time, 39.56% compilation time)
  0.318478 seconds (518 allocations: 30.836 KiB)


5000.396789794042

In [20]:
using Distributed
@everywhere using Distributed, Random, DataFrames

@everywhere function calc(x, y)
    2x + y
end

@everywhere function init_worker()    
   Random.seed!(myid())
end

@sync for wid in workers()
    @async fetch(@spawnat wid init_worker())
end


Typically results are collected to a `DataFrame`

In [25]:
data = @distributed (append!) for i = 1:24
    a = rand(1:499)
    b = rand(1:9)*1000
    c = calc(a, b)
    DataFrame(;a,b,c,procid = myid())
end

Row,a,b,c,procid
Unnamed: 0_level_1,Int64,Int64,Int64,Int64
1,470,6000,6940,2
2,135,7000,7270,2
3,172,9000,9344,2
4,345,6000,6690,2
5,20,7000,7040,2
6,189,6000,6378,2
7,44,1000,1088,3
8,176,3000,3352,3
9,3,5000,5006,3
10,406,6000,6812,3


#### Advanced Inteprocess communication - cellular automaton example

In [16]:
using Distributed
@everywhere using ParallelDataTransfer, Distributed


@everywhere function rule30()
    lastv = Main.caa[1]
    for i in 2:(length(Main.caa)-1)
        current = Main.caa[i]
        Main.caa[i] = xor(lastv, Main.caa[i] || Main.caa[i+1])
        lastv = current
    end
end


@everywhere function getcaa()
    Main.caa
end
@everywhere function getsetborder()
    #println(myid(),"gs");flush(stdout)
    Main.caa[1] = (@fetchfrom Main.neighbours[1] getcaa()[15+1])
    #println(myid(),"gs1");flush(stdout)
    Main.caa[end] = (@fetchfrom Main.neighbours[2] getcaa()[2])
    #println(myid(),"gse");flush(stdout)
end

function printsimdist(workers::Array{Int})
    for w in workers
        dat = @fetchfrom w caa
        for b in dat[2:end-1]
            print(b ? "#" : " ")
        end
    end
    println()
    flush(stdout)
end

function runca(steps::Int, visualize::Bool)
    @sync for w in workers()
        @async @fetchfrom w fill!(caa, false)
    end
    @fetchfrom wks[Int(nwks/2)+1] caa[2]=true
    visualize && printsimdist(workers())
    for i in 1:steps
        @sync for w in workers()
            @async @fetchfrom w getsetborder()
        end
        @sync for w in workers()
            @async @fetchfrom w rule30()
        end
        visualize && printsimdist(workers())
    end
end



runca (generic function with 1 method)

In [17]:
wks = workers()
nwks = length(wks)
for i in 1:nwks
    sendto(wks[i], neighbours = (i==1 ? wks[nwks] : wks[i-1],
                                i==nwks ? wks[1] : wks[i+1]))
    fetch(@defineat wks[i] const caa = zeros(Bool, 15+2));
end

runca(20,true)


                              #                             
                             ###                            
                            ##  #                           
                           ## ####                          
                          ##  #   #                         
                         ## #### ###                        
                        ##  #    #  #                       
                       ## ####  ######                      
                      ##  #   ###     #                     
                     ## #### ##  #   ###                    
                    ##  #    # #### ##  #                   
                   ## ####  ## #    # ####                  
                  ##  #   ###  ##  ## #   #                 
                 ## #### ##  ### ###  ## ###                
                ##  #    # ###   #  ###  #  #               
               ## ####  ## #  # #####  #######              
              ##  #   ##