From 11423699912319b7fd12006a1b1e2301c1ee3df1 Mon Sep 17 00:00:00 2001 From: Mulavar <978007503@qq.com> Date: Mon, 25 Oct 2021 15:23:35 +0800 Subject: [PATCH] add dynamic route config (#1519) * remove struct DubboRouterRule which is useless * remove some error which is useless * rename DubboServiceRouterItem to DubboRouteDetail * 1. add zk mesh route dynamic configuration listener 2. add base64Enable flag to zookeeperDynamicConfiguration * rename Router to Routers * Revert "rename Router to Routers" This reverts commit dd0867d6 Co-authored-by: dongjianhui03 --- cluster/directory/base/directory_test.go | 2 +- cluster/directory/static/directory.go | 3 +- cluster/router/chain/chain.go | 21 +- cluster/router/router.go | 13 +- cluster/router/v3router/dubbo_rule.go | 61 ----- cluster/router/v3router/factory.go | 9 +- cluster/router/v3router/factory_test.go | 35 --- .../v3router/k8s_api/listener_handler_impl.go | 2 +- cluster/router/v3router/k8s_crd/client.go | 2 +- cluster/router/v3router/router_chain.go | 214 +++++++++--------- cluster/router/v3router/router_chain_test.go | 65 +++--- cluster/router/v3router/uniform_route.go | 26 ++- cluster/router/v3router/uniform_rule.go | 27 +-- common/constant/key.go | 4 +- common/extension/router_factory.go | 7 +- config/config_center_config.go | 31 ++- config/uniform_router_config.go | 21 +- config_center/mock_dynamic_config.go | 4 +- config_center/zookeeper/impl.go | 28 ++- config_center/zookeeper/listener.go | 13 +- imports/imports.go | 1 + registry/directory/directory.go | 6 +- registry/protocol/protocol.go | 4 +- remoting/zookeeper/listener.go | 9 +- 24 files changed, 266 insertions(+), 342 deletions(-) delete mode 100644 cluster/router/v3router/dubbo_rule.go delete mode 100644 cluster/router/v3router/factory_test.go diff --git a/cluster/directory/base/directory_test.go b/cluster/directory/base/directory_test.go index 79cca2cbcd..3e8c5c946e 100644 --- a/cluster/directory/base/directory_test.go +++ b/cluster/directory/base/directory_test.go @@ -50,7 +50,7 @@ func TestBuildRouterChain(t *testing.T) { regURL.AddParam(constant.INTERFACE_KEY, "mock-app") directory := NewDirectory(regURL) var err error - directory.routerChain, err = chain.NewRouterChain(regURL) + directory.routerChain, err = chain.NewRouterChain() assert.Error(t, err) } diff --git a/cluster/directory/static/directory.go b/cluster/directory/static/directory.go index 7e877e42ad..0e9ecc3852 100644 --- a/cluster/directory/static/directory.go +++ b/cluster/directory/static/directory.go @@ -91,8 +91,7 @@ func (dir *directory) BuildRouterChain(invokers []protocol.Invoker) error { if len(invokers) == 0 { return perrors.Errorf("invokers == null") } - url := invokers[0].GetURL() - routerChain, e := chain.NewRouterChain(url) + routerChain, e := chain.NewRouterChain() if e != nil { return e } diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go index 250e183499..e366a2a811 100644 --- a/cluster/router/chain/chain.go +++ b/cluster/router/chain/chain.go @@ -36,11 +36,6 @@ import ( "dubbo.apache.org/dubbo-go/v3/protocol" ) -var ( - virtualServiceConfigByte []byte - destinationRuleConfigByte []byte -) - // RouterChain Router chain type RouterChain struct { // Full list of addresses from registry, classified by method name. @@ -106,14 +101,9 @@ func (c *RouterChain) copyInvokers() []protocol.Invoker { return ret } -func SetVSAndDRConfigByte(vs, dr []byte) { - virtualServiceConfigByte = vs - destinationRuleConfigByte = dr -} - -// NewRouterChain Use url to init router chain +// NewRouterChain init router chain // Loop routerFactories and call NewRouter method -func NewRouterChain(url *common.URL) (*RouterChain, error) { +func NewRouterChain() (*RouterChain, error) { routerFactories := extension.GetRouterFactories() if len(routerFactories) == 0 { return nil, perrors.Errorf("No routerFactory exits , create one please") @@ -122,12 +112,9 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) { routers := make([]router.PriorityRouter, 0, len(routerFactories)) for key, routerFactory := range routerFactories { - if virtualServiceConfigByte == nil || destinationRuleConfigByte == nil { - logger.Warnf("virtual Service ProtocolConfig or destinationRule Confi Byte may be empty, pls check your CONF_VIRTUAL_SERVICE_FILE_PATH and CONF_DEST_RULE_FILE_PATH env is correctly point to your yaml file\n") - } - r, err := routerFactory().NewPriorityRouter(virtualServiceConfigByte, destinationRuleConfigByte) + r, err := routerFactory().NewPriorityRouter() if r == nil || err != nil { - logger.Errorf("router chain build router fail! routerFactories key:%s error:%vv", key, err) + logger.Errorf("router chain build router fail! routerFactories key:%s error:%v", key, err) continue } routers = append(routers, r) diff --git a/cluster/router/router.go b/cluster/router/router.go index d3d2fe53d5..301510b294 100644 --- a/cluster/router/router.go +++ b/cluster/router/router.go @@ -26,25 +26,20 @@ import ( "dubbo.apache.org/dubbo-go/v3/protocol" ) -// Extension - Router // PriorityRouterFactory creates creates priority router with url type PriorityRouterFactory interface { // NewPriorityRouter creates router instance with URL - NewPriorityRouter([]byte, []byte) (PriorityRouter, error) + NewPriorityRouter() (PriorityRouter, error) } -// Router -type router interface { +// PriorityRouter routes with priority +type PriorityRouter interface { // Route Determine the target invokers list. Route([]protocol.Invoker, *common.URL, protocol.Invocation) []protocol.Invoker // URL Return URL in router URL() *common.URL -} -// Router -type PriorityRouter interface { - router // Priority Return Priority in router // 0 to ^int(0) is better Priority() int64 @@ -66,7 +61,7 @@ type Poolable interface { // AddrPool is an address pool, backed by a snapshot of address list, divided into categories. type AddrPool map[string]*roaring.Bitmap -// AddrMetadta is address metadata, collected from a snapshot of address list by a router, if it implements Poolable. +// AddrMetadata is address metadata, collected from a snapshot of address list by a router, if it implements Poolable. type AddrMetadata interface { // Source indicates where the metadata comes from. Source() string diff --git a/cluster/router/v3router/dubbo_rule.go b/cluster/router/v3router/dubbo_rule.go deleted file mode 100644 index c809354b19..0000000000 --- a/cluster/router/v3router/dubbo_rule.go +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package v3router - -import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/config" - "dubbo.apache.org/dubbo-go/v3/protocol" -) - -// nolint -type DubboRouterRule struct { - uniformRules []*UniformRule -} - -func newDubboRouterRule(dubboRoutes []*config.DubboRoute, - destinationMap map[string]map[string]string) (*DubboRouterRule, error) { - - uniformRules := make([]*UniformRule, 0) - for _, v := range dubboRoutes { - uniformRule, err := newUniformRule(v, destinationMap) - if err != nil { - return nil, err - } - uniformRules = append(uniformRules, uniformRule) - } - - return &DubboRouterRule{ - uniformRules: uniformRules, - }, nil -} - -func (drr *DubboRouterRule) route(invokers []protocol.Invoker, url *common.URL, - invocation protocol.Invocation) []protocol.Invoker { - - resultInvokers := make([]protocol.Invoker, 0) - for _, v := range drr.uniformRules { - if resultInvokers = v.route(invokers, url, invocation); len(resultInvokers) == 0 { - continue - } - // once there is a uniformRule successfully get target invoker lists, return it - return resultInvokers - } - // return s empty invoker list - return resultInvokers -} diff --git a/cluster/router/v3router/factory.go b/cluster/router/v3router/factory.go index afd14af66d..e7512479d6 100644 --- a/cluster/router/v3router/factory.go +++ b/cluster/router/v3router/factory.go @@ -19,8 +19,13 @@ package v3router import ( "dubbo.apache.org/dubbo-go/v3/cluster/router" + "dubbo.apache.org/dubbo-go/v3/common/extension" ) +func init() { + extension.SetRouterFactory("mesh", NewUniformRouterFactory) +} + // UniformRouteFactory is uniform router's factory type UniformRouteFactory struct{} @@ -30,6 +35,6 @@ func NewUniformRouterFactory() router.PriorityRouterFactory { } // NewPriorityRouter construct a new UniformRouteFactory as PriorityRouter -func (f *UniformRouteFactory) NewPriorityRouter(vsConfigBytes, distConfigBytes []byte) (router.PriorityRouter, error) { - return NewUniformRouterChain(vsConfigBytes, distConfigBytes) +func (f *UniformRouteFactory) NewPriorityRouter() (router.PriorityRouter, error) { + return NewUniformRouterChain() } diff --git a/cluster/router/v3router/factory_test.go b/cluster/router/v3router/factory_test.go deleted file mode 100644 index a5ea41579c..0000000000 --- a/cluster/router/v3router/factory_test.go +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package v3router - -import ( - "testing" -) - -import ( - "github.com/stretchr/testify/assert" -) - -// TestUniformRouterFacotry created a new factory that can new uniform router -func TestUniformRouterFacotry(t *testing.T) { - factory := NewUniformRouterFactory() - assert.NotNil(t, factory) - router, err := factory.NewPriorityRouter([]byte{}, []byte{}) - assert.Nil(t, err) - assert.NotNil(t, router) -} diff --git a/cluster/router/v3router/k8s_api/listener_handler_impl.go b/cluster/router/v3router/k8s_api/listener_handler_impl.go index 38f5748bb9..70dc8be2e4 100644 --- a/cluster/router/v3router/k8s_api/listener_handler_impl.go +++ b/cluster/router/v3router/k8s_api/listener_handler_impl.go @@ -36,7 +36,7 @@ import ( const ( VirtualServiceEventKey = "virtualServiceEventKey" - DestinationRuleEventKey = "destinationRuleEventKe3y" + DestinationRuleEventKey = "destinationRuleEventKey" VirtualServiceResource = "virtualservices" DestRuleResource = "destinationrules" diff --git a/cluster/router/v3router/k8s_crd/client.go b/cluster/router/v3router/k8s_crd/client.go index 2dfe201961..0da410cbb4 100644 --- a/cluster/router/v3router/k8s_crd/client.go +++ b/cluster/router/v3router/k8s_crd/client.go @@ -105,7 +105,7 @@ func NewK8sCRDClient(groupName, groupVersion, namespace string, handlers ...List return newClient, nil } -// func (c *Client) WatchResources() []cache.Store { can only be called once +// WatchResources can only be called once func (c *Client) WatchResources() []cache.Store { stores := make([]cache.Store, 0) c.once.Do( diff --git a/cluster/router/v3router/router_chain.go b/cluster/router/v3router/router_chain.go index ce8269493d..973f98e5a6 100644 --- a/cluster/router/v3router/router_chain.go +++ b/cluster/router/v3router/router_chain.go @@ -18,7 +18,6 @@ package v3router import ( - "encoding/json" "io" "strings" ) @@ -29,43 +28,47 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/cluster/router" - "dubbo.apache.org/dubbo-go/v3/cluster/router/v3router/k8s_api" "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/config_center" "dubbo.apache.org/dubbo-go/v3/protocol" - "dubbo.apache.org/dubbo-go/v3/remoting" ) // RouterChain contains all uniform router logic // it has UniformRouter list, type RouterChain struct { - routers []*UniformRouter - virtualServiceConfigBytes []byte - destinationRuleConfigBytes []byte - notify chan struct{} + routers []*UniformRouter + notify chan struct{} } -// NewUniformRouterChain return -func NewUniformRouterChain(virtualServiceConfig, destinationRuleConfig []byte) (router.PriorityRouter, error) { - fromFileConfig := true - uniformRouters, err := parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig) +// nolint +func NewUniformRouterChain() (router.PriorityRouter, error) { + // 1. add mesh route listener + r := &RouterChain{} + rootConfig := config.GetRootConfig() + dynamicConfiguration, err := rootConfig.ConfigCenter.GetDynamicConfiguration() if err != nil { - fromFileConfig = false - logger.Warnf("parse router config form local file failed, error = %+v", err) + return nil, err } - r := &RouterChain{ - virtualServiceConfigBytes: virtualServiceConfig, - destinationRuleConfigBytes: destinationRuleConfig, - routers: uniformRouters, + dynamicConfiguration.AddListener(rootConfig.Application.Name, r) + + // 2. try to get mesh route configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo" + key := rootConfig.Application.Name + constant.MeshRouteSuffix + meshRouteValue, err := dynamicConfiguration.GetProperties(key, config_center.WithGroup(rootConfig.ConfigCenter.Group)) + if err != nil { + // the mesh route may not be initialized now + logger.Warnf("Can not get mesh route for key=%s, error=%v", key, err) + return r, nil } - if err := k8s_api.SetK8sEventListener(r); err != nil { - logger.Warnf("try listen K8s router config failed, error = %+v", err) - if !fromFileConfig { - panic("No config file from both local file and k8s") - } + logger.Debugf("Successfully get mesh route:%s", meshRouteValue) + routes, err := parseRoute(meshRouteValue) + if err != nil { + logger.Warnf("Parse mesh route failed, error=%v", err) + return nil, err } + r.routers = routes return r, nil } @@ -77,84 +80,15 @@ func (r *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invoca return invokers } +// Process process route config change event func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) { - logger.Debugf("on processed event = %+v\n", *event) - if event.ConfigType == remoting.EventTypeAdd || event.ConfigType == remoting.EventTypeUpdate { - switch event.Key { - case k8s_api.VirtualServiceEventKey: - logger.Debug("virtul service event") - newVSValue, ok := event.Value.(*config.VirtualServiceConfig) - if !ok { - logger.Error("event.Value assertion error") - return - } - - newVSJsonValue, ok := newVSValue.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] - if !ok { - logger.Error("newVSValue.ObjectMeta.Annotations has no key named kubectl.kubernetes.io/last-applied-configuration") - return - } - logger.Debugf("new virtual service json value = \n%v\n", newVSJsonValue) - newVirtualServiceConfig := &config.VirtualServiceConfig{} - if err := json.Unmarshal([]byte(newVSJsonValue), newVirtualServiceConfig); err != nil { - logger.Error("on process json data unmarshal error = ", err) - return - } - newVirtualServiceConfig.YamlAPIVersion = newVirtualServiceConfig.APIVersion - newVirtualServiceConfig.YamlKind = newVirtualServiceConfig.Kind - newVirtualServiceConfig.MetaData.Name = newVirtualServiceConfig.ObjectMeta.Name - logger.Debugf("get event after asseration = %+v\n", newVirtualServiceConfig) - data, err := yaml.Marshal(newVirtualServiceConfig) - if err != nil { - logger.Error("Process change of virtual service: event.Value marshal error:", err) - return - } - r.routers, err = parseFromConfigToRouters(data, r.destinationRuleConfigBytes) - if err != nil { - logger.Error("Process change of virtual service: parseFromConfigToRouters:", err) - return - } - case k8s_api.DestinationRuleEventKey: - logger.Debug("handling dest rule event") - newDRValue, ok := event.Value.(*config.DestinationRuleConfig) - if !ok { - logger.Error("event.Value assertion error") - return - } - - newDRJsonValue, ok := newDRValue.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] - if !ok { - logger.Error("newVSValue.ObjectMeta.Annotations has no key named kubectl.kubernetes.io/last-applied-configuration") - return - } - newDestRuleConfig := &config.DestinationRuleConfig{} - if err := json.Unmarshal([]byte(newDRJsonValue), newDestRuleConfig); err != nil { - logger.Error("on process json data unmarshal error = ", err) - return - } - newDestRuleConfig.YamlAPIVersion = newDestRuleConfig.APIVersion - newDestRuleConfig.YamlKind = newDestRuleConfig.Kind - newDestRuleConfig.MetaData.Name = newDestRuleConfig.ObjectMeta.Name - logger.Debugf("get event after asseration = %+v\n", newDestRuleConfig) - data, err := yaml.Marshal(newDestRuleConfig) - if err != nil { - logger.Error("Process change of dest rule: event.Value marshal error:", err) - return - } - r.routers, err = parseFromConfigToRouters(r.virtualServiceConfigBytes, data) - if err != nil { - logger.Error("Process change of dest rule: parseFromConfigToRouters:", err) - return - } - default: - logger.Error("unknown unsupported event key:", event.Key) - } + logger.Debugf("RouteChain process event:\n%+v", event) + routers, err := parseRoute(event.Value.(string)) + if err != nil { + return } - + r.routers = routers // todo delete router - //if event.ConfigType == remoting.EventTypeDel { - // - //} } // Name get name of ConnCheckerRouter @@ -179,7 +113,7 @@ func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte vsDecoder := yaml.NewDecoder(strings.NewReader(string(virtualServiceConfig))) drDecoder := yaml.NewDecoder(strings.NewReader(string(destinationRuleConfig))) - // parse virtual service + // 1. parse virtual service config for { virtualServiceCfg := &config.VirtualServiceConfig{} @@ -195,7 +129,7 @@ func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte virtualServiceConfigList = append(virtualServiceConfigList, virtualServiceCfg) } - // parse destination rule + // 2. parse destination rule config for { destRuleCfg := &config.DestinationRuleConfig{} err := drDecoder.Decode(destRuleCfg) @@ -219,33 +153,89 @@ func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte routers := make([]*UniformRouter, 0) + // 3. construct virtual service host to destination mapping for _, v := range virtualServiceConfigList { - tempSerivceNeedsDescMap := make(map[string]map[string]string) + tempServiceNeedsDescMap := make(map[string]map[string]string) for _, host := range v.Spec.Hosts { + // name -> labels targetDestMap := destRuleConfigsMap[host] - // copy to new Map - mapCombine(tempSerivceNeedsDescMap, targetDestMap) + // copy to new Map, FIXME name collision + mapCopy(tempServiceNeedsDescMap, targetDestMap) } - // change single config to one rule - newRule, err := newDubboRouterRule(v.Spec.Dubbo, tempSerivceNeedsDescMap) - if err != nil { - logger.Error("Parse config to uniform rule err = ", err) + // transform single config to one rule + routers = append(routers, NewUniformRouter(v.Spec.Dubbo, tempServiceNeedsDescMap)) + } + logger.Debug("parsed successfully with router size = ", len(routers)) + return routers, nil +} + +func parseRoute(routeContent string) ([]*UniformRouter, error) { + var virtualServiceConfigList []*config.VirtualServiceConfig + destRuleConfigsMap := make(map[string]map[string]map[string]string) + + meshRouteDecoder := yaml.NewDecoder(strings.NewReader(routeContent)) + for { + meshRouteMetadata := &config.MeshRouteMetadata{} + err := meshRouteDecoder.Decode(meshRouteMetadata) + if err == io.EOF { + break + } else if err != nil { + logger.Error("parseRoute route metadata err = ", err) return nil, err } - rtr, err := NewUniformRouter(newRule) + + bytes, err := yaml.Marshal(meshRouteMetadata.Spec) if err != nil { - logger.Error("new uniform router err = ", err) return nil, err } - routers = append(routers, rtr) + specDecoder := yaml.NewDecoder(strings.NewReader(string(bytes))) + switch meshRouteMetadata.YamlKind { + case "VirtualService": + meshRouteConfigSpec := &config.UniformRouterConfigSpec{} + err := specDecoder.Decode(meshRouteConfigSpec) + if err != nil { + return nil, err + } + virtualServiceConfigList = append(virtualServiceConfigList, &config.VirtualServiceConfig{ + YamlAPIVersion: meshRouteMetadata.YamlAPIVersion, + YamlKind: meshRouteMetadata.YamlKind, + TypeMeta: meshRouteMetadata.TypeMeta, + ObjectMeta: meshRouteMetadata.ObjectMeta, + MetaData: meshRouteMetadata.MetaData, + Spec: *meshRouteConfigSpec, + }) + case "DestinationRule": + meshRouteDestinationRuleSpec := &config.DestinationRuleSpec{} + err := specDecoder.Decode(meshRouteDestinationRuleSpec) + if err != nil { + return nil, err + } + destRuleCfgMap := make(map[string]map[string]string) + for _, v := range meshRouteDestinationRuleSpec.SubSets { + destRuleCfgMap[v.Name] = v.Labels + } + + destRuleConfigsMap[meshRouteDestinationRuleSpec.Host] = destRuleCfgMap + } + } + + routers := make([]*UniformRouter, 0) + + for _, v := range virtualServiceConfigList { + tempServiceNeedsDescMap := make(map[string]map[string]string) + for _, host := range v.Spec.Hosts { + targetDestMap := destRuleConfigsMap[host] + mapCopy(tempServiceNeedsDescMap, targetDestMap) + } + routers = append(routers, NewUniformRouter(v.Spec.Dubbo, tempServiceNeedsDescMap)) } - logger.Debug("parsed successed! with router size = ", len(routers)) + logger.Debug("parsed successfully with router size = ", len(routers)) return routers, nil } -func mapCombine(dist map[string]map[string]string, from map[string]map[string]string) { - for k, v := range from { +func mapCopy(dist map[string]map[string]string, source map[string]map[string]string) { + for k, v := range source { dist[k] = v } } diff --git a/cluster/router/v3router/router_chain_test.go b/cluster/router/v3router/router_chain_test.go index ac4176931e..31a824e95d 100644 --- a/cluster/router/v3router/router_chain_test.go +++ b/cluster/router/v3router/router_chain_test.go @@ -30,25 +30,21 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/cluster/router/v3router/k8s_api" - "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/yaml" "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/config_center" - "dubbo.apache.org/dubbo-go/v3/protocol" - "dubbo.apache.org/dubbo-go/v3/protocol/invocation" ) const ( mockVSConfigPath = "./test_file/virtual_service.yml" mockDRConfigPath = "./test_file/dest_rule.yml" + mockConfigPath = "./test_file/mesh_route.yml" ) func TestNewUniformRouterChain(t *testing.T) { - vsBytes, _ := yaml.LoadYMLConfig(mockVSConfigPath) - drBytes, _ := yaml.LoadYMLConfig(mockDRConfigPath) - rc, err := NewUniformRouterChain(vsBytes, drBytes) - assert.Nil(t, err) - assert.NotNil(t, rc) + //rc, err := NewUniformRouterChain() + //assert.Nil(t, err) + //assert.NotNil(t, rc) } type ruleTestItemStruct struct { @@ -70,9 +66,9 @@ func TestParseConfigFromFile(t *testing.T) { routers, err := parseFromConfigToRouters(vsBytes, drBytes) fmt.Println(routers, err) assert.Equal(t, len(routers), 1) - assert.NotNil(t, routers[0].dubboRouter) - assert.Equal(t, len(routers[0].dubboRouter.uniformRules), 2) - for i, v := range routers[0].dubboRouter.uniformRules { + assert.NotNil(t, routers[0].uniformRules) + assert.Equal(t, len(routers[0].uniformRules), 2) + for i, v := range routers[0].uniformRules { if i == 0 { assert.Equal(t, len(v.services), 2) assert.Equal(t, "com.taobao.hsf.demoService:1.0.0", v.services[0].Exact) @@ -192,23 +188,21 @@ func TestParseConfigFromFile(t *testing.T) { } func TestRouterChain_Route(t *testing.T) { - vsBytes, _ := yaml.LoadYMLConfig(mockVSConfigPath) - drBytes, _ := yaml.LoadYMLConfig(mockDRConfigPath) - rc, err := NewUniformRouterChain(vsBytes, drBytes) - assert.Nil(t, err) - assert.NotNil(t, rc) - newGoodURL, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0") - newBadURL1, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0") - newBadURL2, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0") - goodIvk := protocol.NewBaseInvoker(newGoodURL) - b1 := protocol.NewBaseInvoker(newBadURL1) - b2 := protocol.NewBaseInvoker(newBadURL2) - invokerList := make([]protocol.Invoker, 3) - invokerList = append(invokerList, goodIvk) - invokerList = append(invokerList, b1) - invokerList = append(invokerList, b2) - result := rc.Route(invokerList, newGoodURL, invocation.NewRPCInvocation("GetUser", nil, nil)) - assert.Equal(t, 0, len(result)) + //rc, err := NewUniformRouterChain() + //assert.Nil(t, err) + //assert.NotNil(t, rc) + //newGoodURL, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0") + //newBadURL1, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0") + //newBadURL2, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0") + //goodIvk := protocol.NewBaseInvoker(newGoodURL) + //b1 := protocol.NewBaseInvoker(newBadURL1) + //b2 := protocol.NewBaseInvoker(newBadURL2) + //invokerList := make([]protocol.Invoker, 3) + //invokerList = append(invokerList, goodIvk) + //invokerList = append(invokerList, b1) + //invokerList = append(invokerList, b2) + //result := rc.Route(invokerList, newGoodURL, invocation.NewRPCInvocation("GetUser", nil, nil)) + //assert.Equal(t, 0, len(result)) //todo test find target invoker } @@ -223,11 +217,12 @@ func TestRouterChain_Process(t *testing.T) { }, }, } - + vsValue, err := yaml.MarshalYML(mockVirtualServiceConfig) + assert.Nil(t, err) // test virtual service config chage event mockVirtualServiceChangeEvent := &config_center.ConfigChangeEvent{ Key: k8s_api.VirtualServiceEventKey, - Value: mockVirtualServiceConfig, + Value: string(vsValue), ConfigType: 0, } rc.Process(mockVirtualServiceChangeEvent) @@ -241,16 +236,12 @@ func TestRouterChain_Process(t *testing.T) { }, }, } + drValue, err := yaml.MarshalYML(mockDestinationRuleConfig) + assert.Nil(t, err) mockDestinationRuleChangeEvent := &config_center.ConfigChangeEvent{ Key: k8s_api.DestinationRuleEventKey, - Value: mockDestinationRuleConfig, + Value: string(drValue), ConfigType: 0, } rc.Process(mockDestinationRuleChangeEvent) - - // test unknown event type - mockUnsupportedEvent := &config_center.ConfigChangeEvent{ - Key: "unknown", - } - rc.Process(mockUnsupportedEvent) } diff --git a/cluster/router/v3router/uniform_route.go b/cluster/router/v3router/uniform_route.go index 9ea12dd421..8d42ceefa3 100644 --- a/cluster/router/v3router/uniform_route.go +++ b/cluster/router/v3router/uniform_route.go @@ -19,6 +19,7 @@ package v3router import ( "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/config_center" "dubbo.apache.org/dubbo-go/v3/protocol" ) @@ -29,20 +30,33 @@ const ( // UniformRouter have type UniformRouter struct { - dubboRouter *DubboRouterRule + uniformRules []*UniformRule } // NewUniformRouter construct an NewConnCheckRouter via url -func NewUniformRouter(dubboRouter *DubboRouterRule) (*UniformRouter, error) { - r := &UniformRouter{ - dubboRouter: dubboRouter, +func NewUniformRouter(dubboRoutes []*config.DubboRoute, destinationMap map[string]map[string]string) *UniformRouter { + uniformRules := make([]*UniformRule, 0) + for _, v := range dubboRoutes { + uniformRules = append(uniformRules, newUniformRule(v, destinationMap)) + } + + return &UniformRouter{ + uniformRules: uniformRules, } - return r, nil } // Route gets a list of routed invoker func (r *UniformRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { - return r.dubboRouter.route(invokers, url, invocation) + resultInvokers := make([]protocol.Invoker, 0) + for _, v := range r.uniformRules { + if resultInvokers = v.route(invokers, url, invocation); len(resultInvokers) == 0 { + continue + } + // once there is a uniformRule successfully get target invoker lists, return it + return resultInvokers + } + // return s empty invoker list + return resultInvokers } // Process there is no process needs for uniform Router, as it upper struct RouterChain has done it diff --git a/cluster/router/v3router/uniform_rule.go b/cluster/router/v3router/uniform_rule.go index 3b552b89f0..4dddc35adb 100644 --- a/cluster/router/v3router/uniform_rule.go +++ b/cluster/router/v3router/uniform_rule.go @@ -38,13 +38,13 @@ import ( // if match, get result destination key, which should be defined in DestinationRule yaml file type VirtualServiceRule struct { // routerItem store match router list and destination list of this router - routerItem *config.DubboServiceRouterItem + routerItem *config.DubboRouteDetail // uniformRule is the upper struct ptr uniformRule *UniformRule } -// match read from vsr's Match config +// match read from VirtualServiceRule's Match config // it judges if this invocation matches the router rule request defined in config one by one func (vsr *VirtualServiceRule) match(url *common.URL, invocation protocol.Invocation) bool { for _, v := range vsr.routerItem.Match { @@ -61,7 +61,7 @@ func (vsr *VirtualServiceRule) match(url *common.URL, invocation protocol.Invoca return false } - // atta match judge + // attachment match judge if v.Attachment != nil { attachmentMatchJudger := judger.NewAttachmentMatchJudger(v.Attachment) if attachmentMatchJudger.Judge(invocation) { @@ -198,7 +198,7 @@ func (vsr *VirtualServiceRule) getRuleTargetInvokers(invokers []protocol.Invoker return weightInvokerPairResult.getTargetInvokers(), nil } -// UniformRule +// UniformRule uniform rule type UniformRule struct { services []*config.StringMatch virtualServiceRules []VirtualServiceRule @@ -206,21 +206,22 @@ type UniformRule struct { } // NewDefaultConnChecker constructs a new DefaultConnChecker based on the url -func newUniformRule(dubboRoute *config.DubboRoute, destinationMap map[string]map[string]string) (*UniformRule, error) { - matchItems := dubboRoute.RouterDetail - virtualServiceRules := make([]VirtualServiceRule, 0) - newUniformRule := &UniformRule{ - DestinationLabelListMap: destinationMap, +func newUniformRule(dubboRoute *config.DubboRoute, destinationMap map[string]map[string]string) *UniformRule { + uniformRule := &UniformRule{ services: dubboRoute.Services, + DestinationLabelListMap: destinationMap, } - for _, v := range matchItems { + + routeDetail := dubboRoute.RouterDetail + virtualServiceRules := make([]VirtualServiceRule, 0) + for _, v := range routeDetail { virtualServiceRules = append(virtualServiceRules, VirtualServiceRule{ routerItem: v, - uniformRule: newUniformRule, + uniformRule: uniformRule, }) } - newUniformRule.virtualServiceRules = virtualServiceRules - return newUniformRule, nil + uniformRule.virtualServiceRules = virtualServiceRules + return uniformRule } func (u *UniformRule) route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { diff --git a/common/constant/key.go b/common/constant/key.go index 5aeb9701da..d44542d94b 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -199,7 +199,7 @@ const ( ) const ( - CONFIGURATORS_SUFFIX = ".configurators" + ConfiguratorSuffix = ".configurators" ) const ( @@ -253,6 +253,8 @@ const ( TagRouterRuleSuffix = ".tag-router" // ConditionRouterRuleSuffix Specify condition router suffix ConditionRouterRuleSuffix = ".condition-router" + // MeshRouteSuffix Specify mesh router suffix + MeshRouteSuffix = ".MESHAPPRULE" // ForceUseTag is the tag in attachment ForceUseTag = "dubbo.force.tag" Tagkey = "dubbo.tag" diff --git a/common/extension/router_factory.go b/common/extension/router_factory.go index 913e9c5ba8..454fe00df4 100644 --- a/common/extension/router_factory.go +++ b/common/extension/router_factory.go @@ -17,17 +17,12 @@ package extension -import ( - "sync" -) - import ( "dubbo.apache.org/dubbo-go/v3/cluster/router" ) var ( - routers = make(map[string]func() router.PriorityRouterFactory) - fileRouterFactoryOnce sync.Once + routers = make(map[string]func() router.PriorityRouterFactory) ) // SetRouterFactory sets create router factory function with @name diff --git a/config/config_center_config.go b/config/config_center_config.go index 025ca0eb2c..956a39df8f 100644 --- a/config/config_center_config.go +++ b/config/config_center_config.go @@ -60,6 +60,8 @@ type CenterConfig struct { AppID string `default:"dubbo" yaml:"app-id" json:"app-id,omitempty"` Timeout string `default:"10s" yaml:"timeout" json:"timeout,omitempty"` Params map[string]string `yaml:"params" json:"parameters,omitempty"` + + DynamicConfiguration config_center.DynamicConfiguration } // Prefix dubbo.config-center @@ -129,11 +131,7 @@ func (c *CenterConfig) toURL() (*common.URL, error) { // it will prepare the environment func startConfigCenter(rc *RootConfig) error { cc := rc.ConfigCenter - configCenterUrl, err := cc.toURL() - if err != nil { - return err - } - strConf, err := cc.prepareEnvironment(configCenterUrl) + strConf, err := cc.prepareEnvironment() if err != nil { return errors.WithMessagef(err, "start config center error!") } @@ -150,7 +148,7 @@ func startConfigCenter(rc *RootConfig) error { return nil } -func (c *CenterConfig) GetDynamicConfiguration() (config_center.DynamicConfiguration, error) { +func (c *CenterConfig) CreateDynamicConfiguration() (config_center.DynamicConfiguration, error) { configCenterUrl, err := c.toURL() if err != nil { return nil, err @@ -162,14 +160,23 @@ func (c *CenterConfig) GetDynamicConfiguration() (config_center.DynamicConfigura return factory.GetDynamicConfiguration(configCenterUrl) } -func (c *CenterConfig) prepareEnvironment(configCenterUrl *common.URL) (string, error) { - factory := extension.GetConfigCenterFactory(configCenterUrl.Protocol) - if factory == nil { - return "", errors.New("get config center factory failed") +func (c *CenterConfig) GetDynamicConfiguration() (config_center.DynamicConfiguration, error) { + if c.DynamicConfiguration != nil { + return c.DynamicConfiguration, nil + } + dynamicConfig, err := c.CreateDynamicConfiguration() + if err != nil { + logger.Errorf("Create dynamic configuration error , error message is %v", err) + return nil, errors.WithStack(err) } - dynamicConfig, err := factory.GetDynamicConfiguration(configCenterUrl) + c.DynamicConfiguration = dynamicConfig + return dynamicConfig, nil +} + +func (c *CenterConfig) prepareEnvironment() (string, error) { + dynamicConfig, err := c.GetDynamicConfiguration() if err != nil { - logger.Errorf("Get dynamic configuration error , error message is %v", err) + logger.Errorf("Create dynamic configuration error , error message is %v", err) return "", errors.WithStack(err) } envInstance := conf.GetEnvInstance() diff --git a/config/uniform_router_config.go b/config/uniform_router_config.go index 3d6f7283ea..b96a14b2ee 100644 --- a/config/uniform_router_config.go +++ b/config/uniform_router_config.go @@ -24,12 +24,22 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +type MeshRouteMetadata struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + YamlAPIVersion string `yaml:"apiVersion" ` + YamlKind string `yaml:"kind" ` + MetaData MetaDataStruct `yaml:"metadata"` + Spec interface{} +} + // nolint type MetaDataStruct struct { Name string `yaml:"name" json:"name"` } -// VirtualService Config Definition +// VirtualServiceConfig Config Definition type VirtualServiceConfig struct { YamlAPIVersion string `yaml:"apiVersion"` YamlKind string `yaml:"kind"` @@ -47,12 +57,13 @@ type UniformRouterConfigSpec struct { // nolint type DubboRoute struct { - Services []*StringMatch `yaml:"services" json:"service"` - RouterDetail []*DubboServiceRouterItem `yaml:"routedetail" json:"routedetail"` + Name string `yaml:"name" json:"name"` + Services []*StringMatch `yaml:"services" json:"service"` + RouterDetail []*DubboRouteDetail `yaml:"routedetail" json:"routedetail"` } // nolint -type DubboServiceRouterItem struct { +type DubboRouteDetail struct { Name string `yaml:"name" json:"name"` Match []*DubboMatchRequest `yaml:"match" json:"match"` Router []*DubboDestination `yaml:"route" json:"route"` @@ -146,7 +157,7 @@ type RouterDest struct { // todo port } -// DestinationRule Definition +// DestinationRuleConfig Definition type DestinationRuleConfig struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index f42b1ddd67..0abf808c39 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -168,7 +168,7 @@ func (c *MockDynamicConfiguration) MockServiceConfigEvent() { }, } value, _ := yaml.Marshal(config) - key := "group*" + mockServiceName + ":1.0.0" + constant.CONFIGURATORS_SUFFIX + key := "group*" + mockServiceName + ":1.0.0" + constant.ConfiguratorSuffix c.listener[key].Process(&ConfigChangeEvent{Key: key, Value: string(value), ConfigType: remoting.EventTypeAdd}) } @@ -191,6 +191,6 @@ func (c *MockDynamicConfiguration) MockApplicationConfigEvent() { }, } value, _ := yaml.Marshal(config) - key := "test-application" + constant.CONFIGURATORS_SUFFIX + key := "test-application" + constant.ConfiguratorSuffix c.listener[key].Process(&ConfigChangeEvent{Key: key, Value: string(value), ConfigType: remoting.EventTypeAdd}) } diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index b85d28abea..c5eb36cded 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -19,6 +19,7 @@ package zookeeper import ( "encoding/base64" + "strconv" "strings" "sync" ) @@ -34,15 +35,13 @@ import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/logger" + "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/config_center" "dubbo.apache.org/dubbo-go/v3/config_center/parser" "dubbo.apache.org/dubbo-go/v3/remoting/zookeeper" ) const ( - // ZkClient - // zookeeper client name - ZkClient = "zk config_center" pathSeparator = "/" ) @@ -59,6 +58,8 @@ type zookeeperDynamicConfiguration struct { listener *zookeeper.ZkEventListener cacheListener *CacheListener parser parser.ConfigurationParser + + base64Enabled bool } func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) { @@ -66,6 +67,14 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu url: url, rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", } + if v, ok := config.GetRootConfig().ConfigCenter.Params["base64"]; ok { + base64Enabled, err := strconv.ParseBool(v) + if err != nil { + panic("value of base64 must be bool, error=" + err.Error()) + } + c.base64Enabled = base64Enabled + } + err := zookeeper.ValidateZookeeperClient(c, url.Location) if err != nil { logger.Errorf("zookeeper client start error ,error message is %v", err) @@ -102,7 +111,6 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config if len(tmpOpts.Group) != 0 { key = tmpOpts.Group + "/" + key } else { - /** * when group is null, we are fetching governance rules, for example: * 1. key=org.apache.dubbo.DemoService.configurators @@ -115,6 +123,10 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config if err != nil { return "", perrors.WithStack(err) } + if !c.base64Enabled { + return string(content), nil + } + decoded, err := base64.StdEncoding.DecodeString(string(content)) if err != nil { return "", perrors.WithStack(err) @@ -130,9 +142,11 @@ func (c *zookeeperDynamicConfiguration) GetInternalProperty(key string, opts ... // PublishConfig will put the value into Zk with specific path func (c *zookeeperDynamicConfiguration) PublishConfig(key string, group string, value string) error { path := c.getPath(key, group) - strbytes := []byte(value) - encoded := base64.StdEncoding.EncodeToString(strbytes) - err := c.client.CreateWithValue(path, []byte(encoded)) + valueBytes := []byte(value) + if c.base64Enabled { + valueBytes = []byte(base64.StdEncoding.EncodeToString(valueBytes)) + } + err := c.client.CreateWithValue(path, valueBytes) if err != nil { return perrors.WithStack(err) } diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go index 9be9233a73..7b67a70f6e 100644 --- a/config_center/zookeeper/listener.go +++ b/config_center/zookeeper/listener.go @@ -24,6 +24,7 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/config_center" "dubbo.apache.org/dubbo-go/v3/remoting" ) @@ -65,6 +66,10 @@ func (l *CacheListener) DataChange(event remoting.Event) bool { return true } key := l.pathToKey(event.Path) + // TODO use common way + if strings.HasSuffix(key, constant.MeshRouteSuffix) { + key = key[:strings.Index(key, constant.MeshRouteSuffix)] + } if key != "" { if listeners, ok := l.keyListeners.Load(key); ok { for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) { @@ -78,11 +83,13 @@ func (l *CacheListener) DataChange(event remoting.Event) bool { func (l *CacheListener) pathToKey(path string) string { key := strings.Replace(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".", -1) - if strings.HasSuffix(key, constant.CONFIGURATORS_SUFFIX) || + if strings.HasSuffix(key, constant.ConfiguratorSuffix) || strings.HasSuffix(key, constant.TagRouterRuleSuffix) || - strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) { + strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) || + strings.HasSuffix(key, constant.MeshRouteSuffix) { // governance config, so we remove the "dubbo." prefix - return key[strings.Index(key, ".")+1:] + key = key[strings.Index(key, ".")+1:] } + logger.Debugf("pathToKey path:%s, key:%s\n", path, key) return key } diff --git a/imports/imports.go b/imports/imports.go index 078be19ae9..02c685ea0a 100644 --- a/imports/imports.go +++ b/imports/imports.go @@ -30,6 +30,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/leastactive" _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin" + _ "dubbo.apache.org/dubbo-go/v3/cluster/router/v3router" _ "dubbo.apache.org/dubbo-go/v3/common/proxy/proxy_factory" _ "dubbo.apache.org/dubbo-go/v3/config_center/apollo" _ "dubbo.apache.org/dubbo-go/v3/config_center/nacos" diff --git a/registry/directory/directory.go b/registry/directory/directory.go index b8a4b674b0..bec03343bb 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -85,7 +85,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (director dir.consumerURL = dir.getConsumerUrl(url.SubURL) - if routerChain, err := chain.NewRouterChain(dir.consumerURL); err == nil { + if routerChain, err := chain.NewRouterChain(); err == nil { dir.Directory.SetRouterChain(routerChain) } else { logger.Warnf("fail to create router chain with url: %s, err is: %v", url.SubURL, err) @@ -465,7 +465,7 @@ type referenceConfigurationListener struct { func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener { listener := &referenceConfigurationListener{directory: dir, url: url} listener.InitWith( - url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX, + url.EncodedServiceKey()+constant.ConfiguratorSuffix, listener, extension.GetDefaultConfiguratorFunc(), ) @@ -489,7 +489,7 @@ func newConsumerConfigurationListener(dir *RegistryDirectory) *consumerConfigura listener := &consumerConfigurationListener{directory: dir} application := config.GetRootConfig().Application listener.InitWith( - application.Name+constant.CONFIGURATORS_SUFFIX, + application.Name+constant.ConfiguratorSuffix, listener, extension.GetDefaultConfiguratorFunc(), ) diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index add0c5d157..01dbdf2975 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -461,7 +461,7 @@ func newProviderConfigurationListener(overrideListeners *sync.Map) *providerConf listener := &providerConfigurationListener{} listener.overrideListeners = overrideListeners listener.InitWith( - config.GetRootConfig().Application.Name+constant.CONFIGURATORS_SUFFIX, + config.GetRootConfig().Application.Name+constant.ConfiguratorSuffix, listener, extension.GetDefaultConfiguratorFunc(), ) @@ -486,7 +486,7 @@ type serviceConfigurationListener struct { func newServiceConfigurationListener(overrideListener *overrideSubscribeListener, providerUrl *common.URL) *serviceConfigurationListener { listener := &serviceConfigurationListener{overrideListener: overrideListener, providerUrl: providerUrl} listener.InitWith( - providerUrl.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX, + providerUrl.EncodedServiceKey()+constant.ConfiguratorSuffix, listener, extension.GetDefaultConfiguratorFunc(), ) diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 1aef932c34..34d6c13908 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -152,6 +152,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li newChildren, err := l.client.GetChildren(zkPath) if err != nil { + // FIXME always false if err == errNilChildren { content, _, connErr := l.client.Conn.Get(zkPath) if connErr != nil { @@ -240,7 +241,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen if MaxFailTimes <= failTimes { failTimes = MaxFailTimes } - logger.Infof("listenDirEvent(path{%s}) = error{%v}", zkPath, err) + logger.Debugf("listenDirEvent(path{%s}) = error{%v}", zkPath, err) // clear the event channel CLEAR: for { @@ -264,10 +265,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen continue case <-l.exit: l.client.UnregisterEvent(zkPath, &event) - logger.Warnf("listen(path{%s}) goroutine exit now...", zkPath) + logger.Debugf("listen(path{%s}) goroutine exit now...", zkPath) return case <-event: - logger.Infof("get zk.EventNodeDataChange notify event") + logger.Debugf("get zk.EventNodeDataChange notify event") l.client.UnregisterEvent(zkPath, &event) l.handleZkNodeEvent(zkPath, nil, listener) continue @@ -358,7 +359,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen ticker = time.NewTicker(tickerTTL) } case zkEvent = <-childEventCh: - logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", + logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}", zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err) ticker.Stop() if zkEvent.Type != zk.EventNodeChildrenChanged {