# Concurrent and Parallel programming #
```






```
###### Slides thanks to Amit Murthy (https://github.com/amitmurthy), from JuliaCon India 2015 ######

## Tasks in Julia ##

- Coroutines / Cooperative multitasking
  - Multi-threading is a WIP under development in the 0.5 branch
- Useful for 
  - IO bound tasks - Network, File, etc
  - Implementation of Timers, background tasks
- High level API - `@async`, `@sync`, `@schedule`

In [None]:
t0=time()
# Start an async task
t = @async begin
    sleep(2.0)
    return pi
end

# Control comes here immediately
println("Started ", t)

# wait for its completion
resp = wait(t)
println("Response : ", resp, " after ", time()-t0, " seconds")

In [None]:
# @sync waits for all enclosed async operations to complete
t0=time()
@sync begin
    for i in 1:100
        @async (sleep(1.0); print("$i.."))
    end
end
println("\n\n All tasks finished in ", time() - t0, " seconds")

In [None]:
# compute-bound or blocking ccalls do not yield
t0=time()
@sync begin
    for i in 1:5
        @async begin
            ccall(:Sleep, Cint, (Cint,), 1000.0)
            print("$i..")
        end
    end
end
println("\n\n All tasks finished in ", time() - t0, " seconds")

- `@schedule` schedules an expression to be run
  - like `@async` except: 
    - launched task is not waited on by enclosing `@sync` blocks
    - does not "localize" variables

In [None]:
a=1

# @async localizes variables
@async (sleep(2.0); println("@async : ", a))

# @schedule does not
@schedule (sleep(2.0); println("@schedule : ", a))

a="Hello";


##  Tasks - produce/consume ##

- `produce` and `consume` allow a producer task to "feed" one or more consumers
- `produce` blocks till a consumer removes a value
- `consume` blocks till the producer adds a value or exits


In [None]:
function producer(cnt)
    @schedule begin
        for i in 1:cnt
            produce(i)
        end
        println("Producer DONE!")
    end
end

function consumer(t, id)
    @schedule begin
        sleep(1.0)
        while !istaskdone(t)
            v = consume(t)
            (v != nothing) && println("consumer $id ===> $v")
        end
        println("consumer $id DONE!")
    end
end

# start a producer task
t=producer(6);

# start consumers
consumer(t, 1);
consumer(t, 2);
consumer(t, 3);

In [None]:
# start a producer task
t=producer(6);

for v in t
    println("consumed $v")
end

## Channels ##

- Another means of inter-task communication
- Is type-aware and has a size
- API 
  - `put!`
  - `take!`
  - `fetch`
  - `isready`
  - `wait`

In [None]:
c=Channel{Int64}(100)
for i in 1:6
    put!(c, i)
end

@schedule begin 
    for v in c
        println(v)
    end
    println("Consumer task DONE!")
end;


In [None]:
close(c);

## Multi-processing ##

- Leverge multiple cores
- Distribute across machines
- Remote function execution as opposed to message passing


## @parallel for ##
- distrubutes the range across workers and executes in parallel 

In [None]:
nprocs() > 1 && rmprocs(workers(); waitfor=0.5)
addprocs(4)
@parallel (+) for i in 1:100
    rand(Bool)
end
@time @parallel (+) for i in 1:10^8
    rand(Bool)
end

## pmap ##
- distributed `map` function

In [None]:
nprocs() > 1 && rmprocs(workers(); waitfor=0.5)
addprocs(4)
files = filter(x->endswith(x, ".jl"),
    readdir(joinpath(JULIA_HOME,Base.DATAROOTDIR,"julia","base")))
# compilation run
@everywhere using SHA
pmap(x->begin
        path=joinpath(JULIA_HOME,Base.DATAROOTDIR,"julia","base",x)
        sha512(readall(path))
    end, files);

# timed run
@time pmap(x->begin
        path=joinpath(JULIA_HOME,Base.DATAROOTDIR,"julia","base",x)
        # simulate compute time by calculating SHA512 a few times
        for i in 1:5
            sha512(readall(path))
        end
        sha512(readall(path))
    end, files)

## distributing work with remote channels ##

In [None]:
nprocs() > 1 && rmprocs(workers(); waitfor=0.5)
addprocs(4)

jobs_c = RemoteRef(()->Channel(100))
results_c = RemoteRef(()->Channel(100))

In [None]:
# define function on all workers
@everywhere function do_some_work(jobs_c, results_c)
    println("Worker task started.")
    while true
        try
            rqst = take!(jobs_c)
            if rqst == :EXIT
                break
            else
                sleep(rqst)
                put!(results_c, (myid(), rqst))
            end
        catch e
            println(e)
        end
    end
    println("DONE!")
end

In [None]:
# start the worker tasks....
for p in workers()
    remotecall(p, do_some_work, jobs_c, results_c)
end

In [None]:
# adding jobs....
jobs = rand(1:10, 30)
for i in jobs
    put!(jobs_c, i)
end

# Have the worker tasks exit at the end
for i in 1:nworkers()
    put!(jobs_c, :EXIT)
end

In [None]:
for i in 1:length(jobs)
    (wrkr, data) = take!(results_c) 
    println("worker $wrkr : ", data)
end

## More examples ##

In [None]:
nprocs() > 1 && rmprocs(workers(); waitfor=0.5)
addprocs(4)

# execute expression on all processors
@everywhere println(myid())

In [None]:
rr = remotecall(workers()[1], ()->(sleep(1.0); pi))
isready(rr)

In [None]:
fetch(rr)

In [None]:
take!(rr)

In [None]:
isready(rr)

## Other calls ##
- `remotecall_fetch` blocks and returns with the result of function execution
- `put!(rr, v)` assigns a value to the reference. Blocks if reference is full
- `@spawnat pid expr` executes expression on worker `pid`
- `@spawn expr` executes expression on the next worker. Cycles through all workers

In [None]:
nprocs() > 1 && rmprocs(workers(); waitfor=0.5)
addprocs(4)

In [None]:
# @spawn cycles through all workers
fetch(@spawn myid())

In [None]:
rr = RemoteRef(()->Channel{AbstractString}(2), workers()[1])
put!(rr, "Hello")

In [None]:
put!(rr, 2)

In [None]:
put!(rr, "World")

In [None]:
# should be ready
isready(rr)

In [None]:
# take the first element
take!(rr)

In [None]:
# and then the next
take!(rr)

In [None]:
# should not be ready
isready(rr)

- RemoteRefs can be serialized across workers

In [None]:
p1 = workers()[1]

In [None]:
p2 = workers()[2]

In [None]:
rr = RemoteRef(p1)

In [None]:
put!(rr, "put! from $(myid())")

In [None]:
remotecall_fetch(p2, r->(println(take!(r)); put!(r, "put! from $(myid())"); nothing), rr)

In [None]:
fetch(rr)