-
Notifications
You must be signed in to change notification settings - Fork 0
/
task_pipeline.go
190 lines (156 loc) · 4.92 KB
/
task_pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package wtclient
import (
"container/list"
"sync"
"time"
"github.com/btcsuite/btclog"
)
// taskPipeline implements a reliable, in-order queue that ensures its queue
// fully drained before exiting. Stopping the taskPipeline prevents the pipeline
// from accepting any further tasks, and will cause the pipeline to exit after
// all updates have been delivered to the downstream receiver. If this process
// hangs and is unable to make progress, users can optionally call ForceQuit to
// abandon the reliable draining of the queue in order to permit shutdown.
type taskPipeline struct {
started sync.Once
stopped sync.Once
forced sync.Once
log btclog.Logger
queueMtx sync.Mutex
queueCond *sync.Cond
queue *list.List
newBackupTasks chan *backupTask
quit chan struct{}
forceQuit chan struct{}
shutdown chan struct{}
}
// newTaskPipeline initializes a new taskPipeline.
func newTaskPipeline(log btclog.Logger) *taskPipeline {
rq := &taskPipeline{
log: log,
queue: list.New(),
newBackupTasks: make(chan *backupTask),
quit: make(chan struct{}),
forceQuit: make(chan struct{}),
shutdown: make(chan struct{}),
}
rq.queueCond = sync.NewCond(&rq.queueMtx)
return rq
}
// Start spins up the taskPipeline, making it eligible to begin receiving backup
// tasks and deliver them to the receiver of NewBackupTasks.
func (q *taskPipeline) Start() {
q.started.Do(func() {
go q.queueManager()
})
}
// Stop begins a graceful shutdown of the taskPipeline. This method returns once
// all backupTasks have been delivered via NewBackupTasks, or a ForceQuit causes
// the delivery of pending tasks to be interrupted.
func (q *taskPipeline) Stop() {
q.stopped.Do(func() {
q.log.Debugf("Stopping task pipeline")
close(q.quit)
q.signalUntilShutdown()
// Skip log if we also force quit.
select {
case <-q.forceQuit:
default:
q.log.Debugf("Task pipeline stopped successfully")
}
})
}
// ForceQuit signals the taskPipeline to immediately exit, dropping any
// backupTasks that have not been delivered via NewBackupTasks.
func (q *taskPipeline) ForceQuit() {
q.forced.Do(func() {
q.log.Infof("Force quitting task pipeline")
close(q.forceQuit)
q.signalUntilShutdown()
q.log.Infof("Task pipeline unclean shutdown complete")
})
}
// NewBackupTasks returns a read-only channel for enqueue backupTasks. The
// channel will be closed after a call to Stop and all pending tasks have been
// delivered, or if a call to ForceQuit is called before the pending entries
// have been drained.
func (q *taskPipeline) NewBackupTasks() <-chan *backupTask {
return q.newBackupTasks
}
// QueueBackupTask enqueues a backupTask for reliable delivery to the consumer
// of NewBackupTasks. If the taskPipeline is shutting down, ErrClientExiting is
// returned. Otherwise, if QueueBackupTask returns nil it is guaranteed to be
// delivered via NewBackupTasks unless ForceQuit is called before completion.
func (q *taskPipeline) QueueBackupTask(task *backupTask) error {
q.queueCond.L.Lock()
select {
// Reject new tasks after quit has been signaled.
case <-q.quit:
q.queueCond.L.Unlock()
return ErrClientExiting
// Reject new tasks after force quit has been signaled.
case <-q.forceQuit:
q.queueCond.L.Unlock()
return ErrClientExiting
default:
}
// Queue the new task and signal the queue's condition variable to wake up
// the queueManager for processing.
q.queue.PushBack(task)
q.queueCond.L.Unlock()
q.queueCond.Signal()
return nil
}
// queueManager processes all incoming backup requests that get added via
// QueueBackupTask. The manager will exit
//
// NOTE: This method MUST be run as a goroutine.
func (q *taskPipeline) queueManager() {
defer close(q.shutdown)
defer close(q.newBackupTasks)
for {
q.queueCond.L.Lock()
for q.queue.Front() == nil {
q.queueCond.Wait()
select {
case <-q.quit:
// Exit only after the queue has been fully drained.
if q.queue.Len() == 0 {
q.queueCond.L.Unlock()
q.log.Debugf("Revoked state pipeline flushed.")
return
}
case <-q.forceQuit:
q.queueCond.L.Unlock()
q.log.Debugf("Revoked state pipeline force quit.")
return
default:
}
}
// Pop the first element from the queue.
e := q.queue.Front()
task := q.queue.Remove(e).(*backupTask)
q.queueCond.L.Unlock()
select {
// Backup task submitted to dispatcher. We don't select on quit to
// ensure that we still drain tasks while shutting down.
case q.newBackupTasks <- task:
// Force quit, return immediately to allow the client to exit.
case <-q.forceQuit:
q.log.Debugf("Revoked state pipeline force quit.")
return
}
}
}
// signalUntilShutdown strobes the queue's condition variable to ensure the
// queueManager reliably unblocks to check for the exit condition.
func (q *taskPipeline) signalUntilShutdown() {
for {
select {
case <-time.After(time.Millisecond):
q.queueCond.Signal()
case <-q.shutdown:
return
}
}
}