From 403cec06ab0ab0cb0e6fa6f28ec18be3682e1839 Mon Sep 17 00:00:00 2001 From: kevin Date: Mon, 6 May 2024 23:13:00 +0800 Subject: [PATCH 1/2] fix: etcd scheme on grpc resolver, because of grpc resolver logic changed --- zrpc/resolver/internal/discovbuilder.go | 42 +++++++++++++++++++------ zrpc/resolver/internal/kubebuilder.go | 3 +- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/zrpc/resolver/internal/discovbuilder.go b/zrpc/resolver/internal/discovbuilder.go index cf432e23e5ba..89e09fdfbeeb 100644 --- a/zrpc/resolver/internal/discovbuilder.go +++ b/zrpc/resolver/internal/discovbuilder.go @@ -9,16 +9,44 @@ import ( "google.golang.org/grpc/resolver" ) -type discovBuilder struct{} +type discovBuilder struct { + cc resolver.ClientConn + update func() +} func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) ( resolver.Resolver, error) { + b.cc = cc + if err := b.updateState(target); err != nil { + return nil, err + } + + return &nopResolver{cc: cc}, nil +} + +func (b *discovBuilder) Scheme() string { + return DiscovScheme +} + +func (b *discovBuilder) updateState(target resolver.Target) error { + if b.update == nil { + if err := b.buildEndpointsUpdater(target); err != nil { + return err + } + } + + b.update() + + return nil +} + +func (b *discovBuilder) buildEndpointsUpdater(target resolver.Target) error { hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool { return r == EndpointSepChar }) sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target)) if err != nil { - return nil, err + return err } update := func() { @@ -28,18 +56,14 @@ func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ Addr: val, }) } - if err := cc.UpdateState(resolver.State{ + if err := b.cc.UpdateState(resolver.State{ Addresses: addrs, }); err != nil { logx.Error(err) } } sub.AddListener(update) - update() + b.update = update - return &nopResolver{cc: cc}, nil -} - -func (b *discovBuilder) Scheme() string { - return DiscovScheme + return nil } diff --git a/zrpc/resolver/internal/kubebuilder.go b/zrpc/resolver/internal/kubebuilder.go index 63900377d83c..5f8b75566572 100644 --- a/zrpc/resolver/internal/kubebuilder.go +++ b/zrpc/resolver/internal/kubebuilder.go @@ -26,7 +26,8 @@ type kubeResolver struct { stopCh chan struct{} } -func (r *kubeResolver) ResolveNow(_ resolver.ResolveNowOptions) {} +func (r *kubeResolver) ResolveNow(_ resolver.ResolveNowOptions) { +} func (r *kubeResolver) start() { threading.GoSafe(func() { From 4c13f14e7ba9a84c5e91a823f22b438ca09953cf Mon Sep 17 00:00:00 2001 From: kevin Date: Mon, 6 May 2024 23:31:33 +0800 Subject: [PATCH 2/2] chore: coding style --- zrpc/resolver/internal/discovbuilder.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/zrpc/resolver/internal/discovbuilder.go b/zrpc/resolver/internal/discovbuilder.go index 89e09fdfbeeb..3d6e346b5725 100644 --- a/zrpc/resolver/internal/discovbuilder.go +++ b/zrpc/resolver/internal/discovbuilder.go @@ -30,9 +30,12 @@ func (b *discovBuilder) Scheme() string { func (b *discovBuilder) updateState(target resolver.Target) error { if b.update == nil { - if err := b.buildEndpointsUpdater(target); err != nil { + update, err := b.buildEndpointsUpdater(target) + if err != nil { return err } + + b.update = update } b.update() @@ -40,13 +43,13 @@ func (b *discovBuilder) updateState(target resolver.Target) error { return nil } -func (b *discovBuilder) buildEndpointsUpdater(target resolver.Target) error { +func (b *discovBuilder) buildEndpointsUpdater(target resolver.Target) (func(), error) { hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool { return r == EndpointSepChar }) sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target)) if err != nil { - return err + return nil, err } update := func() { @@ -63,7 +66,6 @@ func (b *discovBuilder) buildEndpointsUpdater(target resolver.Target) error { } } sub.AddListener(update) - b.update = update - return nil + return update, nil }