-
Notifications
You must be signed in to change notification settings - Fork 0
/
mdns.go
108 lines (84 loc) · 2.48 KB
/
mdns.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Package mdns is a multicast dns registry
package mdns
import (
"context"
"github.com/grandcat/zeroconf"
"github.com/pubgo/funk/assert"
"github.com/pubgo/funk/errors"
"github.com/pubgo/funk/log"
"github.com/pubgo/funk/merge"
"github.com/pubgo/funk/recovery"
"github.com/pubgo/funk/typex"
"github.com/pubgo/lava/core/registry"
"github.com/pubgo/lava/core/service"
)
const (
zeroconfService = "_lava._tcp"
zeroconfDomain = "local."
zeroconfInstance = "lava"
)
func New(conf *registry.Config, log log.Logger) registry.Registry {
if conf.Driver != Name {
return nil
}
var cfg Cfg
merge.MapStruct(&cfg, conf.DriverCfg).Unwrap()
resolver, err := zeroconf.NewResolver()
assert.MustF(err, "Failed to initialize zeroconf resolver")
return &mdnsRegistry{resolver: resolver, cfg: cfg, log: log.WithName(registry.Name).WithName(Name)}
}
type serverNode struct {
srv *zeroconf.Server
name string
id string
}
var _ registry.Registry = (*mdnsRegistry)(nil)
type mdnsRegistry struct {
cfg Cfg
services typex.SyncMap
resolver *zeroconf.Resolver
log log.Logger
}
func (m *mdnsRegistry) Close() {
}
func (m *mdnsRegistry) Init() {
}
func (m *mdnsRegistry) Register(ctx context.Context, service *service.Service, optList ...registry.RegOpt) (gErr error) {
defer recovery.Recovery(func(err error) {
gErr = errors.WrapKV(err, "service", service)
})
assert.If(service == nil, "[service] should not be nil")
assert.If(len(service.Nodes) == 0, "[service] nodes should not be zero")
node := service.Nodes[0]
// 已经存在
if m.services.Has(node.Id) {
return
}
server, err := zeroconf.Register(node.Id, service.Name, zeroconfDomain, node.GetPort(), []string{node.Id}, nil)
assert.MustF(err, "[mdns] service %s register error", service.Name)
var opts registry.RegOpts
for i := range optList {
optList[i](&opts)
}
m.services.Set(node.Id, &serverNode{
srv: server,
id: node.Id,
name: service.Name,
})
return
}
func (m *mdnsRegistry) Deregister(ctx context.Context, service *service.Service, opt ...registry.DeregOpt) (gErr error) {
defer recovery.Recovery(func(err error) {
gErr = errors.WrapKV(err, "service", service)
})
assert.If(service == nil, "[service] should not be nil")
assert.If(len(service.Nodes) == 0, "[service] nodes should not be zero")
node := service.Nodes[0]
val, ok := m.services.LoadAndDelete(node.Id)
if !ok || val == nil {
return
}
val.(*serverNode).srv.Shutdown()
return
}
func (m *mdnsRegistry) String() string { return Name }