-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.go
228 lines (192 loc) · 5.59 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package tasks
import (
"context"
"fmt"
"sync"
"time"
"github.com/robfig/cron/v3"
"github.com/tierklinik-dobersberg/logger"
)
// TaskFunc is that is executed when a task runs.
type TaskFunc func(context.Context) error
// Task is a workload that runs async on a given cron schedule.
// The same task is guaranteed to not run twice.
type Task struct {
// Name is the name of the task.
Name string
// TaskFunc is the actual function that should be executed
// when the task runs.
TaskFunc TaskFunc
// Deadline can be set to the maximum duration the task is
// allowed to run. The context passed to TaskFunc is cancelled
// with the deadline.
Deadline time.Duration
// StartNow can be set to true if the task should run immediately
// when the task manager is started.
StartNow bool
// Schedule holds the cron schedule at which the task should be
// executed.
Schedule string
// rootCtx is used to create a new context for this tasks whenever
// it is executed.
rootCtx context.Context
// cronEntry holds the ID of the task entry inside the cron scheduler.
cronEntry cron.EntryID
lastLock sync.Mutex
lastErr error
lastExecTime time.Time
}
// LastResult returns time and the error of the last execution.
// If the task has not yet been executed a zero time is returned
// together with a nil error.
func (task *Task) LastResult() (time.Time, error) {
task.lastLock.Lock()
defer task.lastLock.Unlock()
return task.lastExecTime, task.lastErr
}
// Manager manages and schedules tasks register on it. The manager
// must be started for any tasks to get scheduled. Once started
// tasks are scheduled as long as the manager is not stopped.
type Manager struct {
l sync.RWMutex
tasks []*Task
ctx context.Context
cron *cron.Cron
running bool
}
// NewManager returns a new task manager that is configured
// to interpret cron schedule as at the timezone loc.
// If loc is nil then time.UTC will be used.
func NewManager(loc *time.Location) *Manager {
if loc == nil {
loc = time.UTC
}
mng := &Manager{
// TODO(ppacher): add WithLogger()?
cron: cron.New(cron.WithLocation(loc)),
}
return mng
}
// Start starts the manager and the cron scheduler. Tasks that
// are marked as StartNow are executed immediately in a dedicated
// goroutine.
func (mng *Manager) Start(ctx context.Context) {
mng.l.Lock()
defer mng.l.Unlock()
if mng.running {
return
}
mng.running = true
mng.ctx = ctx
mng.cron.Start()
for _, t := range mng.tasks {
t.rootCtx = ctx
if t.StartNow {
// BUG(ppacher): see bug in Manager.Register(*Task)
go t.Run()
}
}
go func() {
<-ctx.Done()
<-mng.cron.Stop().Done()
}()
}
// Stop stops the manager and waits for all running tasks to
// complete. Running tasks are NOT cancelled! If the user wants
// to cancel all tasks the context passed to mng.Start() should
// be cancelled instead. Afterwards a call to Stop() will make
// sure that no more tasks get scheduled and will wait for all
// running tasks to finish.
func (mng *Manager) Stop() {
mng.l.Lock()
defer mng.l.Unlock()
<-mng.cron.Stop().Done()
}
// Register registers task at mng. An error can only be
// returned when the Schedule field of task cannot be
// parsed.
// If StartNow is set to true on task it will be executed
// as soon as the manager is started.
// If mng has already been started and StartNow is true
// the task is executed immediately in a dedicated go-routine.
func (mng *Manager) Register(task *Task) error {
var err error
mng.l.Lock()
defer mng.l.Unlock()
if mng.running {
task.rootCtx = mng.ctx
if task.StartNow {
// BUG(ppacher): right now it is possible for the
// task to run twice at the same time because
// the cron-schedule might also execute the task
// the next moment.
go task.Run()
}
}
task.cronEntry, err = mng.cron.AddJob(task.Schedule, task)
if err != nil {
return err
}
mng.tasks = append(mng.tasks, task)
return err
}
// Run actually runs the task. Any panic thrown by the TaskFunc
// is caught and stored as the last execution error.
func (task *Task) Run() {
start := time.Now()
ctx := task.rootCtx
if ctx == nil {
ctx = context.Background()
}
ctx = logger.WithFields(ctx, logger.Fields{
"task": task.Name,
"start": start,
})
logger.From(ctx).Infof("starting task")
if task.Deadline > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(task.Deadline))
defer cancel()
}
var execErr error
defer func() {
if x := recover(); x != nil {
if e, ok := x.(error); ok {
execErr = e
} else {
execErr = fmt.Errorf("%v", x)
}
}
errStr := ""
if execErr != nil {
errStr = execErr.Error()
}
logger.From(ctx).WithFields(logger.Fields{
"lastErr": errStr,
"duration": time.Since(start),
}).Infof("task finished")
task.lastLock.Lock()
defer task.lastLock.Unlock()
task.lastErr = execErr
task.lastExecTime = start
}()
execErr = task.TaskFunc(ctx)
}
// DefaultManager is the default task manage of this package.
// Users of the DefaultManager must make sure that no unwanted
// tasks are registered automatically by the init() functions
// of other packages. For safety, users are adviced to use
// NewManager on their own and register all required tasks
// manually.
// Creators of tasks are adivced to export a RegisterOn(mng *Manager)
// method that can be used to register a task on a certain
// manager.
var DefaultManager = NewManager(time.Local)
// Register registers t at the DefaultManager.
func Register(t *Task) error {
return DefaultManager.Register(t)
}
// Start starts the DefaultManager.
func Start(ctx context.Context) {
DefaultManager.Start(ctx)
}