/
resolver_resolver.go
123 lines (111 loc) · 3.21 KB
/
resolver_resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/joy12825/gf.
package resolver
import (
"context"
"errors"
"time"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
"github.com/joy12825/gf/encoding/gjson"
"github.com/joy12825/gf/net/gsvc"
"github.com/joy12825/gf/os/glog"
)
// Resolver implements grpc resolver.Resolver,
// which watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver struct {
discovery gsvc.Discovery // Service discovery.
watcher gsvc.Watcher // Service watcher
watchKey string // Watched key.
cc resolver.ClientConn // GRPC client conn.
ctx context.Context
cancel context.CancelFunc
logger *glog.Logger
}
func (r *Resolver) watch() {
var (
err error
services []gsvc.Service
)
// It updates the resolver state in time.
services, err = r.discovery.Search(r.ctx, gsvc.SearchInput{
Prefix: r.watchKey,
})
if err != nil && !errors.Is(err, context.Canceled) {
r.logger.Warningf(r.ctx, `discovery.Search error: %+v`, err)
}
if len(services) > 0 {
r.update(services)
}
// Then watch.
for {
select {
case <-r.ctx.Done():
return
default:
services, err = r.watcher.Proceed()
if err != nil && !errors.Is(err, context.Canceled) {
r.logger.Warningf(r.ctx, `watcher.Proceed error: %+v`, err)
time.Sleep(time.Second)
continue
}
if len(services) > 0 {
r.update(services)
}
}
}
}
func (r *Resolver) update(services []gsvc.Service) {
var (
err error
addresses = make([]resolver.Address, 0)
)
for _, service := range services {
for _, endpoint := range service.GetEndpoints() {
addr := resolver.Address{
Addr: endpoint.String(),
ServerName: service.GetName(),
Attributes: newAttributesFromMetadata(service.GetMetadata()),
}
addr.Attributes = addr.Attributes.WithValue(rawSvcKeyInSubConnInfo, service)
addresses = append(addresses, addr)
}
}
if len(addresses) == 0 {
r.logger.Noticef(r.ctx, "empty addresses parsed from: %+v", services)
return
}
r.logger.Debugf(r.ctx, "client conn updated with addresses %s", gjson.MustEncodeString(addresses))
if err = r.cc.UpdateState(resolver.State{Addresses: addresses}); err != nil {
r.logger.Errorf(r.ctx, "UpdateState failed: %+v", err)
}
}
// Close closes the resolver.
func (r *Resolver) Close() {
r.logger.Debugf(r.ctx, `resolver closed`)
if err := r.watcher.Close(); err != nil {
r.logger.Errorf(r.ctx, `%+v`, err)
}
r.cancel()
}
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
func (r *Resolver) ResolveNow(options resolver.ResolveNowOptions) {
}
func newAttributesFromMetadata(metadata map[string]interface{}) *attributes.Attributes {
var a *attributes.Attributes
for k, v := range metadata {
if a == nil {
a = attributes.New(k, v)
} else {
a = a.WithValue(k, v)
}
}
return a
}