/
allocator.go
73 lines (62 loc) 路 1.82 KB
/
allocator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package pool
import (
"context"
"time"
"github.com/roadrunner-server/api/v2/ipc"
"github.com/roadrunner-server/api/v2/worker"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v2/events"
workerImpl "github.com/roadrunner-server/sdk/v2/worker"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
func (sp *Pool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory ipc.Factory, cmd Command) worker.Allocator {
return func() (worker.BaseProcess, error) {
ctxT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(sp.cfg.Command))
if err != nil {
// context deadline
if errors.Is(errors.TimeOut, err) {
return nil, errors.Str("failed to spawn a worker, possible reasons: https://roadrunner.dev/docs/known-issues-allocate-timeout/2.x/en")
}
return nil, err
}
// wrap sync worker
sw := workerImpl.From(w)
sp.log.Debug("worker is allocated", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerConstruct.String()))
return sw, nil
}
}
// allocate required number of stack
func (sp *Pool) parallelAllocator(numWorkers uint64) ([]worker.BaseProcess, error) {
const op = errors.Op("static_pool_allocate_workers")
workers := make([]worker.BaseProcess, numWorkers)
eg := new(errgroup.Group)
// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
ii := i
eg.Go(func() error {
w, err := sp.allocator()
if err != nil {
return errors.E(op, errors.WorkerAllocate, err)
}
workers[ii] = w
return nil
})
}
err := eg.Wait()
if err != nil {
for j := 0; j < len(workers); j++ {
jj := j
if workers[jj] != nil {
go func() {
_ = workers[jj].Wait()
}()
_ = workers[jj].Kill()
}
}
return nil, err
}
return workers, nil
}