
Julia is not fully thread-safe yet.

# Julia Tasks (aka Coroutines) 

## Channels

Channels can be quite useful to pass data between running tasks, particularly those involving I/O operations.



In [54]:
@noinline function inner(x, y)
    s = zero(eltype(x))
    for i=eachindex(x)
        @inbounds s += x[i]*y[i]
    end
    return s
end

@noinline function innersimd(x, y)
    s = zero(eltype(x))
    @simd for i = eachindex(x)
        @inbounds s += x[i] * y[i]
    end
    return s
end

function timeit(n, reps)
    x = rand(Float32, n)
    y = rand(Float32, n)
    s = zero(Float64)
    time = @elapsed for j in 1:reps
        s += inner(x, y)
    end
    println("GFlop/sec        = ", 2n*reps / time*1E-9)
    time = @elapsed for j in 1:reps
        s += innersimd(x, y)
    end
    println("GFlop/sec (SIMD) = ", 2n*reps / time*1E-9)
end

timeit(1000, 1000)

GFlop/sec        = 1.9827736624208876
GFlop/sec (SIMD) = 14.363172824877017


In [55]:
function init!(u::Vector)
    n = length(u)
    dx = 1.0 / (n-1)
    @fastmath @inbounds @simd for i in 1:n #by asserting that `u` is a `Vector` we can assume it has 1-based indexing
        u[i] = sin(2pi*dx*i)
    end
end

function deriv!(u::Vector, du)
    n = length(u)
    dx = 1.0 / (n-1)
    @fastmath @inbounds du[1] = (u[2] - u[1]) / dx
    @fastmath @inbounds @simd for i in 2:n-1
        du[i] = (u[i+1] - u[i-1]) / (2*dx)
    end
    @fastmath @inbounds du[n] = (u[n] - u[n-1]) / dx
end

function mynorm(u::Vector)
    n = length(u)
    T = eltype(u)
    s = zero(T)
    @fastmath @inbounds @simd for i in 1:n
        s += u[i]^2
    end
    @fastmath @inbounds return sqrt(s/n)
end

function main()
    n = 2000
    u = Vector{Float64}(undef, n)
    init!(u)
    du = similar(u)

    deriv!(u, du)
    nu = mynorm(du)

    @time for i in 1:10^6
        deriv!(u, du)
        nu = mynorm(du)
    end

    println(nu)
end

main()

  0.859857 seconds
4.443986180758243


In [1]:
using Distributed
workers()

1-element Array{Int64,1}:
 1

In [2]:
addprocs(3)

3-element Array{Int64,1}:
 2
 3
 4

In [3]:
workers() # proc 1 is now the master and is not a worker anymore

3-element Array{Int64,1}:
 2
 3
 4

In [4]:
psize = 8
@distributed for prank=1:psize
    println(myid()); # return worker id
end;

      From worker 4:	4
      From worker 2:	2
      From worker 3:	3
      From worker 3:	3
      From worker 3:	3
      From worker 4:	4
      From worker 2:	2
      From worker 2:	2


In [7]:
# Given Channels c1 and c2,
jobs = Channel(32) # can hold a maximum of 32 objects of any type.
results = Channel(32)

# and a function `slow_double` which reads items from from c1, 
# doubles the item read, wait 1 second
# and writes a result to c2,
function slow_double()
    while true
        data = take!(jobs)
        sleep(1)
        put!(results, (myid(),data*2))    # write out result
    end
end

function make_jobs(n)
    for i in 1:n
        put!(jobs, i)
    end
end

n = 8

@async make_jobs(n) # feed the jobs channel with "n" jobs

# we can schedule `n` instances of `foo` to be active concurrently.
for _ in 1:n
    @async slow_double()
end

@elapsed while n > 0 # print out results
    global n
    job_id, result = take!(results)
    println("$job_id finished data = $result")
    n = n - 1
end

1 finished data = 2
1 finished data = 4
1 finished data = 6
1 finished data = 8
1 finished data = 10
1 finished data = 12
1 finished data = 14
1 finished data = 16


1.04675385

In [52]:
@everywhere using SharedArrays

@everywhere function do_work(x)
    println(x)
    return x * 2
end

n = 8
S = SharedArray{Int,1}(psize, init = S -> S[localindices(S)] =localindices(S))

@distributed for i=1:n
    S[i] = do_work(S[i])
end

Task (runnable) @0x0000000111e31b10

      From worker 3:	4
      From worker 3:	5
      From worker 3:	6
      From worker 2:	1
      From worker 2:	2
      From worker 2:	3
      From worker 4:	7
      From worker 4:	8


In [53]:
for i = 1:5
    println(double(i))
end

1
2
2
4
3
6
4
8
5
10


In [12]:
workers()

3-element Array{Int64,1}:
 2
 3
 4

In [3]:
function timestep(b::Vector{T}, a::Vector{T}, Δt::T) where T
    @assert length(a)==length(b)
    n = length(b)
    b[1] = 1                            # Boundary condition
    for i=2:n-1
        b[i] = a[i] + (a[i-1] - T(2)*a[i] + a[i+1]) * Δt
    end
    b[n] = 0                            # Boundary condition
end

function heatflow(a::Vector{T}, nstep::Integer) where T
    b = similar(a)
    for t=1:div(nstep,2)                # Assume nstep is even
        timestep(b,a,T(0.1))
        timestep(a,b,T(0.1))
    end
end

heatflow(zeros(Float32,10),2)           # Force compilation
for trial=1:6
    a = zeros(Float32,1000)
    set_zero_subnormals(iseven(trial))  # Odd trials use strict IEEE arithmetic
    @time heatflow(a,1000)
end

  0.002301 seconds (1 allocation: 4.063 KiB)
  0.001591 seconds (1 allocation: 4.063 KiB)
  0.002324 seconds (1 allocation: 4.063 KiB)
  0.001675 seconds (1 allocation: 4.063 KiB)
  0.002320 seconds (1 allocation: 4.063 KiB)
  0.001584 seconds (1 allocation: 4.063 KiB)
