-
Notifications
You must be signed in to change notification settings - Fork 0
/
runner.go
127 lines (110 loc) · 2.48 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package runner
import (
"errors"
"fmt"
"route_populator/publisher"
"sync"
"time"
)
type Runner struct {
stopped bool
cc publisher.ConnectionCreator
job publisher.Job
numGoRoutines int
heartbeatInterval time.Duration
publishDelay time.Duration
wg *sync.WaitGroup
errsChan chan error
quitChan chan struct{}
}
func NewRunner(c publisher.ConnectionCreator, j publisher.Job, numGoRoutines int, heartbeatInterval time.Duration, publishDelay time.Duration) *Runner {
return &Runner{
cc: c,
job: j,
numGoRoutines: numGoRoutines,
wg: &sync.WaitGroup{},
errsChan: make(chan error, numGoRoutines),
quitChan: make(chan struct{}, 1),
heartbeatInterval: heartbeatInterval,
publishDelay: publishDelay,
}
}
func (r *Runner) Start() error {
if r.stopped {
return errors.New("Cannot restart a runner.")
}
numRoutes := r.job.EndRange - r.job.StartRange
rangeSize := numRoutes / r.numGoRoutines
ranges := PartitionRange(r.job.StartRange, r.job.EndRange, rangeSize)
for i := 0; i < r.numGoRoutines; i += 1 {
r.wg.Add(1)
go func(id int) {
defer r.wg.Done()
job := r.job
job.StartRange = ranges[id]
job.EndRange = ranges[id+1]
p := publisher.NewPublisher(job, r.publishDelay)
err := p.Initialize(r.cc)
if err != nil {
r.errsChan <- fmt.Errorf("initializing connection: %s", err)
r.Stop()
return
}
err = p.PublishRouteRegistrations()
if err != nil {
r.errsChan <- fmt.Errorf("publishing: %s", err)
r.Stop()
return
}
for {
select {
case <-time.After(r.heartbeatInterval):
err := p.PublishRouteRegistrations()
if err != nil {
r.errsChan <- fmt.Errorf("publishing: %s", err)
r.Stop()
return
}
case <-r.quitChan:
// Exit upon closed quit channel
return
}
}
}(i)
}
return nil
}
func (r *Runner) Wait() error {
r.wg.Wait()
if len(r.errsChan) > 0 {
err := <-r.errsChan
return err
}
return nil
}
func (r *Runner) Stop() {
if r.stopped == false {
r.stopped = true
close(r.quitChan)
}
}
func min(a, b int) int {
if b >= a {
return a
}
return b
}
func PartitionRange(start, end, partitionSize int) []int {
var result []int
rangeSize := (end - start)
if partitionSize == rangeSize {
return []int{start, end}
}
for i := start; i <= end; i += partitionSize {
result = append(result, i)
}
if (rangeSize % partitionSize) > 0 {
result[len(result)-1] = end
}
return result
}