forked from hashicorp/serf
/
mdns.go
133 lines (116 loc) · 2.85 KB
/
mdns.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package agent
import (
"fmt"
"github.com/armon/mdns"
"io"
"log"
"net"
"time"
)
const (
mdnsPollInterval = 60 * time.Second
mdnsQuietInterval = 100 * time.Millisecond
)
// AgentMDNS is used to advertise ourself using mDNS and to
// attempt to join peers periodically using mDNS queries.
type AgentMDNS struct {
agent *Agent
discover string
logger *log.Logger
seen map[string]struct{}
server *mdns.Server
replay bool
iface *net.Interface
}
// NewAgentMDNS is used to create a new AgentMDNS
func NewAgentMDNS(agent *Agent, logOutput io.Writer, replay bool,
node, discover string, iface *net.Interface, bind net.IP, port int) (*AgentMDNS, error) {
// Create the service
service := &mdns.MDNSService{
Instance: node,
Service: mdnsName(discover),
Addr: bind,
Port: port,
Info: fmt.Sprintf("Serf '%s' cluster", discover),
}
if err := service.Init(); err != nil {
return nil, err
}
// Configure mdns server
conf := &mdns.Config{
Zone: service,
Iface: iface,
}
// Create the server
server, err := mdns.NewServer(conf)
if err != nil {
return nil, err
}
// Initialize the AgentMDNS
m := &AgentMDNS{
agent: agent,
discover: discover,
logger: log.New(logOutput, "", log.LstdFlags),
seen: make(map[string]struct{}),
server: server,
replay: replay,
iface: iface,
}
// Start the background workers
go m.run()
return m, nil
}
// run is a long running goroutine that scans for new hosts periodically
func (m *AgentMDNS) run() {
hosts := make(chan *mdns.ServiceEntry, 32)
poll := time.After(0)
var quiet <-chan time.Time
var join []string
for {
select {
case h := <-hosts:
// Format the host address
addr := net.TCPAddr{IP: h.Addr, Port: h.Port}
addrS := addr.String()
// Skip if we've handled this host already
if _, ok := m.seen[addrS]; ok {
continue
}
// Queue for handling
join = append(join, addrS)
quiet = time.After(mdnsQuietInterval)
case <-quiet:
// Attempt the join
n, err := m.agent.Join(join, m.replay)
if err != nil {
m.logger.Printf("[ERR] agent.mdns: Failed to join: %v", err)
}
if n > 0 {
m.logger.Printf("[INFO] agent.mdns: Joined %d hosts", n)
}
// Mark all as seen
for _, n := range join {
m.seen[n] = struct{}{}
}
join = nil
case <-poll:
poll = time.After(mdnsPollInterval)
go m.poll(hosts)
}
}
}
// poll is invoked periodically to check for new hosts
func (m *AgentMDNS) poll(hosts chan *mdns.ServiceEntry) {
params := mdns.QueryParam{
Service: mdnsName(m.discover),
Interface: m.iface,
Entries: hosts,
}
if err := mdns.Query(¶ms); err != nil {
m.logger.Printf("[ERR] agent.mdns: Failed to poll for new hosts: %v", err)
}
}
// mdnsName returns the service name to register and to lookup
func mdnsName(discover string) string {
return fmt.Sprintf("_serf_%s._tcp", discover)
}