Skip to content

Commit

Permalink
[ISSUE #1131] 针对DataPlane -> ControlPlane 方向网络故障引起实例大面积不健康问题支持零实例保护 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed May 21, 2023
1 parent 83c6dfd commit febe7c0
Show file tree
Hide file tree
Showing 28 changed files with 1,089 additions and 1,031 deletions.
7 changes: 3 additions & 4 deletions apiserver/xdsserverv3/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,6 @@ func (x *XDSServer) makeGatewayRoutes(namespace string, xdsNode *XDSClient) []*r
return
}

routeMatch := &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{Prefix: "/"},
}

for i := range rule.RuleRouting.Rules {
subRule := rule.RuleRouting.Rules[i]
// 先判断 dest 的服务是否满足目标 namespace
Expand All @@ -160,6 +156,9 @@ func (x *XDSServer) makeGatewayRoutes(namespace string, xdsNode *XDSClient) []*r
continue
}

routeMatch := &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{Prefix: "/"},
}
for _, source := range subRule.Sources {
if !isMatchGatewaySource(source, callerService, callerNamespace) {
continue
Expand Down
23 changes: 8 additions & 15 deletions apiserver/xdsserverv3/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"bytes"
_ "embed"
"encoding/json"
"fmt"
"os"
"reflect"
"testing"
Expand Down Expand Up @@ -207,7 +206,10 @@ func Test_makeLocalRateLimit(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := mockXds.makeLocalRateLimit(tt.args.svc); !reflect.DeepEqual(got, tt.want) {
if got := mockXds.makeLocalRateLimit(model.ServiceKey{
Namespace: tt.args.svc.Namespace,
Name: tt.args.svc.Name,
}); !reflect.DeepEqual(got, tt.want) {
t.Errorf("makeLocalRateLimit() = %v, want %v", got, tt.want)
}
})
Expand Down Expand Up @@ -454,28 +456,19 @@ func TestSnapshot(t *testing.T) {

snapshot, _ := x.cache.GetSnapshot("default")
dumpYaml := dumpSnapShot(snapshot)
if !bytes.Equal(noInboundDump, dumpYaml) {
t.Fatal("\n" + string(dumpYaml))
}
assert.Equal(t, noInboundDump, dumpYaml)

snapshot, _ = x.cache.GetSnapshot("default/permissive")
dumpYaml = dumpSnapShot(snapshot)
if !bytes.Equal(permissiveDump, dumpYaml) {
t.Fatal("\n" + string(dumpYaml))
}
assert.Equal(t, permissiveDump, dumpYaml)

snapshot, _ = x.cache.GetSnapshot("default/strict")
dumpYaml = dumpSnapShot(snapshot)
if !bytes.Equal(strictDump, dumpYaml) {
t.Fatal("\n" + string(dumpYaml))
}
assert.Equal(t, strictDump, dumpYaml)

snapshot, _ = x.cache.GetSnapshot("gateway~default/9b9f5630-81a1-47cd-a558-036eb616dc71~172.17.1.1")
dumpYaml = dumpSnapShot(snapshot)
if !bytes.Equal(gatewayDump, dumpYaml) {
fmt.Printf("\n%s\n", string(dumpYaml))
t.Fatal("\n" + string(dumpYaml))
}
assert.Equalf(t, string(gatewayDump), string(dumpYaml), "expect : %s, actual : %s", string(gatewayDump), string(dumpYaml))
}

// ParseArrayByText 通过字符串解析PB数组对象
Expand Down
12 changes: 6 additions & 6 deletions apiserver/xdsserverv3/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,12 @@ func buildSidecarRouteMatch(routeMatch *route.RouteMatch, source *traffic_manage
buildCommonRouteMatch(routeMatch, source)
}

func (x *XDSServer) makeLocalRateLimit(si *ServiceInfo) map[string]*anypb.Any {
func (x *XDSServer) makeLocalRateLimit(svcKey model.ServiceKey) map[string]*anypb.Any {
ratelimitGetter := x.RatelimitConfigGetter
if ratelimitGetter == nil {
ratelimitGetter = x.namingServer.Cache().RateLimit().GetRateLimitRules
}
conf, _ := ratelimitGetter(model.ServiceKey{
Namespace: si.Namespace,
Name: si.Name,
})
conf, _ := ratelimitGetter(svcKey)
filters := make(map[string]*anypb.Any)
if conf != nil {
rateLimitConf := &lrl.LocalRateLimit{
Expand Down Expand Up @@ -252,7 +249,10 @@ func (x *XDSServer) makeSidecarVirtualHosts(services []*ServiceInfo) []types.Res
Name: serviceInfo.Name,
Domains: generateServiceDomains(serviceInfo),
Routes: makeSidecarRoutes(serviceInfo),
// TypedPerFilterConfig: x.makeLocalRateLimit(serviceInfo),
// TypedPerFilterConfig: x.makeLocalRateLimit(model.ServiceKey{
// Namespace: serviceInfo.Namespace,
// Name: serviceInfo.Name,
// }),
}
hosts = append(hosts, vHost)
}
Expand Down
2 changes: 1 addition & 1 deletion cache/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func newCacheManager(ctx context.Context, cacheOpt *Config, storage store.Store)

// call cache.addlistener here, need ensure that all of cache impl has been instantiated and loaded
mgr.AddListener(CacheNameInstance, []Listener{
&WatchInstanceReload{
&watchInstanceReload{
Handler: func(val interface{}) {
if svcIds, ok := val.(map[string]bool); ok {
mgr.caches[CacheService].(*serviceCache).notifyServiceCountReload(svcIds)
Expand Down
15 changes: 7 additions & 8 deletions cache/routing_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"golang.org/x/sync/singleflight"

"github.com/polarismesh/polaris/common/model"
routingcommon "github.com/polarismesh/polaris/common/routing"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/store"
)
Expand Down Expand Up @@ -310,7 +309,7 @@ func (rc *routingConfigCache) convertV1toV2(rule *model.RoutingConfig) (bool, []
return false, nil, fmt.Errorf("svc: %+v is alias", svc)
}

in, out, err := routingcommon.ConvertRoutingV1ToExtendV2(svc.Name, svc.Namespace, rule)
in, out, err := model.ConvertRoutingV1ToExtendV2(svc.Name, svc.Namespace, rule)
if err != nil {
return false, nil, err
}
Expand All @@ -328,22 +327,22 @@ func (rc *routingConfigCache) convertV2toV1(entries map[routingLevel][]*model.Ex
service, namespace string) ([]*apitraffic.Route, []*apitraffic.Route, []string) {
level1 := entries[level1RoutingV2]
sort.Slice(level1, func(i, j int) bool {
return routingcommon.CompareRoutingV2(level1[i], level1[j])
return model.CompareRoutingV2(level1[i], level1[j])
})

level2 := entries[level2RoutingV2]
sort.Slice(level2, func(i, j int) bool {
return routingcommon.CompareRoutingV2(level2[i], level2[j])
return model.CompareRoutingV2(level2[i], level2[j])
})

level3 := entries[level3RoutingV2]
sort.Slice(level3, func(i, j int) bool {
return routingcommon.CompareRoutingV2(level3[i], level3[j])
return model.CompareRoutingV2(level3[i], level3[j])
})

level1inRoutes, level1outRoutes, level1Revisions := routingcommon.BuildV1RoutesFromV2(service, namespace, level1)
level2inRoutes, level2outRoutes, level2Revisions := routingcommon.BuildV1RoutesFromV2(service, namespace, level2)
level3inRoutes, level3outRoutes, level3Revisions := routingcommon.BuildV1RoutesFromV2(service, namespace, level3)
level1inRoutes, level1outRoutes, level1Revisions := model.BuildV1RoutesFromV2(service, namespace, level1)
level2inRoutes, level2outRoutes, level2Revisions := model.BuildV1RoutesFromV2(service, namespace, level2)
level3inRoutes, level3outRoutes, level3Revisions := model.BuildV1RoutesFromV2(service, namespace, level3)

revisions := make([]string, 0, len(level1Revisions)+len(level2Revisions)+len(level3Revisions))
revisions = append(revisions, level1Revisions...)
Expand Down
13 changes: 6 additions & 7 deletions cache/routing_config_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"

"github.com/polarismesh/polaris/common/model"
routingcommon "github.com/polarismesh/polaris/common/routing"
)

type (
Expand Down Expand Up @@ -154,7 +153,7 @@ func (b *routeRuleBucket) saveV2(conf *model.ExtendRouterConfig) {
b.rules[conf.ID] = conf
handler := func(bt boundType, item serviceInfo) {
// level1 级别 cache 处理
if item.GetService() != routingcommon.MatchAll && item.GetNamespace() != routingcommon.MatchAll {
if item.GetService() != model.MatchAll && item.GetNamespace() != model.MatchAll {
key := buildServiceKey(item.GetNamespace(), item.GetService())
if _, ok := b.level1Rules[key]; !ok {
b.level1Rules[key] = map[string]struct{}{}
Expand All @@ -164,15 +163,15 @@ func (b *routeRuleBucket) saveV2(conf *model.ExtendRouterConfig) {
return
}
// level2 级别 cache 处理
if item.GetService() == routingcommon.MatchAll && item.GetNamespace() != routingcommon.MatchAll {
if item.GetService() == model.MatchAll && item.GetNamespace() != model.MatchAll {
if _, ok := b.level2Rules[bt][item.GetNamespace()]; !ok {
b.level2Rules[bt][item.GetNamespace()] = map[string]struct{}{}
}
b.level2Rules[bt][item.GetNamespace()][conf.ID] = struct{}{}
return
}
// level3 级别 cache 处理
if item.GetService() == routingcommon.MatchAll && item.GetNamespace() == routingcommon.MatchAll {
if item.GetService() == model.MatchAll && item.GetNamespace() == model.MatchAll {
b.level3Rules[bt][conf.ID] = struct{}{}
return
}
Expand Down Expand Up @@ -238,17 +237,17 @@ func (b *routeRuleBucket) deleteV2(id string) {
service := source.GetService()
namespace := source.GetNamespace()

if service == routingcommon.MatchAll && namespace == routingcommon.MatchAll {
if service == model.MatchAll && namespace == model.MatchAll {
delete(b.level3Rules[outBound], id)
delete(b.level3Rules[inBound], id)
}

if service == routingcommon.MatchAll && namespace != routingcommon.MatchAll {
if service == model.MatchAll && namespace != model.MatchAll {
delete(b.level2Rules[outBound][namespace], id)
delete(b.level2Rules[inBound][namespace], id)
}

if service != routingcommon.MatchAll && namespace != routingcommon.MatchAll {
if service != model.MatchAll && namespace != model.MatchAll {
key := buildServiceKey(namespace, service)
delete(b.level1Rules[key], id)
}
Expand Down
Loading

0 comments on commit febe7c0

Please sign in to comment.