# Parallel and Distributed Computing


## Multithreading vs Multiprocessing




- **Multithreading**: multiple threads with shared memory are executed concurrently; each thread runs a process.

- **Multiprocessing**: multiple processors are executed concurrently; each processor can run one or more threads, each thread runs a process.

![](https://miro.medium.com/v2/resize:fit:720/format:webp/1*hZ3guTdmDMXevFiT5Z3VrA.png)


## Multithreading in Julia
🤯 built-in support with the `Threads` standard library 🤯


In [None]:
a = zeros(10)

Threads.@threads for i = 1:10
    a[i] = Threads.threadid()
end
println(a)

[2.0, 2.0, 8.0, 8.0, 3.0, 5.0, 7.0, 6.0, 4.0, 1.0]



The number of execution threads is controlled either by using the `-t`/`--threads` command line argument 

```shell
julia --threads 10 my_script.jl
```

or by using the `JULIA_NUM_THREADS` environment variable. This can also be changed in VSCode setting. 


- `-t`/`--threads` takes precedence.

- The number of threads can either be specified as an integer (`--threads=4`) or as auto (`--threads=auto`) (number of local CPU threads)



To check the number of threads available:

In [2]:
Threads.nthreads()

8

Here is another example to make sure that `Threads.@threads` does indeed run code in parrallel: we compare a simple loop against a loop using `Threads.@threads`.

In [4]:
myfun() = sleep(1)
function myfun_loop()
    for i = 1:10
        myfun()
    end
end

using BenchmarkTools
@btime myfun_loop()

  10.020 s (249 allocations: 6.89 KiB)


In [5]:
function myfun_loop_multithreading()
    Threads.@threads for i = 1:10
        myfun()
    end
end

@btime myfun_loop_multithreading()


  2.007 s (123 allocations: 6.42 KiB)


### Be careful with race condition!
Multiple processes or threads accessing and manipulating shared resources or data concurrently may lead to unexpected and undesirable results. 

In [6]:
a = Float64[]
Threads.@threads for i in 1:1000
    push!(a, i)
end
@assert println(length(a)) == 1000

927


AssertionError: AssertionError: println(length(a)) == 1000

#### `lock` 

The `lock` function can be used to prevent race condition


In [7]:
a = []
lk = ReentrantLock()
Threads.@threads for i in 1:100
    x = i^2
    lock(lk) do
        push!(a, x)
    end
end
println(length(a)) # ==1000

100


- `lk` is a synchronisation primitive
- ensures that only one thread can access the shared resource at a time

### Overhead
- There's a performance benefit to parallelization, but the overhead for starting threads may be an overkill. 
- For multithreading to be worth, you need a reasonably large amount of "real work". 


In [19]:
N = 2^10
x = fill(1.0f0, N)  # a vector filled with 1.0 (Float32)
y = fill(2.0f0, N);  # a vector filled with 2.0


In [20]:
function sequential_add!(y, x)
    for i in eachindex(y, x)
        @inbounds y[i] += x[i]
    end
    return nothing
end

using BenchmarkTools
@btime sequential_add!($y, $x)


  69.744 ns (0 allocations: 0 bytes)


In [21]:
function parallel_add!(y, x)
    Threads.@threads for i in eachindex(y, x)
        @inbounds y[i] += x[i]
    end
    return nothing
end

@btime parallel_add!($y, $x)



  7.333 μs (41 allocations: 4.23 KiB)


### Practical use of multithreading: grid search


In [11]:
function simul(noise, batch_size)
    # do something with noise and batch_size
    sleep(1)
    return randn()
end

simul (generic function with 1 method)


- we may want to evaluate it for many combinations of `noise` and `batch_size`. 
- Let's do so by creating a dictionary `pars` for each combination of arguments, and adding it to an array `pars_arr`.


In [12]:
pars_arr = Dict[]

noises = [0.1, 0.2, 0.3]
batch_sizes = [1000, 2000, 3000]

for noise in noises, batch_size in batch_sizes
    pars = Dict()
    pars["noise"] = noise
    pars["batch_size"] = batch_size
    push!(pars_arr, pars)
end

We'll also create a `DataFrame` to store the results.


In [13]:
using DataFrames
df_results = DataFrame("Result" => [],
                    "noise" => [],
                    "batch_size" => [])

Row,Result,noise,batch_size
Unnamed: 0_level_1,Any,Any,Any


And now, we can write the loop. Let's be fancy

In [14]:
using ProgressMeter
progr = Progress(length(pars_arr), showspeed = true, barlen = 10)

loc = Threads.ReentrantLock()

Threads.@threads for k in 1:length(pars_arr)
    p = pars_arr[k]
    noise = p["noise"]
    batch_size = p["batch_size"]
    try
        out = simul(noise, batch_size)
        lock(loc) do
            push!(df_results, (out, noise, batch_size));
        end
    catch e
        println("problem with p = $(pars_arr[k])")
        println(e)
    end
    next!(progr)
end

[32mProgress: 100%|██████████| Time: 0:00:02 ( 0.33  s/it)[39m[K



### Atomic operations
Atomic operations can also be used to prevent race condition, see the [dedicated section](https://docs.julialang.org/en/v1/manual/multi-threading/#Atomic-Operations) in Julia documentation. Atomic operations are limited to primitive types, but can be faster than locks.


## Multi-processing in Julia

built-in support with the `Distributed` standard library. 

- More difficult to deploy than mulitthreading. 
- useful when you have a lot of work that cannot be split among multiple threads and needs to be distributed across multiple machines (e.g., Monte Carlo simulations).


- `julia -p 4` provides `4` worker processes on the local machine. 
- Alternatively, within Julia you can add workers by 
```julia
using Distributed
addprocs(4)  # add 4 worker processes
```


- The most straightforward way of performing distributed computing is using  `pmap`. 
- A good tutorial on how to use `pmap` can be found [here](https://github.com/Arpeggeo/julia-distributed-computing).
- [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl) may be useful.




#### `MPI.jl`
There exists an MPI (Message Passing Interface) interface for the Julia language, provided by the `MPI.jl` package. MPI is a low-level communication protocol that enables message passing between processes running on different nodes in a distributed system. It may be a better choice due to its interoperability, customization options, performance, and scalability on large-scale systems. If you never heard of it, then forget about it!




## GPU computing

Multiple dispatch allows your code to be executed on GPUS! Here is how.


In [23]:
# Reference function
function myfun(a::AbstractArray, b::AbstractArray)
    return sum(a.^2 * b)
end

# generate CPU arrays
a = rand(Float32, 1000, 1000)
b = rand(Float32, 1000, 1000)

using BenchmarkTools
@btime myfun($a, $b) #  4.015 ms (4 allocations: 7.63 MiB)


  4.015 ms (4 allocations: 7.63 MiB)


1.6656771f8

### GPU programming with CUDA

In [16]:
using CUDA

@assert CUDA.functional()

for d in devices()
    println(d)
end
CUDA.device!(1)
CUDA.current_device()


AssertionError: AssertionError: CUDA.functional()

In [17]:
a_cuda = CUDA.rand(1000, 1000)
b_cuda = CUDA.rand(1000, 1000)


ErrorException: CUDA driver not found

In [18]:
@btime myfun($a_cuda, $b_cuda)

UndefVarError: UndefVarError: `a_cuda` not defined


### GPU programming with Metal


In [22]:
using Metal
a_mtl = MtlArray(a)
b_mtl = MtlArray(b)


  1.254 ms (892 allocations: 25.27 KiB)


1.6690597f8

In [None]:

@btime myfun($a_mtl, $b_mtl) # 1.254 ms (892 allocations: 25.27 KiB)

### Additional resources and acknowledgements
- [Discourse category :Julia at scale](https://discourse.julialang.org/c/domain/parallel/34)
- [Further explanations on Multithreading vs Multiprocessing computing](https://towardsdatascience.com/multithreading-and-multiprocessing-in-10-minutes-20d9b3c6a867)
- [Julia multi threading](https://docs.julialang.org/en/v1/manual/multi-threading/)
- [CUDA.jl documentation](https://github.com/JuliaGPU/CUDA.jl)
- [A tutorial for using MPI.jl in ML](https://github.com/LuxDL/Lux.jl/tree/main/examples/ImageNet)