-
Notifications
You must be signed in to change notification settings - Fork 107
/
fixed_rate.go
108 lines (87 loc) · 2.22 KB
/
fixed_rate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package scheduling
import (
"context"
"sync"
"time"
"github.com/oasisprotocol/oasis-core/go/common/logging"
cmSync "github.com/oasisprotocol/oasis-core/go/common/sync"
)
type fixedRateTask struct {
name string
fn func(ctx context.Context) error
}
var _ Scheduler = (*fixedRateScheduler)(nil)
// fixedRateScheduler executes tasks consecutively after an initial delay, and repeats them
// at regular intervals.
type fixedRateScheduler struct {
logger *logging.Logger
delay time.Duration // Initial time delay before first execution.
interval time.Duration // Time interval between repetitions.
startOne cmSync.One // Allows running scheduler only once at a time.
mu sync.Mutex
tasks []*fixedRateTask
}
// NewFixedRateScheduler creates a new fixed rate scheduler.
//
// The interval must be greater than zero; if not, the scheduler will panic.
func NewFixedRateScheduler(delay time.Duration, interval time.Duration) Scheduler {
l := logging.GetLogger("scheduler/fixed-rate")
return &fixedRateScheduler{
logger: l,
delay: delay,
interval: interval,
startOne: cmSync.NewOne(),
tasks: make([]*fixedRateTask, 0),
}
}
// AddTask implements Scheduler.
func (s *fixedRateScheduler) AddTask(name string, fn func(ctx context.Context) error) {
s.mu.Lock()
defer s.mu.Unlock()
s.tasks = append(s.tasks, &fixedRateTask{
name: name,
fn: fn,
})
}
// Start implements Scheduler.
func (s *fixedRateScheduler) Start() {
s.startOne.TryStart(s.run)
}
// Stop implements Scheduler.
func (s *fixedRateScheduler) Stop() {
s.startOne.TryStop()
}
func (s *fixedRateScheduler) run(ctx context.Context) {
select {
case <-time.After(s.delay):
case <-ctx.Done():
return
}
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
tasks := make([]*fixedRateTask, 0)
for {
// Add new tasks, if any.
func() {
s.mu.Lock()
defer s.mu.Unlock()
for i := len(tasks); i < len(s.tasks); i++ {
tasks = append(tasks, s.tasks[i])
}
}()
// Execute tasks consecutively.
for _, task := range tasks {
if err := task.fn(ctx); err != nil {
s.logger.Error("failed to execute task",
"err", err,
"task", task.name,
)
}
}
select {
case <-ticker.C:
case <-ctx.Done():
return
}
}
}