forked from yunify/metad
/
client.go
106 lines (94 loc) · 2.1 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package local
import (
"github.com/yunify/metad/log"
"github.com/yunify/metad/store"
)
// a backend just for test.
type Client struct {
data store.Store
mapping store.Store
}
func NewLocalClient() (*Client, error) {
return &Client{
data: store.New(),
mapping: store.New(),
}, nil
}
// Get queries etcd for nodePath.
func (c *Client) Get(nodePath string, dir bool) (interface{}, error) {
_, r := c.data.Get(nodePath)
if r != nil {
return r, nil
} else {
if dir {
return map[string]interface{}{}, nil
} else {
return "", nil
}
}
}
func (c *Client) Put(nodePath string, value interface{}, replace bool) error {
if replace {
c.data.Delete(nodePath)
}
c.data.Put(nodePath, value)
return nil
}
func (c *Client) Delete(nodePath string, dir bool) error {
c.data.Delete(nodePath)
return nil
}
func (c *Client) Sync(s store.Store, stopChan chan bool) {
go c.internalSync("data", c.data, s, stopChan)
}
func (c *Client) GetMapping(nodePath string, dir bool) (interface{}, error) {
_, r := c.mapping.Get(nodePath)
if r != nil {
return r, nil
} else {
if dir {
return map[string]interface{}{}, nil
} else {
return "", nil
}
}
}
func (c *Client) PutMapping(nodePath string, mapping interface{}, replace bool) error {
if replace {
c.mapping.Delete(nodePath)
}
c.mapping.Put(nodePath, mapping)
return nil
}
func (c *Client) DeleteMapping(nodePath string, dir bool) error {
c.mapping.Delete(nodePath)
return nil
}
func (c *Client) SyncMapping(mapping store.Store, stopChan chan bool) {
go c.internalSync("mapping", c.mapping, mapping, stopChan)
}
func (c *Client) internalSync(name string, from store.Store, to store.Store, stopChan chan bool) {
w := from.Watch("/", 5000)
_, meta := from.Get("/")
if meta != nil {
to.Put("/", meta)
}
for {
select {
case e, ok := <-w.EventChan():
if !ok {
return
}
log.Debug("processEvent %s %s %s", e.Action, e.Path, e.Value)
switch e.Action {
case store.Delete:
to.Delete(e.Path)
case store.Update:
to.Put(e.Path, e.Value)
}
case <-stopChan:
log.Info("Stop sync %s", name)
w.Remove()
}
}
}