# Parallel computing and GPU programming with Julia 
## Part I: Multi-threading
Alexis Montoison

In [1]:
using Base.Threads
using BenchmarkTools

- **Thread** is the smallest unit of executable code that performs a particular task.
- An application can divided into multiple tasks and each can be assigned to a thread.
- Many threads executing simultaneously is termed as **multi-threading**.

In Julia, all relevant functions for multi-threading are in the `Threads` library.
How many threads do we have access to?

In [2]:
Threads.nthreads()

12

we will need more than one thread to be able to gain any performance from multi-threading...

Julia can be started with a given number of threads in different ways:

```julia
JULIA_NUM_THREADS=4 julia  # we can also set the `JULIA_NUM_THREADS` environment variable in .bashrc.
julia -t 4
julia --threads 4
julia -t auto
```

The main multithreading approach is to use the `Threads.@threads` macro which parallelizes a for loop to run with multiple threads. Let us operate on the array `a` simultaneously using 4 threads. We'll have each thread write its thread ID into each location.

**Note**: 4 is the number of threads on my computer.

In [3]:
a = zeros(Int, 10)
Threads.@threads for i = 1:10
    a[i] = Threads.threadid()
end
display(a)

10-element Vector{Int64}:
  2
  9
 12
  6
  7
 10
  1
  8
  3
 11

The iteration space is split among the threads. What is the difference between `:static` and `:dynamic` schedulers?

In [4]:
function busywait(seconds)
    tstart = time_ns()
    while (time_ns() - tstart) / 1e9 < seconds
    end
end

busywait (generic function with 1 method)

The schedule used by the macro `Threads.@threads` can be specified as:
* `:static` schedules one task per thread apriori
* `:dynamic` (default) schedules tasks among threads dynamically

In [5]:
@time begin
    Threads.@spawn busywait(5)
    Threads.@threads :static for i in 1:Threads.nthreads()
        busywait(1)
    end
end

  6.046568 seconds (63.43 k allocations: 3.391 MiB, 0.93% compilation time)


In [6]:
@time begin
    Threads.@spawn busywait(5)
    Threads.@threads :dynamic for i in 1:Threads.nthreads()
        busywait(1)
    end
end

  2.037974 seconds (42.13 k allocations: 2.229 MiB, 1.86% compilation time)


`@inbounds` macro is used to skip in range check of the iterator `i` in the matrix `A`

In [7]:
function sqrt_array(A)
    B = similar(A)
    for i in eachindex(A)
        @inbounds B[i] = sqrt(A[i])
    end
    B
end

sqrt_array (generic function with 1 method)

In [8]:
function threaded_sqrt_array(A)
    B = similar(A)
    @threads for i in eachindex(A)
        @inbounds B[i] = sqrt(A[i])
    end
    B
end

threaded_sqrt_array (generic function with 1 method)

In [9]:
n = 1000
A = rand(n, n)
@btime sqrt_array(A);
@btime threaded_sqrt_array(A);

  1.473 ms (2 allocations: 7.63 MiB)


  448.933 μs (75 allocations: 7.64 MiB)


Do we have the correct result?

In [10]:
sqrt_array(A) ≈ threaded_sqrt_array(A)

true

With 4 threads, the speedup could be about a factor of 3

In [11]:
function sqrt_sum(A)
    s = zero(eltype(A))
    for i in eachindex(A)
        @inbounds s += sqrt(A[i])
    end
    return s
end

sqrt_sum (generic function with 1 method)

In [12]:
function threaded_sqrt_sum(A)
    s = zero(eltype(A))
    @threads for i in eachindex(A)
        @inbounds s += sqrt(A[i])
    end
    return s
end

threaded_sqrt_sum (generic function with 1 method)

In [13]:
n = 1000
A = rand(n, n)
@btime sqrt_sum(A);
@btime threaded_sqrt_sum(A);

  1.458 ms (1 allocation: 16 bytes)


  12.409 ms (2000074 allocations: 30.52 MiB)


In [14]:
sqrt_sum(A) ≈ threaded_sqrt_sum(A)

false

The result of the multithreaded function is wrong due to missing thread synchronization

In [15]:
# Ref{Int} is an object that safely references data of type Int.
# This type is guaranteed to point to valid, Julia-allocated memory of the correct type.
acc = Ref{Int}(0)
@threads for i in 1:1000
    acc[] += 1
end
acc[]

251

With multi-threading we need to be aware of possible race conditions, i.e. when the order in which threads read from and write to memory can change the result of a computation.

![](./Graphics/update_int.png)

You are entirely responsible for ensuring that your program is data-race free. Be very careful about reading any data if another thread might write to it!

![](./Graphics/meme_race_conditions.jpg)

Julia supports accessing and modifying values atomically, that is, in a thread-safe way to avoid race conditions.
A value (which must be of a primitive type) can be wrapped as `Threads.Atomic` to indicate it must be accessed in this way. Here we can see an example:

In [16]:
 acc = Atomic{Int}(0)
 @threads for i in 1:1000
    atomic_add!(acc, 1)
end
acc[]

1000

In [17]:
i = Threads.Atomic{Int}(0)
old_i = zeros(4)
Threads.@threads for id in 1:4
    old_i[id] = atomic_add!(i, id) # Threads.atomic_add! returns the old value of i!
end
display(i[])
old_i

10

4-element Vector{Float64}:
 0.0
 1.0
 3.0
 6.0

Let's solve the race condition in our previous example:

In [18]:
function threaded_sqrt_sum_atomic(A)
    T = eltype(A)
    s = Atomic{T}(zero(T))
    @threads for i in eachindex(A)
        @inbounds atomic_add!(s, sqrt(A[i]))
    end
    return s[]
end

threaded_sqrt_sum_atomic (generic function with 1 method)

In [19]:
@btime threaded_sqrt_sum_atomic(A);

  70.470 ms (77 allocations: 6.39 KiB)


In [20]:
sqrt_sum(A) ≈ threaded_sqrt_sum_atomic(A)

true

In [21]:
function threaded_sqrt_sum_optimized(A)
    T = eltype(A)
    partial = zeros(T, nthreads())
    @threads for i in eachindex(A)
        @inbounds partial[threadid()] += sqrt(A[i])
    end
    s = zero(T)
    for i in eachindex(partial)
        s += partial[i]
    end
    return s
end

threaded_sqrt_sum_optimized (generic function with 1 method)

In [22]:
@btime threaded_sqrt_sum_optimized(A);

  519.993 μs (75 allocations: 6.50 KiB)


We observe that:
- The serial version provides the correct value and reference execution time.
- The race condition version is both slow and wrong.
- The atomic version is correct but extremely slow.
- The optimized version is fast and correct, but required refactoring.

**Conclusion**: Threads is as easy as decorating for loops with `@threads`, but data dependencies (race conditions) need to be avoided.
It sometimes requires code refactorization.
Using `atomic` operations adds significant overhead and thus only makes sense if each iteration of the loop takes significant time to compute.

![](./Graphics/meme_multithreading.jpg)

#### Exercise: Multithread the computation of π

Consider the following function which estimates π by “throwing darts”, i.e. randomly sampling (x,y) points in the interval [0.0, 1.0] and checking if they fall within the unit circle.
<img src='./Graphics/pi_with_darts.png' width='400'>

In [23]:
function estimate_pi(num_points)
    hits = 0
    for _ in 1:num_points
        x, y = rand(), rand()
        if x^2 + y^2 < 1.0
            hits += 1
        end
    end
    fraction = hits / num_points
    return 4 * fraction
end

estimate_pi (generic function with 1 method)

In [24]:
num_points = 100_000_000
@btime estimate_pi(num_points)  # 3.14147572...

  334.244 ms (1 allocation: 16 bytes)


3.141523

In [25]:
function threaded_estimate_pi_v1(num_points)
    hits = Atomic{Int}(0)
    @threads for _ in 1:num_points
        x, y = rand(), rand()
        if x^2 + y^2 < 1.0
            atomic_add!(hits, 1)
        end
    end
    fraction = hits[] / num_points
    return 4 * fraction
end

threaded_estimate_pi_v1 (generic function with 1 method)

In [26]:
num_points = 100_000_000
@btime threaded_estimate_pi_v1(num_points)

  1.588 s (92 allocations: 6.70 KiB)


3.14147204

In [27]:
function threaded_estimate_pi_v2(num_points)
    partial_hits = zeros(Int, nthreads())
    @threads for _ in 1:num_points
        x, y = rand(), rand()
        if x^2 + y^2 < 1.0
            partial_hits[threadid()] += 1
        end
    end
    hits = sum(partial_hits)
    fraction = hits / num_points
    return 4 * fraction
end

threaded_estimate_pi_v2 (generic function with 1 method)

In [28]:
num_points = 100_000_000
@btime threaded_estimate_pi_v2(num_points)

  269.257 ms (77 allocations: 6.53 KiB)


3.1417878

```julia
julia -t 1 threaded_estimate_pi.jl
pi = 3.14176872
time = 950.957122

julia -t 2 threaded_estimate_pi.jl
pi = 3.1412234
time = 732.195929

julia -t 4 threaded_estimate_pi.jl
pi = 3.14180932
time = 663.25783
```

Parallel scaling is not linear with the number of threads! Comparing to the unthreaded version reveals the overhead from creating and managing threads.

## Homework 🤓

- Implement a multi-threaded version of the dot product between two vectors.
- Implement a multi-threaded version of the matrix-vector products `A * v` and `Aᵀ * v` where A is a SparseMatrixCSC. Explain which product is more adapted for multi-threading.
![label_image](https://matteding.github.io/images/csc.gif)

# References:
- https://docs.julialang.org/en/v1/base/multi-threading
- https://enccs.github.io/Julia-for-HPC/multithreading
- https://miro.medium.com/v2/resize:fit:4800/format:webp/1*2JKgg1exEuvgO8mLfEEmRg.png

## Vector product: $\mathbf{A}^\text{T}\mathbf{v}$, where $\mathbf{A}$ is a CSC matrix

In [47]:
using SparseArrays
using Random
using Printf

In [48]:
K = 2000
# generate a compressed sparse column matrix and a random vector
Random.seed!(1234)
m = 10 * K
n = 5 * K
A = sprand(m,n,0.1)
v = rand(m)

display(A)

# println("col ptr: ", A.colptr)
# println("row id:  ", A.rowval)
# println("data:    ", A.nzval)
@printf "number of non-zero values: %i\n" length(A.nzval)

20000×10000 SparseMatrixCSC{Float64, Int64} with 19996614 stored entries:
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿

number of non-zero values: 19996614


In [49]:
function CSC_vector_product(A::SparseMatrixCSC,v::Vector)
    product = zeros(A.n)

    for i in 1:A.n
        for j in A.colptr[i]:(A.colptr[i+1]-1)
            product[i] += A.nzval[j]*v[A.rowval[j]]
        end
    end
    return product
end

CSC_vector_product (generic function with 1 method)

In [50]:
function threaded_CSC_vector_product(A::SparseMatrixCSC,v::Vector)
    product = zeros(A.n)

    @threads for i in 1:A.n
        for j in A.colptr[i]:(A.colptr[i+1]-1)
            product[i] += A.nzval[j]*v[A.rowval[j]]
        end
    end
    return product
end

threaded_CSC_vector_product (generic function with 1 method)

In [51]:
println((A'*v) ≈ CSC_vector_product(A,v))
println((A'*v) ≈ threaded_CSC_vector_product(A,v))

true


true


In [52]:
@btime CSC_vector_product(A,v);
@btime threaded_CSC_vector_product(A,v);

  30.976 ms (2 allocations: 78.17 KiB)


  10.047 ms (75 allocations: 85.06 KiB)


## Vector product: $\mathbf{A}\mathbf{v}$, where $\mathbf{A}$ is a CSC matrix

In [53]:
Random.seed!(1234)
v = rand(n);

In [54]:
function CSC_vector_product_t(A::SparseMatrixCSC,v::Vector)
    product = zeros(A.m)

    for i in 1:A.n # columns
        for j in A.colptr[i]:(A.colptr[i+1]-1)
            product[A.rowval[j]] += A.nzval[j]*v[i]
        end
    end
    return product
end

CSC_vector_product_t (generic function with 1 method)

In [55]:
function threaded_CSC_vector_product_t(A::SparseMatrixCSC,v::Vector)
    product = zeros(A.m)

    @threads for i in 1:A.n # columns
        for j in A.colptr[i]:(A.colptr[i+1]-1)
            product[A.rowval[j]] += A.nzval[j]*v[i]
        end
    end
    return product
end

function threaded_CSC_vector_product_t_atomic(A::SparseMatrixCSC,v::Vector)
    product = zeros(A.m)
    
    for i in 1:A.n # columns
        @threads for j in A.colptr[i]:(A.colptr[i+1]-1)
            product[A.rowval[j]] += A.nzval[j]*v[i]
        end
    end
    return product
end

function threaded_CSC_vector_product_t_optimized(A::SparseMatrixCSC,v::Vector)
    product = zeros(A.m)
    T = eltype(A)
    partial = zeros(T, A.m, nthreads())
    @threads for i in 1:A.n # columns
        for j in A.colptr[i]:(A.colptr[i+1]-1)
            partial[A.rowval[j],threadid()] += A.nzval[j]*v[i]
        end
    end
    for i in 1:nthreads()
        product += partial[:,i]
    end
        
    return product
end

threaded_CSC_vector_product_t_optimized (generic function with 1 method)

In [56]:
println((A*v) ≈ CSC_vector_product_t(A,v))
println((A*v) ≈ threaded_CSC_vector_product_t(A,v))
println((A*v) ≈ threaded_CSC_vector_product_t_atomic(A,v))
println((A*v) ≈ threaded_CSC_vector_product_t_optimized(A,v))

true


false


true


true


In [57]:
@btime CSC_vector_product_t(A,v);
@btime threaded_CSC_vector_product_t(A,v);
@btime threaded_CSC_vector_product_t_atomic(A,v);
@btime threaded_CSC_vector_product_t_optimized(A,v);

  37.939 ms (2 allocations: 156.30 KiB)


  31.394 ms (76 allocations: 163.22 KiB)


  118.516 ms (741276 allocations: 67.79 MiB)


  11.944 ms (125 allocations: 5.65 MiB)
