forked from mantl/mesos-consul
-
Notifications
You must be signed in to change notification settings - Fork 0
/
zk.go
144 lines (119 loc) · 2.89 KB
/
zk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package mesos
import (
"encoding/binary"
"fmt"
"log"
"net"
"sync"
"time"
"github.com/mesos/mesos-go/mesosproto"
zoo "github.com/CiscoCloud/mesos-consul/mesos/zkdetect"
)
func (m *Mesos) zkDetector(zkURI string) {
if (zkURI == "") {
log.Fatal("[ERROR] Zookeeper address not provided")
}
dr, err := m.leaderDetect(zkURI)
if err != nil {
log.Fatal("[ERROR] ", err.Error())
}
log.Print("[INFO] Waiting for initial leader information from Zookeeper")
select {
case <-dr:
log.Print("[INFO] Done waiting for initial leader information from Zookeeper")
case <-time.After(2 * time.Minute):
log.Fatal("[ERROR] Timed out waiting for initial ZK detection")
}
}
func (m *Mesos) leaderDetect(zkURI string) (<-chan struct{}, error) {
log.Print("[INFO] Starting leader detector for ZK ", zkURI)
md, err := zoo.NewClusterDetector(zkURI)
if err != nil {
return nil, fmt.Errorf("failed to create master detector: %v", err)
}
var startedOnce sync.Once
started := make(chan struct{})
if err := md.Detect(zoo.OnClusterChanged(func(info *zoo.ClusterInfo) {
m.Lock.Lock()
defer m.Lock.Unlock()
m.Masters = new([]MesosHost)
// Handle list of masters
for _, ma := range *info.Masters {
mh := m.hostFromMasterInfo(ma)
*m.Masters = append(*m.Masters, mh)
}
// Handle leader
ma := m.hostFromMasterInfo(info.Leader)
if len(ma.host) > 0 {
ma.isLeader = true
}
*m.Masters = append(*m.Masters, ma)
startedOnce.Do(func() { close(started) })
})); err != nil {
return nil, fmt.Errorf("failed to initalize master detector: %v", err)
}
return started, nil
}
// Get the leader out of the list of masters
//
func (m *Mesos) getLeader() (string, string) {
m.Lock.Lock()
defer m.Lock.Unlock()
for _, ms := range *m.Masters {
if ms.isLeader {
return ms.host, ms.port
}
}
return "", ""
}
func (m *Mesos) getMasters() []MesosHost {
m.Lock.Lock()
defer m.Lock.Unlock()
ms := make([]MesosHost, len(*m.Masters))
for i, msp := range *m.Masters {
mh := MesosHost{
host: msp.host,
port: msp.port,
isLeader: msp.isLeader,
}
ms[i] = mh
}
return ms
}
func (m *Mesos) hostFromMasterInfo(mi *mesosproto.MasterInfo) MesosHost {
var ipstring = ""
var port = ""
if mi != nil {
if host := mi.GetHostname(); host != "" {
ip, err := net.LookupIP(host)
if err != nil {
ipstring = host
} else {
for _,i := range(ip) {
four := i.To4()
if four != nil {
ipstring = i.String()
break
}
}
// If control reaches here there are no IPv4 addresses
// returned by net.LookupIP. Use the hostname as ipstring
//
ipstring = host
}
} else {
octets := make([]byte, 4, 4)
binary.BigEndian.PutUint32(octets, mi.GetIp())
ipv4 := net.IP(octets)
ipstring = ipv4.String()
}
}
if len(ipstring) > 0 {
port = fmt.Sprint(mi.GetPort())
}
return MesosHost{
host: ipstring,
port: port,
isLeader: false,
}
}