Skip to content

Commit

Permalink
Merge pull request #33 from symsimmy/feature/namespace
Browse files Browse the repository at this point in the history
feature registry namespace
  • Loading branch information
symsimmy committed Jun 20, 2024
2 parents fe8d8d9 + f03c4a0 commit 6600ec6
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 38 deletions.
15 changes: 8 additions & 7 deletions cluster/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,14 @@ func (g *Gate) stopCatServer() {
// 注册服务实例
func (g *Gate) registerServiceInstance() {
g.instance = &registry.ServiceInstance{
ID: g.opts.id,
Name: string(cluster.Gate),
Kind: cluster.Gate,
Alias: g.opts.name,
State: cluster.Work,
Endpoint: g.rpc.Endpoint().String(),
MetaMap: g.opts.registry.GetMetaMap(),
ID: g.opts.id,
Name: string(cluster.Gate),
Kind: cluster.Gate,
Alias: g.opts.name,
State: cluster.Work,
Endpoint: g.rpc.Endpoint().String(),
MetaMap: g.opts.registry.GetMetaMap(),
Namespace: g.opts.namespace,
}
if g.opts.promServer.Enable() {
metricsPort, err := strconv.Atoi(g.opts.promServer.GetMetricsPort())
Expand Down
4 changes: 4 additions & 0 deletions cluster/gate/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
defaultTimeout = 30 * time.Second // 默认超时时间
defaultCodec = "proto" // 默认编解码器名称
defaultWorkloadStat = 500
defaultNamespace = ""
)

const (
Expand All @@ -39,6 +40,7 @@ const (
defaultDecryptorKey = "config.cluster.gate.decryptor"
defaultEncryptorKey = "config.cluster.node.encryptor"
defaultWorkloadStatKey = "config.workload.stat"
defaultNamespaceKey = "config.cluster.gate.namespace"
)

type Option func(o *options)
Expand All @@ -60,6 +62,7 @@ type options struct {
encryptor crypto.Encryptor // 消息加密器
decryptor crypto.Decryptor
workloadStat int32
namespace string
}

func defaultOptions() *options {
Expand All @@ -69,6 +72,7 @@ func defaultOptions() *options {
timeout: defaultTimeout,
codec: encoding.Invoke(defaultCodec),
workloadStat: config.Get(defaultWorkloadStatKey, defaultWorkloadStat).Int32(),
namespace: config.Get(defaultNamespaceKey, defaultNamespace).String(),
}

if id := config.Get(defaultIDKey).String(); id != "" {
Expand Down
1 change: 1 addition & 0 deletions cluster/gate/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func newProxy(gate *Gate) *Proxy {
Transporter: gate.opts.transporter,
Codec: gate.opts.codec,
Encryptor: gate.opts.encryptor,
Namespace: gate.opts.namespace,
})}
}

Expand Down
17 changes: 9 additions & 8 deletions cluster/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package master
import (
"context"
"github.com/symsimmy/due/cluster"
"github.com/symsimmy/due/common/endpoint"
xnet "github.com/symsimmy/due/common/net"
"github.com/symsimmy/due/component"
"github.com/symsimmy/due/config"
_ "github.com/symsimmy/due/crypto/ecc"
_ "github.com/symsimmy/due/crypto/rsa"
_ "github.com/symsimmy/due/encoding/json"
_ "github.com/symsimmy/due/encoding/proto"
_ "github.com/symsimmy/due/encoding/xml"
"github.com/symsimmy/due/common/endpoint"
xnet "github.com/symsimmy/due/common/net"
"github.com/symsimmy/due/log"
"github.com/symsimmy/due/registry"
"sync/atomic"
Expand Down Expand Up @@ -119,12 +119,13 @@ func (m *Master) registerServiceInstance() {
m.endpoint = endpoint.NewEndpoint("http", exposeAddr, false)

m.instance = &registry.ServiceInstance{
ID: m.opts.id,
Name: string(cluster.Master),
Kind: cluster.Node,
Alias: m.opts.name,
State: m.getState(),
Endpoint: m.endpoint.String(),
ID: m.opts.id,
Name: string(cluster.Master),
Kind: cluster.Node,
Alias: m.opts.name,
State: m.getState(),
Endpoint: m.endpoint.String(),
Namespace: m.opts.namespace,
}

ctx, cancel := context.WithTimeout(m.ctx, 10*time.Second)
Expand Down
16 changes: 10 additions & 6 deletions cluster/master/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
)

const (
defaultName = "master" // 默认节点名称
defaultCodec = "proto" // 默认编解码器名称
defaultTimeout = 3 * time.Second // 默认超时时间
defaultName = "master" // 默认节点名称
defaultCodec = "proto" // 默认编解码器名称
defaultTimeout = 3 * time.Second // 默认超时时间
defaultNamespace = ""
)

const (
Expand All @@ -26,6 +27,7 @@ const (
defaultTimeoutKey = "config.cluster.master.timeout"
defaultEncryptorKey = "config.cluster.master.encryptor"
defaultDecryptorKey = "config.cluster.master.decryptor"
defaultNamespaceKey = "config.cluster.master.namespace"
)

type Option func(o *options)
Expand All @@ -42,13 +44,15 @@ type options struct {
catServer *cat.Server
encryptor crypto.Encryptor // 消息加密器
decryptor crypto.Decryptor // 消息解密器
namespace string
}

func defaultOptions() *options {
opts := &options{
ctx: context.Background(),
name: defaultName,
timeout: defaultTimeout,
ctx: context.Background(),
name: defaultName,
timeout: defaultTimeout,
namespace: config.Get(defaultNamespaceKey, defaultNamespace).String(),
}

if id := config.Get(defaultIDKey).String(); id != "" {
Expand Down
1 change: 1 addition & 0 deletions cluster/master/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func newProxy(master *Master) *Proxy {
Registry: master.opts.registry,
Encryptor: master.opts.encryptor,
Transporter: master.opts.transporter,
Namespace: master.opts.namespace,
})}
}

Expand Down
19 changes: 10 additions & 9 deletions cluster/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,16 @@ func (n *Node) registerServiceInstance() {
}

n.instance = &registry.ServiceInstance{
ID: n.opts.id,
Name: string(cluster.Node),
Kind: cluster.Node,
Alias: n.opts.name,
State: n.getState(),
Routes: routes,
Events: events,
Endpoint: n.rpc.Endpoint().String(),
MetaMap: n.opts.registry.GetMetaMap(),
ID: n.opts.id,
Name: string(cluster.Node),
Kind: cluster.Node,
Alias: n.opts.name,
State: n.getState(),
Routes: routes,
Events: events,
Endpoint: n.rpc.Endpoint().String(),
MetaMap: n.opts.registry.GetMetaMap(),
Namespace: n.opts.namespace,
}
if n.opts.promServer.Enable() {
metricsPort, err := strconv.Atoi(n.opts.promServer.GetMetricsPort())
Expand Down
12 changes: 8 additions & 4 deletions cluster/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
defaultTimeout = 3 * time.Second // 默认超时时间
defaultEnableAsyncEventHandle = true // event事件异步执行
defaultEnableAsyncRouterHandle = true // route事件异步执行
defaultNamespace = ""
)

const (
Expand All @@ -31,6 +32,7 @@ const (
defaultDecryptorKey = "config.cluster.node.decryptor"
defaultAsyncEventHandleKey = "config.cluster.node.async_event_handle"
defaultAsyncRouterHandleKey = "config.cluster.node.async_router_handle"
defaultNamespaceKey = "config.cluster.node.namespace"
)

type Option func(o *options)
Expand All @@ -50,14 +52,16 @@ type options struct {
decryptor crypto.Decryptor // 消息解密器
asyncEventHandle bool // 异步处理连接事件
asyncRouterHandle bool // 异步处理消息事件
namespace string
}

func defaultOptions() *options {
opts := &options{
ctx: context.Background(),
name: defaultName,
codec: encoding.Invoke(defaultCodec),
timeout: defaultTimeout,
ctx: context.Background(),
name: defaultName,
codec: encoding.Invoke(defaultCodec),
timeout: defaultTimeout,
namespace: config.Get(defaultNamespaceKey, defaultNamespace).String(),
}

if id := config.Get(defaultIDKey).String(); id != "" {
Expand Down
1 change: 1 addition & 0 deletions cluster/node/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func newProxy(node *Node) *Proxy {
Registry: node.opts.registry,
Encryptor: node.opts.encryptor,
Transporter: node.opts.transporter,
Namespace: node.opts.namespace,
})}
}

Expand Down
4 changes: 3 additions & 1 deletion common/link/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package link

import (
"context"
"fmt"
"github.com/symsimmy/due/cluster"
"github.com/symsimmy/due/common/dispatcher"
"github.com/symsimmy/due/common/endpoint"
Expand Down Expand Up @@ -50,6 +51,7 @@ type Options struct {
Encryptor crypto.Encryptor // 加密器
Transporter transport.Transporter // 传输器
BalanceStrategy dispatcher.BalanceStrategy // 负载均衡策略
Namespace string
}

func NewLink(opts *Options) *Link {
Expand Down Expand Up @@ -251,7 +253,7 @@ func (l *Link) FetchServiceAliasIDs(ctx context.Context, kind cluster.Kind, alia
}

func (l *Link) FetchServiceAliasList(ctx context.Context, kind cluster.Kind, alias string, states ...cluster.State) ([]*registry.ServiceInstance, error) {
services, err := l.opts.Registry.Services(ctx, string(kind))
services, err := l.opts.Registry.Services(ctx, fmt.Sprintf("%s%s", l.opts.Namespace, string(kind)))
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions registry/consul/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *registrar) register(ctx context.Context, ins *registry.ServiceInstance)

registration := &api.AgentServiceRegistration{
ID: ins.ID,
Name: ins.Name,
Name: fmt.Sprintf("%s%s", ins.Namespace, ins.Name),
Tags: make([]string, 0, len(ins.Events)),
Meta: make(map[string]string, 3),
Address: overwriteHost,
Expand Down Expand Up @@ -137,7 +137,7 @@ func (r *registrar) register(ctx context.Context, ins *registry.ServiceInstance)
metricsInsId := ins.ID + "-exporter"
metricsRegistration := &api.AgentServiceRegistration{
ID: metricsInsId,
Name: ins.Name + "-exporter",
Name: fmt.Sprintf("%s%s-exporter", ins.Namespace, ins.Name),
Address: overwriteHost,
Port: ins.MetricsPort,
TaggedAddresses: map[string]api.ServiceAddress{raw.Scheme: {
Expand Down
3 changes: 2 additions & 1 deletion registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type ServiceInstance struct {
MetaMap map[string]string `json:"metaMap"`
// prometheus enable
// prometheus metrics暴露端口
MetricsPort int `json:"metricsPort"`
MetricsPort int `json:"metricsPort"`
Namespace string `json:"namespace"`
}

type Route struct {
Expand Down

0 comments on commit 6600ec6

Please sign in to comment.