This repository has been archived by the owner on Mar 31, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 80
/
notify_db.go
121 lines (100 loc) · 2.21 KB
/
notify_db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package firetest
import (
"encoding/json"
"strings"
_sync "sync"
"time"
"github.com/zabawaba99/firego/sync"
)
type event struct {
Name string
Data eventData
}
type eventData struct {
Path string `json:"path"`
Data *sync.Node `json:"data"`
}
func (ed eventData) MarshalJSON() ([]byte, error) {
type eventData2 eventData
ed2 := eventData2(ed)
ed2.Path = "/" + ed2.Path
return json.Marshal(ed2)
}
func newEvent(name, path string, n *sync.Node) event {
return event{
Name: "put",
Data: eventData{
Path: path,
Data: n,
},
}
}
type notifyDB struct {
intDB *sync.Database
watchersMtx _sync.RWMutex
watchers map[string][]chan event
}
func newNotifyDB() *notifyDB {
return ¬ifyDB{
intDB: sync.NewDB(),
watchers: map[string][]chan event{},
}
}
func (db *notifyDB) add(path string, n *sync.Node) {
db.intDB.Add(path, n)
go db.notify(newEvent("put", path, n))
}
func (db *notifyDB) update(path string, n *sync.Node) {
db.intDB.Update(path, n)
go db.notify(newEvent("patch", path, n))
}
func (db *notifyDB) del(path string) {
db.intDB.Del(path)
go db.notify(newEvent("put", path, nil))
}
func (db *notifyDB) get(path string) *sync.Node {
return db.intDB.Get(path)
}
func (db *notifyDB) notify(e event) {
db.watchersMtx.RLock()
for path, listeners := range db.watchers {
if !strings.HasPrefix(e.Data.Path, path) {
continue
}
// Make sure to not return full path when notifying
// only return the path relative to the watcher
e.Data.Path = strings.TrimPrefix(e.Data.Path, path)
e.Data.Path = sanitizePath(e.Data.Path)
for _, c := range listeners {
select {
case c <- e:
case <-time.After(250 * time.Millisecond):
continue
}
}
}
db.watchersMtx.RUnlock()
}
func (db *notifyDB) stopWatching(path string, c chan event) {
db.watchersMtx.Lock()
index := -1
for i, ch := range db.watchers[path] {
if ch == c {
index = i
break
}
}
if index > -1 {
a := db.watchers[path]
db.watchers[path] = append(a[:index], a[index+1:]...)
close(c)
}
db.watchersMtx.Unlock()
}
func (db *notifyDB) watch(path string) chan event {
c := make(chan event)
db.watchersMtx.Lock()
db.watchers[path] = append(db.watchers[path], c)
db.watchersMtx.Unlock()
return c
}