### Mechanizmy współbieżności i równoległości w Julii 

Kanały komunikacyjne pomiędzy zadaniami współbieżnymi (tutaj w ramach jednego procesu)

In [1]:
# Definijemy kanał komunikacyjny
c= Channel(32)

# funkcja consumer czyta dane z kanalu komunikacyjnego 
function consumer()
    for i in 1:3
        data = take!(c)
        println("Konsumuje ", data)
    end    
end

#uruchamiam funkcje consumer() jako współbieżne zadanie (Task)
consumertask= @async consumer()


Task (runnable) @0x00007f94a08a0a90

In [2]:
# pętla producenta
for i in 1:3
    put!(c,i) 
end

Konsumuje 1
Konsumuje 2
Konsumuje 3


### Programowanie równoległe

In [3]:
using Distributed
if nprocs() == 1
    addprocs(4)
end
# wszystkie procesy
nprocs() |> println
# procesy workerów
# tablica identyfikatorów
workers() |> println
# ilość
nworkers()|> println

5
[2, 3, 4, 5]
4


In [4]:
# uruchom funkcje na wybranym workerze, zwraca future
# future = "pudełko" na wynik, które wypełni się, jak wynik będzie gotowy
# argumenty:
# 1. funkcja jaka się ma wykonać
# 2. identyfikator workera, 
# 3. parametry funkcji
ref = remotecall(myid,workers()[1])

Future(2, 1, 6, nothing)

In [5]:
# czekamy na wynik i wypełniamy pudełko
_id = fetch(ref)
println(_id)

2


In [6]:
@time ref = remotecall(x->(sleep(x);10x), workers()[1], 2)
@time fetch(ref) |> println

  0.462858 seconds (854.22 k allocations: 42.966 MiB, 3.28% gc time)
20
  2.442863 seconds (325 allocations: 15.422 KiB)


In [7]:
# przekazywanie RemoteRef między procesami
# makro @spawnat oblicza na workerze, którego identyfikator jest podany w pierwszym argumencie
# wyrażenie podane w  drugim argumencie 
# makro - wygodniejsze w użyciu niż funkcja

r = remotecall(rand, 2, 2, 2)
s = @spawnat 2 1 .+ fetch(r)

fetch(s)

2×2 Array{Float64,2}:
 1.53511  1.3371 
 1.14919  1.57484

In [8]:
# remotecall i fetch w jednym
remotecall_fetch(getindex, 2, r, 1, 1)

0.5351106303820665

In [9]:
# makro @spawn wybiera zdalny proces. 
# Druga linijka wykona się na tym samym procesie co pierwsza, 
# aby nie trzeba było przesyłać danych
# można definiować własne makra typu @spawn
# raz ściągnięty wynik future jest przechowywany lokalnie (cache), 
# wartość przechowywana zdalnie jest kasowana.
r = @spawn rand(2,2)
s = @spawn 1 .+ fetch(r)
fetch(s)

2×2 Array{Float64,2}:
 1.50855  1.18484
 1.78214  1.65356

Przykład użycia zdalnych kanałow komunikacji (Remote Channels)

In [10]:
# zdalny kanał przekazujący numery zadań
const jobs = RemoteChannel(()->Channel{Int}(32));

In [11]:
# zdalny kanał przekazujący wyniki
const results = RemoteChannel(()->Channel{Tuple}(32));

In [12]:
# funkcja wykonująca pewną (symulowaną sleepem) "pracę"
 @everywhere function do_work(jobs, results) # define work function everywhere
           while true   
               job_id = take!(jobs)
               exec_time = rand()
               sleep(exec_time) # simulates elapsed time doing actual work
               put!(results, (job_id, exec_time, myid()))
           end
  end


In [13]:
# funkcja wkładająca  do kanału numery "prac"

function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
end;

In [14]:
n = 12;

In [15]:
@async make_jobs(n);

In [16]:
# starujemy zdalne zadania

for p in workers() # start tasks on the workers to process requests in parallel
           remote_do(do_work, p, jobs, results)
end

In [17]:
# wypisujemy wyniki

while n > 0 # print out results
           job_id, exec_time, where = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
           n = n - 1
end

3 finished in 0.2 seconds on worker 2
4 finished in 0.27 seconds on worker 4
1 finished in 0.55 seconds on worker 3
5 finished in 0.34 seconds on worker 2
6 finished in 0.3 seconds on worker 4
2 finished in 0.76 seconds on worker 5
8 finished in 0.3 seconds on worker 3
10 finished in 0.1 seconds on worker 5
9 finished in 0.2 seconds on worker 4
12 finished in 0.14 seconds on worker 5
7 finished in 0.87 seconds on worker 2
11 finished in 0.71 seconds on worker 3


In [18]:
# przesyłamy wiadomośc o zamknięciu do zdalnych procesów
finalize(jobs)
finalize(results)

### Zadanie

* Zaimplementować obliczanie zbioru Julii z poprzedniego laboratorium    za pomocą mechanizmu przydziału prac do workerów. Ilość zadań powinna być o rząd wielkości większa od liczby workerów
* Zmierzyć przyspieszenie i efektywność algorytmu. 

