In [1]:
import Base.Threads: AbstractLock, TatasLock, lock!, trylock!, unlock!

In [55]:
macro with_lock(lock, body)
    quote
        lock!($lock)
        try
            $body
        finally
            unlock!($lock)
        end
    end
end
macro try_with_lock(lock, successbody, failbody)
    quote
        if trylock!($lock) == 0 #Trylock returns 0 on true.
            try
                $successbody
            finally
                unlock!($lock)
            end
        else
            $failbody
        end
    end
end
    


@try_with_lock (macro with 1 method)

In [90]:
q= CircQueue{Int64}(2)
@show enqueue!(q, 20)
lock!(q.lock)
@show  try_dequeue!(q)
@show q

unlock!(q.lock)

enqueue!(q,20) = CircQueue{Int64}([140484525436752,140484483817360,20],3,2,Base.Threads.TatasLock(Base.Threads.Atomic{Int64}(0)))
try_dequeue!(q) = Nullable{Int64}()
q = CircQueue{Int64}([140484525436752,140484483817360,20],3,2,Base.Threads.TatasLock(Base.Threads.Atomic{Int64}(1)))


0

In [92]:
@show  try_dequeue!(q)
@show q

LoadError: LoadError: QueueEmptyError()
while loading In[92], in expression starting on line 218

In [78]:
q

CircQueue{Int64}([140484467794224,20,20],3,1,Base.Threads.TatasLock(Base.Threads.Atomic{Int64}(0)))

In [None]:
enqueue!(q,10)
lock!(q.lock)
@test try_enqueue!(q, 20) == false
@test try_dequeue!(q).value == Nullable{Int64}()
unlock!(q.lock)
@test dequeue!(q) == 10 

In [56]:
type CircQueue{T}
    buffer::Vector{T}
    front::Int64
    back::Int64
    lock::TatasLock
    
    CircQueue(len::Int64) = new(Vector{T}(len+1),len+1,len+1,TatasLock())
end

immutable QueueFullError <: Exception
end
immutable QueueEmptyError <: Exception
end


LoadError: LoadError: invalid redefinition of constant CircQueue
while loading In[56], in expression starting on line 1

In [181]:
wrap_index(ii::Int64, len) = mod(ii - 1, len) + 1
wrap_index(ii::Int64, q::CircQueue) = wrap_index(ii, length(q.buffer))


function _empty(q::CircQueue)
    q.front==q.back 
end

function _full(q::CircQueue)
    q.front==wrap_index(q.back-1 ,q)
end

function _enqueue!{T}(q::CircQueue{T}, v::T)
    if _full(q)
        throw(QueueFullError())
    end
    q.buffer[q.back] = v
    q.back = wrap_index(q.back-1, q)
    q
end

function enqueue!{T}(q::CircQueue{T}, v::T)
    @with_lock(q.lock, _enqueue!(q,v))
end

function try_enqueue!{T}(q::CircQueue{T}, v::T) ::Bool
    @try_with_lock(q.lock, (_enqueue!(q,v); true), false)
end


function _dequeue!{T}(q::CircQueue{T}) :: T
        if _empty(q)
        throw(QueueEmptyError())
    end
    len = length(q.buffer)
    
    ret = q.buffer[q.front]
    q.front=wrap_index(q.front-1, q)   
    ret
end

function dequeue!{T}(q::CircQueue{T}) ::T
    @with_lock(q.lock, _dequeue!(q))
end

function try_dequeue!{T}(q::CircQueue{T}) ::Nullable{T}
    @try_with_lock(q.lock, Nullable{T}(_dequeue!(q)), Nullable{T}())
end

try_dequeue! (generic function with 2 methods)

In [94]:
using Base.Test

In [109]:
q= CircQueue{Int64}(20)
enqueue!(q, 10)
@test dequeue!(q) == 10 

q= CircQueue{Int64}(5)
enqueue!(q, 10)
enqueue!(q, 20)
enqueue!(q, 30)
@test dequeue!(q) == 10 
@test dequeue!(q) == 20 
@test dequeue!(q) == 30


q= CircQueue{Int64}(5)
enqueue!(q, 10)
enqueue!(q, 20)
@test dequeue!(q) == 10 
enqueue!(q, 30)
@test dequeue!(q) == 20 
@test dequeue!(q) == 30

q=CircQueue{Int64}(2)
@test_throws(QueueEmptyError, dequeue!(q))

q=CircQueue{Int64}(2)
enqueue!(q, 10)
enqueue!(q, 20)
@test_throws(QueueFullError, enqueue!(q, 30))


q=CircQueue{Int64}(10)
@test try_enqueue!(q, 20) == true
@test get(try_dequeue!(q)) == 20
enqueue!(q,10)
lock!(q.lock)
@test try_enqueue!(q, 20) == false
@test isnull(try_dequeue!(q))
unlock!(q.lock)
@test dequeue!(q) == 10 


q=CircQueue{Int64}(3)
enqueue!(q, 10)
enqueue!(q, 20)
enqueue!(q, 30)
#It is now full
lock!(q.lock)
@test try_enqueue!(q, 20) == false
unlock!(q.lock)

0

In [179]:
function unbalanced_performance_test()
    len = 1000
    srand(1)
    rets = Vector{Float64}(len)
    for ii in 1:len
        dim = rand(2:200)
        facts = rand(dim,dim)\rand(dim)
        rets[ii]=facts[end÷2]
    end
    rets
end

gc()
a=@time unbalanced_performance_test()
println("---")
gc()
b=@time unbalanced_performance_test()
println("---")
gc()
c=@time unbalanced_performance_test()

a == b == c

  1.516305 seconds (18.30 k allocations: 217.259 MB, 1.59% gc time)
---
  8.336642 seconds (14.60 k allocations: 217.097 MB, 1.37% gc time)
---
  9.398417 seconds (14.60 k allocations: 217.097 MB, 1.52% gc time)


true

In [178]:
function unbalanced_performance_test2()
    len = 1000
    srand(1)
    rets = Vector{Float64}(len)
    for ii in 1:len
        dim = rand(2:200)
        facts = (Matrix{Float64}(dim,dim)+eye(dim)) \Vector{Float64}(dim)
        rets[ii]=facts[end÷2]
    end
    rets
end

gc()
a=@time unbalanced_performance_test2()
println("---")
gc()
b=@time unbalanced_performance_test2()
println("---")
gc()
c=@time unbalanced_performance_test2()

a == b == c

LoadError: LoadError: Base.LinAlg.SingularException(111)
while loading In[178], in expression starting on line 184

In [None]:
function _ws_threadsfor(iter,lbody)
    fun = gensym("_ws_threadsfor")
    lidx = iter.args[1]         # index
    range = iter.args[2]
    max_work = ceil(length(range)/nthreads())
    work_queues = [CircQueue{eltype(range)}(max_work) for tid in 1:nthreads()]
    tid=1
    for lidx_val in range
        tid=wrap_index(tid+1, length(work_queues))
        enqueue!(work_queues[tid],lidx_val) #I'd rather be loading the work after it is already started but can't
    end
    
    quote
        function $fun()
            tid = threadid()
            w_queues = $(esc(work_queues)) 
            #WRITE ME 
            try
            # run this thread's iterations
            
                local $(esc(lidx)) = Base.unsafe_getindex(r,i)
                $(esc(lbody))
            end
        end
        ccall(:jl_threading_run, Void, (Any,), Core.svec($fun))
    end
end

macro ws_threads(args...)
    na = length(args)
    if na != 1
        throw(ArgumentError("wrong number of arguments in @threads"))
    end
    ex = args[1]
    if !isa(ex, Expr)
        throw(ArgumentError("need an expression argument to @threads"))
    end
    if is(ex.head, :for)
        return _ws_threadsfor(ex.args[1],ex.args[2])
    else
        throw(ArgumentError("unrecognized argument to @threads"))
    end
end

In [None]:
;git add "WorkStealing.ipynb"

In [None]:
type Counter
  count
end

function docount(counter::Counter, id, iobuff)
  for i = 1:10
    println(iobuff, @sprintf("%f",time()),"\t $id: 1 i=$i counter.count=$(counter.count)")
    println(iobuff,@sprintf("%f",time()), "\t\t 2 $id: i=$i counter.count=$(counter.count)")
    counter.count += 1
    println(iobuff,@sprintf("%f",time()),"\t\t\t 3 $id: i=$i counter.count=$(counter.count)")
    if counter.count == 15
            println(iobuff, @sprintf("%f",time()),"\t $id terminating")
      return
    end
  end
end

function docount1(counter::Counter, id, iobuff)
  for i = 1:10
    println(iobuff, @sprintf("%f",time()),"\t $id: 1 i=$i counter.count=$(counter.count)")
    println(iobuff,@sprintf("%f",time()), "\t\t 2 $id: i=$i counter.count=$(counter.count)")
    counter.count += 1
    println(iobuff,@sprintf("%f",time()),"\t\t\t 3 $id: i=$i counter.count=$(counter.count)")
    if counter.count == 15
            println(iobuff, @sprintf("%f",time()),"\t $id terminating")
      return
    end
  end
end

counter = Counter(0)

for a in 1:100
    iobuff_$a = IOBuffer()
    
end


ll = TatasLock()
@sync begin
    @async @with_lock(ll,docount(counter,"a",iobuff_a))
    @async @with_lock(ll,docount(counter,"b",iobuff_b))
end

@show(counter.count)
notes_a = split( takebuf_string(iobuff_a),"\n" )
notes_b = split( takebuf_string(iobuff_b),"\n" )
println(join(sort([notes_a; notes_b]),"\n"))