From f03c4a05257f1a9860fa814e4838359a8b24c69d Mon Sep 17 00:00:00 2001 From: David_zdw Date: Thu, 20 Jun 2024 15:36:17 +0800 Subject: [PATCH] feature registry namespace --- cluster/gate/gate.go | 15 ++++++++------- cluster/gate/options.go | 4 ++++ cluster/gate/proxy.go | 1 + cluster/master/master.go | 17 +++++++++-------- cluster/master/options.go | 16 ++++++++++------ cluster/master/proxy.go | 1 + cluster/node/node.go | 19 ++++++++++--------- cluster/node/options.go | 12 ++++++++---- cluster/node/proxy.go | 1 + common/link/link.go | 4 +++- registry/consul/registrar.go | 4 ++-- registry/registry.go | 3 ++- 12 files changed, 59 insertions(+), 38 deletions(-) diff --git a/cluster/gate/gate.go b/cluster/gate/gate.go index 8b0b9281..598a41e3 100644 --- a/cluster/gate/gate.go +++ b/cluster/gate/gate.go @@ -280,13 +280,14 @@ func (g *Gate) stopCatServer() { // 注册服务实例 func (g *Gate) registerServiceInstance() { g.instance = ®istry.ServiceInstance{ - ID: g.opts.id, - Name: string(cluster.Gate), - Kind: cluster.Gate, - Alias: g.opts.name, - State: cluster.Work, - Endpoint: g.rpc.Endpoint().String(), - MetaMap: g.opts.registry.GetMetaMap(), + ID: g.opts.id, + Name: string(cluster.Gate), + Kind: cluster.Gate, + Alias: g.opts.name, + State: cluster.Work, + Endpoint: g.rpc.Endpoint().String(), + MetaMap: g.opts.registry.GetMetaMap(), + Namespace: g.opts.namespace, } if g.opts.promServer.Enable() { metricsPort, err := strconv.Atoi(g.opts.promServer.GetMetricsPort()) diff --git a/cluster/gate/options.go b/cluster/gate/options.go index 3a2934ed..c1246860 100644 --- a/cluster/gate/options.go +++ b/cluster/gate/options.go @@ -29,6 +29,7 @@ const ( defaultTimeout = 30 * time.Second // 默认超时时间 defaultCodec = "proto" // 默认编解码器名称 defaultWorkloadStat = 500 + defaultNamespace = "" ) const ( @@ -39,6 +40,7 @@ const ( defaultDecryptorKey = "config.cluster.gate.decryptor" defaultEncryptorKey = "config.cluster.node.encryptor" defaultWorkloadStatKey = "config.workload.stat" + defaultNamespaceKey = "config.cluster.gate.namespace" ) type Option func(o *options) @@ -60,6 +62,7 @@ type options struct { encryptor crypto.Encryptor // 消息加密器 decryptor crypto.Decryptor workloadStat int32 + namespace string } func defaultOptions() *options { @@ -69,6 +72,7 @@ func defaultOptions() *options { timeout: defaultTimeout, codec: encoding.Invoke(defaultCodec), workloadStat: config.Get(defaultWorkloadStatKey, defaultWorkloadStat).Int32(), + namespace: config.Get(defaultNamespaceKey, defaultNamespace).String(), } if id := config.Get(defaultIDKey).String(); id != "" { diff --git a/cluster/gate/proxy.go b/cluster/gate/proxy.go index b35de0a6..8727241f 100644 --- a/cluster/gate/proxy.go +++ b/cluster/gate/proxy.go @@ -38,6 +38,7 @@ func newProxy(gate *Gate) *Proxy { Transporter: gate.opts.transporter, Codec: gate.opts.codec, Encryptor: gate.opts.encryptor, + Namespace: gate.opts.namespace, })} } diff --git a/cluster/master/master.go b/cluster/master/master.go index 708b7bed..0e6a25ae 100644 --- a/cluster/master/master.go +++ b/cluster/master/master.go @@ -3,6 +3,8 @@ package master import ( "context" "github.com/symsimmy/due/cluster" + "github.com/symsimmy/due/common/endpoint" + xnet "github.com/symsimmy/due/common/net" "github.com/symsimmy/due/component" "github.com/symsimmy/due/config" _ "github.com/symsimmy/due/crypto/ecc" @@ -10,8 +12,6 @@ import ( _ "github.com/symsimmy/due/encoding/json" _ "github.com/symsimmy/due/encoding/proto" _ "github.com/symsimmy/due/encoding/xml" - "github.com/symsimmy/due/common/endpoint" - xnet "github.com/symsimmy/due/common/net" "github.com/symsimmy/due/log" "github.com/symsimmy/due/registry" "sync/atomic" @@ -119,12 +119,13 @@ func (m *Master) registerServiceInstance() { m.endpoint = endpoint.NewEndpoint("http", exposeAddr, false) m.instance = ®istry.ServiceInstance{ - ID: m.opts.id, - Name: string(cluster.Master), - Kind: cluster.Node, - Alias: m.opts.name, - State: m.getState(), - Endpoint: m.endpoint.String(), + ID: m.opts.id, + Name: string(cluster.Master), + Kind: cluster.Node, + Alias: m.opts.name, + State: m.getState(), + Endpoint: m.endpoint.String(), + Namespace: m.opts.namespace, } ctx, cancel := context.WithTimeout(m.ctx, 10*time.Second) diff --git a/cluster/master/options.go b/cluster/master/options.go index ada0486a..07b81f14 100644 --- a/cluster/master/options.go +++ b/cluster/master/options.go @@ -14,9 +14,10 @@ import ( ) const ( - defaultName = "master" // 默认节点名称 - defaultCodec = "proto" // 默认编解码器名称 - defaultTimeout = 3 * time.Second // 默认超时时间 + defaultName = "master" // 默认节点名称 + defaultCodec = "proto" // 默认编解码器名称 + defaultTimeout = 3 * time.Second // 默认超时时间 + defaultNamespace = "" ) const ( @@ -26,6 +27,7 @@ const ( defaultTimeoutKey = "config.cluster.master.timeout" defaultEncryptorKey = "config.cluster.master.encryptor" defaultDecryptorKey = "config.cluster.master.decryptor" + defaultNamespaceKey = "config.cluster.master.namespace" ) type Option func(o *options) @@ -42,13 +44,15 @@ type options struct { catServer *cat.Server encryptor crypto.Encryptor // 消息加密器 decryptor crypto.Decryptor // 消息解密器 + namespace string } func defaultOptions() *options { opts := &options{ - ctx: context.Background(), - name: defaultName, - timeout: defaultTimeout, + ctx: context.Background(), + name: defaultName, + timeout: defaultTimeout, + namespace: config.Get(defaultNamespaceKey, defaultNamespace).String(), } if id := config.Get(defaultIDKey).String(); id != "" { diff --git a/cluster/master/proxy.go b/cluster/master/proxy.go index f2fdf401..c57989c0 100644 --- a/cluster/master/proxy.go +++ b/cluster/master/proxy.go @@ -34,6 +34,7 @@ func newProxy(master *Master) *Proxy { Registry: master.opts.registry, Encryptor: master.opts.encryptor, Transporter: master.opts.transporter, + Namespace: master.opts.namespace, })} } diff --git a/cluster/node/node.go b/cluster/node/node.go index c187c946..291843b2 100644 --- a/cluster/node/node.go +++ b/cluster/node/node.go @@ -224,15 +224,16 @@ func (n *Node) registerServiceInstance() { } n.instance = ®istry.ServiceInstance{ - ID: n.opts.id, - Name: string(cluster.Node), - Kind: cluster.Node, - Alias: n.opts.name, - State: n.getState(), - Routes: routes, - Events: events, - Endpoint: n.rpc.Endpoint().String(), - MetaMap: n.opts.registry.GetMetaMap(), + ID: n.opts.id, + Name: string(cluster.Node), + Kind: cluster.Node, + Alias: n.opts.name, + State: n.getState(), + Routes: routes, + Events: events, + Endpoint: n.rpc.Endpoint().String(), + MetaMap: n.opts.registry.GetMetaMap(), + Namespace: n.opts.namespace, } if n.opts.promServer.Enable() { metricsPort, err := strconv.Atoi(n.opts.promServer.GetMetricsPort()) diff --git a/cluster/node/options.go b/cluster/node/options.go index dd4dab7c..4454614b 100644 --- a/cluster/node/options.go +++ b/cluster/node/options.go @@ -20,6 +20,7 @@ const ( defaultTimeout = 3 * time.Second // 默认超时时间 defaultEnableAsyncEventHandle = true // event事件异步执行 defaultEnableAsyncRouterHandle = true // route事件异步执行 + defaultNamespace = "" ) const ( @@ -31,6 +32,7 @@ const ( defaultDecryptorKey = "config.cluster.node.decryptor" defaultAsyncEventHandleKey = "config.cluster.node.async_event_handle" defaultAsyncRouterHandleKey = "config.cluster.node.async_router_handle" + defaultNamespaceKey = "config.cluster.node.namespace" ) type Option func(o *options) @@ -50,14 +52,16 @@ type options struct { decryptor crypto.Decryptor // 消息解密器 asyncEventHandle bool // 异步处理连接事件 asyncRouterHandle bool // 异步处理消息事件 + namespace string } func defaultOptions() *options { opts := &options{ - ctx: context.Background(), - name: defaultName, - codec: encoding.Invoke(defaultCodec), - timeout: defaultTimeout, + ctx: context.Background(), + name: defaultName, + codec: encoding.Invoke(defaultCodec), + timeout: defaultTimeout, + namespace: config.Get(defaultNamespaceKey, defaultNamespace).String(), } if id := config.Get(defaultIDKey).String(); id != "" { diff --git a/cluster/node/proxy.go b/cluster/node/proxy.go index 593a1938..6359d8a3 100644 --- a/cluster/node/proxy.go +++ b/cluster/node/proxy.go @@ -49,6 +49,7 @@ func newProxy(node *Node) *Proxy { Registry: node.opts.registry, Encryptor: node.opts.encryptor, Transporter: node.opts.transporter, + Namespace: node.opts.namespace, })} } diff --git a/common/link/link.go b/common/link/link.go index 8e74a5f8..036a8443 100644 --- a/common/link/link.go +++ b/common/link/link.go @@ -2,6 +2,7 @@ package link import ( "context" + "fmt" "github.com/symsimmy/due/cluster" "github.com/symsimmy/due/common/dispatcher" "github.com/symsimmy/due/common/endpoint" @@ -50,6 +51,7 @@ type Options struct { Encryptor crypto.Encryptor // 加密器 Transporter transport.Transporter // 传输器 BalanceStrategy dispatcher.BalanceStrategy // 负载均衡策略 + Namespace string } func NewLink(opts *Options) *Link { @@ -251,7 +253,7 @@ func (l *Link) FetchServiceAliasIDs(ctx context.Context, kind cluster.Kind, alia } func (l *Link) FetchServiceAliasList(ctx context.Context, kind cluster.Kind, alias string, states ...cluster.State) ([]*registry.ServiceInstance, error) { - services, err := l.opts.Registry.Services(ctx, string(kind)) + services, err := l.opts.Registry.Services(ctx, fmt.Sprintf("%s%s", l.opts.Namespace, string(kind))) if err != nil { return nil, err } diff --git a/registry/consul/registrar.go b/registry/consul/registrar.go index 60c5d6c4..e7f041c0 100644 --- a/registry/consul/registrar.go +++ b/registry/consul/registrar.go @@ -68,7 +68,7 @@ func (r *registrar) register(ctx context.Context, ins *registry.ServiceInstance) registration := &api.AgentServiceRegistration{ ID: ins.ID, - Name: ins.Name, + Name: fmt.Sprintf("%s%s", ins.Namespace, ins.Name), Tags: make([]string, 0, len(ins.Events)), Meta: make(map[string]string, 3), Address: overwriteHost, @@ -137,7 +137,7 @@ func (r *registrar) register(ctx context.Context, ins *registry.ServiceInstance) metricsInsId := ins.ID + "-exporter" metricsRegistration := &api.AgentServiceRegistration{ ID: metricsInsId, - Name: ins.Name + "-exporter", + Name: fmt.Sprintf("%s%s-exporter", ins.Namespace, ins.Name), Address: overwriteHost, Port: ins.MetricsPort, TaggedAddresses: map[string]api.ServiceAddress{raw.Scheme: { diff --git a/registry/registry.go b/registry/registry.go index 13db0cfc..cb967d6f 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -52,7 +52,8 @@ type ServiceInstance struct { MetaMap map[string]string `json:"metaMap"` // prometheus enable // prometheus metrics暴露端口 - MetricsPort int `json:"metricsPort"` + MetricsPort int `json:"metricsPort"` + Namespace string `json:"namespace"` } type Route struct {