forked from fagongzi/manba
-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.go
138 lines (111 loc) · 2.95 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package model
import (
"fmt"
"net/url"
"strings"
"time"
"github.com/fagongzi/util/task"
"github.com/toolkits/net"
)
var (
// TICKER ticket
TICKER = time.Second * 3
// TTL timeout
TTL = int64(5)
)
var (
supportSchema = make(map[string]func(string, string, *task.Runner) (Store, error))
)
// EvtType event type
type EvtType int
// EvtSrc event src
type EvtSrc int
const (
// EventTypeNew event type new
EventTypeNew = EvtType(0)
// EventTypeUpdate event type update
EventTypeUpdate = EvtType(1)
// EventTypeDelete event type delete
EventTypeDelete = EvtType(2)
)
const (
// EventSrcCluster cluster event
EventSrcCluster = EvtSrc(0)
// EventSrcServer server event
EventSrcServer = EvtSrc(1)
// EventSrcBind bind event
EventSrcBind = EvtSrc(2)
// EventSrcAPI api event
EventSrcAPI = EvtSrc(3)
// EventSrcRouting routing event
EventSrcRouting = EvtSrc(4)
)
// Evt event
type Evt struct {
Src EvtSrc
Type EvtType
Key string
Value interface{}
}
func init() {
supportSchema["consul"] = getConsulStoreFrom
supportSchema["etcd"] = getEtcdStoreFrom
}
// GetStoreFrom returns a store implemention, if not support returns error
func GetStoreFrom(registryAddr, prefix string, taskRunner *task.Runner) (Store, error) {
u, err := url.Parse(registryAddr)
if err != nil {
panic(fmt.Sprintf("parse registry addr failed, errors:%+v", err))
}
schema := strings.ToLower(u.Scheme)
fn, ok := supportSchema[schema]
if ok {
return fn(u.Host, prefix, taskRunner)
}
return nil, fmt.Errorf("not support: %s", registryAddr)
}
func getConsulStoreFrom(addr, prefix string, taskRunner *task.Runner) (Store, error) {
return NewConsulStore(addr, prefix, taskRunner)
}
func getEtcdStoreFrom(addr, prefix string, taskRunner *task.Runner) (Store, error) {
var addrs []string
values := strings.Split(addr, ",")
for _, value := range values {
addrs = append(addrs, fmt.Sprintf("http://%s", value))
}
return NewEtcdStore(addrs, prefix, taskRunner)
}
func convertIP(addr string) string {
if strings.HasPrefix(addr, ":") {
ips, err := net.IntranetIP()
if err == nil {
addr = strings.Replace(addr, ":", fmt.Sprintf("%s:", ips[0]), 1)
}
}
return addr
}
// Store store interface
type Store interface {
SaveBind(bind *Bind) error
UnBind(bind *Bind) error
GetBinds() ([]*Bind, error)
SaveCluster(cluster *Cluster) error
UpdateCluster(cluster *Cluster) error
DeleteCluster(name string) error
GetClusters() ([]*Cluster, error)
GetCluster(clusterName string) (*Cluster, error)
SaveServer(svr *Server) error
UpdateServer(svr *Server) error
DeleteServer(addr string) error
GetServers() ([]*Server, error)
GetServer(serverAddr string) (*Server, error)
SaveAPI(api *API) error
UpdateAPI(api *API) error
DeleteAPI(url string, method string) error
GetAPIs() ([]*API, error)
GetAPI(url string, method string) (*API, error)
SaveRouting(routing *Routing) error
GetRoutings() ([]*Routing, error)
Watch(evtCh chan *Evt, stopCh chan bool) error
Clean() error
}