-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.go
57 lines (49 loc) · 1.81 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package scheduler
import (
"sync"
"time"
)
// ProcessFunc is a function to process an item in the work queue.
type ProcessFunc func(interface{})
// ScheduledWorkQueue is an interface to describe a queue that will execute the
// given ProcessFunc with the object given to Add once the time.Duration is up,
// since the time of calling Add.
type ScheduledWorkQueue interface {
// Add will add an item to this queue, executing the ProcessFunc after the
// Duration has come (since the time Add was called). If an existing Timer
// for obj already exists, the previous timer will be cancelled.
Add(interface{}, time.Duration)
// Forget will cancel the timer for the given object, if the timer exists.
Forget(interface{})
}
type scheduledWorkQueue struct {
processFunc ProcessFunc
work map[interface{}]*time.Timer
workLock sync.Mutex
}
// NewScheduledWorkQueue will create a new workqueue with the given processFunc
func NewScheduledWorkQueue(processFunc ProcessFunc) ScheduledWorkQueue {
return &scheduledWorkQueue{processFunc, make(map[interface{}]*time.Timer), sync.Mutex{}}
}
// Add will add an item to this queue, executing the ProcessFunc after the
// Duration has come (since the time Add was called). If an existing Timer for
// obj already exists, the previous timer will be cancelled.
func (s *scheduledWorkQueue) Add(obj interface{}, duration time.Duration) {
s.clearTimer(obj)
s.work[obj] = time.AfterFunc(duration, func() {
defer s.clearTimer(obj)
s.processFunc(obj)
})
}
// Forget will cancel the timer for the given object, if the timer exists.
func (s *scheduledWorkQueue) Forget(obj interface{}) {
s.clearTimer(obj)
}
func (s *scheduledWorkQueue) clearTimer(obj interface{}) {
s.workLock.Lock()
defer s.workLock.Unlock()
if timer, ok := s.work[obj]; ok {
timer.Stop()
delete(s.work, obj)
}
}