/
logstaticpool.jl
75 lines (60 loc) · 2.05 KB
/
logstaticpool.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#############################
# Internal Structures
#############################
struct LoggedStaticPool <: AbstractThreadPool
tids :: Vector{Int}
lck :: ReentrantLock
recs :: Vector{ThreadRecord}
t0 :: UInt64
log :: ThreadLog
end
#############################
# Constructors / Finalizer
#############################
"""
LoggedStaticPool(init_thrd=1, nthrds=Threads.nthreads())
The main LoggedStaticPool object.
"""
function LoggedStaticPool(init_thrd::Integer=1, nthrds::Integer=Threads.nthreads())
thrd0 = min(init_thrd, Threads.nthreads())
thrd1 = min(thrd0+nthrds-1, Threads.nthreads())
return LoggedStaticPool(thrd0:thrd1, ReentrantLock(), ThreadRecord[], time_ns(), ThreadLog())
end
function Base.close(pool::LoggedStaticPool)
_recordstolog!(pool.log, pool.recs, pool. t0)
end
#############################
# ThreadPool Interface
#############################
function tmap(fn, pool::LoggedStaticPool, itr)
data = collect(itr)
applicable(fn, first(data)) || error("function can't be applied to iterator contents")
N = length(data)
sizehint!(pool.recs, N)
result = Array{_detect_type(fn, data), ndims(data)}(undef, size(data))
nthrds = length(pool.tids)
njobs = div(N,nthrds)
remjobs = N % nthrds
len(ind) = max(0, njobs + (nthrds-ind+1 <= remjobs ? 1 : 0))
finish(ind) = sum([len(x) for x in 1:ind])
start(ind) = finish(ind)-len(ind)+1
_fn(ind) = begin
if finish(ind) > 0
tid = Threads.threadid()
for i in start(ind):finish(ind)
lock(pool.lck)
push!(pool.recs, (i, tid, true, time_ns()))
unlock(pool.lck)
@inbounds result[i] = fn(Base.unsafe_getindex(data, i))
lock(pool.lck)
push!(pool.recs, (i, tid, false, time_ns()))
unlock(pool.lck)
end
end
end
Threads.@threads for tid in 1:Threads.nthreads()
ind = findfirst(t->t==tid, pool.tids)
isnothing(ind) || _fn(ind)
end
return result
end