# Multithreading in Julia


In [2]:
versioninfo()
Base.Threads.nthreads()

Julia Version 1.3.0-rc4.1
Commit 8c4656b97a (2019-10-15 14:08 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Core(TM) i7-7660U CPU @ 2.50GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-6.0.1 (ORCJIT, skylake)
Environment:
  JULIA_PKG_DEVDIR = /home/vchuravy/src
  JULIA_NUM_THREADS = 4


4

## Types of parallelism

- data parallelism
- structured parallelism

In [18]:
import Base.Threads: nthreads, threadid, @spawn, @threads
import Base.Threads: Atomic, atomic_add!
import Base.Threads: ReentrantLock, SpinLock
using BenchmarkTools

## Data parallelism `@threads`

In [None]:
a = zeros(Int, nthreads()*3)
@threads for i in 1:length(a)
    @show threadid()
    a[i] = threadid()
end
a

## Structured parallelism

In [None]:
function fib(n::Int)
    if n < 2
        return n
    end
    t = @spawn fib(n - 2)
    return fib(n - 1) + fetch(t)
end

## Racy-code

In [None]:
function count()
    acc = 0
    @threads for i in 1:10_000
        acc = acc + 1
    end
    return acc
end
# What is the result?

In [None]:
count()

In [None]:
@btime count()

In [None]:
function count_ref()
    acc = Ref(0)
    @threads for i in 1:10_000
        acc[] += 1
    end
    return acc[]
end
# What is the result?

In [None]:
count_ref()

In [None]:
@btime count_ref()

### Using Atomics

In [13]:
function count_atomic()
    acc = Atomic{Int64}(0)
    @threads for i in 1:10_000
        atomic_add!(acc, 1)
    end
    acc
end
# What is the result?

count_atomic (generic function with 1 method)

In [14]:
count_atomic()

Atomic{Int64}(10000)

In [15]:
@btime count_atomic()

  77.666 μs (30 allocations: 3.02 KiB)


Atomic{Int64}(10000)

### Using Locks

Atomics are good for protecting a single value,
for bigger amounts of data locks might more sense.

In [6]:
struct Accumulator{T, L}
    x::Base.RefValue{T}
    lock::L
end

Base.lock(a::Accumulator) = lock(a.lock)
Base.unlock(a::Accumulator) = unlock(a.lock)
Base.getindex(a::Accumulator) = a.x[]
Base.setindex!(a::Accumulator, val) = a.x[] = val

In [7]:
function count(acc)
    @threads for i in 1:10_000
        lock(acc)
        acc[] += 1
        unlock(acc)
    end
    return acc[]
end

count (generic function with 1 method)

### ReentrantLock
Standard lock, best used when having to lock large section of code.

In [24]:
acc = Accumulator(Ref{Int}(0), ReentrantLock());

In [25]:
@btime count($acc)

  1.095 ms (60 allocations: 3.48 KiB)


15430000

### SpinLock

Lower overhead version for very small sections of code, inefficient if we protect large sections of code.

In [26]:
acc = Accumulator(Ref{Int}(0), SpinLock());

In [27]:
@btime count($acc)

  119.399 μs (30 allocations: 3.02 KiB)


125640000

### Avoiding all of this

In [28]:
function count_racefree()
    # note this is prone to false-sharing
    acc = zeros(Int, nthreads())
    @threads for i in 1:10_000
        acc[threadid()] += 1
    end
    return sum(acc)
end

count_racefree (generic function with 1 method)

In [23]:
@btime count_racefree()

  16.755 μs (30 allocations: 3.11 KiB)


10000

### Other helpful tools

- https://docs.julialang.org/en/v1/manual/parallel-computing/#Channels-1
- https://docs.julialang.org/en/v1.2/base/parallel/#Base.Semaphore
- https://docs.julialang.org/en/v1.2/base/parallel/#Base.Event

## False sharing

https://software.intel.com/en-us/articles/avoiding-and-identifying-false-sharing-among-threads

In [29]:
function sum_simple()
    acc = zeros(Int64, nthreads())
    @threads for tid in 1:nthreads()
        for i in 1:10_000
             acc[tid] += 1
        end
    end
    sum(acc)
end

sum_simple (generic function with 1 method)

In [30]:
function sum_cacheaware()
    CACHE_LINE = 64 #bytes
    elems = div(CACHE_LINE, sizeof(Int64))
    acc = zeros(Int64, nthreads()*elems)
    @threads for tid in 1:nthreads()
        store = (tid-1)*elems+1
        for i in 1:10_000
             acc[store] += 1
        end
    end
    sum(acc)
end

sum_cacheaware (generic function with 1 method)

In [31]:
@benchmark sum_simple()

BenchmarkTools.Trial: 
  memory estimate:  3.11 KiB
  allocs estimate:  30
  --------------
  minimum time:     33.154 μs (0.00% GC)
  median time:      87.621 μs (0.00% GC)
  mean time:        254.371 μs (0.00% GC)
  maximum time:     21.260 ms (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1

In [32]:
@benchmark sum_cacheaware()

BenchmarkTools.Trial: 
  memory estimate:  3.34 KiB
  allocs estimate:  30
  --------------
  minimum time:     24.693 μs (0.00% GC)
  median time:      35.112 μs (0.00% GC)
  mean time:        80.054 μs (0.00% GC)
  maximum time:     8.336 ms (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1

## A fun user submission
Help me! this code is slow

In [33]:
using BenchmarkTools
using Base.Threads
using LinearAlgebra
using Random
println("Number of threads: ", nthreads())

Number of threads: 4


In [34]:
function myfun(rng::MersenneTwister)
    s = 0.0
    N = 10000
    for i=1:N
        s+=det(randn(rng, 3,3))
    end
    s/N
end

rgi   = [MersenneTwister(rand(UInt)) for _ in 1:nthreads()]
function bench(rgi)
    a  = zeros(1000)
    @threads for i=1:length(a)
        a[i] = myfun(rgi[threadid()])
    end
end

bench (generic function with 1 method)

In [35]:
result = @benchmark bench($rgi)
display(result)

BenchmarkTools.Trial: 
  memory estimate:  4.32 GiB
  allocs estimate:  40000046
  --------------
  minimum time:     6.938 s (14.96% GC)
  median time:      6.938 s (14.96% GC)
  mean time:        6.938 s (14.96% GC)
  maximum time:     6.938 s (14.96% GC)
  --------------
  samples:          1
  evals/sample:     1

#### Thread 1

```
BenchmarkTools.Trial: 
  memory estimate:  4.32 GiB
  allocs estimate:  40000002
  --------------
  minimum time:     4.063 s (4.03% GC)
  median time:      4.217 s (3.57% GC)
  mean time:        4.217 s (3.57% GC)
  maximum time:     4.371 s (3.14% GC)
  --------------
  samples:          2
  evals/sample:     1
```

#### Thread 4

```
BenchmarkTools.Trial: 
  memory estimate:  3.52 GiB
  allocs estimate:  34212074
  --------------
  minimum time:     3.346 s (0.00% GC)
  median time:      3.960 s (10.85% GC)
  mean time:        3.960 s (10.85% GC)
  maximum time:     4.574 s (18.78% GC)
  --------------
  samples:          2
  evals/sample:     1
```

#### How did I optimise this code?
1. Memory allocations in hot-loop
2. Eliminate allocs caused by rand
3. Investigate how det is implemented
4. Implement det!
5. Remove overhead to library call
6. Use profiling tools
7. Start using StaticArrays

Full story here: https://hackmd.io/@dLigA9a4SwKmdcaQtloXXw/BkyZ5Mmbb

```julia
@edit det(zeros(3, 3)) -> det(lufact(A))
lufact(A, pivot = true) = lufact!(copy(A), pivot)
```

```julia
det!(A) = det(lufact!(A))
det!(A) = det(LinearAlgebra.generic_lufact!(A))
```

`det!` originally was calling a `lufact!` from LAPACK,
which is overkill for the matrix size. First attempt switch to a pure Julia implementation.

In [None]:
using BenchmarkTools
using StaticArrays
using Base.Threads
println("Number of threads: ", nthreads())

In [None]:
function myfun(rng::MersenneTwister)
    s = 0.0
    N = 10000
    for i=1:N
        s += det(randn(rng, SMatrix{3, 3}))
    end
    s/N
end

In [None]:
rgi   = [MersenneTwister(abs(rand(Int))) for s in 1:nthreads()]

function bench(rgi)
    a  = zeros(1000)
    @threads for i=1:length(a)
        @inbounds a[i] = myfun(rgi[threadid()])
    end
end

In [None]:
result = @benchmark bench($rgi)
display(result)

## Examples

In [None]:
function psort(v::AbstractVector)
    hi = length(v)
    if hi < 100_000 # below some cutoff, run in serial
        return sort(view(v, 1:hi), alg = MergeSort)
    end
    
    # split the range and sort the halves in parallel recursively
    mid = (1+hi) ÷ 2
    half = @spawn psort(view(v, 1:mid))
    right = psort(view(v, (mid+1):hi))
    left = fetch(half)::typeof(right)
    
    # perform the merge on the result
    out = similar(v)
    i, il, ir = 1, 1, 1
    @inbounds while il <= mid && ir <= (hi-mid)
        l, r = left[il], right[ir]
        if l < r
            out[i] = l
            il += 1
        else
            out[i] = r
            ir += 1
        end
        i += 1
    end
    @inbounds while il <= length(left)
        out[i] = left[il]
        il += 1
        i += 1
    end
    @inbounds while ir <= length(right)
        out[i] = right[ir]
        ir += 1
        i += 1
    end
    return out
end

In [None]:
# function sieve(numPrimes, n::Integer=nthreads(), buffer = 5)
#     primes = Int[]
    
#     #completion signal
#     done = Atomic{Bool}(false)

#     sieves = [Channel{Int}(buffer) for i in 1:n]
#     for i in 1:n
#         @spawn begin
#             p = take!(sieves[i])
#             push!(primes, p)
#             if length(primes) == numPrimes
#                 done[] = true
#                 return
#             end
#             mp = p # non-prime multiples of p
#             for m in sieves[i]
#                 while m > mp
#                     mp += p
#                 end
#                 if m < mp # rem(m, p) > 0
#                     put!(sieves[i + 1], m)
#                 end
#             end
#         end
#     end
    
#     put!(sieves[1], 2)
#     n = 3
#     while !done[]
#         put!(sieves[1], n)
#         n += 2
#     end
#     return primes
# end

### CSP style programming

- https://github.com/NHDaly/CspExamples.jl/blob/master/src/CspExamples.jl