# Scaling computations using parallel computing

## Przemysław Szufel

<a class="anchor" id="toc"></a>
## Table of content
  

1. [Multithreading](#multithreading)
2. [Green threading](#green)
3. [Multi-processing and distributed computing](#multiprocessing)
4. [(background material) Parallelize via Single Instruction Multiple Data (SIMD)](#simd)

Before running Jupyter notebook set in Julia number of threads.
This should be done *before* actually running the `notebook()` command.
The number of threads can be also set up in Julia options in Visual Studio code (if this is used to run this notebook).
```
# run this code from Julia console just before starting Jupyter Notebook
ENV["JULIA_NUM_THREADS"]=4
```

In [None]:
println("Number of threads that your Julia is run: ## $(Threads.nthreads())")

In [None]:
using BenchmarkTools, Distributed

<a class="anchor" id="multithreading"></a>
### Multithreading
---- [Return to table of contents](#toc) ---


In [None]:
Threads.nthreads()

In [None]:
function ssum(x)
    r, c = size(x)
    y = zeros(c)
    for i in 1:c
        for j in 1:r
            @inbounds y[i] += x[j, i]
        end
    end
    y
end

In [None]:
function tsum(x)
    r, c = size(x)
    y = zeros(c)
    Threads.@threads for i in 1:c
        for j in 1:r
            @inbounds y[i] += x[j, i]
        end
    end
    y
end


In [None]:
x = rand(1000,10000);

In [None]:
@time ssum(x)
@time ssum(x);

In [None]:
@time tsum(x)
@time tsum(x);

#### Locking mechanism for threads

In [None]:
function f_bad()
    x = 0
    Threads.@threads for i in 1:10^6
        x += 1
    end
    return x
end


In [None]:
f_bad()

In [None]:
function f_add()
    x = 0 
    for i in 1:10^6
        x += 1
    end
    x
end
@btime f_add()
    

In [None]:
function f_atomic()
    x = Threads.Atomic{Int}(0)
    Threads.@threads for i in 1:10^6
        Threads.atomic_add!(x, 1)
    end
    return x[]
end
f_atomic()

In [None]:
function f_spin()
    l = Threads.SpinLock()
    x = 0
    Threads.@threads for i in 1:10^6
        Threads.lock(l) do
            x += 1
        end
    end
    return x
end

function f_reentrant()
    l = ReentrantLock()
    x = 0
    Threads.@threads for i in 1:10^6
        Threads.lock(l) do
            x += 1
        end
    end
    return x
end


In [None]:
using DataFrames
stats = DataFrame()
for f in [f_bad, f_atomic, f_spin, f_reentrant]
    for i = 1:2
        value, elapsedtime  = @timed f()
        push!(stats, (f=string(f),i=i, value=value, timems=elapsedtime*1000))
    end
end
println(stats)


<a class="anchor" id="green"></a>
### Green threading 
---- [Return to table of contents](#toc) ---


In [None]:
@time sleep(2)

In [None]:
@time t = @async sleep(4)

In [None]:
t

In [None]:
function dojob(i)
    val = round(rand(), digits=2)
    sleep(val)   # this could be external computations with I/O
    i, val
end

In [None]:
result = Vector{Tuple{Int,Float64}}(undef, 8)

In [None]:
@time for i=1:8
    result[i] = dojob(i)
end
result

In [None]:
result = Vector{Tuple{Int,Float64}}(undef, 8);
@time for i=1:8
   @async result[i] = dojob(i)
end
result

In [None]:
result = Vector{Tuple{Int,Float64}}(undef, 8);
@time @sync for i=1:8
   @async result[i] = dojob(i)
end
result

<a class="anchor" id="multiprocessing"></a>
### Multi-processing and distributed computing
---- [Return to table of contents](#toc) ---


In [None]:
using Distributed

This code adds 4 workers (and avoids adding more)

In [None]:
addprocs(max(0, 5-nprocs()));

In [None]:
workers()

In [None]:
function s_rand()
    n = 10^4
    x = 0.0
    for i in 1:n
        x += sum(rand(10^4))
    end
    x / n
end
 
@time s_rand()
@time s_rand()


In [None]:
using Distributed
 
function p_rand()
    n = 10^4
    x = @distributed (+) for i in 1:n
        # the last line will be aggregated
        sum(rand(10^4))
    end
    x / n
end

@time p_rand()
@time p_rand()


In [None]:
workers()'

In [None]:
fetch(@spawnat 3 4+3)

In [None]:
function myf() 
    println("I am on worker ", myid())
    rand()
end
myf()

In [None]:
a = nothing
try 
    fetch(@spawnat 4 myf())
catch e
    println(e)
end

In [None]:
@everywhere function myf() 
    println("I am on worker ", myid())
    rand()
end
fetch(@spawnat 4 myf())

#### A typical pattern for setting an intial state across workers

In [None]:
using Distributed
@everywhere using Pkg
@everywhere Pkg.activate(".")
@everywhere using Distributed, Random, DataFrames

@everywhere function calc(x, y)
    2x + y
end

@everywhere function init_worker()    
   Random.seed!(myid())
    # reading initial data from files or other actions
end

@sync for wid in workers()
    @async fetch(@spawnat wid init_worker())
end


Typically results are collected to a `DataFrame`

In [None]:
data = @distributed (append!) for (i, j) = vec(collect(Iterators.product(1:4, 1:3)))
    a = rand(1:499)
    b = rand(1:9)*1000
    c = calc(a, b)
    DataFrame(;i,j,a,b,c,procid = myid())
end

<a class="anchor" id="simd"></a>
### (background material) Parallelize via Single Instruction Multiple Data (SIMD)
---- [Return to table of contents](#toc) ---



In [None]:
function dot1(x, y)
    s = 0.0
    for i in 1:length(x)
        @inbounds s += x[i]*y[i]
    end
    s
end

In [None]:
function dot2(x, y)
    s = 0.0
    @simd for i in 1:length(x)
        @inbounds s += x[i]*y[i]
    end
    s
end

In [None]:
x = 100*rand(10000)
y = 100*rand(10000);

@btime dot1($x, $y)
@btime dot2($x, $y)

In [None]:
res1 =  dot1(x, y)

In [None]:
res2 =  dot2(x, y)

In [None]:
res1 == res2

In [None]:
@show res1 
@show res2