-
Notifications
You must be signed in to change notification settings - Fork 0
/
status.go
544 lines (459 loc) · 13.6 KB
/
status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
// Package status provides Status, LineBuffer and Group
//
//spellchecker:words status
package status
//spellchecker:words maps sync atomic time github gosuri uilive pkglib nobufio noop stream
import (
"fmt"
"io"
"maps"
"os"
"sync"
"sync/atomic"
"time"
"github.com/gosuri/uilive"
"github.com/tkw1536/pkglib/nobufio"
"github.com/tkw1536/pkglib/noop"
"github.com/tkw1536/pkglib/stream"
)
//spellchecker:words annot compat
// Status represents an interactive status display that can write to multiple lines at once.
//
// A Status must be initialized using [New], then started (and stopped again) to write messages.
// Status may not be reused.
//
// A typical usage is as follows:
//
// st := New(os.Stdout, 10)
// st.Start()
// defer st.Stop()
//
// // ... whatever usage here ...
// st.Set("line 0", 0)
//
// Using the status to Write messages outside of the Start / Stop process results in no-ops.
//
// In addition to writing to lines directly, a status keeps separate log files for each line.
// These are automatically deleted once the Stop method is called, unless a separate call to the Keep() method is made.
//
// Status should only be used on interactive terminals.
// On other [io.Writer]s, a so-called compatibility mode can be used, that writes updates to the terminal line by line.
// See [NewWithCompat].
type Status struct {
state atomic.Uint64 // see state* comments below
keepLog atomic.Bool // keep the log files around
counter atomic.Uint64 // the first free message id, increased atomically
w *uilive.Writer // underlying uilive writer
compat bool // compatibility mode enabled
logPath string // temporary path for log files (passed when creating logWriters)
logWriters map[uint64]io.WriteCloser // writers for the backup loggers
logNamesLock sync.RWMutex // protects the below
logNames map[uint64]string // the names of the log files
ids []uint64 // ordered list of active message ids
idsI map[uint64]int // inverse list of active message ids
messages map[uint64]string // content of all the messages
lastFlush time.Time // last time we flushed
actions chan action // channel that status updates are sent to
done chan struct{}
}
// state* describe the lifecycle of a Status
const (
stateInvalid uint64 = iota
stateNewCalled
stateStartCalled
stateStopCalled
)
// lineAction describe the types of actions for lines
type lineAction uint8
const (
setAction lineAction = iota
openAction
closeAction
)
// action describes actions to perform on a [Status]
type action struct {
action lineAction // what kind of action to perform
id uint64 // id of line to perform action on
message string // content of the line
}
// New creates a new writer with the provided number of status lines.
// count must fit into the uint64 type, meaning it has to be non-negative.
//
// The ids of the status lines are guaranteed to be 0...(count-1).
// When count is less than 0, it is set to 0.
func New(writer io.Writer, count int) *Status {
if int(uint64(count)) != count {
panic("Status: count does not fit into uint64")
}
// when a zero writer was passed, we don't need a status.
// and everything should become a no-op.
if stream.IsNullWriter(writer) {
return nil
}
if count < 0 {
count = 0
}
st := &Status{
w: uilive.New(),
compat: false,
ids: make([]uint64, count),
idsI: make(map[uint64]int, count),
messages: make(map[uint64]string, count),
actions: make(chan action, count),
done: make(chan struct{}),
}
st.state.Store(stateNewCalled)
st.counter.Store(uint64(count))
// setup new ids
for index := range st.ids {
i := uint64(index)
st.ids[index] = i
st.idsI[i] = index
// open the logger!
st.openLogger(i)
}
st.w.Out = writer
return st
}
// NewWithCompat is like [New], but places the Status into a compatibility mode if and only if writer does not represent a terminal.
//
// In compatibility mode, Status automatically prints each line to the output, instead of putting them onto separate lines.
func NewWithCompat(writer io.Writer, count int) (st *Status) {
st = New(writer, count)
st.compat = !nobufio.IsTerminal(writer)
return st
}
// Start instructs this Status to start writing output to the underlying writer.
//
// No other process should write to the underlying writer, while this process is running.
// Instead [Bypass] should be used.
// See also [Stop].
//
// Start may not be called more than once, extra calls may result in a panic.
func (st *Status) Start() {
// nil check for no-op status
if st == nil {
return
}
if st.state.Load() == stateInvalid {
panic("Status: Not created using New")
}
if !st.state.CompareAndSwap(stateNewCalled, stateStartCalled) {
panic("Status: Start() called multiple times")
}
go st.listen()
}
const minFlushDelay = 50 * time.Millisecond
// flush flushes the output of this Status to the underlying writer.
// see [flushCompat] and [flushNormal]
func (st *Status) flush(force bool, changed uint64) {
st.flushLogs(changed)
if st.compat {
st.flushCompat(changed)
return
}
st.flushNormal(force)
}
// flushCompat flushes the provided updated message, if it is valid.
func (st *Status) flushCompat(changed uint64) {
line, ok := st.messages[changed]
if !ok {
return
}
fmt.Fprintln(st.w.Out, line)
}
// flushLogs flushes to the given log file
func (st *Status) flushLogs(changed uint64) {
line, ok := st.messages[changed]
if !ok {
return
}
logger, ok := st.logWriters[changed]
if !ok {
return
}
fmt.Fprintln(logger, line)
}
// flushNormal implements flushing in normal mode.
// Respects [minFlushDelay], unless force is set to true.
func (st *Status) flushNormal(force bool) {
now := time.Now()
if !force && now.Sub(st.lastFlush) < minFlushDelay {
return
}
st.lastFlush = now
// write out each of the lines
var line io.Writer
for i, key := range st.ids {
if i == 0 {
line = st.w
} else {
line = st.w.Newline()
}
fmt.Fprintln(line, st.messages[key])
}
// flush the output
st.w.Flush()
}
// Keep instructs this Status to not keep any log files, and returns a map from ids to file names.
func (st *Status) Keep() map[uint64]string {
// we keep the log files!
st.keepLog.Store(true)
st.logNamesLock.RLock()
defer st.logNamesLock.RUnlock()
// make a copy of the logNames!
files := make(map[uint64]string, len(st.logNames))
maps.Copy(files, st.logNames)
return files
}
// Stop blocks until all updates to finish processing.
// It then stops writing updates to the underlying writer.
// It then deletes all log files, unless a call to Keep() has been made.
//
// Stop must be called after [Start] has been called.
// Start may not be called more than once.
func (st *Status) Stop() {
// nil check for no-op status
if st == nil {
return
}
if !st.state.CompareAndSwap(stateStartCalled, stateStopCalled) {
panic("Status: Stop() called out-of-order")
}
close(st.actions)
<-st.done
st.flush(true, st.counter.Add(1)) // force an invalid flush!
// close the remaining loggers
for _, id := range st.ids {
st.closeLogger(id)
}
// if we requested for the log files to be deleted, do it!
if !st.keepLog.Load() {
st.logNamesLock.Lock()
defer st.logNamesLock.Unlock()
for _, name := range st.logNames {
os.Remove(name)
}
}
}
// openLogger opens the logger for the line with the given id
func (st *Status) openLogger(id uint64) {
if st == nil {
return
}
file, err := os.CreateTemp(st.logPath, "status-*.log")
if err != nil {
return
}
st.logNamesLock.Lock()
defer st.logNamesLock.Unlock()
if st.logNames == nil {
st.logNames = make(map[uint64]string)
}
// store the file and name!
if st.logWriters == nil {
st.logWriters = make(map[uint64]io.WriteCloser, 1)
}
st.logWriters[id] = file
if st.logNames == nil {
st.logNames = make(map[uint64]string, 1)
}
st.logNames[id] = file.Name()
}
// closeLogger closes the logger for the line with the given id
func (st *Status) closeLogger(id uint64) {
defer func() { _ = recover() }() // silently ignore errors
if st == nil {
return
}
// get and delete the log writer
handle, ok := st.logWriters[id]
delete(st.logWriters, id)
// delete it if ok
if ok {
handle.Close()
}
}
// Set sets the status line with the given id to contain message.
// message should not contain newline characters.
// Set may block until the addition has been processed.
//
// Calling Set on a line which is not active results is a no-op.
//
// Set may safely be called concurrently with other methods.
//
// Set may only be called after [Start] has been called, but before [Stop].
// Other calls are silently ignored, and return an invalid line id.
func (st *Status) Set(id uint64, message string) {
if st.state.Load() != stateStartCalled {
return
}
st.actions <- action{
action: setAction,
id: id,
message: message,
}
}
// Line returns an [io.WriteCloser] linked to the status line with the provided id.
// Writing a complete newline-delimited line to it behaves just like [Set] with that line prefixed with prefix would.
// Calling [io.WriteCloser.Close] behaves just like [Close] would.
//
// Line may be called at any time.
// Line should not be called multiple times with the same id.
func (st *Status) Line(prefix string, id uint64) io.WriteCloser {
// nil check for no-op status
if st == nil {
return stream.Null
}
// setup a delay for flushing partial lines after writes.
// when in compatibility mode, this should be turned off.
delay := 10 * minFlushDelay
if st.compat {
delay = 0
}
return &LineBuffer{
FlushPartialLineAfter: delay,
Line: func(message string) { st.Set(id, prefix+message) },
FlushLineOnClose: true,
CloseLine: func() { st.Close(id) },
annot: true,
annotID: id,
}
}
// NoLine indicates that the given writer does not have an associated line id.
const NoLine = ^uint64(0)
// LineOf returns the id of a line returned by the Line and OpenLine methods.
// If a different writer is passed (or there is no associated id), returns NoLine.
func LineOf(line io.WriteCloser) uint64 {
lb, ok := line.(*LineBuffer)
if !ok || !lb.annot {
return NoLine
}
return lb.annotID
}
// Open adds a new status line and returns its' id.
// The new status line is initially set to message.
// It may be further updated with calls to [Set], or removed with [Done].
// Open may block until the addition has been processed.
//
// Open may safely be called concurrently with other methods.
//
// Open may only be called after [Start] has been called, but before [Stop].
// Other calls are silently ignored, and return an invalid line id.
func (st *Status) Open(message string) (id uint64) {
// nil check for no-op status
if st == nil {
return 0
}
// even when not active, generate a new id
// this guarantees that other calls are no-ops.
id = st.counter.Add(1)
if st.state.Load() != stateStartCalled {
return
}
st.actions <- action{
action: openAction,
id: id,
message: message,
}
return
}
// OpenLine behaves like a call to [Open] followed by a call to [Line].
//
// OpenLine may only be called after [Start] has been called, but before [Stop].
// Other calls are silently ignored, and return a no-op io.Writer.
//
// To retrieve the id of the newly created line, use [LineOf].
func (st *Status) OpenLine(prefix, data string) io.WriteCloser {
// nil check for no-op status
if st == nil {
return noop.Writer{Writer: io.Discard}
}
return st.Line(prefix, st.Open(prefix+data))
}
// Close removes the status line with the provided id from this status.
// The last value of the status line is written to the top of the output.
// Close may block until the removal has been processed.
//
// Calling Close on a line which is not active results is a no-op.
//
// Close may safely be called concurrently with other methods.
//
// Close may only be called after [Start] has been called, but before [Stop].
// Other calls are silently ignored.
func (st *Status) Close(id uint64) {
// nil check for no-op status
if st == nil {
return
}
if st.state.Load() != stateStartCalled {
return
}
st.actions <- action{
action: closeAction,
id: id,
}
}
// listen listens for updates
func (st *Status) listen() {
// nil check for no-op status
if st == nil {
return
}
defer close(st.done)
for msg := range st.actions {
switch msg.action {
case setAction:
// if the id doesn't exist, do nothing!
if _, ok := st.idsI[msg.id]; !ok {
break
}
// store the message, and do a normal flush!
st.messages[msg.id] = msg.message
st.flush(false, msg.id)
case openAction:
// duplicate id, shouldn't occur
if _, ok := st.idsI[msg.id]; ok {
break
}
// add the item to the ids!
st.ids = append(st.ids, msg.id)
st.idsI[msg.id] = len(st.ids) - 1
// setup the initial message
st.messages[msg.id] = msg.message
// open the logger!
st.openLogger(msg.id)
// force a flush so that we see it
st.flush(true, msg.id)
case closeAction:
// make sure that the line exists!
index, ok := st.idsI[msg.id]
if !ok {
break
}
// close the logger
st.closeLogger(msg.id)
// update the list of active ids
// and rebuild the inverse index map
st.ids = append(st.ids[:index], st.ids[index+1:]...)
for key, value := range st.ids {
st.idsI[value] = key
}
delete(st.idsI, msg.id)
// flush out the current message!
fmt.Fprintln(st.w.Bypass(), st.messages[msg.id])
delete(st.messages, msg.id)
// and flush all the other lines
st.flush(true, msg.id)
}
}
}
// Bypass returns a writer that completely bypasses this Status, and writes directly to the underlying writer.
// [Start] must have been called.
func (st *Status) Bypass() io.Writer {
// nil check for no-op status
if st == nil {
return io.Discard
}
return st.w.Bypass()
}