/
processor_pool.go
212 lines (172 loc) · 5.49 KB
/
processor_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package worker
import (
"sort"
"sync"
"time"
gocontext "context"
"github.com/pborman/uuid"
"github.com/sirupsen/logrus"
"github.com/travis-ci/worker/backend"
"github.com/travis-ci/worker/context"
)
// A ProcessorPool spins up multiple Processors handling build jobs from the
// same queue.
type ProcessorPool struct {
Context gocontext.Context
Provider backend.Provider
Generator BuildScriptGenerator
CancellationBroadcaster *CancellationBroadcaster
Hostname string
HardTimeout, InitialSleep, LogTimeout, ScriptUploadTimeout, StartupTimeout time.Duration
MaxLogLength int
PayloadFilterExecutable string
SkipShutdownOnLogTimeout bool
queue JobQueue
poolErrors []error
processorsLock sync.Mutex
processors []*Processor
processorsWG sync.WaitGroup
pauseCount int
}
type ProcessorPoolConfig struct {
Hostname string
Context gocontext.Context
HardTimeout, InitialSleep, LogTimeout, ScriptUploadTimeout, StartupTimeout time.Duration
MaxLogLength int
PayloadFilterExecutable string
}
// NewProcessorPool creates a new processor pool using the given arguments.
func NewProcessorPool(ppc *ProcessorPoolConfig,
provider backend.Provider, generator BuildScriptGenerator,
cancellationBroadcaster *CancellationBroadcaster) *ProcessorPool {
return &ProcessorPool{
Hostname: ppc.Hostname,
Context: ppc.Context,
HardTimeout: ppc.HardTimeout,
InitialSleep: ppc.InitialSleep,
LogTimeout: ppc.LogTimeout,
ScriptUploadTimeout: ppc.ScriptUploadTimeout,
StartupTimeout: ppc.StartupTimeout,
MaxLogLength: ppc.MaxLogLength,
Provider: provider,
Generator: generator,
CancellationBroadcaster: cancellationBroadcaster,
PayloadFilterExecutable: ppc.PayloadFilterExecutable,
}
}
// Each loops through all the processors in the pool and calls the given
// function for each of them, passing in the index and the processor. The order
// of the processors is the same for the same set of processors.
func (p *ProcessorPool) Each(f func(int, *Processor)) {
procIDs := []string{}
procsByID := map[string]*Processor{}
for _, proc := range p.processors {
id := proc.ID.String()
procIDs = append(procIDs, id)
procsByID[id] = proc
}
sort.Strings(procIDs)
for i, procID := range procIDs {
f(i, procsByID[procID])
}
}
// Size returns the number of processors in the pool
func (p *ProcessorPool) Size() int {
return len(p.processors)
}
// TotalProcessed returns the sum of all processor ProcessedCount values.
func (p *ProcessorPool) TotalProcessed() int {
total := 0
p.Each(func(_ int, pr *Processor) {
total += pr.ProcessedCount
})
return total
}
// Run starts up a number of processors and connects them to the given queue.
// This method stalls until all processors have finished.
func (p *ProcessorPool) Run(poolSize int, queue JobQueue) error {
p.queue = queue
p.poolErrors = []error{}
for i := 0; i < poolSize; i++ {
p.Incr()
}
if len(p.poolErrors) > 0 {
context.LoggerFromContext(p.Context).WithFields(logrus.Fields{
"self": "processor_pool",
"pool_errors": p.poolErrors,
}).Panic("failed to populate pool")
}
p.processorsWG.Wait()
return nil
}
// GracefulShutdown causes each processor in the pool to start its graceful
// shutdown.
func (p *ProcessorPool) GracefulShutdown(togglePause bool) {
p.processorsLock.Lock()
defer p.processorsLock.Unlock()
logger := context.LoggerFromContext(p.Context).WithField("self", "processor_pool")
if togglePause {
p.pauseCount++
if p.pauseCount == 1 {
logger.Info("incrementing wait group for pause")
p.processorsWG.Add(1)
} else if p.pauseCount == 2 {
logger.Info("finishing wait group to unpause")
p.processorsWG.Done()
} else if p.pauseCount > 2 {
return
}
}
for _, processor := range p.processors {
processor.GracefulShutdown()
}
}
// Incr adds a single running processor to the pool
func (p *ProcessorPool) Incr() {
p.processorsWG.Add(1)
go func() {
defer p.processorsWG.Done()
err := p.runProcessor(p.queue)
if err != nil {
p.poolErrors = append(p.poolErrors, err)
return
}
}()
}
// Decr pops a processor out of the pool and issues a graceful shutdown
func (p *ProcessorPool) Decr() {
if len(p.processors) == 0 {
return
}
var proc *Processor
proc, p.processors = p.processors[len(p.processors)-1], p.processors[:len(p.processors)-1]
proc.GracefulShutdown()
}
func (p *ProcessorPool) runProcessor(queue JobQueue) error {
processorUUID := uuid.NewRandom()
ctx := context.FromProcessor(p.Context, processorUUID.String())
proc, err := NewProcessor(ctx, p.Hostname,
queue, p.Provider, p.Generator, p.CancellationBroadcaster,
ProcessorConfig{
HardTimeout: p.HardTimeout,
InitialSleep: p.InitialSleep,
LogTimeout: p.LogTimeout,
MaxLogLength: p.MaxLogLength,
ScriptUploadTimeout: p.ScriptUploadTimeout,
StartupTimeout: p.StartupTimeout,
PayloadFilterExecutable: p.PayloadFilterExecutable,
})
if err != nil {
context.LoggerFromContext(p.Context).WithFields(logrus.Fields{
"err": err,
"self": "processor_pool",
}).Error("couldn't create processor")
return err
}
proc.SkipShutdownOnLogTimeout = p.SkipShutdownOnLogTimeout
p.processorsLock.Lock()
p.processors = append(p.processors, proc)
p.processorsLock.Unlock()
proc.Run()
return nil
}