# Multithreading

By default Julia only using a single thread. To start it with multiple threads we must tell it explicitly:

##### Command line argument

```bash
julia -t 4
```

or

```bash
julia --threads 4
```

##### Environmental variable

On Linux/MacOS:

```bash
export JULIA_NUM_THREADS=4
```

On Windows:

```bash
set JULIA_NUM_THREADS=4
```

Afterwards start julia *in the same terminal*.

##### Jupyter kernel

You can also create a *Jupyter kernel* for multithreaded Julia:

```julia
using IJulia
installkernel("Julia (4 threads)", "--project=@.", env=Dict("JULIA_NUM_THREADS"=>"4"))
```

*Note:* This has to be manually redone for every new Julia version and you have to restart your Jupyter process to see an effect.

To check this has worked we we use:

In [None]:
Threads.nthreads()

## `Threads.@spawn`

The `Threads.@spawn` macro dynamically spawns a new thread to execute a command in the background. Programmatically, it creates a `Task` and puts it on the todo-list. Whenever a thread is free, the task is dynamically assigned to a thread and executing the work starts.

In [None]:
Threads.@spawn println("test")

**Important:** `Threads.@spawn` returns the created task *immediately*, but we might have to wait until the task is done and fetch the result later:

In [None]:
t = Threads.@spawn begin
    sleep(3)
    4
end
println("got here")
# We immediately get here
@time fetch(t)  # This waits until the task is done

To prevent the immediate return, we need to explicitly synchronise the execution using an `@sync` macro barrier.
For example:

In [None]:
@sync begin
    t = Threads.@spawn begin
        sleep(3)
        4
    end
end
println("got here")

@time fetch(t)  # No need to wait, the task is already done

## Filling an array in parallel

Now, let's use this to actually parallelise something: We will fill an array in parallel:

In [None]:
function fill_array_parallel(a)
    @sync for i in 1:length(a)
        Threads.@spawn a[i] = Threads.threadid()
    end
    a
end

a = zeros(Threads.nthreads()*10);
fill_array_parallel(a)

In [None]:
@show count(a .== 1.0)
@show count(a .== 2.0)
@show count(a .== 3.0)
@show count(a .== 4.0)

Note: Due to the **dynamic scheduling** some threads actually do more work (more values of i) than others!

## Nesting threading

A key motion in the Julia ecosystem is to support **nested threading**:

In [None]:
function threaded_fun()
    x = Threads.threadid()
    Threads.@spawn println("job1", " (spawned from $x, processed by $(Threads.threadid()))")
    Threads.@spawn println("job2", " (spawned from $x, processed by $(Threads.threadid()))")
    Threads.@spawn println("job3", " (spawned from $x, processed by $(Threads.threadid()))")
end

In [None]:
@sync for i in 1:Threads.nthreads()
    Threads.@spawn threaded_fun()
end

The key point about this is that in this way the threading of different layers of functions does not interfer by causing more threads to be spawned than there are workers (CPU cores).

The issue happens rather easily whenever a parallelised routine like `threaded_fun` (e.g. a numerical integration routine) is again called from a parallelised outer loop (e.g. a solver). To avoid the problem one needs to introduce some kind of coupling between the routines to communicate to the inner routine (`threaded_fun`) how many threads it may use. To avoid the need to do this explicitly, Julia implemented has decided to base its threading mostly on dynamic scheduling and the `@spawn` formalism.

## Threading takes extra care: Parallel summation

We consider the case of a parallel summation

In [None]:
function mysum(xs)
    s = zero(eltype(xs))
    for x in xs
        s += x
    end
    s
end

In [None]:
function mysum_parallel_naive(xs)
    s = zero(eltype(xs))
    @sync for x in xs
        Threads.@spawn (s += x)
    end
    s
end

In [None]:
xs = rand(100_000);

In [None]:
@show sum(xs);
@show mysum(xs);
@show mysum_parallel_naive(xs);

Hmmm ... the problem is a so-called **race condition**, a clash due to the parallel writing access from multiple threads.

One way to solve this is by using [Atomic Operations](https://docs.julialang.org/en/v1/manual/multi-threading/#Atomic-Operations):

In [None]:
import Base.Threads: Atomic, atomic_add!

function mysum_parallel_atomics(xs)
    T = eltype(xs)
    s = Atomic{T}(zero(T))
    @sync for x in xs
        Threads.@spawn atomic_add!(s, x)
    end
    s[]
end

In [None]:
@show mysum(xs);
@show mysum_parallel_atomics(xs);

In [None]:
@btime mysum($xs);
@btime mysum_parallel_atomics($xs);
@btime mysum_parallel_naive($xs);

**Note:** Atomics are generally bad. Don't use this paradigm in production unless you know what you are doing. Use FLoops.jl (see below).

## Is there no static scheduling option in Julia?

Yes there is and it can sometimes be faster than dynamic threading:

In [None]:
function mysum_parallel_threads(xs)
    T = eltype(xs)
    s = Atomic{T}(zero(T))
    Threads.@threads :static for x in xs
        atomic_add!(s, x)
    end
    s[]
end

In [None]:
@btime mysum_parallel_atomics($xs);
@btime mysum_parallel_threads($xs);

While on a first look this has advantages in form of a 10-fold reduced speed, the disadvantages are that there is no nested threading and there can be severe load imbalancing since work is split statically at startup of the loop.

## FLoops.jl: Easy and fast dynamic threads

As a way out the Julia ecosystem has brought forward a number of carefully optimised packages for threaded execution based on *dynamic* scheduling. One example is [FLoops.jl](https://github.com/JuliaFolds/FLoops.jl). Our `mysum` function is parallelised using FLoops by just adding two macros:

In [None]:
using FLoops

function mysum_parallel_floops(xs)
    s = zero(eltype(xs))
    @floop for x in xs
        @reduce s += x
    end
    s
end

Still it gives the right result and is faster than our statically scheduled `@threads` version:

In [None]:
@show mysum(xs);
@show mysum_parallel_floops(xs);

In [None]:
@btime mysum_parallel_threads($xs);
@btime mysum_parallel_floops($xs);

**Note:** The fact that `FLoops` is faster is a little misleading at first sight, but illustrates an important point nevertheless:

- If *perfectly written* statically scheduled threads are faster than dynamically scheduled threads
- But this requires deep insight to obtain optimal load balancing, careful use of atomics etc.
- If you are not a parallelisation expert carefully optimised packages based on *dynamical scheduling* will likely be faster for your use case. The plain reason is that the *learning time* to understand all the neccessary tricks and the time needed to *fix all the subtle bugs* is not to be underestimated.

For even more flexibility `@floop` supports different executors that allow to easily switch between serial, parallel, CPU, GPU, ... execution:

In [None]:
function mysum_parallel_floops(xs, executor)
    @floop executor for i in eachindex(xs)
        @reduce s += xs[i]
    end
    s
end

In [None]:
@btime mysum_parallel_floops($xs, $(SequentialEx()));  # Sequential
@btime mysum_parallel_floops($xs, $(ThreadedEx()));    # Threaded

If you have a GPU you can even try:

In [None]:
using FoldsCUDA
using CUDA
xsgpu = cu(xs)  # Transfer data to GPU
@btime mysum_parallel_floops($xsgpu, $(CUDAEx()));     # GPU

## ThreadsX.jl: Parallelised Base functions

In [None]:
using ThreadsX

In [None]:
ys = rand(1_000_000);

In [None]:
@btime Base.sum($ys);

In [None]:
@btime ThreadsX.sum($ys);

## LoopVectorization.jl: Macros vectorising loops

In [None]:
using LoopVectorization

function mysum_turbo(xs)
    s = zero(eltype(xs))
    @tturbo for i in eachindex(xs)
        @inbounds s += xs[i]
    end
    s
end

In [None]:
@btime mysum_turbo($ys);

### Takeaways
- Julia's thread infrastructure is mostly based on *dynamic* threading
- The advantages are thread nesting and better load balancing in cases where load per iteration is not uniform.
- The disadvantage is a larger startup time per thread
- Packages like FLoops.jl make it easy to write fast parallel code.

##### More details
- https://juliafolds.github.io/data-parallelism/