forked from kataras/iris
/
scheduler.go
153 lines (128 loc) · 3.62 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright 2017 Gerasimos Maropoulos, ΓΜ. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package host
import (
"sync/atomic"
)
type task struct {
runner TaskRunner
proc TaskProcess
// atomic-accessed, if != 0 means that is already
// canceled before it ever ran, this happens to interrupt handlers too.
alreadyCanceled int32
Cancel func()
}
func (t *task) isCanceled() bool {
return atomic.LoadInt32(&t.alreadyCanceled) != 0
}
// Scheduler is a type of an event emmiter.
// Can register a specific task for a specific event
// when host is starting the server or host is interrupted by CTRL+C/CMD+C.
// It's being used internally on host supervisor.
type Scheduler struct {
onServeTasks []*task
onInterruptTasks []*task
}
// TaskCancelFunc cancels a Task when called.
type TaskCancelFunc func()
// Schedule schedule/registers a Task,
// it will be executed/run to when host starts the server
// or when host is interrupted by CTRL+C/CMD+C based on the TaskRunner type.
//
// See `OnInterrupt` and `ScheduleFunc` too.
func (s *Scheduler) Schedule(runner TaskRunner) TaskCancelFunc {
t := new(task)
t.runner = runner
t.Cancel = func() {
// it's not running yet, so if canceled now
// set to already canceled to not run it at all.
atomic.StoreInt32(&t.alreadyCanceled, 1)
}
if _, ok := runner.(OnInterrupt); ok {
s.onInterruptTasks = append(s.onInterruptTasks, t)
} else {
s.onServeTasks = append(s.onServeTasks, t)
}
return func() {
t.Cancel()
}
}
// ScheduleFunc schedule/registers a task function,
// it will be executed/run to when host starts the server
// or when host is interrupted by CTRL+C/CMD+C based on the TaskRunner type.
//
// See `OnInterrupt` and `ScheduleFunc` too.
func (s *Scheduler) ScheduleFunc(runner func(TaskProcess)) TaskCancelFunc {
return s.Schedule(TaskRunnerFunc(runner))
}
func cancelTasks(tasks []*task) {
for _, t := range tasks {
if atomic.LoadInt32(&t.alreadyCanceled) != 0 {
continue // canceled, don't run it
}
go t.Cancel()
}
}
// CancelOnServeTasks cancels all tasks that are scheduled to run when
// host is starting the server, when the server is alive and online.
func (s *Scheduler) CancelOnServeTasks() {
cancelTasks(s.onServeTasks)
}
// CancelOnInterruptTasks cancels all tasks that are scheduled to run when
// host is being interrupted by CTRL+C/CMD+C, when the server is alive and online as well.
func (s *Scheduler) CancelOnInterruptTasks() {
cancelTasks(s.onInterruptTasks)
}
func runTaskNow(task *task, host TaskHost) {
proc := newTaskProcess(host)
task.proc = proc
task.Cancel = func() {
proc.canceledChan <- struct{}{}
}
go task.runner.Run(proc)
}
func runTasks(tasks []*task, host TaskHost) {
for _, t := range tasks {
if t.isCanceled() {
continue
}
runTaskNow(t, host)
}
}
func (s *Scheduler) runOnServe(host TaskHost) {
runTasks(s.onServeTasks, host)
}
func (s *Scheduler) runOnInterrupt(host TaskHost) {
runTasks(s.onInterruptTasks, host)
}
func (s *Scheduler) visit(visitor func(*task)) {
for _, t := range s.onServeTasks {
visitor(t)
}
for _, t := range s.onInterruptTasks {
visitor(t)
}
}
func (s *Scheduler) notifyShutdown() {
s.visit(func(t *task) {
go func() {
t.proc.Host().doneChan <- struct{}{}
}()
})
}
func (s *Scheduler) notifyErr(err error) {
s.visit(func(t *task) {
go func() {
t.proc.Host().errChan <- err
}()
})
}
// CopyTo copies all tasks from "s" to "to" Scheduler.
// It doesn't care about anything else.
func (s *Scheduler) CopyTo(to *Scheduler) {
s.visit(func(t *task) {
rnner := t.runner
to.Schedule(rnner)
})
}