diff --git a/cluster/gate/gate.go b/cluster/gate/gate.go index 598a41e..465842d 100644 --- a/cluster/gate/gate.go +++ b/cluster/gate/gate.go @@ -9,6 +9,7 @@ package gate import ( "context" + "fmt" "github.com/symsimmy/due/cluster" "github.com/symsimmy/due/common/link" "github.com/symsimmy/due/errors" @@ -281,7 +282,7 @@ func (g *Gate) stopCatServer() { func (g *Gate) registerServiceInstance() { g.instance = ®istry.ServiceInstance{ ID: g.opts.id, - Name: string(cluster.Gate), + Name: fmt.Sprintf("%s%s", g.opts.namespace, string(cluster.Gate)), Kind: cluster.Gate, Alias: g.opts.name, State: cluster.Work, diff --git a/cluster/master/master.go b/cluster/master/master.go index 0e6a25a..ca1c516 100644 --- a/cluster/master/master.go +++ b/cluster/master/master.go @@ -2,6 +2,7 @@ package master import ( "context" + "fmt" "github.com/symsimmy/due/cluster" "github.com/symsimmy/due/common/endpoint" xnet "github.com/symsimmy/due/common/net" @@ -120,7 +121,7 @@ func (m *Master) registerServiceInstance() { m.instance = ®istry.ServiceInstance{ ID: m.opts.id, - Name: string(cluster.Master), + Name: fmt.Sprintf("%s%s", m.opts.namespace, string(cluster.Master)), Kind: cluster.Node, Alias: m.opts.name, State: m.getState(), diff --git a/cluster/node/node.go b/cluster/node/node.go index 291843b..ae65b9b 100644 --- a/cluster/node/node.go +++ b/cluster/node/node.go @@ -2,6 +2,7 @@ package node import ( "context" + "fmt" "github.com/symsimmy/due/cluster" "github.com/symsimmy/due/component" "github.com/symsimmy/due/log" @@ -223,9 +224,15 @@ func (n *Node) registerServiceInstance() { events = append(events, event) } + var name string + if n.opts.namespace != "" { + name = fmt.Sprintf("%s%s.%s", n.opts.namespace, string(cluster.Node), n.opts.name) + } else { + name = string(cluster.Node) + } n.instance = ®istry.ServiceInstance{ ID: n.opts.id, - Name: string(cluster.Node), + Name: name, Kind: cluster.Node, Alias: n.opts.name, State: n.getState(), diff --git a/common/link/link.go b/common/link/link.go index 036a844..d25b823 100644 --- a/common/link/link.go +++ b/common/link/link.go @@ -2,7 +2,6 @@ package link import ( "context" - "fmt" "github.com/symsimmy/due/cluster" "github.com/symsimmy/due/common/dispatcher" "github.com/symsimmy/due/common/endpoint" @@ -253,7 +252,7 @@ func (l *Link) FetchServiceAliasIDs(ctx context.Context, kind cluster.Kind, alia } func (l *Link) FetchServiceAliasList(ctx context.Context, kind cluster.Kind, alias string, states ...cluster.State) ([]*registry.ServiceInstance, error) { - services, err := l.opts.Registry.Services(ctx, fmt.Sprintf("%s%s", l.opts.Namespace, string(kind))) + services, err := l.opts.Registry.Services(ctx, l.opts.Namespace, kind, alias) if err != nil { return nil, err } @@ -1023,14 +1022,19 @@ func (l *Link) getNodeClientByNID(nid string) (transport.NodeClient, error) { // WatchServiceInstance 监听服务实例 func (l *Link) WatchServiceInstance(ctx context.Context, kinds ...cluster.Kind) { for _, kind := range kinds { - l.watchServiceInstance(ctx, kind) + if kind == cluster.Node { + l.watchServiceInstance(ctx, kind, "center") + l.watchServiceInstance(ctx, kind, "game") + } else { + l.watchServiceInstance(ctx, kind, "") + } } } // 监听服务实例 -func (l *Link) watchServiceInstance(ctx context.Context, kind cluster.Kind) { +func (l *Link) watchServiceInstance(ctx context.Context, kind cluster.Kind, alias string) { rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) - watcher, err := l.opts.Registry.Watch(rctx, string(kind)) + watcher, err := l.opts.Registry.Watch(rctx, l.opts.Namespace, kind, alias) rcancel() if err != nil { log.Fatalf("the dispatcher instance watch failed: %v", err) @@ -1049,7 +1053,6 @@ func (l *Link) watchServiceInstance(ctx context.Context, kind cluster.Kind) { if err != nil { continue } - if kind == cluster.Node { l.nodeDispatcher.ReplaceServices(services...) } else { diff --git a/registry/consul/registrar.go b/registry/consul/registrar.go index e7f041c..7773e63 100644 --- a/registry/consul/registrar.go +++ b/registry/consul/registrar.go @@ -68,7 +68,7 @@ func (r *registrar) register(ctx context.Context, ins *registry.ServiceInstance) registration := &api.AgentServiceRegistration{ ID: ins.ID, - Name: fmt.Sprintf("%s%s", ins.Namespace, ins.Name), + Name: ins.Name, Tags: make([]string, 0, len(ins.Events)), Meta: make(map[string]string, 3), Address: overwriteHost, diff --git a/registry/consul/registry.go b/registry/consul/registry.go index a2c090f..4078655 100644 --- a/registry/consul/registry.go +++ b/registry/consul/registry.go @@ -87,7 +87,8 @@ func (r *Registry) Deregister(ctx context.Context, ins *registry.ServiceInstance } // Services 获取服务实例列表 -func (r *Registry) Services(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) { +func (r *Registry) Services(ctx context.Context, namespace string, kind cluster.Kind, alias string) ([]*registry.ServiceInstance, error) { + serviceName := r.getServicesName(namespace, kind, alias) if r.err != nil { return nil, r.err } @@ -102,7 +103,8 @@ func (r *Registry) Services(ctx context.Context, serviceName string) ([]*registr } // Watch 监听服务 -func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) { +func (r *Registry) Watch(ctx context.Context, namespace string, kind cluster.Kind, alias string) (registry.Watcher, error) { + serviceName := r.getServicesName(namespace, kind, alias) if r.err != nil { return nil, r.err } @@ -121,6 +123,14 @@ func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watc return w.fork(), nil } +func (r *Registry) getServicesName(namespace string, kind cluster.Kind, alias string) string { + if alias == "" { + return fmt.Sprintf("%s%s", namespace, string(kind)) + } else { + return fmt.Sprintf("%s%s.%s", namespace, string(kind), alias) + } +} + // 获取服务实体列表 func (r *Registry) services(ctx context.Context, serviceName string, waitIndex uint64, passingOnly bool) ([]*registry.ServiceInstance, uint64, error) { opts := &api.QueryOptions{ diff --git a/registry/registry.go b/registry/registry.go index cb967d6..dbf15c8 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -13,16 +13,16 @@ type Registry interface { // Deregister 解注册服务实例 Deregister(ctx context.Context, ins *ServiceInstance) error // Watch 监听相同服务名的服务实例变化 - Watch(ctx context.Context, serviceName string) (Watcher, error) + Watch(ctx context.Context, namespace string, kind cluster.Kind, alias string) (Watcher, error) // Services 获取服务实例列表 - Services(ctx context.Context, serviceName string) ([]*ServiceInstance, error) + Services(ctx context.Context, namespace string, kind cluster.Kind, alias string) ([]*ServiceInstance, error) } type Discovery interface { // Watch 监听相同服务名的服务实例变化 - Watch(ctx context.Context, serviceName string) (Watcher, error) + Watch(ctx context.Context, namespace string, kind cluster.Kind, alias string) (Watcher, error) // Services 获取服务实例列表 - Services(ctx context.Context, serviceName string) ([]*ServiceInstance, error) + Services(ctx context.Context, namespace string, kind cluster.Kind, alias string) ([]*ServiceInstance, error) } type Watcher interface {