-
Notifications
You must be signed in to change notification settings - Fork 0
/
trie_db.go
80 lines (68 loc) · 1.76 KB
/
trie_db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package trie
import (
"sync"
"github.com/morgeq/iotfast/server/mqtt/retained"
gmqtt "github.com/morgeq/iotfast/server/mqtt"
)
// trieDB implement the retain.Store, it use trie tree to store retain messages .
type trieDB struct {
sync.RWMutex
userTrie *topicTrie
systemTrie *topicTrie
}
func (t *trieDB) Iterate(fn retained.IterateFn) {
t.RLock()
defer t.RUnlock()
if !t.userTrie.preOrderTraverse(fn) {
return
}
t.systemTrie.preOrderTraverse(fn)
}
func (t *trieDB) getTrie(topicName string) *topicTrie {
if isSystemTopic(topicName) {
return t.systemTrie
}
return t.userTrie
}
// GetRetainedMessage return the retain message of the given topic name.
// return nil if the topic name not exists
func (t *trieDB) GetRetainedMessage(topicName string) *gmqtt.Message {
t.RLock()
defer t.RUnlock()
node := t.getTrie(topicName).find(topicName)
if node != nil {
return node.msg.Copy()
}
return nil
}
// ClearAll clear all retain messages.
func (t *trieDB) ClearAll() {
t.Lock()
defer t.Unlock()
t.systemTrie = newTopicTrie()
t.userTrie = newTopicTrie()
}
// AddOrReplace add or replace a retain message.
func (t *trieDB) AddOrReplace(message *gmqtt.Message) {
t.Lock()
defer t.Unlock()
t.getTrie(message.Topic).addRetainMsg(message.Topic, message)
}
// remove remove the retain message of the topic name.
func (t *trieDB) Remove(topicName string) {
t.Lock()
defer t.Unlock()
t.getTrie(topicName).remove(topicName)
}
// GetMatchedMessages returns all messages that match the topic filter.
func (t *trieDB) GetMatchedMessages(topicFilter string) []*gmqtt.Message {
t.RLock()
defer t.RUnlock()
return t.getTrie(topicFilter).getMatchedMessages(topicFilter)
}
func NewStore() *trieDB {
return &trieDB{
userTrie: newTopicTrie(),
systemTrie: newTopicTrie(),
}
}