forked from xtaci/kcp-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
timedsched.go
146 lines (130 loc) · 3.27 KB
/
timedsched.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package kcp
import (
"container/heap"
"runtime"
"sync"
"time"
)
// SystemTimedSched is the library level timed-scheduler
var SystemTimedSched *TimedSched = NewTimedSched(runtime.NumCPU())
type timedFunc struct {
execute func()
ts time.Time
}
// a heap for sorted timed function
type timedFuncHeap []timedFunc
func (h timedFuncHeap) Len() int { return len(h) }
func (h timedFuncHeap) Less(i, j int) bool { return h[i].ts.Before(h[j].ts) }
func (h timedFuncHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *timedFuncHeap) Push(x interface{}) { *h = append(*h, x.(timedFunc)) }
func (h *timedFuncHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
old[n-1].execute = nil // avoid memory leak
*h = old[0 : n-1]
return x
}
// TimedSched represents the control struct for timed parallel scheduler
type TimedSched struct {
// prepending tasks
prependTasks []timedFunc
prependLock sync.Mutex
chPrependNotify chan struct{}
// tasks will be distributed through chTask
chTask chan timedFunc
dieOnce sync.Once
die chan struct{}
}
// NewTimedSched creates a parallel-scheduler with given parallelization
func NewTimedSched(parallel int) *TimedSched {
ts := new(TimedSched)
ts.chTask = make(chan timedFunc)
ts.die = make(chan struct{})
ts.chPrependNotify = make(chan struct{}, 1)
for i := 0; i < parallel; i++ {
go ts.sched()
}
go ts.prepend()
return ts
}
func (ts *TimedSched) sched() {
var tasks timedFuncHeap
timer := time.NewTimer(0)
drained := false
for {
select {
case task := <-ts.chTask:
now := time.Now()
if now.After(task.ts) {
// already delayed! execute immediately
task.execute()
} else {
heap.Push(&tasks, task)
// properly reset timer to trigger based on the top element
stopped := timer.Stop()
if !stopped && !drained {
<-timer.C
}
timer.Reset(tasks[0].ts.Sub(now))
drained = false
}
case now := <-timer.C:
drained = true
for tasks.Len() > 0 {
if now.After(tasks[0].ts) {
heap.Pop(&tasks).(timedFunc).execute()
} else {
timer.Reset(tasks[0].ts.Sub(now))
drained = false
break
}
}
case <-ts.die:
return
}
}
}
func (ts *TimedSched) prepend() {
var tasks []timedFunc
for {
select {
case <-ts.chPrependNotify:
ts.prependLock.Lock()
// keep cap to reuse slice
if cap(tasks) < cap(ts.prependTasks) {
tasks = make([]timedFunc, 0, cap(ts.prependTasks))
}
tasks = tasks[:len(ts.prependTasks)]
copy(tasks, ts.prependTasks)
for k := range ts.prependTasks {
ts.prependTasks[k].execute = nil // avoid memory leak
}
ts.prependTasks = ts.prependTasks[:0]
ts.prependLock.Unlock()
for k := range tasks {
select {
case ts.chTask <- tasks[k]:
tasks[k].execute = nil // avoid memory leak
case <-ts.die:
return
}
}
tasks = tasks[:0]
case <-ts.die:
return
}
}
}
// Put a function 'f' awaiting to be executed at 'deadline'
func (ts *TimedSched) Put(f func(), deadline time.Time) {
ts.prependLock.Lock()
ts.prependTasks = append(ts.prependTasks, timedFunc{f, deadline})
ts.prependLock.Unlock()
select {
case ts.chPrependNotify <- struct{}{}:
default:
}
}
// Close terminates this scheduler
func (ts *TimedSched) Close() { ts.dieOnce.Do(func() { close(ts.die) }) }