# Parallel and Distributed Computing

**Multithreading** refers to the ability of a processor to execute multiple threads concurrently, where each thread runs a process. **Multiprocessing** refers to the ability of a system to run multiple processors concurrently, where each processor can run one or more threads.

![](https://miro.medium.com/v2/resize:fit:720/format:webp/1*hZ3guTdmDMXevFiT5Z3VrA.png)


## Multithreading
Multi-threading is a programming technique that allows **multiple threads** of execution to run concurrently within a single process. Julia provides built-in support for multi-threading, making it easy to write concurrent code. To use multi-threading in Julia, you can use the Threads standard library.

The number of execution threads is controlled either by using the `-t`/`--threads` command line argument 

```shell
julia --threads 10 my_script.jl
```

or by using the `JULIA_NUM_THREADS` environment variable. This can also be changed in VSCode setting. 


When both `JULIA_NUM_THREADS` and `-t`/`--threads` are specified, then `-t`/`--threads` takes precedence.

The number of threads can either be specified as an integer (`--threads=4`) or as auto (`--threads=auto`), where auto sets the number of threads to the number of local CPU threads.

To check the number of threads available:

In [1]:
Threads.nthreads()

5


Multithreading in Julia is **super easy**: just put `Threads.@threads` in front of the loop you want to parrallelize.

In [4]:
a = zeros(10)

Threads.@threads for i = 1:10
    a[i] = Threads.threadid()
end
println(a)

[5.0, 5.0, 2.0, 2.0, 4.0, 4.0, 3.0, 3.0, 1.0, 1.0]


### Be careful with race condition!


In [5]:
a = Float64[]
Threads.@threads for i in 1:100
    x = i^2
    push!(a, x)
end
println(length(a)) # !== 1000

#### `lock`

The `lock` function can be used to prevent race condition


In [None]:
a = []
lk = ReentrantLock()
Threads.@threads for i in 1:100
    x = i^2
    lock(lk) do
        push!(a, x)
    end
end
println(length(a)) # ==1000

### Overhead
There's a performance benefit to parallelization, but the overhead for starting threads may be an overkill. For multithreading to be worth, you need a reasonably large amount of "real work"; this would demonstrate scaling that is closer to linear in the number of cores. Conversely, with small works, the parallel version might be slower than the serial version.

In [25]:
N = 2^30
x = fill(1.0f0, N)  # a vector filled with 1.0 (Float32)
y = fill(2.0f0, N);  # a vector filled with 2.0


In [22]:
function sequential_add!(y, x)
    for i in eachindex(y, x)
        @inbounds y[i] += x[i]
    end
    return nothing
end

@btime sequential_add!($y, $x)


  437.580 ms (0 allocations: 0 bytes)


In [23]:
function parallel_add!(y, x)
    Threads.@threads for i in eachindex(y, x)
        @inbounds y[i] += x[i]
    end
    return nothing
end

@btime parallel_add!($y, $x)



  437.179 ms (8 allocations: 592 bytes)


### 👍 How I use multithreading in my simulations

I typically have an expensive function that I want to call multiple times with different arguments.



In [1]:
function simul(noise, batch_size)
    # do something with noise and batch_size
    sleep(1)
    return randn()
end

simul (generic function with 1 method)


`simul` does some simulation based on the `noise` and `batch_size` parameters, then returns the simulation result.

I want to loops through all combinations of the arguments proposed. Let's do so by creating a dictionary `pars` for each combination of arguments, and adding it to an array `pars_arr`.


In [2]:
pars_arr = Dict[]

noises = [0.1, 0.2, 0.3]
batch_sizes = [1000, 2000, 3000]

for noise in noises, batch_size in batch_sizes
    pars = Dict()
    pars["noise"] = noise
    pars["batch_size"] = batch_size
    push!(pars_arr, pars)
end

We'll also create a `DataFrame` to store the results.


In [4]:
using DataFrames
df_results = DataFrame("Result" => [],
                    "noise" => [],
                    "batch_size" => [])

Row,Result,noise,batch_size
Unnamed: 0_level_1,Any,Any,Any


Here is how I would run the simulations.


In [5]:
using ProgressMeter
progr = Progress(length(pars_arr), showspeed = true, barlen = 10)

loc = Threads.ReentrantLock()

Threads.@threads for k in 1:length(pars_arr)
    p = pars_arr[k]
    noise = p["noise"]
    batch_size = p["batch_size"]
    try
        out = simul(noise, batch_size)
        lock(loc) do
            push!(df_results, (out, noise, batch_size));
        end
    catch e
        println("problem with p = $(pars_arr[k])")
        println(e)
    end
    next!(progr)
end

[32mProgress:  22%|██▎       |  ETA: 0:00:06 ( 0.81  s/it)[39m[K

[32mProgress:  67%|██████▋   |  ETA: 0:00:01 ( 0.47  s/it)[39m[K

[32mProgress: 100%|██████████| Time: 0:00:02 ( 0.31  s/it)[39m[K




I like using `ProgressMeter`, to get a sense of where my computation is at.


### Atomic operations
Note that you can also perform something called atomic operations, see the [dedicated section](https://docs.julialang.org/en/v1/manual/multi-threading/#Atomic-Operations) in Julia documentation. Atomic operations are similar to what you could do with `lock`, although they may be faster but more limited in what you could do.




## Multi-processing

### `Distributed`
Julia has also a built-in library for distributed parallel computing, called `Distributed`. Although it is generally more difficult to deploy than mulitthreading, it may be useful in certain occasions.  Distributed computing is useful when you have a lot of work that cannot be split among multiple threads and needs to be distributed across multiple machines.

Monte Carlo simulations is another good use-case with distributed computing may be useful.




`julia -p 4` provides `4` worker processes on the local machine. Alternatively, within Julia you can add workers by 
```julia
using Distributed
addprocs(4)  # add 4 worker processes
```

The most straightforward way of performing distributed computing is using  `pmap`. A good tutorial on how to use `pmap` can be found [here](https://github.com/Arpeggeo/julia-distributed-computing).

Note that [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl) may be useful for distributed computing.




#### `MPI.jl`
There exists an MPI (Message Passing Interface) interface for the Julia language, provided by the `MPI.jl` package. MPI is a low-level communication protocol that enables message passing between processes running on different nodes in a distributed system. It may be a better choice due to its interoperability, customization options, performance, and scalability on large-scale systems. If you never heard of it, then forget about it!




## GPU computing

Multiple dispatch allows your code to be executed on GPUS! Here is how.



### GPU programming with CUDA

In [26]:
using BenchmarkTools


In [27]:

function myfun(a::AbstractArray, b::AbstractArray)
    return sum(a.^2 .* b)
end

# generate CPU arrays
a = rand(Float32, 1000, 1000)
b = rand(Float32, 1000, 1000)

using BenchmarkTools
@btime myfun(a, b) # 820.959 μs (3 allocations: 7.63 MiB)

  456.008 μs (3 allocations: 3.81 MiB)


166483.64f0

In [31]:
using CUDA

@assert CUDA.functional()

for d in devices()
    println(d)
end
CUDA.device!(1)
CUDA.current_device()


CuDevice(0)
CuDevice(1)
CuDevice(2)
CuDevice(3)
CuDevice(4)
CuDevice(5)
CuDevice(6)
CuDevice(7)


CuDevice(1): NVIDIA TITAN RTX

In [32]:
a = CUDA.rand(1000, 1000)
b = CUDA.rand(1000, 1000)


1000×1000 CuArray{Float32, 2, CUDA.Mem.DeviceBuffer}:
 0.140214   0.622038   0.73573    …  0.836653   0.640832   0.088709
 0.702914   0.142996   0.282872      0.312544   0.551903   0.778418
 0.347167   0.422628   0.214782      0.556982   0.21898    0.436132
 0.109869   0.405116   0.175415      0.295749   0.558965   0.227327
 0.696447   0.966963   0.0538881     0.557063   0.807533   0.415223
 0.63939    0.679551   0.722373   …  0.0821103  0.28504    0.817443
 0.435309   0.0315925  0.0783469     0.223421   0.196664   0.371542
 0.705055   0.219103   0.749981      0.759564   0.774275   0.593102
 0.0855225  0.676072   0.0670949     0.670792   0.723975   0.508414
 0.973308   0.535222   0.0419881     0.929245   0.713791   0.79461
 ⋮                                ⋱                        
 0.249762   0.0335487  0.240396      0.798747   0.161045   0.826653
 0.711381   0.793769   0.178583      0.976543   0.946686   0.193337
 0.166499   0.0293248  0.885914      0.712727   0.865431   0.72912
 0.2

In [34]:
@btime myfun(a, b)

  60.250 μs (125 allocations: 6.86 KiB)


166715.84f0


### GPU programming on MacOS
```julia
using Metal
a_gpu = MtlArray(a)
b_gpu = MtlArray(b)

@btime myfun(a_gpu, b_gpu)
```


### Additional resources
- [Discourse category Julia at scale](https://discourse.julialang.org/c/domain/parallel/34)
- [Further explanations on Multithreading vs Multiprocessing computing](https://towardsdatascience.com/multithreading-and-multiprocessing-in-10-minutes-20d9b3c6a867)
- [Julia multi threading](https://docs.julialang.org/en/v1/manual/multi-threading/)