# Cuidados con la paralelización en Julia

Reference: [PSA: Thread-local state is no longer recommended](https://julialang.org/blog/2023/07/PSA-dont-use-threadid/)

Podemos analizar la escritura de "codigo paralelo incorrecto" como sigue:

```julia
using Base.Threads: nthreads, @threads, threadid

states = [some_initial_value for _ in 1:nthreads()]
@threads for x in some_data
    tid = threadid()
    old_value = states[tid]
    new_value = some_operator(old_value, f(x))
    states[tid] = new_value
end
do_something(states)
```

El código anterior es incorrecto porque las *taks* generadas por `@threads` pueden ceder el paso a otras *tasks* durante su ejecución. 

Entre la lectura de `old_value` y el almacenamiento en memoria de `new_value`, la *task* actual podría ser pausada y una nueva *task* que se ejecuta en el mismo *thread* con el mismo `threadid()` podría escribir simultáneamente a la variable `states[tid]`, causando una *race condition* y por lo tanto la pérdida de trabajo.

Notemos que no se trata de un problema específico del *multithreading*, sino de un problema de concurrencia. (en la refencia hay un ejemplo demostrando que el problema presiste aún si usamos un único *thread* `julia --threads=1`).

# Ejemplo simple

In [6]:
using Pkg
Pkg.activate("./")
Pkg.status()

[32m[1m  Activating[22m[39m new project at `~/github_repositories/my_repositories/workshop_juliero/multithread`


[32m[1mStatus[22m[39m `~/github_repositories/my_repositories/workshop_juliero/multithread/Project.toml` (empty project)


## Comparación entre versiones seriales y paralelizadas

Supongamos que queremos calcular la siguiente operación:

$$
\Large
a=\sum _{i=1}^{N} f(i)
\normalsize
$$

donde $f$ es alguna función arbitraria de $i$. Entonces, creamos diversas funciones para computar esto y vemos si producen distintos resultados.

### Versiones seriales

In [21]:
#=
- Function
    reduce_serial_01(f,N)
- Description
    This function calculates the sum of the results of the function f from 1 to n.
- Arguments
    `f::Function`: Function to be calculated
    `N::Int`: The number of times the function is calculated
- Output
    `a`: Sum of the results of the function f from 1 to n
=#
function reduce_serial_01(f::Function,N::Int)
    a=0.0
    for i in 1:N
        a+=f(i)
    end
    return a;
end

reduce_serial_01 (generic function with 1 method)

Versión compacta y generalizada

In [22]:
#=
- Function
    reduce_serial_02(f,op,itr)
- Description
    This function calculates the result of the function f for each element of the iterator itr
    and then calculates the result of the operation op.
- Arguments
    `f::Function``: Function to be calculated
    `op`: Operation to be calculated
    `itr`: Iterator
- Output
    `a`: Result of the operation op
=#
reduce_serial_02(f::Function,op,itr) = reduce(op,f.(itr));

### Versiones paralelas

Función análoga a `reduce_serial_01(f,N)` pero usando el macro `Threads.@threads`. Notemos que esta es un codigo paralelo incorrecto porque cada hay muchos *tasks* tratando de escribir al mismo lugar de memoria (en este caso en la variable `a`).

In [23]:
function reduce_parallel_01(f::Function,N::Int)
    a=0.0
    Threads.@threads for i in 1:N
        a+=f(i)
    end
    return a;
end

reduce_parallel_01 (generic function with 1 method)

Función análoga a `reduce_serial_01(f,n)` pero usando el macro `Threads.@threads`. Además, tratamos de solucionar el problema de *race condition* de la función anterior `reduce_parallel_01(f,N)`.

In [24]:
function reduce_parallel_02(f::Function,N::Int)
    a_values=zeros(N)
    Threads.@threads for i in 1:N
        a_values[i]=f(i)
    end
    return sum(a_values);
end

reduce_parallel_02 (generic function with 1 method)

Versión extraida de la referencia (versión compacta y generalizada). Según la referencia es una función que posiblemente se agregue a codigo base de Julia.

In [25]:
using Base.Threads: nthreads, @spawn
#=
- Function
    tmapreduce(f, op, itr; tasks_per_thread::Int = 2, kwargs...)
- Aim
    Perform a mapreduce operation on the function f.
- Arguments
    - f::Function: A function that takes an integer as input and returns an integer.
    - op::Function: A function that takes two integers as input and returns an integer.
    - itr::AbstractArray: An array of integers.
    - tasks_per_thread::Int: An integer.
    - kwargs: Keyword arguments.
- Output
    The result of the mapreduce operation.
=#
function tmapreduce(f, op, itr; tasks_per_thread::Int = 2, kwargs...)
    chunk_size = max(1, length(itr) ÷ (tasks_per_thread * nthreads()))
    tasks = map(Iterators.partition(itr, chunk_size)) do chunk
        @spawn mapreduce(f, op, chunk; kwargs...)
    end
    mapreduce(fetch, op, tasks; kwargs...);
end

tmapreduce (generic function with 1 method)

Ahora hacemos una versión un poco más explícita para enteder la función anterior

**Overhead**: se refiere a que existen recursos adicionales (tiempo, memoria y poder de procesamiento) necesarios para gestionar y ejecutar una task más allá del trabajo real que se está realizando. Por ejemplo, en Julia crear, switchear y sincronizar tasks produce overhead.

**Load balancing**: si tenemos pocos tasks por thread y como no sabemos cuántas operaciones requerirá cada task, algunos threads terminarán de trabajar y otros no. Y como el código debe esperar a que términe todo el paralelismo para poder seguir, lo anterior no es conveniente, porque podríamos tener mucho tiempo de threads sin trabajar. Si aumentamos más tasks por threads nos aseguramos de que cada threads utilice un tiempo considerable de computo.


In [26]:
using Base.Threads: nthreads, @threads, @spawn
using Base.Iterators: partition

function reduce_parallel_03(f::Function,N::Int)
    # customize this as needed. More tasks have more overhead, but better load balancing
    tasks_per_thread = Threads.nthreads()
    itr=1:N
    # partition your data into chunks that individual tasks will deal with
    chunk_size = max(1, length(itr) ÷ (tasks_per_thread * nthreads()))
    tasks = map(Iterators.partition(itr, chunk_size)) do chunk
        # Each chunk of your data gets its own spawned task that does its own local,
        #  sequential work and then returns the result
        @spawn begin
            partial_sum=0.0
            for i in chunk
                partial_sum += f(i)
            end
            return partial_sum
        end
    end
    # get all the values returned by the individual tasks. fetch is type unstable,
    #  so you may optionally want to assert a specific return type.
    partial_sums=fetch.(tasks)
    # sum reduction
    return sum(partial_sums);
end

reduce_parallel_03 (generic function with 1 method)

En Julia, la "type instability" se refiere a situaciones en las que el tipo de una variable o expresión no puede determinarse en tiempo de compilación y puede cambiar durante la ejecución. Esto puede llevar a problemas de rendimiento porque el compilador de Julia no puede optimizar el código de manera eficiente, ya que no puede generar un código de máquina específico para un tipo conocido.

Cuando el comentario dice "fetch is type unstable", significa que el resultado de `fetch(task)` no tiene un tipo predecible o consistente. Dado que `fetch` recupera el resultado de una computación que podría devolver diferentes tipos dependiendo de la entrada o función utilizada, el compilador no puede saber de antemano qué tipo recibirá. Esta falta de estabilidad de tipo puede llevar a una ejecución del código más lenta porque el tiempo de ejecución de Julia tiene que manejar la variabilidad de manera dinámica.

Para mitigar esto, podrías especificar o asegurar un tipo de retorno específico cuando trabajes con `fetch` si sabes qué tipo de resultado esperar. Por ejemplo, si esperas que `fetch(task)` devuelva un `Float64`, podrías usar aserciones o anotaciones de tipo para informar al compilador, lo cual puede ayudar a mejorar el rendimiento.

### Ahora testeamos las funciones anteriores

Consideramos que $f(i) = i$ y que $N=100$ entonces queremos calcular la siguiente expresión

$$
\Large
a=\sum _{i=1}^{100} i = n(n+1)/2
\normalsize
$$

In [37]:
N=200
f(i)=i

f (generic function with 1 method)

In [41]:
exact_value = N*(N+1)/2
println("Exact value = $(exact_value)")

Exact value = 20100.0


A priori, parecería ser que todas las funciones nos dan lo mismo.

In [69]:
println("Single calculation version 1 = $(reduce_serial_01(f,N))")
println("Single calculation version 2 = $(reduce_serial_02(f,+,[i for i in 1:N]))")
println("Parallel calculation version 1 = $(reduce_parallel_01(f,N))")
println("Parallel calculation version 2 = $(reduce_parallel_02(f,N))")
println("Parallel calculation version 3 = $(tmapreduce(f,+,[i for i in 1:N]))")
println("Parallel calculation version 4 = $(reduce_parallel_03(f,N))")

Single calculation version 1 = 20100.0
Single calculation version 2 = 20100
Parallel calculation version 1 = 20100.0
Parallel calculation version 2 = 20100.0
Parallel calculation version 3 = 20100
Parallel calculation version 4 = 20100.0


# Evidenciemos el problema de "race condition"

In [42]:
using Test

Notemos que con la función `reduce_parallel_01(f,N)` tendremos errores en el resultado de forma no controlable.

In [66]:
@testset "Check reduce_parallel_01 function" begin
    for i in 1:2
        @test reduce_parallel_01(f,N) == exact_value
    end
end

[0m[1mTest Summary:                     | [22m[32m[1mPass  [22m[39m[36m[1mTotal  [22m[39m[0m[1mTime[22m
Check reduce_parallel_01 function | [32m   2  [39m[36m    2  [39m[0m0.0s


Test.DefaultTestSet("Check reduce_parallel_01 function", Any[], 2, false, false, true, 1.722179391816403e9, 1.722179391817438e9, false)

In [68]:
@testset "Check reduce_parallel_01 function" begin
    for i in 1:2
        @test reduce_parallel_01(f,N) == exact_value
    end
end

Check reduce_parallel_01 function: [91m[1mTest Failed[22m[39m at [39m[1m/home/martin/github_repositories/my_repositories/workshop_juliero/multithread/jl_notebook_cell_df34fa98e69747e1a8f8a730347b8e2f_Y102sZmlsZQ==.jl:3[22m
  Expression: reduce_parallel_01(f, N) == exact_value
   Evaluated: 16325.0 == 20100.0

Stacktrace:
 [1] [0m[1mmacro expansion[22m
[90m   @[39m [90m~/Downloads/julia-1.9.0/share/julia/stdlib/v1.9/Test/src/[39m[90m[4mTest.jl:478[24m[39m[90m [inlined][39m
 [2] [0m[1mmacro expansion[22m
[90m   @[39m [90m~/github_repositories/my_repositories/workshop_juliero/multithread/[39m[90m[4mjl_notebook_cell_df34fa98e69747e1a8f8a730347b8e2f_Y102sZmlsZQ==.jl:3[24m[39m[90m [inlined][39m
 [3] [0m[1mmacro expansion[22m
[90m   @[39m [90m~/Downloads/julia-1.9.0/share/julia/stdlib/v1.9/Test/src/[39m[90m[4mTest.jl:1498[24m[39m[90m [inlined][39m
 [4] top-level scope
[90m   @[39m [90m~/github_repositories/my_repositories/workshop_juliero/multi

TestSetException: Some tests did not pass: 1 passed, 1 failed, 0 errored, 0 broken.

Sin embargo el resto de funciónes nunca tienen problemas, por más de que las corramos muchas veces.

In [72]:
@testset "Check reduce_serial_01" begin
    for i in 1:100
        @test reduce_serial_01(f,N) == exact_value
    end
end

@testset "Check reduce_serial_02" begin
    for i in 1:100
        @test reduce_serial_02(f,+,[i for i in 1:N]) == exact_value
    end
end

@testset "Check reduce_parallel_02" begin
    for i in 1:100
        @test reduce_parallel_02(f,N) == exact_value
    end
end

@testset "Check tmapreduce" begin
    for i in 1:100
        @test tmapreduce(f,+,[i for i in 1:N]) == exact_value
    end
end

@testset "Check tmapreduce" begin
    for i in 1:100
        @test reduce_parallel_03(f,N) == exact_value
    end
end

[0m[1mTest Summary:          | [22m[32m[1mPass  [22m[39m[36m[1mTotal  [22m[39m[0m[1mTime[22m
Check reduce_serial_01 | [32m 100  [39m[36m  100  [39m[0m0.0s
[0m[1mTest Summary:          | [22m[32m[1mPass  [22m[39m[36m[1mTotal  [22m[39m[0m[1mTime[22m
Check reduce_serial_02 | [32m 100  [39m[36m  100  [39m[0m0.0s
[0m[1mTest Summary:            | [22m[32m[1mPass  [22m[39m[36m[1mTotal  [22m[39m[0m[1mTime[22m
Check reduce_parallel_02 | [32m 100  [39m[36m  100  [39m[0m0.0s
[0m[1mTest Summary:    | [22m[32m[1mPass  [22m[39m[36m[1mTotal  [22m[39m[0m[1mTime[22m
Check tmapreduce | [32m 100  [39m[36m  100  [39m[0m0.0s
[0m[1mTest Summary:    | [22m[32m[1mPass  [22m[39m[36m[1mTotal  [22m[39m[0m[1mTime[22m
Check tmapreduce | [32m 100  [39m[36m  100  [39m[0m0.0s


Test.DefaultTestSet("Check tmapreduce", Any[], 100, false, false, true, 1.7221795665905e9, 1.722179566608613e9, false)