mirrored from https://chromium.googlesource.com/infra/luci/luci-go
/
buffer.go
184 lines (159 loc) · 4.99 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parallel
import (
"container/list"
"sync"
"sync/atomic"
)
// A Buffer embeds a Runner, overriding its RunOne method to buffer tasks
// indefinitely without blocking.
type Buffer struct {
Runner
// lifo, if non-zero-indicates a LIFO task dispatch or, if zero, a FIFO task
// dispatch. For more informatio, see SetFIFO.
lifo int32
// initOnce ensures that the Buffer is initialized at most once.
initOnce sync.Once
// workC receives enqueued tasks for processing.
workC chan WorkItem
// tasksFinishedC is used to signal Close when our list has finished
// dispatching tasks.
tasksFinishedC chan struct{}
}
func (b *Buffer) init() {
b.initOnce.Do(func() {
b.workC = make(chan WorkItem)
b.tasksFinishedC = make(chan struct{})
go b.process()
})
}
// process enqueues tasks into the Buffer and dispatches them to the underlying
// Runner when available.
func (b *Buffer) process() {
defer close(b.tasksFinishedC)
// outC is the channel that we send work to. We toggle it between nil and our
// Runner's WorkC depending on whether we have work.
//
// cur is the current work to send. It is only valid if hasWork is true.
var outC chan<- WorkItem
var cur WorkItem
// This is our work buffer. If we have unsent work, any additional work will
// be written to this buffer.
var buf list.List
// Our main processing loop.
inC := b.workC
for {
select {
case work, ok := <-inC:
if !ok {
// Our work channel has been closed. We aren't accepting any new tasks.
if outC == nil && buf.Len() == 0 {
// We have no buffered work; exit immediately.
return
}
// Mark that we're closed. When all of our work drains, we will exit.
inC = nil
break
}
// If we have no immediate work, send "work" directly; otherwise, buffer
// work for future sending.
if outC == nil {
cur = work
outC = b.Runner.WorkC()
} else {
buf.PushBack(&work)
}
case outC <- cur:
// "cur" has been sent. Dequeue the next work item, or set outC to nil if
// there are no more items.
switch {
case buf.Len() > 0:
var e *list.Element
if b.isFIFO() {
e = buf.Front()
} else {
e = buf.Back()
}
cur = *(buf.Remove(e).(*WorkItem))
case inC == nil:
// There's no more immediate work, no buffered work, and we're closed,
// so we're finished.
return
default:
// No more work to send.
outC = nil
}
}
}
}
// Run implements the same semantics as Runner's Run. However, if the
// dispatch pipeline is full, Run will buffer the work and return immediately
// rather than block.
func (b *Buffer) Run(gen func(chan<- func() error)) <-chan error {
return b.runThen(gen, nil)
}
// Run implements the same semantics as Runner's Run. However, if the
// dispatch pipeline is full, Run will buffer the work and return immediately
// rather than block.
func (b *Buffer) runThen(gen func(chan<- func() error), then func()) <-chan error {
b.init()
return runImpl(gen, b.workC, then)
}
// RunOne implements the same semantics as Runner's RunOne. However, if the
// dispatch pipeline is full, RunOne will buffer the work and return immediately
// rather than block.
func (b *Buffer) RunOne(f func() error) <-chan error {
b.init()
errC := make(chan error)
b.workC <- WorkItem{
F: f,
ErrC: errC,
After: func() { close(errC) },
}
return errC
}
// WorkC implements the same semantics as Runner's WorkC. However, this channel
// will not block pending work dispatch. Any tasks written to this channel that
// would block are instead buffered pending dispatch availability.
func (b *Buffer) WorkC() chan<- WorkItem {
b.init()
return b.workC
}
// Close flushes the remaining tasks in the Buffer and Closes the underlying
// Runner.
//
// Adding new tasks to the Buffer after Close has been invoked will cause a
// panic.
func (b *Buffer) Close() {
b.init()
close(b.workC)
<-b.tasksFinishedC
b.Runner.Close()
}
// SetFIFO sets the Buffer's task dispatch order to FIFO (true) or LIFO (false).
// This determines the order in which buffered tasks will be dispatched. In
// FIFO (first in, first out) mode, the first tasks to be buffered will be
// dispatchd first. In LIFO (last in, last out) mode, the last tasks to be
// buffered will be dispatched first.
func (b *Buffer) SetFIFO(fifo bool) {
if fifo {
atomic.StoreInt32(&b.lifo, 0)
} else {
atomic.StoreInt32(&b.lifo, 1)
}
}
func (b *Buffer) isFIFO() bool {
return atomic.LoadInt32(&b.lifo) == 0
}