# Scaling out numerical computing in Julia
Przemysław Szufel




<a class="anchor" id="toc"></a>
## Table of content

    
1. [Parallelize via Single Instruction Multiple Data (SIMD)](#simd)
2. [Green threading](#green)
3. [Multithreading](#multithreading)
4. [Multi-processing and distributed computing](#multiprocessing)

Before running Jupyter notebook set in Julia number of threads.
This should be done *before* actually running the `notebook()` command.
The number of threads can be also set up in Julia options in Visual Studio code (if this is used to run this notebook).
```
# run this code from Julia console just before starting Jupyter Notebook
ENV["JULIA_NUM_THREADS"]=4
```

In [1]:
println("Number of threads that your Julia is run: ## $(Threads.nthreads())")

Number of threads that your Julia is run: ## 4


In [3]:
using Pkg
Pkg.status()

[32m[1mStatus[22m[39m `C:\AAABIBLIOTEKA\EuropeanCommission\2024_Julica_EC_Distributed_GPU\1_ParallelDistributed\Project.toml`
  [90m[1520ce14] [39mAbstractTrees v0.4.5
  [90m[6e4b80f9] [39mBenchmarkTools v1.5.0
  [90m[336ed68f] [39mCSV v0.10.14
  [90m[34f1f09b] [39mClusterManagers v0.4.6
  [90m[a93c6f00] [39mDataFrames v1.6.1
  [90m[31c24e10] [39mDistributions v0.25.108
  [90m[cd3eb016] [39mHTTP v1.10.8
  [90m[0f8b85d8] [39mJSON3 v1.14.0
  [90m[91a5bcdd] [39mPlots v1.40.4


In [4]:
using BenchmarkTools, Distributed

<a class="anchor" id="simd"></a>
### Parallelize via Single Instruction Multiple Data (SIMD)
---- [Return to table of contents](#toc) ---



In [6]:
function dot1(x, y)
    s = 0.0
    for i in 1:length(x)
        @inbounds s += x[i]*y[i]
    end
    s
end

dot1 (generic function with 1 method)

In [7]:
function dot2(x, y)
    s = 0.0
    @simd for i in 1:length(x)
        @inbounds s += x[i]*y[i]
    end
    s
end

dot2 (generic function with 1 method)

In [16]:
x'

1×10000 adjoint(::Vector{Float64}) with eltype Float64:
 59.9256  51.1308  95.4121  4.47716  …  64.5328  31.1439  2.26223  87.9916

In [8]:
x = 100*rand(10000)
y = 100*rand(10000);

@btime dot1($x, $y)
@btime dot2($x, $y)

  4.000 μs (0 allocations: 0 bytes)
  764.035 ns (0 allocations: 0 bytes)


2.504822558295526e7

In [11]:
using LinearAlgebra
@btime dot($x, $y)

  1.140 μs (0 allocations: 0 bytes)


2.504822558295526e7

In [12]:
res1 =  dot1(x, y)

2.5048225582955126e7

In [13]:
res2 ≈  dot2(x, y)

2.504822558295526e7

In [72]:
res1 == res2

false

In [18]:
@show res1 
@show res2

res1 = 2.5048225582955126e7
res2 = 2.504822558295526e7


2.504822558295526e7

In [15]:
res1 ≈ res2

true

<a class="anchor" id="green"></a>
### Green threading 
---- [Return to table of contents](#toc) ---


In [19]:
@time sleep(2)

  2.016791 seconds (44 allocations: 1.297 KiB)


In [21]:
@time t = @async sleep(4)

  0.000069 seconds (27 allocations: 2.086 KiB)


Task (runnable) @0x000002be45d95180

In [23]:
t

Task (done) @0x000002be45d95180

In [24]:
function dojob(i)
    fname = download("https://szufel.pl/pliki/plik.txt?id=$i")  # this could be external computations with I/O
    i, fname
end

dojob (generic function with 1 method)

In [26]:
_, myfname = dojob(2)

(2, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_3eDO92zmlo")

In [29]:
result = Vector{Tuple{Int,String}}(undef, 8)

8-element Vector{Tuple{Int64, String}}:
 #undef
 #undef
 #undef
 #undef
 #undef
 #undef
 #undef
 #undef

In [30]:
@time for i=1:8
    result[i] = dojob(i)
end
result

  1.355866 seconds (3.78 k allocations: 223.281 KiB, 0.43% compilation time)


8-element Vector{Tuple{Int64, String}}:
 (1, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_9yl2RqzSJa")
 (2, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_5iT2r4JY5E")
 (3, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_DMfE38lkRy")
 (4, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_luNYHCl6fC")
 (5, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_Dkb6ZM1Ofe")
 (6, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_FqVG56R8JI")
 (7, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_xyrqNsdadk")
 (8, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_3ybqBW9MZC")

In [34]:
result = Vector{Tuple{Int,String}}(undef, 8)
@time for i=1:8
   @async result[i] = dojob(i)
end
sleep(0.12)
result

  0.000104 seconds (81 allocations: 7.055 KiB)


8-element Vector{Tuple{Int64, String}}:
    (1, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_1shivUl6lq")
    (2, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_RqJSLi1mro")
 #undef
    (4, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_Duj0hyJkhM")
 #undef
 #undef
    (7, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_xyHMLuneVm")
 #undef

In [35]:
result


8-element Vector{Tuple{Int64, String}}:
 (1, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_1shivUl6lq")
 (2, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_RqJSLi1mro")
 (3, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_ZOrMt6t8F8")
 (4, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_Duj0hyJkhM")
 (5, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_P6PW5iZW1y")
 (6, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_hw5mPiVOtu")
 (7, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_xyHMLuneVm")
 (8, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_pwLo1ijWvi")

In [37]:
result = Vector{Tuple{Int,String}}(undef, 8);
@time for i=1:8
   result[i] = dojob(i)
end

  1.325766 seconds (3.18 k allocations: 180.594 KiB)


In [38]:
result = Vector{Tuple{Int,String}}(undef, 8);
@time @sync for i=1:8
   @async result[i] = dojob(i)
end
result

  0.518670 seconds (4.53 k allocations: 262.211 KiB, 1.35% compilation time)


8-element Vector{Tuple{Int64, String}}:
 (1, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_1mhMB430Bg")
 (2, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_ZcxIXKliz0")
 (3, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_5gnYpyPO7a")
 (4, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_5E7q9ATERo")
 (5, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_LW7whgbS1q")
 (6, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_REZs1SLUbk")
 (7, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_bW3iPgDwFK")
 (8, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_PGtCNipwd8")

In [49]:
function f(c)
    val = take!(c)
    println("DONE $val")
end

c= Channel()
@async f(c)


Task (runnable) @0x000002be45e49180

In [50]:
put!(c, "done")

DONE done

"done"




In [42]:
function jobstat(sem::Base.RefValue{Bool})
    sleep(4)
    sem[] = true
end


jobstat (generic function with 1 method)

In [43]:
completed = Ref(false)
@async jobstat(completed)

Task (runnable) @0x000002be45a083f0

In [45]:
completed

Base.RefValue{Bool}(true)

In [51]:
asyncmap(dojob, 1:20; ntasks=5)

20-element Vector{Tuple{Int64, String}}:
 (1, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_J2Fw3chSDO")
 (2, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_RmtinExWPI")
 (3, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_b4xQRQHk9y")
 (4, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_F8fY9YzMDU")
 (5, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_1ghuVm7Iba")
 (6, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_7IBkp0HYXW")
 (7, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_3A1qLYXQNG")
 (8, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_pSrWjSpCdG")
 (9, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_nufCfKDWd6")
 (10, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_1UtczC9eXo")
 (11, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_5sLMVgXSZA")
 (12, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_py96fOFunA")
 (13, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_p0Jw54BcPY")
 (14, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_ri16HwdoNu")
 (15, "C:\\Users\\pszuf\\AppData\\Local\\Temp\\jl_N4jglUDin2")
 (16, "C:\\Users\\pszuf

In [25]:
?asyncmap

search: [0m[1ma[22m[0m[1ms[22m[0m[1my[22m[0m[1mn[22m[0m[1mc[22m[0m[1mm[22m[0m[1ma[22m[0m[1mp[22m [0m[1ma[22m[0m[1ms[22m[0m[1my[22m[0m[1mn[22m[0m[1mc[22m[0m[1mm[22m[0m[1ma[22m[0m[1mp[22m!



```
asyncmap(f, c...; ntasks=0, batch_size=nothing)
```

Uses multiple concurrent tasks to map `f` over a collection (or multiple equal length collections). For multiple collection arguments, `f` is applied elementwise.

`ntasks` specifies the number of tasks to run concurrently. Depending on the length of the collections, if `ntasks` is unspecified, up to 100 tasks will be used for concurrent mapping.

`ntasks` can also be specified as a zero-arg function. In this case, the number of tasks to run in parallel is checked before processing every element and a new task started if the value of `ntasks_func` is greater than the current number of tasks.

If `batch_size` is specified, the collection is processed in batch mode. `f` must then be a function that must accept a `Vector` of argument tuples and must return a vector of results. The input vector will have a length of `batch_size` or less.

The following examples highlight execution in different tasks by returning the `objectid` of the tasks in which the mapping function is executed.

First, with `ntasks` undefined, each element is processed in a different task.

```
julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
```

With `ntasks=2` all elements are processed in 2 tasks.

```
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
```

With `batch_size` defined, the mapping function needs to be changed to accept an array of argument tuples and return an array of results. `map` is used in the modified mapping function to achieve this.

```
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
```


#### Programming a simple web server
You should be able to connect using the address <a href="http://localhost:9992/3+4" target="about:blank">http://localhost:9992/3+4</a>

To stop web server click <a href="http://localhost:9992/stopme" target="about:blank">http://localhost:9992/stopme</a>

In [52]:
using Sockets
println("Starting the web server...")
server = Sockets.listen(9992)

Starting the web server...


Sockets.TCPServer(Base.Libc.WindowsRawSocket(0x0000000000000744) active)

In [53]:
begin
    contt = Ref(true)
    while contt[]
        sock = Sockets.accept(server)
        @async begin
            data = readline(sock)
            print("Got request:\n", data, "\n")
            cmd = split(data, " ")[2][2:end]
            println(sock, "\nHTTP/1.1 200 OK\nContent-Type: text/html\n")
            contt[] = contt[] && (!occursin("stopme", data))
            if contt[]
                 println(sock, string("<html><body>", cmd, "=", 
                     eval(Meta.parse(cmd)), "</body></html>"))
            else
                println(sock,"<html><body>stopping</body></html>")
            end
            close(sock)
        end
    end
    println("Handling requests stopped")
end

Got request:
GET /3+4 HTTP/1.1
Got request:
GET /favicon.ico HTTP/1.1
Got request:
GET /30+40 HTTP/1.1
Got request:
GET /favicon.ico HTTP/1.1
Got request:
GET /stopme HTTP/1.1
Handling requests stopped
Got request:
GET /favicon.ico HTTP/1.1


<a class="anchor" id="multithreading"></a>
### Multithreading
---- [Return to table of contents](#toc) ---


In [54]:
Threads.nthreads()

4

In [55]:
function ssum(x)
    r, c = size(x)
    y = zeros(c)
    for i in 1:c
        for j in 1:r
            @inbounds y[i] += x[j, i]
        end
    end
    y
end

ssum (generic function with 1 method)

In [56]:
function tsum(x)
    r, c = size(x)
    y = zeros(c)
    Threads.@threads for i in 1:c
        for j in 1:r
            @inbounds y[i] += x[j, i]
        end
    end
    y
end


tsum (generic function with 1 method)

In [57]:
x = rand(1000,10000);

In [58]:
@time ssum(x)
@time ssum(x);

  0.030382 seconds (7.23 k allocations: 575.148 KiB, 58.13% compilation time)
  0.013528 seconds (2 allocations: 78.172 KiB)


In [59]:
@time tsum(x)
@time tsum(x);

  0.116192 seconds (44.37 k allocations: 3.113 MiB, 166.48% compilation time)
  0.005133 seconds (26 allocations: 81.266 KiB)


#### Locking mechanism for threads

In [60]:
function f_bad()
    x = 0
    Threads.@threads for i in 1:10^6
        x += 1
    end
    return x
end


f_bad (generic function with 1 method)

In [71]:
f_bad()

252309

In [74]:
using BenchmarkTools
function f_add()
    x = 0 
    for i in 1:10^9
        x += 1
    end
    x
end
@btime f_add() 

  1.400 ns (0 allocations: 0 bytes)


1000000000

In [81]:
x[]

150

In [82]:
function f_atomic()
    x = Threads.Atomic{Int}(0)
    Threads.@threads for i in 1:10^6
        Threads.atomic_add!(x, 1)
    end
    return x[]
end
f_atomic()

1000000

In [84]:
function f_bins()
    result = zeros(Int, Threads.nthreads())
    Threads.@threads for i in 1:10^6
        result[Threads.threadid()] += 1
    end
    return sum(result)
end
f_easy()

1000000

In [113]:
fetch(Threads.@spawn Threads.threadid())

3

In [89]:
t = Threads.@spawn begin sleep(8);3+3;end

Task (runnable) @0x000002be10cbc9c0

In [90]:
fetch(t)

6

In [91]:
function f_chunks()
    chunks = Iterators.partition(1:1000_000, 1000_000 ÷ Threads.nthreads())
    tasks = map(chunks) do chunk
        Threads.@spawn begin
            s = 0
            for i in chunk
                s += 1
            end
            s
        end
    end
    chunk_sums = fetch.(tasks)
    return sum(chunk_sums)
end

f_chunks (generic function with 1 method)

In [92]:
# this is good up till 16 threads
function f_reentrant()
    l = ReentrantLock()
    # create shared data
    x = 0    
    df = DataFrame()
    Threads.@threads for i in 1:10^6        
        # provide the code that does all computation
        # ....
        Threads.lock(l) do
            # provide the code that updates the shared data
            x += 1
            push!(df, ...)
        end
    end
    return x
end


f_reentrant (generic function with 1 method)

In [93]:
using DataFrames
stats = DataFrame()
for f in [f_bad, f_atomic, f_easy, f_chunks, f_reentrant]
    for i = 1:2
        value, elapsedtime  = @timed f()
        push!(stats, (f=string(f),i=i, value=value, timems=elapsedtime*1000))
    end
end
println(stats)


[1m10×4 DataFrame[0m
[1m Row [0m│[1m f           [0m[1m i     [0m[1m value   [0m[1m timems   [0m
     │[90m String      [0m[90m Int64 [0m[90m Int64   [0m[90m Float64  [0m
─────┼───────────────────────────────────────
   1 │ f_bad            1   250509   29.0828
   2 │ f_bad            2   250000   28.0122
   3 │ f_atomic         1  1000000   16.3139
   4 │ f_atomic         2  1000000   16.9462
   5 │ f_easy           1  1000000    0.0507
   6 │ f_easy           2  1000000    0.0176
   7 │ f_chunks         1  1000000  179.431
   8 │ f_chunks         2  1000000    0.2488
   9 │ f_reentrant      1  1000000  367.541
  10 │ f_reentrant      2  1000000  309.614


<a class="anchor" id="multiprocessing"></a>
### Multi-processing and distributed computing
---- [Return to table of contents](#toc) ---


In [115]:
using Distributed

This code adds 4 workers (and avoids adding more)

In [114]:
addprocs(max(0, 5-nprocs()));

In [116]:
workers()

4-element Vector{Int64}:
 2
 3
 4
 5

In [117]:
function s_rand()
    n = 10^4
    x = 0.0
    for i in 1:n
        x += sum(rand(10^4))
    end
    x / n
end
 
@time s_rand()
@time s_rand()


  0.706433 seconds (20.00 k allocations: 763.397 MiB, 13.87% gc time)
  0.710811 seconds (20.00 k allocations: 763.397 MiB, 14.33% gc time)


5000.073882932142

In [118]:
using Distributed
 
function p_rand()
    n = 10^4
    x = @distributed (+) for i in 1:n
        # more lines coul dbe here
        # the last line will be aggregated
        sum(rand(10^4))
    end
    x / n
end

@time p_rand()
@time p_rand()


  1.918265 seconds (465.10 k allocations: 31.231 MiB, 38.88% compilation time)
  0.192752 seconds (431 allocations: 24.094 KiB)


5000.144259169032

In [46]:
workers()'

1×4 adjoint(::Vector{Int64}) with eltype Int64:
 2  3  4  5

In [125]:
t = Distributed.@spawnat 5 myid()

Future(5, 1, 20, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (0, 3015316826112, 3015316838800)), nothing)

In [126]:
fetch(t)

5

In [47]:
fetch(@spawnat 3 4+3)

7

In [139]:
@everywhere using DataFrames
@everywhere function myf() 
    println("I am on worker ", myid())
    DataFrame(c=rand())
end
myf()

I am on worker 1


Row,c
Unnamed: 0_level_1,Float64
1,0.0767119


In [140]:
fetch(@spawnat 2 myf())

      From worker 2:	I am on worker 2


Row,c
Unnamed: 0_level_1,Float64
1,0.728635


In [49]:
a = nothing
try 
    fetch(@spawnat 4 myf())
catch e
    println(e)
end

RemoteException(4, CapturedException(UndefVarError(Symbol("#myf")), Any[(deserialize_datatype at Serialization.jl:1399, 1), (handle_deserialize at Serialization.jl:867, 1), (deserialize at Serialization.jl:814, 1), (handle_deserialize at Serialization.jl:874, 1), (deserialize at Serialization.jl:814 [inlined], 1), (deserialize_global_from_main at clusterserialize.jl:160, 1), (#5 at clusterserialize.jl:72 [inlined], 1), (foreach at abstractarray.jl:3097, 1), (deserialize at clusterserialize.jl:72, 1), (handle_deserialize at Serialization.jl:960, 1), (deserialize at Serialization.jl:814, 1), (handle_deserialize at Serialization.jl:871, 1), (deserialize at Serialization.jl:814, 1), (handle_deserialize at Serialization.jl:874, 1), (deserialize at Serialization.jl:814 [inlined], 1), (deserialize_msg at messages.jl:87, 1), (#invokelatest#2 at essentials.jl:892 [inlined], 1), (invokelatest at essentials.jl:889 [inlined], 1), (message_handler_loop at process_messages.jl:176, 1), (process_tcp_s

In [50]:
@everywhere function myf() 
    println("I am on worker ", myid())
    rand()
end
fetch(@spawnat 4 myf())

      From worker 4:	I am on worker 4


0.43466090764226784

#### A typical pattern for setting an intial state across workers

In [141]:
using Distributed
# assumes you have already done addprocs

@everywhere using Pkg
# This assumes that the package is in the same folder as the running code
@everywhere Pkg.activate(".")
@everywhere using Distributed, Random, DataFrames

@everywhere function calc(x, y)
    2x + y + rand()
end

@everywhere function init_worker()    
   Random.seed!(myid())
    # reading initial data from files or other actions
end

@sync for wid in workers()
    @async fetch(@spawnat wid init_worker())
end


[32m[1m  Activating[22m[39m project at `C:\AAABIBLIOTEKA\EuropeanCommission\2024_Julica_EC_Distributed_GPU\1_ParallelDistributed`


      From worker 4:	[32m[1m  Activating[22m[39m project at `C:\AAABIBLIOTEKA\EuropeanCommission\2024_Julica_EC_Distributed_GPU\1_ParallelDistributed`
      From worker 5:	[32m[1m  Activating[22m[39m project at `C:\AAABIBLIOTEKA\EuropeanCommission\2024_Julica_EC_Distributed_GPU\1_ParallelDistributed`
      From worker 3:	[32m[1m  Activating[22m[39m project at `C:\AAABIBLIOTEKA\EuropeanCommission\2024_Julica_EC_Distributed_GPU\1_ParallelDistributed`
      From worker 2:	[32m[1m  Activating[22m[39m project at `C:\AAABIBLIOTEKA\EuropeanCommission\2024_Julica_EC_Distributed_GPU\1_ParallelDistributed`


Typically results are collected to a `DataFrame`

In [142]:
data = @distributed (append!) for (i, j, k) = vec(collect(Iterators.product(1:4, 1:3, 1:2)))
    a = rand(1:499)
    b = rand(1:9)*1000
    c = calc(a, b)
    DataFrame(;i,j,k,a,b,c,procid = myid())
end

Row,i,j,k,a,b,c,procid
Unnamed: 0_level_1,Int64,Int64,Int64,Int64,Int64,Float64,Int64
1,1,1,1,278,1000,1556.74,2
2,2,1,1,175,6000,6350.9,2
3,3,1,1,67,8000,8134.54,2
4,4,1,1,455,7000,7910.49,2
5,1,2,1,163,5000,5326.79,2
6,2,2,1,127,6000,6254.37,2
7,3,2,1,357,2000,2714.47,3
8,4,2,1,217,6000,6434.57,3
9,1,3,1,238,9000,9476.88,3
10,2,3,1,95,6000,6190.5,3


In [147]:
@everywhere myfun(i) =  (;i, procid=myid(), c= calc(i,i) )

In [148]:
pmap(myfun, 1:10) |> DataFrame

Row,i,procid,c
Unnamed: 0_level_1,Int64,Int64,Float64
1,1,2,3.75133
2,2,4,6.72633
3,3,3,9.25065
4,4,5,12.8475
5,5,5,15.5972
6,6,4,18.7157
7,7,5,21.664
8,8,4,24.248
9,9,5,27.8401
10,10,4,30.9266
