-
Notifications
You must be signed in to change notification settings - Fork 1
/
ttl.go
121 lines (96 loc) · 1.65 KB
/
ttl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package ttl
import (
"sync/atomic"
"time"
)
type Job struct {
Key string
Schedule time.Time
}
func New(exec Runner) *TTL {
return &TTL{
started: &atomic.Bool{},
event: make(chan struct{}),
exec: exec,
jobs: NewHeap(make([]*Job, 0), func(x, y *Job) bool {
return x.Schedule.Before(y.Schedule)
}, func(job *Job) string {
return job.Key
}),
}
}
type Runner func(string) error
type TTL struct {
started *atomic.Bool
event chan struct{}
exec Runner
jobs *Heap[*Job]
}
func (ttl *TTL) Add(job *Job) {
if job == nil {
return
}
ttl.jobs.Push(job)
ttl.notify()
}
func (ttl *TTL) Delete(key string) {
ttl.jobs.Remove(key)
ttl.notify()
}
func (ttl *TTL) Update(job *Job) {
if job == nil || job.Key == "" {
return
}
ttl.jobs.Update(job)
ttl.notify()
}
func (ttl *TTL) Run() error {
ttl.started.Store(true)
for {
if !ttl.started.Load() {
break
}
ttl.run()
}
return nil
}
func (ttl *TTL) Stop() {
ttl.started.Store(false)
close(ttl.event)
}
func (ttl *TTL) IsExpired(key string) bool {
job := ttl.jobs.Get(key)
return job != nil && !job.Schedule.After(time.Now())
}
const infTime time.Duration = 1<<63 - 1
func (ttl *TTL) run() {
now := time.Now()
duration := infTime
job, ok := ttl.jobs.Peek()
if ok {
if job.Schedule.After(now) {
duration = job.Schedule.Sub(now)
} else {
duration = 0
}
}
if duration > 0 {
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-ttl.event:
return
case <-timer.C:
}
}
job, ok = ttl.jobs.Pop()
if !ok {
return
}
go ttl.exec(job.Key)
}
func (ttl *TTL) notify() {
if ttl.started.Load() {
ttl.event <- struct{}{}
}
}