forked from VolantMQ/volantmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
expiry.go
123 lines (103 loc) · 2.44 KB
/
expiry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package clients
import (
"strconv"
"sync"
"time"
"github.com/VolantMQ/vlapi/mqttp"
"github.com/VolantMQ/vlapi/plugin/persistence"
"github.com/VolantMQ/volantmq/types"
)
type expiryEvent interface {
sessionTimer(string, bool)
}
type expiryConfig struct {
expiryEvent
id string
createdAt time.Time
expiringSince time.Time
messenger types.TopicMessenger
will *mqttp.Publish
expireIn *uint32
willIn uint32
}
type expiry struct {
expiryConfig
timerLock sync.Mutex
timer *time.Timer
}
func newExpiry(c expiryConfig) *expiry {
return &expiry{
expiryConfig: c,
}
}
func (s *expiry) start() {
var timerPeriod uint32
// if meet will requirements point that
if s.will != nil && s.willIn > 0 {
timerPeriod = s.willIn
} else {
s.will = nil
}
if s.expireIn != nil {
// if will delay is set before and value less than expiration
// then timer should fire 2 times
if (timerPeriod > 0) && (timerPeriod < *s.expireIn) {
*s.expireIn = *s.expireIn - timerPeriod
} else {
timerPeriod = *s.expireIn
*s.expireIn = 0
}
}
if s.expiringSince.IsZero() {
s.expiringSince = time.Now()
}
s.timerLock.Lock()
s.timer = time.AfterFunc(time.Duration(timerPeriod)*time.Second, s.timerCallback)
s.timerLock.Unlock()
}
func (s *expiry) cancel() bool {
defer s.timerLock.Unlock()
s.timerLock.Lock()
res := s.timer.Stop()
if !res {
<-s.timer.C
}
return res
}
func (s *expiry) persistedState() *persistence.SessionDelays {
exp := &persistence.SessionDelays{
Since: s.expiringSince.Format(time.RFC3339),
}
if s.will != nil {
exp.Will, _ = mqttp.Encode(s.will)
}
if s.expireIn != nil {
exp.ExpireIn = strconv.Itoa(int(*s.expireIn))
}
return exp
}
func (s *expiry) timerCallback() {
defer s.timerLock.Unlock()
s.timerLock.Lock()
// 1. check for will message available
if s.will != nil {
// publish if exists and wipe state
s.messenger.Publish(s.will) // nolint: errcheck
s.will = nil
s.willIn = 0
}
if s.expireIn == nil {
// 2.a session has processed delayed will and there is nothing to do
// completely shutdown the session
s.sessionTimer(s.id, false)
} else if *s.expireIn == 0 {
// session has expired. WIPE IT
s.sessionTimer(s.id, true)
} else {
// restart timer and wait again
val := *s.expireIn
// clear value pointed by expireIn so when next fire comes we signal session is expired
*s.expireIn = 0
s.timer.Reset(time.Duration(val) * time.Second)
}
}