/
interface.jl
171 lines (144 loc) · 3.9 KB
/
interface.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import Core.Compiler
abstract type AbstractThreadPool end
@deprecate pforeach(pool, fn::Function, itr) tforeach(pool, fn::Function, itr)
@deprecate pforeach(fn::Function, pool, itr) tforeach(fn::Function, pool, itr)
@deprecate pmap(pool, fn::Function, itr) tmap(pool, fn::Function, itr)
@deprecate pmap(fn::Function, pool, itr) tmap(fn::Function, pool, itr)
_detect_type(fn, itr) = Core.Compiler.return_type(fn, Tuple{eltype(itr)})
"""
tforeach(pool, fn::Function, itr)
tforeach(fn::Function, pool, itr)
Mimics `Base.foreach`, but launches the function evaluations onto the provided
pool to assign the tasks.
# Example
```
julia> pool = pwith(ThreadPools.LoggedQueuePool(1,2)) do pool
tforeach(pool, x -> println((x,Threads.threadid())), 1:8)
end;
(2, 2)
(1, 1)
(3, 2)
(5, 2)
(4, 1)
(6, 2)
(7, 1)
(8, 2)
julia> plot(pool)
```
"""
function tforeach(pool::AbstractThreadPool, fn::Function, itr)
tmap(pool, fn, itr)
nothing
end
tforeach(fn::Function, pool::AbstractThreadPool, itr) = tforeach(pool, fn, itr)
tforeach(pool::AbstractThreadPool, fn::Function, itr1, itrs...) = tforeach(pool, x -> fn(x...), zip(itr1, itrs...))
tforeach(fn::Function, pool::AbstractThreadPool, itr1, itrs...) = tforeach(pool, x -> fn(x...), zip(itr1, itrs...))
"""
tmap(pool, fn::Function, itr)
tmap(fn::Function, pool, itr)
Mimics `Base.map`, but launches the function evaluations onto the provided
pool to assign the tasks.
# Example
```
julia> pool = pwith(ThreadPools.LoggedQueuePool(1,2)) do pool
tmap(pool, 1:8) do x
println((x,Threads.threadid()))
end
end;
(2, 2)
(1, 1)
(3, 2)
(4, 1)
(5, 2)
(6, 1)
(7, 2)
(8, 1)
julia> plot(pool)
```
"""
tmap(fn::Function, pool::AbstractThreadPool, itr) = tmap(pool, fn, itr)
tmap(pool::AbstractThreadPool, fn::Function, itr1, itrs...) = tmap(pool, x -> fn(x...), zip(itr1, itrs...))
tmap(fn::Function, pool::AbstractThreadPool, itr1, itrs...) = tmap(pool, x -> fn(x...), zip(itr1, itrs...))
"""
pwith(fn::Function, pool) -> pool
Apply the functon `fn` to the provided pool and close the pool. Returns the
closed pool for any desired analysis or plotting.
# Example
```
julia> pwith(ThreadPools.QueuePool(1,2)) do pool
tforeach(pool, x -> println((x,Threads.threadid())), 1:8)
end;
(2, 2)
(1, 1)
(3, 2)
(4, 1)
(5, 2)
(6, 1)
(7, 2)
(8, 1)
```
Note in the above example, only two threads were used, as set by the
`QueuePool` setting.
"""
function pwith(fn::Function, pool)
fn(pool)
close(pool)
pool
end
"""
@pthreads pool
Mimic the `Base.Threads.@threads` macro, but uses the provided pool to
assign the tasks.
# Example
```
julia> pwith(ThreadPools.QueuePool(1,2)) do pool
@pthreads pool for x in 1:8
println((x,Threads.threadid()))
end
end;
(2, 2)
(3, 2)
(1, 1)
(4, 2)
(5, 1)
(6, 2)
(8, 2)
(7, 1)
```
"""
macro pthreads(pool, args...)
na = length(args)
if na != 1
throw(ArgumentError("wrong number of arguments in @pthreads"))
end
ex = args[1]
if !isa(ex, Expr)
throw(ArgumentError("need an expression argument to @pthreads"))
end
if ex.head === :for
if ex.args[1] isa Expr && ex.args[1].head === :(=)
index = ex.args[1].args[1]
range = ex.args[1].args[2]
body = ex.args[2]
return quote
tforeach($(esc(pool)), $(esc(range))) do $(esc(index))
$(esc(body))
end
end
else
throw(ArgumentError("nested outer loops are not currently supported by @pthreads"))
end
else
throw(ArgumentError("unrecognized argument to @pthreads"))
end
end
function Base.finalize(pool::AbstractThreadPool)
close(pool)
end
"""
Base.close(pool::AbstractThreadPool)
Closes the pool, shuts down any handlers and finalizes any logging activities.
"""
function Base.close(pool::AbstractThreadPool)
nothing
end