/
work_queue.go
88 lines (76 loc) · 2.02 KB
/
work_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Copyright © 2018 by PACE Telematics GmbH. All rights reserved.
// Created at 2018/10/12 by Vincent Landgraf
package synctx
import (
"context"
"fmt"
"sync"
)
// WorkFunc a function that receives an context and optionally returns
// an error. Returning an error will cancel all other worker functions
type WorkFunc func(ctx context.Context) error
// WorkQueue is a work queue implementation that respects cancellation
// using contexts
type WorkQueue struct {
wg WaitGroup
mu sync.Mutex
ctx context.Context
done chan struct{}
err error
cancel func()
}
// NewWorkQueue creates a new WorkQueue that respects
// the passed context for cancellation
func NewWorkQueue(ctx context.Context) *WorkQueue {
ctx, cancel := context.WithCancel(ctx)
return &WorkQueue{
ctx: ctx,
done: make(chan struct{}),
cancel: cancel,
}
}
// Add add work to the work queue. The passed description
// will be used for the error message, if any. The function
// will be immediately executed.
func (queue *WorkQueue) Add(description string, fn WorkFunc) {
queue.wg.Add(1)
go func() {
err := fn(queue.ctx)
// if one of the work queue items fails the whole
// queue will be canceled
if err != nil {
queue.setErr(fmt.Errorf("failed to %s: %v", description, err))
queue.cancel()
}
queue.wg.Done()
}()
}
// Wait waits until all worker functions are done,
// one worker is failing or the context is canceled
func (queue *WorkQueue) Wait() {
defer queue.cancel()
select {
case <-queue.wg.Finish():
case <-queue.ctx.Done():
err := queue.ctx.Err()
// if the queue was canceled and no error was set already
// store the error
if err != nil {
queue.setErr(err)
}
}
}
// Err returns the error if one of the work queue items failed
func (queue *WorkQueue) Err() error {
queue.mu.Lock()
defer queue.mu.Unlock()
return queue.err
}
// setErr sets the error on the queue if not set already
func (queue *WorkQueue) setErr(err error) {
queue.mu.Lock()
defer queue.mu.Unlock()
if queue.err == nil {
queue.err = err
}
}