forked from CyCoreSystems/ari-proxy
/
cluster.go
149 lines (123 loc) · 3.37 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package cluster
import (
"strings"
"sync"
"time"
)
// AutoPurgeInterval is the maximum amount of time to wait before automatically purging the cluster of stale members
var AutoPurgeInterval = 24 * time.Hour
// AutoPurgeAge is the maximum age allowed for members' last update when automatically purging.
var AutoPurgeAge = 12 * time.Hour
// Cluster describes the set of ari proxies in a system. The list is indexed by a hash of the asterisk ID and the ARI application and indicates the time of last contact.
type Cluster struct {
lastPurge time.Time
members map[string]time.Time
mu sync.Mutex
}
// New returns a new Cluster
func New() *Cluster {
return &Cluster{
members: make(map[string]time.Time),
}
}
// hash returns the key for a given proxy instance
func hash(id, app string) string {
return id + "|" + app
}
// dehash returns the Asterisk ID and ARI application represented by the given key
func dehash(key string) (id string, app string) {
pieces := strings.Split(key, "|")
if len(pieces) < 2 {
return
}
return pieces[0], pieces[1]
}
// Member describes the state of a Member of an application cluster
type Member struct {
// ID is the unique identifier for the Asterisk node
ID string
// App indicates the ARI application of this proxy
App string
// LastActive is the timestamp of the last occurrence of this node
LastActive time.Time
}
// All returns a list of all cluster members whose LastActive time is no older thatn the given maxAge.
func (c *Cluster) All(maxAge time.Duration) (list []Member) {
c.mu.Lock()
defer c.mu.Unlock()
for k, v := range c.members {
if maxAge == 0 || time.Since(v) < maxAge {
id, app := dehash(k)
list = append(list, Member{
ID: id,
App: app,
LastActive: v,
})
}
}
return
}
// App returns a list of all cluster members for the given ARI Application whose LastActive time is no older than the given maxAge.
func (c *Cluster) App(app string, maxAge time.Duration) (list []Member) {
c.mu.Lock()
defer c.mu.Unlock()
for k, v := range c.members {
i, a := dehash(k)
if app == a && (maxAge == 0 || time.Since(v) < maxAge) {
list = append(list, Member{
ID: i,
App: a,
LastActive: v,
})
}
}
return
}
// Matching returns a list of all cluster members for whom the given proxy Metadata matches
func (c *Cluster) Matching(id, app string, maxAge time.Duration) (list []Member) {
c.mu.Lock()
defer c.mu.Unlock()
for k, v := range c.members {
if time.Since(v) > maxAge {
continue
}
i, a := dehash(k)
if id != "" && id != i {
continue
}
if app != "" && app != a {
continue
}
list = append(list, Member{
ID: i,
App: a,
LastActive: v,
})
}
return
}
// Update adds (or updates) a proxy to/in the cluster
func (c *Cluster) Update(id, app string) {
c.mu.Lock()
c.members[hash(id, app)] = time.Now()
c.mu.Unlock()
// See if it is time to auto-purge
if time.Since(c.lastPurge) > AutoPurgeInterval {
c.Purge(AutoPurgeAge)
}
}
// Purge removes any proxies in the cluster which are older than the given maxAge.
func (c *Cluster) Purge(maxAge time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.lastPurge = time.Now()
var removalKeys []string
for k, v := range c.members {
if maxAge == 0 || time.Since(v) > maxAge {
removalKeys = append(removalKeys, k)
}
}
for _, key := range removalKeys {
delete(c.members, key)
}
}