forked from canonical/lxd
-
Notifications
You must be signed in to change notification settings - Fork 1
/
group.go
117 lines (102 loc) · 2.7 KB
/
group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package task
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
)
// Group of tasks sharing the same lifecycle.
//
// All tasks in a group will be started and stopped at the same time.
type Group struct {
cancel func()
wg sync.WaitGroup
tasks []Task
running map[int]bool
mu sync.Mutex
}
// Add a new task to the group, returning its index.
func (g *Group) Add(f Func, schedule Schedule) *Task {
g.mu.Lock()
defer g.mu.Unlock()
i := len(g.tasks)
g.tasks = append(g.tasks, Task{
f: f,
schedule: schedule,
reset: make(chan struct{}, 16), // Buffered to not block senders
})
return &g.tasks[i]
}
// Start all the tasks in the group.
func (g *Group) Start(ctx context.Context) {
// Lock access to the g.running and g.tasks map for the entirety of this function so that
// concurrent calls to Start() or Add(0) don't race. This ensures all tasks in this group
// are started based on a consistent snapshot of g.running and g.tasks.
g.mu.Lock()
defer g.mu.Unlock()
ctx, g.cancel = context.WithCancel(ctx)
g.wg.Add(len(g.tasks))
if g.running == nil {
g.running = make(map[int]bool)
}
for i := range g.tasks {
if g.running[i] {
continue
}
g.running[i] = true
task := g.tasks[i] // Local variable for the closure below.
go func(i int) {
task.loop(ctx)
// Ensure running map is updated before wait group Done() is called.
g.mu.Lock()
g.running[i] = false
g.mu.Unlock()
g.wg.Done()
}(i)
}
}
// Stop all tasks in the group.
//
// This works by sending a cancellation signal to all tasks of the
// group and waiting for them to terminate.
//
// If a task is idle (i.e. not executing its task function) it will terminate
// immediately.
//
// If a task is busy executing its task function, the cancellation signal will
// propagate through the context passed to it, and the task will block waiting
// for the function to terminate.
//
// In case the given timeout expires before all tasks complete, this method
// exits immediately and returns an error, otherwise it returns nil.
func (g *Group) Stop(timeout time.Duration) error {
if g.cancel == nil {
// We were not even started
return nil
}
g.cancel()
graceful := make(chan struct{}, 1)
go func() {
g.wg.Wait()
close(graceful)
}()
// Wait for graceful termination, but abort if the context expires.
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case <-ctx.Done():
running := []string{}
g.mu.Lock()
defer g.mu.Unlock()
for i, value := range g.running {
if value {
running = append(running, strconv.Itoa(i))
}
}
return fmt.Errorf("tasks %s are still running", strings.Join(running, ", "))
case <-graceful:
return nil
}
}