/
logger.jl
444 lines (369 loc) · 13.6 KB
/
logger.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
"""
struct TaskLog
Logs the execution trace of a [`DataFlowTask`](@ref).
## Fields:
- `tag` : task id in DAG
- `time_start` : time the task started running
- `time_finish` : time the task finished running
- `tid` : thread on which the task ran
- `inneighbors` : vector of incoming neighbors in DAG
- `label` : a string used for displaying and/or postprocessing tasks
"""
struct TaskLog
tag::Int64
time_start::UInt64
time_finish::UInt64
tid::Int
inneighbors::Vector{Int64}
label::String
end
tag(t::TaskLog) = t.tag
label(t::TaskLog) = t.label
task_duration(t::TaskLog) = t.time_finish - t.time_start
"""
struct InsertionLog
Logs the execution trace of a [`DataFlowTask`](@ref) insertion.
## Fields:
- `time_start` : time the insertion began
- `time_finish` : time the insertion finished
- `taskid` : the task it is inserting
- `tid` : the thread on which the insertion is happening
"""
struct InsertionLog
time_start::UInt64
time_finish::UInt64
gc_time::Int64
taskid::Int
tid::Int
end
"""
struct LogInfo
Contains informations on the program's progress. For thread-safety, the
`LogInfo` structure uses one vector of [`TaskLog`](@ref) per thread.
You can visualize and postprocess a `LogInfo` using `GraphViz.Graph` and
`Makie.plot`.
"""
struct LogInfo
tasklogs::Vector{Vector{TaskLog}}
insertionlogs::Vector{Vector{InsertionLog}}
function LogInfo()
# internal constructor to guarantee that there is always one vector per
# thread to do the logging
return new(
[Vector{TaskLog}() for _ in 1:Threads.nthreads()],
[Vector{InsertionLog}() for _ in 1:Threads.nthreads()],
)
end
end
#TODO: show more relevant information
function Base.show(io::IO, l::LogInfo)
n = nbtasknodes(l)
return println(io, "LogInfo with $n logged task", n == 1 ? "" : "s")
end
"""
const LOGINFO::Ref{LogInfo}
Global `LogInfo` being used to record the events. Can be changed using
[`_setloginfo!`](@ref).
"""
const LOGINFO = Ref{Maybe{LogInfo}}()
"""
_setloginfo!(l::LogInfo)
Set the active logger to `l`.
"""
function _setloginfo!(l::Maybe{LogInfo})
return LOGINFO[] = l
end
"""
_getloginfo()
Return the active logger.
"""
function _getloginfo()
return LOGINFO[]
end
function haslogger()
return !isnothing(_getloginfo())
end
#= Utility function to get number of task nodes of the logger =#
function nbtasknodes(logger)
return sum(length(threadlog) for threadlog in logger.tasklogs)
end
"""
with_logging!(f,l::LogInfo)
Similar to [`with_logging`](@ref), but append events to `l`.
"""
function with_logging!(f, l::LogInfo)
# taskgraph must be empty before starting, or we may log dependencies on
# tasks that are not in the logger
tg = get_active_taskgraph()
if !isempty(tg)
msg = """logging requires an empty taskgraph to start. Waiting for
pending tasks to be completed...
"""
@warn msg
wait(tg)
@warn "done."
end
# check if logger is already active, switch to new logger, record, and
# switch back
_log_mode() == true ||
error("you must run `enable_log()` to activate the logger before profiling")
old_logger = _getloginfo()
_setloginfo!(l)
res = f()
_setloginfo!(old_logger)
return res, l
end
"""
with_logging(f) --> f(),loginfo
Execute `f()` and log `DataFlowTask`s into the `loginfo` object.
## Examples:
```jldoctest; output = false
using DataFlowTasks
A,B = zeros(2), ones(2);
out,loginfo = DataFlowTasks.with_logging() do
@dspawn fill!(@W(A),1)
@dspawn fill!(@W(B),1)
res = @dspawn sum(@R(A)) + sum(@R(B))
fetch(res)
end
#
out
# output
4.0
```
See also: [`LogInfo`](@ref)
"""
function with_logging(f)
l = LogInfo()
return with_logging!(f, l)
end
"""
DataFlowTasks.@log expr --> LogInfo
Execute `expr` and return a [`LogInfo`](@ref) instance with the recorded events.
The `Logger` waits for the current taskgraph (see [`get_active_taskgraph`](@ref)
to be empty before starting.
!!! warning
The returned `LogInfo` instance may be incomplete if `block` returns before all
`DataFlowTasks` spawened inside of it are completed. Typically `expr`
should `fetch` the outcome before returning to properly benchmark the code
that it runs (and not merely the tasks that it spawns).
See also: [`with_logging`](@ref), [`with_logging!`](@ref)
"""
macro log(ex)
quote
f = () -> $(esc(ex))
out, loginfo = with_logging(f)
loginfo
end
end
# These implement the required interface to consider a Logger as a graph and
# compute its longest path
Base.isless(t1::TaskLog, t2::TaskLog) = isless(t1.tag, t2.tag)
function topological_sort(l::LogInfo)
tlogs = Iterators.flatten(l.tasklogs) |> collect
return sort!(tlogs)
end
intags(t::TaskLog) = t.inneighbors
weight(t::TaskLog) = task_duration(t) * 1e-9
#= Contains data to plot the Gantt Chart (parallel trace).
It's a Struct of Array paradigm where all the entries i
of all the arrays tells us information about a same task. =#
# """
# struct Gantt
# Structured used ton produce a [`Gantt
# chart`](https://en.wikipedia.org/wiki/Gantt_chart) of the parallel traces of the
# tasks recorded in a [`LogInfo`](@ref) instance. This structure is used when
# plotting the parallel trace with `Makie.plot`.
# See [`extractloggerinfo`](@ref) for more information on how to create an `Gantt`
# instance.
# """
struct Gantt
threads::Vector{Int64} # Thread on wich the task ran
jobids::Vector{Int64} # Task type
starts::Vector{Float64} # Start time
stops::Vector{Float64} # End time
function Gantt()
threads = Vector{Int64}()
jobids = Vector{Int64}()
starts = Vector{Float64}()
stops = Vector{Float64}()
return new(threads, jobids, starts, stops)
end
end
#= Contains additional post-processed informations on the LogInfo =#
# """
# struct ExtendedLogInfo
# Appends informations to [`LogInfo`](@ref) to make it easier to visualize and
# exctract useful information.
# See [`extractloggerinfo`](@ref) for more information on how to create an
# `ExtendedLogInfo` instance.
# """
mutable struct ExtendedLogInfo
firsttime::Float64 # First measured time
lasttime::Float64 # Last measured time
computingtime::Float64 # Cumulative time spent computing
insertingtime::Float64 # Cumulative time spent inserting
othertime::Float64 # Cumulative other time
t∞::Float64 # Inf. proc time
t_nowait::Float64 # Time if we didn't wait at all
timespercat::Vector{Float64} # timespercat[i] cumulative time for category i
categories::Vector{Pair{String,Regex}} # (label => regex) pairs for categories
path::Vector{Int64} # Critical Path
function ExtendedLogInfo(logger::LogInfo, categories, path)
(firsttime, lasttime) = timelimits(logger) .* 10^(-9)
othertime = (lasttime - firsttime) * length(logger.tasklogs)
normalize_category(x) = x
normalize_category(x::String) = (x => Regex(x))
return new(
firsttime,
lasttime,
0,
0,
othertime,
0,
0,
zeros(length(categories) + 1),
normalize_category.(categories),
path,
)
end
end
"""
describe(loginfo::LogInfo; categories = String[])
describe(io, loginfo::LogInfo; categories = String[])
Analyses the information contained in `loginfo` and displays a summary on `io` (`stdout` by default).
Passing a `categories` argument allows grouping tasks by category. The
`categories` can be a vector of `String`s or a vector of `String => Regex`
pairs, which will be matched against the tasks' labels.
"""
describe(loginfo::LogInfo; categories = String[]) =
describe(stdout, loginfo; categories = categories)
function describe(io::IO, loginfo::LogInfo; categories = String[])
extloginfo, _ = extractloggerinfo(loginfo; categories = categories)
# format the output below using printf style. Right align so that all
# numbers are aligned
elapsed = extloginfo.lasttime - extloginfo.firsttime
@printf(io, "• Elapsed time %-10s: %.3f\n", "", elapsed)
@printf(io, " ├─ %-20s: %.3f\n", "Critical Path", extloginfo.t∞)
@printf(io, " ╰─ %-20s: %.3f\n", "No-Wait", extloginfo.t_nowait)
@printf(io, "\n")
runtime = extloginfo.computingtime + extloginfo.insertingtime + extloginfo.othertime
@printf(io, "• Run time %-14s: %.3f\n", "", runtime)
@printf(io, " ├─ %-20s: %.3f\n", "Computing", extloginfo.computingtime)
for i in eachindex(extloginfo.categories)
(title, _) = extloginfo.categories[i]
@printf(io, " │ ├─ %-17s: %.3f\n", title, extloginfo.timespercat[i])
end
@printf(io, " │ ╰─ %-17s: %.3f\n", "unlabeled", extloginfo.timespercat[end])
@printf(io, " ├─ %-20s: %.3f\n", "Task Insertion", extloginfo.insertingtime)
@printf(io, " ╰─ %-20s: %.3f\n", "Other (idle)", extloginfo.othertime)
end
#= Gives minimum and maximum times the logger has measured. =#
function timelimits(logger::LogInfo)
iter = Iterators.flatten(logger.tasklogs)
return minimum(t -> t.time_start, iter), maximum(t -> t.time_finish, iter)
end
#= Considering a `label` and a the full list of labels `categories`,
gives the index of the occurence of label in `categories`. Uses
`length(categories) + 1` when `label` is not presentin `categories`=#
function jobid(label::String, categories)
for i in eachindex(categories)
(title, rx) = categories[i]
occursin(rx, label) && return i # find first
end
return length(categories) + 1
end
function extractloggerinfo(loginfo::LogInfo; categories = String[])
extloginfo = ExtendedLogInfo(loginfo, categories, longest_path(loginfo))
gantt = Gantt()
extractloggerinfo!(loginfo, extloginfo, gantt)
return extloginfo, gantt
end
#= Initialize gantt and loginfo structures from logger. =#
function extractloggerinfo!(logger::LogInfo, loginfo::ExtendedLogInfo, gantt::Gantt)
# Gantt data : Initialization TASKLOGS
# ------------------------------------
for tasklog in Iterators.flatten(logger.tasklogs)
# Gantt data
# ----------
push!(gantt.threads, tasklog.tid)
push!(gantt.jobids, jobid(tasklog.label, loginfo.categories))
push!(gantt.starts, tasklog.time_start * 10^(-9) - loginfo.firsttime)
push!(gantt.stops, tasklog.time_finish * 10^(-9) - loginfo.firsttime)
# General Informations
# --------------------
task_duration = (tasklog.time_finish - tasklog.time_start) * 10^(-9)
# ----
loginfo.othertime -= task_duration
loginfo.computingtime += task_duration
# ----
loginfo.timespercat[jobid(tasklog.label, loginfo.categories)] += task_duration
# ----
tasklog.tag ∈ loginfo.path && (loginfo.t∞ += task_duration)
loginfo.t_nowait += task_duration
end
# Gantt data : Initialization INSERTIONLOGS
# -----------------------------------------
for insertionlog in Iterators.flatten(logger.insertionlogs)
if insertionlog.gc_time != 0
gc_start = insertionlog.time_start * 10^(-9) - loginfo.firsttime
gc_finish = gc_start + insertionlog.gc_time * 10^(-9)
insertion_start = gc_finish
insertion_finish = insertionlog.time_finish * 10^(-9) - loginfo.firsttime
# GC Task
push!(gantt.threads, insertionlog.tid)
push!(gantt.jobids, length(loginfo.categories) + 3)
push!(gantt.starts, gc_start)
push!(gantt.stops, gc_finish)
else
insertion_start = insertionlog.time_start * 10^(-9) - loginfo.firsttime
insertion_finish = insertionlog.time_finish * 10^(-9) - loginfo.firsttime
end
# Gantt data
# ----------
push!(gantt.threads, insertionlog.tid)
push!(gantt.jobids, length(loginfo.categories) + 2)
push!(gantt.starts, insertion_start)
push!(gantt.stops, insertion_finish)
# General Informations
# --------------------
task_duration = (insertionlog.time_finish - insertionlog.time_start) * 10^(-9)
loginfo.othertime -= task_duration
loginfo.insertingtime += task_duration
end
loginfo.t_nowait /= length(logger.tasklogs)
return gantt
end
"""
loggertodot(logger) --> dagstring
Return a string in the
[DOT](https://en.wikipedia.org/wiki/DOT_(graph_description_language)) format
representing the underlying graph in `logger`.
If `GraphViz` is installed, you can use `GraphViz.Graph(logger)` to produce an
image.
"""
function loggertodot(logger)
path = longest_path(logger)
# Write DOT graph
# ---------------
str = "strict digraph dag {rankdir=LR;layout=dot;rankdir=TB;"
str *= """concentrate=true;"""
for tasklog in Iterators.flatten(logger.tasklogs)
# Tasklog.tag node attributes
str *= """ $(tasklog.tag) """
tasklog.label != "" && (str *= """ [label="$(tasklog.label)"] """)
tasklog.tag ∈ path && (str *= """ [color=red] """)
str *= """[penwidth=2];"""
# Defines edges
for neighbour in tasklog.inneighbors
red = false
# Is this connection is in critical path
(neighbour ∈ path && tasklog.tag ∈ path) && (red = true)
# Edge
str *= """ $neighbour -> $(tasklog.tag) """
red && (str *= """[color=red] """)
str *= """[penwidth=2];"""
end
end
return str *= "}"
end