From e0384a79a9db7c2201e40b0434c4847d650222b3 Mon Sep 17 00:00:00 2001 From: whalecold Date: Thu, 21 Mar 2024 13:00:55 +0800 Subject: [PATCH] feat --- core/manager/client.go | 2 +- core/xdsresource/matcher.go | 13 ++++++++++++ core/xdsresource/rds.go | 15 ++++++++++++++ xdssuite/limiter.go | 2 +- xdssuite/retry.go | 41 +++++++++++++++++++++++++++++++++++++ 5 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 xdssuite/retry.go diff --git a/core/manager/client.go b/core/manager/client.go index 22336cc..c338e34 100644 --- a/core/manager/client.go +++ b/core/manager/client.go @@ -455,7 +455,7 @@ func (c *xdsClient) getListenerName(rName string) (string, error) { if len(cip) > 0 { return cip + "_" + port, nil } - return "", fmt.Errorf("failed to convert listener name for %s", rName) + return "", fmt.Errorf("failed to convert listener name for %s addr %s", rName, addr) } // handleLDS handles the lds response diff --git a/core/xdsresource/matcher.go b/core/xdsresource/matcher.go index cbd56ef..99e5a94 100644 --- a/core/xdsresource/matcher.go +++ b/core/xdsresource/matcher.go @@ -31,12 +31,20 @@ func (pm PrefixMatcher) Match(other string) bool { return strings.HasPrefix(other, string(pm)) } +func (pm PrefixMatcher) GetExact() string { + return "" +} + type ExactMatcher string func (em ExactMatcher) Match(other string) bool { return string(em) == other } +func (em ExactMatcher) GetExact() string { + return string(em) +} + type RegexMatcher struct { re *regexp.Regexp } @@ -45,8 +53,13 @@ func (rm *RegexMatcher) Match(other string) bool { return rm.re.MatchString(other) } +func (rm *RegexMatcher) GetExact() string { + return "" +} + type Matcher interface { Match(string) bool + GetExact() string } type Matchers map[string]Matcher diff --git a/core/xdsresource/rds.go b/core/xdsresource/rds.go index 376b5ac..063ae0d 100644 --- a/core/xdsresource/rds.go +++ b/core/xdsresource/rds.go @@ -54,10 +54,18 @@ type WeightedCluster struct { Weight uint32 } +type RetryPolicy struct { + RetryOn string + NumRetries int + PerTryTimeout int64 +} + type Route struct { Match RouteMatch WeightedClusters []*WeightedCluster Timeout time.Duration + RetryPolicy RetryPolicy + Domains map[string]struct{} } type RouteMatch interface { @@ -160,6 +168,13 @@ func unmarshalRoutes(rs []*v3routepb.Route) ([]*Route, error) { route.WeightedClusters = clusters } route.Timeout = a.Route.GetTimeout().AsDuration() + if retryPolicy := a.Route.GetRetryPolicy(); retryPolicy != nil { + route.RetryPolicy = RetryPolicy{ + RetryOn: retryPolicy.GetRetryOn(), + NumRetries: int(retryPolicy.GetNumRetries().GetValue()), + PerTryTimeout: retryPolicy.GetPerTryTimeout().AsDuration().Milliseconds(), + } + } } routes[i] = route } diff --git a/xdssuite/limiter.go b/xdssuite/limiter.go index 04b36a9..4922884 100644 --- a/xdssuite/limiter.go +++ b/xdssuite/limiter.go @@ -1,5 +1,5 @@ /* - * Copyright 2022 CloudWeGo Authors + * Copyright 2024 CloudWeGo Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/xdssuite/retry.go b/xdssuite/retry.go new file mode 100644 index 0000000..aff905d --- /dev/null +++ b/xdssuite/retry.go @@ -0,0 +1,41 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xdssuite + +import ( + "github.com/cloudwego/kitex/client" + "github.com/cloudwego/kitex/pkg/retry" + "github.com/kitex-contrib/xds/core/xdsresource" +) + +func updateRetryPolicy(rc *retry.Container, res map[string]xdsresource.Resource) { + +} + +// NewRetryPolicy integrate xds config and kitex circuitbreaker +func NewRetryPolicy() client.Option { + m := xdsResourceManager.getManager() + if m == nil { + panic("xds resource manager has not been initialized") + } + retryContainer := retry.NewRetryContainerWithPercentageLimit() + + m.RegisterXDSUpdateHandler(xdsresource.RouteConfigType, func(res map[string]xdsresource.Resource) { + updateRetryPolicy(retryContainer, res) + }) + return client.WithRetryContainer(retryContainer) +}