From 6a3e7ed1b6bd6c6e016190230656fc3479c06fd3 Mon Sep 17 00:00:00 2001 From: xwi88 <278810732@qq.com> Date: Tue, 29 Mar 2022 12:10:27 +0800 Subject: [PATCH] refactor config and update examples (#5) --- config_etcd.go => config/config.go | 86 ++++++---------------- discover.go | 6 +- examples/discover/simple_discover.go | 49 +++++++----- examples/register/simple_register.go | 43 ++++++----- go.sum | 10 +-- internal/discovering/discover.go | 3 +- internal/discovering/etcd/discover_grpc.go | 56 +++++++------- internal/registering/etcd/register.go | 10 ++- internal/registering/register.go | 3 +- register.go | 6 +- 10 files changed, 126 insertions(+), 146 deletions(-) rename config_etcd.go => config/config.go (54%) diff --git a/config_etcd.go b/config/config.go similarity index 54% rename from config_etcd.go rename to config/config.go index 1fe4bc1..dc71154 100644 --- a/config_etcd.go +++ b/config/config.go @@ -1,37 +1,18 @@ -package rd +package config import ( "log" "time" - - disETCD "github.com/v8fg/rd/internal/discovering/etcd" - regETCD "github.com/v8fg/rd/internal/registering/etcd" ) -type CommonConfig struct { - ChannelBufferSize int // default 256, for errors or messages channel - - // Return specifies what will be populated. If they are set to true, - // you must read from them to prevent deadlock. - Return struct { - // If enabled, any errors that occurred while consuming are returned on - // the Errors channel (default disabled). - Errors bool - // If enabled, any messages that occurred while consuming are returned on - // the Messages channel (default disabled). - Messages bool - } - - ErrorsHandler func(err <-chan error) // consume errors, if not set drop. - MessagesHandler func(string <-chan string) // consume messages expect errors, if not set drop. - Logger *log.Logger // shall not set, use for debug -} - +// RegisterConfig is used to pass multiple configuration options to the register's constructors. +// Watch out the Immutable set, allow override your KV, shall use default value false. type RegisterConfig struct { - Name string // the name for store the instance, unique. If empty will use Key replace. - Key string // register key, unique. The format maybe like: /{scheme}/{service}/{endPoint}. - Val string - TTL time.Duration + Name string // the name for store the instance, unique. If empty will use Key replace. + Key string // register key, unique. The format maybe like: /{scheme}/{service}/{endPoint}. + Val string + TTL time.Duration + MaxLoopTry uint64 // default 64, if error and try max times, register effect only KeepAlive.Mode=1. MutableVal bool // If true you can override the 'Val', default false. Pls watch out other items shall not change, so dangerous. @@ -40,10 +21,10 @@ type RegisterConfig struct { Mode uint8 // 0=ticker(default, KeepAliveOnce), 1=KeepAlive(not support val update) } - MaxLoopTry uint64 // default 64, if error and try max times, register effect only KeepAlive.Mode=1. CommonConfig } +// DiscoverConfig is used to pass multiple configuration options to the discovery's constructors. type DiscoverConfig struct { Name string // the name for store the instance, unique. If empty will use Scheme, Service replace. Scheme string // register resolver with name scheme, like: services @@ -55,41 +36,22 @@ type DiscoverConfig struct { CommonConfig } -func convertRegisterConfigToInternalETCDRegisterConfig(rgc *RegisterConfig) (rge *regETCD.Config) { - if rgc != nil { - rge = ®ETCD.Config{ - Name: rgc.Name, - Key: rgc.Key, - Val: rgc.Val, - TTL: rgc.TTL, - MaxLoopTry: rgc.MaxLoopTry, - ChannelBufferSize: rgc.ChannelBufferSize, - MutableVal: rgc.MutableVal, - KeepAlive: rgc.KeepAlive, - Return: rgc.Return, - ErrorsHandler: rgc.ErrorsHandler, - MessagesHandler: rgc.MessagesHandler, - Logger: rgc.Logger, - } +// CommonConfig common config used for register and discovery +type CommonConfig struct { + ChannelBufferSize int // default 256, for errors or messages channel + // Return specifies what will be populated. If they are set to true, + // you must read from them to prevent deadlock. + Return struct { + // If enabled, any errors that occurred while consuming are returned on + // the Errors channel (default disabled). + Errors bool + // If enabled, any messages that occurred while consuming are returned on + // the Messages channel (default disabled). + Messages bool } - return rge -} -func convertDiscoverConfigToInternalETCDDiscoverConfig(dc *DiscoverConfig) (dce *disETCD.Config) { - if dc != nil { - dce = &disETCD.Config{ - Name: dc.Name, - Scheme: dc.Scheme, - Service: dc.Service, - ReturnResolve: dc.ReturnResolve, - ChannelBufferSize: dc.ChannelBufferSize, - Return: dc.Return, - ErrorsHandler: dc.ErrorsHandler, - MessagesHandler: dc.MessagesHandler, - AddressesParser: dc.AddressesParser, - Logger: dc.Logger, - } - } - return dce + ErrorsHandler func(err <-chan error) // consume errors, if not set drop. + MessagesHandler func(string <-chan string) // consume messages expect errors, if not set drop. + Logger *log.Logger // shall not set, use for debug } diff --git a/discover.go b/discover.go index 72446a3..f196874 100644 --- a/discover.go +++ b/discover.go @@ -3,6 +3,7 @@ package rd import ( clientV3 "go.etcd.io/etcd/client/v3" + "github.com/v8fg/rd/config" "github.com/v8fg/rd/internal/discovering" ) @@ -15,9 +16,8 @@ func init() { // DiscoverEtcd etcd discover with some configurations. // registry key: name or key, the name preferred. -func DiscoverEtcd(config *DiscoverConfig, client *clientV3.Client, etcdConfig *clientV3.Config) error { - _cfg := convertDiscoverConfigToInternalETCDDiscoverConfig(config) - return discoverRegistry.Register(_cfg, client, etcdConfig) +func DiscoverEtcd(config *config.DiscoverConfig, client *clientV3.Client, etcdConfig *clientV3.Config) error { + return discoverRegistry.Register(config, client, etcdConfig) } // DiscoverInfo return the basic info about discover: key and discover addr. diff --git a/examples/discover/simple_discover.go b/examples/discover/simple_discover.go index 6e1878f..eae48a4 100644 --- a/examples/discover/simple_discover.go +++ b/examples/discover/simple_discover.go @@ -5,47 +5,53 @@ import ( "encoding/json" "fmt" "log" + "os" + "os/signal" + "syscall" "time" clientV3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "github.com/v8fg/rd" + "github.com/v8fg/rd/config" ) +const moduleName = "simple_discover" + var ( errorsHandler = func(errors <-chan error) { for errMsg := range errors { - fmt.Printf("errors consume, count:%v, content:%v\n", len(errors), errMsg) + log.Printf("[%s] errors consume, count:%v, content:%v\n", moduleName, len(errors), errMsg) } } messagesHandler = func(messages <-chan string) { for message := range messages { - fmt.Printf("messages consume, count:%v, content:%v\n", len(messages), message) + log.Printf("[%s] messages consume, count:%v, content:%v\n", moduleName, len(messages), message) } } addressesParser = func(key string, data []byte) (addr string, err error) { - fmt.Printf("addressesParser consume, key:%v, data: %s\n", key, data) + log.Printf("[%s] addressesParser consume, key:%v, data: %s\n", moduleName, key, data) return string(data), err } addressesParserJSON = func(key string, data []byte) (addr string, err error) { - fmt.Printf("addressesParser consume, key:%v, data:%s\n", key, data) + log.Printf("[%s] addressesParser consume, key:%v, data:%s\n", moduleName, key, data) dict := make(map[string]interface{}) err = json.Unmarshal(data, &dict) - fmt.Printf("dict:%v, endPoint: %s\n", dict, dict["endPoint"].(string)) + log.Printf("[%s] dict:%v, endPoint: %s\n", moduleName, dict, dict["endPoint"].(string)) return addr, err } - // logger = log.New(log.Writer(), "[rd-test-discover] ", log.LstdFlags|log.Lshortfile) - logger = log.New(log.Writer(), "[rd-test-discover] ", log.LstdFlags) + // logger = log.New(log.Writer(), fmt.Sprintf("[%s] ", moduleName), log.LstdFlags|log.Lshortfile) + logger = log.New(log.Writer(), fmt.Sprintf("[%s] ", moduleName), log.LstdFlags) // your register key will be: /{scheme}/{service} scheme = "services" service = "test/v1.0" - discoverRegistryName = "my-rd-test-discover" + time.Now().Format("200601021504") + discoverRegistryName = fmt.Sprintf("%s-", moduleName) + time.Now().Format("200601021504") ) func init() { @@ -67,11 +73,11 @@ func main() { func initDiscoverGRPCETCD() { var err error - cfg := rd.DiscoverConfig{ + cfg := config.DiscoverConfig{ Name: discoverRegistryName, Scheme: scheme, Service: service, - CommonConfig: rd.CommonConfig{ + CommonConfig: config.CommonConfig{ ChannelBufferSize: 16, ErrorsHandler: errorsHandler, MessagesHandler: messagesHandler, @@ -87,7 +93,7 @@ func initDiscoverGRPCETCD() { Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: time.Second * 5, }) - fmt.Printf("DiscoverEtcd: %+v, err:%v\n", rd.DiscoverInfo(), err) + log.Printf("[%s] initDiscoverGRPCETCD: %+v, err:%v\n", moduleName, rd.DiscoverInfo(), err) if err != nil { panic(err) } @@ -96,33 +102,36 @@ func initDiscoverGRPCETCD() { func runDiscover() { errs := rd.DiscoverRun() if len(errs) > 0 { - fmt.Printf("DiscoverRun errors:%v\n", errs) + log.Printf("[%s] runDiscover errors:%v", moduleName, errs) } } func block() { - log.Printf("[block] block main exit\n") + log.Printf("[%s] block enter block main", moduleName) tk := time.NewTicker(time.Second * 5) defer tk.Stop() - done := make(chan struct{}) + quit := make(chan struct{}) + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) go func() { - <-time.After(time.Second * 900) - done <- struct{}{} + sig := <-ch + log.Printf("[%s] received signal: %v", moduleName, sig) + quit <- struct{}{} }() loop: for { select { case <-tk.C: - case <-done: + case <-quit: rd.DiscoverClose() - fmt.Printf("close success") + log.Printf("[%s] block close success and exit block main", moduleName) break loop } - log.Println("out select, but in for loop") + log.Printf("[%s] out select, but in for loop", moduleName) } - log.Println("[block] all is done") + log.Printf("[%s] block all is done", moduleName) } diff --git a/examples/register/simple_register.go b/examples/register/simple_register.go index f7102ba..f4d5faa 100644 --- a/examples/register/simple_register.go +++ b/examples/register/simple_register.go @@ -4,35 +4,41 @@ import ( "fmt" "log" "math/rand" + "os" + "os/signal" + "syscall" "time" clientV3 "go.etcd.io/etcd/client/v3" "github.com/v8fg/rd" + "github.com/v8fg/rd/config" ) +const moduleName = "simple_register" + var ( errorsHandler = func(errors <-chan error) { for errMsg := range errors { - fmt.Printf("errors consume, count:%v, content:%v\n", len(errors), errMsg) + log.Printf("[%s] errors consume, count:%v, content:%v", moduleName, len(errors), errMsg) } } messagesHandler = func(messages <-chan string) { for message := range messages { - fmt.Printf("messages consume, count:%v, content:%v\n", len(messages), message) + log.Printf("[%s] messages consume, count:%v, content:%v", moduleName, len(messages), message) } } - // logger = log.New(log.Writer(), "[rd-test-register] ", log.LstdFlags|log.Lshortfile) - logger = log.New(log.Writer(), "[rd-test-register] ", log.LstdFlags) + // logger = log.New(log.Writer(), fmt.Sprintf("[%s] ", moduleName), log.LstdFlags|log.Lshortfile) + logger = log.New(log.Writer(), fmt.Sprintf("[%s] ", moduleName), log.LstdFlags) key = fmt.Sprintf("/services/test/v1.0/grpc/127.0.0.1:33%v", time.Now().Second()+int(rand.Int31n(300))) ) -var cfg = rd.RegisterConfig{ - Name: "my-rd-test-register" + time.Now().Format("200601021504"), +var cfg = config.RegisterConfig{ + Name: fmt.Sprintf("%s-", moduleName) + time.Now().Format("200601021504"), Key: key, Val: key, TTL: time.Second * 15, - CommonConfig: rd.CommonConfig{ + CommonConfig: config.CommonConfig{ ChannelBufferSize: 64, ErrorsHandler: errorsHandler, MessagesHandler: messagesHandler, @@ -60,7 +66,7 @@ func initRegisterETCD() { DialTimeout: time.Second * 5, }) - fmt.Printf("RegisterEtcd: %+v, err:%v\n", rd.RegisterInfo(), err) + log.Printf("[%s] initRegisterETCD: %+v, err:%v", moduleName, rd.RegisterInfo(), err) if err != nil { panic(err) } @@ -75,11 +81,14 @@ func runRegister() { tk := time.NewTicker(time.Second * 20) defer tk.Stop() - done := make(chan struct{}) + quit := make(chan struct{}) + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) go func() { - <-time.After(time.Second * 6000) - done <- struct{}{} + sig := <-ch + log.Printf("[%s] received signal: %v", moduleName, sig) + quit <- struct{}{} }() loop: @@ -88,16 +97,16 @@ loop: case <-tk.C: newVal := fmt.Sprintf("127.0.0.1:33%v", time.Now().Second()+int(rand.Int31n(300))) err := rd.RegisterUpdateVal(cfg.Name, newVal) - log.Printf("ticker update val:%v, err:%v\n", newVal, err) - case <-done: + log.Printf("[%s] runRegister ticker update val:%v, err:%v", moduleName, newVal, err) + case <-quit: errs := rd.RegisterClose() if len(errs) != 0 { - panic(fmt.Sprintf("close err:%+v", errs)) + panic(fmt.Sprintf("[%s] close err:%+v", moduleName, errs)) } - fmt.Printf("close success") + log.Printf("[%s] close success", moduleName) break loop } - // log.Println("out select, but in for loop") + // log.Printf("[%s] out select, but in for loop", moduleName) } - log.Println("all is done") + log.Printf("[%s] all is done", moduleName) } diff --git a/go.sum b/go.sum index 32e9ae3..5f3bfc6 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,7 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -140,16 +141,14 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.2/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3 go.etcd.io/etcd/client/v3 v3.5.2 h1:WdnejrUtQC4nCxK0/dLTMqKOB+U5TP/2Ya0BJL+1otA= go.etcd.io/etcd/client/v3 v3.5.2/go.mod h1:kOOaWFFgHygyT0WlSmL8TJiXmMysO/nNUlEsSsN6W4o= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= -go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= @@ -180,7 +179,6 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20220325170049-de3da57026de h1:pZB1TWnKi+o4bENlbzAgLrEbY4RMYmUIRobMcSmfeYc= golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -211,7 +209,6 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220325203850-36772127a21f h1:TrmogKRsSOxRMJbLYGrB4SBbW+LJcEllYBLME5Zk5pU= golang.org/x/sys v0.0.0-20220325203850-36772127a21f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -219,7 +216,6 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= @@ -245,7 +241,6 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c h1:wtujag7C+4D6KMoulW9YauvK2lgdvCMS260jsqqBXr0= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20220324131243-acbaeb5b85eb h1:0m9wktIpOxGw+SSKmydXWB3Z3GTfcPP6+q75HCQa6HI= google.golang.org/genproto v0.0.0-20220324131243-acbaeb5b85eb/go.mod h1:hAL49I2IFola2sVEjAn7MEwsja0xp51I0tlGAf9hz4E= @@ -268,7 +263,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= diff --git a/internal/discovering/discover.go b/internal/discovering/discover.go index 1313dda..94518ee 100644 --- a/internal/discovering/discover.go +++ b/internal/discovering/discover.go @@ -8,6 +8,7 @@ import ( clientV3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc/resolver" + "github.com/v8fg/rd/config" "github.com/v8fg/rd/internal/discovering/etcd" ) @@ -65,7 +66,7 @@ func (r *DiscoverRegistry) Info() string { // } // Register the Discover for the service. -func (r *DiscoverRegistry) Register(config *etcd.Config, client *clientV3.Client, etcdConfig *clientV3.Config) error { +func (r *DiscoverRegistry) Register(config *config.DiscoverConfig, client *clientV3.Client, etcdConfig *clientV3.Config) error { r.mu.Lock() defer r.mu.Unlock() diff --git a/internal/discovering/etcd/discover_grpc.go b/internal/discovering/etcd/discover_grpc.go index 96ad293..798b7ca 100644 --- a/internal/discovering/etcd/discover_grpc.go +++ b/internal/discovering/etcd/discover_grpc.go @@ -12,40 +12,42 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" clientV3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc/resolver" + + "github.com/v8fg/rd/config" ) const moduleName = "discover-etcd-grpc" type none struct{} -type Config struct { - Name string // the name for registry store the instance, unique. If empty will use the combine: Scheme, Service replace. - Scheme string // register resolver with name scheme, like: services - Service string // service name, like: test/v1.0/grpc - - ReturnResolve bool // if true, will output Resolve info to messages. - ChannelBufferSize int // default 256, for errors or messages channel - - // Return specifies what will be populated. If they are set to true, - // you must read from them to prevent deadlock. - Return struct { - // If enabled, any errors that occurred while consuming are returned on - // the Errors channel (default disabled). - Errors bool - // If enabled, any messages that occurred while consuming are returned on - // the Messages channel (default disabled). - Messages bool - } - ErrorsHandler func(err <-chan error) // consume errors, if not set drop. - MessagesHandler func(string <-chan string) // consume messages expect errors, if not set drop. - AddressesParser func(string, []byte) (string, error) // parse address, k string, val []byte, return address string. - Logger *log.Logger // shall not set, use for debug -} +// type Config struct { +// Name string // the name for registry store the instance, unique. If empty will use the combine: Scheme, Service replace. +// Scheme string // register resolver with name scheme, like: services +// Service string // service name, like: test/v1.0/grpc +// +// ReturnResolve bool // if true, will output Resolve info to messages. +// ChannelBufferSize int // default 256, for errors or messages channel +// +// // Return specifies what will be populated. If they are set to true, +// // you must read from them to prevent deadlock. +// Return struct { +// // If enabled, any errors that occurred while consuming are returned on +// // the Errors channel (default disabled). +// Errors bool +// // If enabled, any messages that occurred while consuming are returned on +// // the Messages channel (default disabled). +// Messages bool +// } +// ErrorsHandler func(err <-chan error) // consume errors, if not set drop. +// MessagesHandler func(string <-chan string) // consume messages expect errors, if not set drop. +// AddressesParser func(string, []byte) (string, error) // parse address, k string, val []byte, return address string. +// Logger *log.Logger // shall not set, use for debug +// } // discover for etcd type discover struct { client *clientV3.Client - config *Config + config *config.DiscoverConfig errors chan error messages chan string @@ -63,7 +65,7 @@ type discover struct { start time.Time // used to calculate the time to the present } -func newDiscover(client *clientV3.Client, config *Config) (*discover, error) { +func newDiscover(client *clientV3.Client, config *config.DiscoverConfig) (*discover, error) { return &discover{ client: client, config: config, @@ -75,7 +77,7 @@ func newDiscover(client *clientV3.Client, config *Config) (*discover, error) { }, nil } -func NewDiscover(config *Config, client *clientV3.Client, etcdConfig *clientV3.Config) (*discover, error) { +func NewDiscover(config *config.DiscoverConfig, client *clientV3.Client, etcdConfig *clientV3.Config) (*discover, error) { err := checkConfig(config) if err != nil { return nil, fmt.Errorf("[%s] NewDiscover err: %w", moduleName, err) @@ -123,7 +125,7 @@ func newClient(config clientV3.Config) (*clientV3.Client, error) { return client, err } -func checkConfig(config *Config) error { +func checkConfig(config *config.DiscoverConfig) error { if config == nil { return fmt.Errorf("[%s] checkConfig failed, config nil", moduleName) } diff --git a/internal/registering/etcd/register.go b/internal/registering/etcd/register.go index 6be7a9b..f2e9f05 100644 --- a/internal/registering/etcd/register.go +++ b/internal/registering/etcd/register.go @@ -12,6 +12,8 @@ import ( "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientV3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" + + "github.com/v8fg/rd/config" ) const moduleName = "register-etcd" @@ -53,7 +55,7 @@ type Config struct { type register struct { client *clientV3.Client - config *Config + config *config.RegisterConfig errors chan error messages chan string @@ -77,7 +79,7 @@ type register struct { etcdConfig *clientV3.Config // can use reconnect, if you use config create the etcd client. } -func newRegister(client *clientV3.Client, config *Config) (*register, error) { +func newRegister(client *clientV3.Client, config *config.RegisterConfig) (*register, error) { return ®ister{ client: client, config: config, @@ -94,7 +96,7 @@ func newRegister(client *clientV3.Client, config *Config) (*register, error) { // NewRegister creates a new register with the given configurations. // Input client preferred, if nil will create with etcdConfig. -func NewRegister(config *Config, client *clientV3.Client, etcdConfig *clientV3.Config) (*register, error) { +func NewRegister(config *config.RegisterConfig, client *clientV3.Client, etcdConfig *clientV3.Config) (*register, error) { err := checkConfig(config) if err != nil { return nil, fmt.Errorf("[%s] NewRegister err: %w", moduleName, err) @@ -155,7 +157,7 @@ func newClient(config clientV3.Config) (*clientV3.Client, error) { return client, err } -func checkConfig(config *Config) error { +func checkConfig(config *config.RegisterConfig) error { if config == nil { return fmt.Errorf("[%s] checkConfig failed, config nil", moduleName) } diff --git a/internal/registering/register.go b/internal/registering/register.go index 2a11d8e..7917243 100644 --- a/internal/registering/register.go +++ b/internal/registering/register.go @@ -7,6 +7,7 @@ import ( clientV3 "go.etcd.io/etcd/client/v3" + "github.com/v8fg/rd/config" "github.com/v8fg/rd/internal/registering/etcd" ) @@ -50,7 +51,7 @@ func (r *RegisterRegistry) Info() string { // Register the service with some configurations. You can pass the etcd client or only pass the related config items. // registry key: name or key, the name preferred. -func (r *RegisterRegistry) Register(config *etcd.Config, client *clientV3.Client, etcdConfig *clientV3.Config) error { +func (r *RegisterRegistry) Register(config *config.RegisterConfig, client *clientV3.Client, etcdConfig *clientV3.Config) error { r.mu.Lock() defer r.mu.Unlock() diff --git a/register.go b/register.go index b4ef28f..8dfd50c 100644 --- a/register.go +++ b/register.go @@ -5,6 +5,7 @@ import ( clientV3 "go.etcd.io/etcd/client/v3" + "github.com/v8fg/rd/config" "github.com/v8fg/rd/internal/registering" ) @@ -17,9 +18,8 @@ func init() { // RegisterEtcd etcd register with some configurations. // registry key: name or key, the name preferred. -func RegisterEtcd(config *RegisterConfig, client *clientV3.Client, etcdConfig *clientV3.Config) error { - _cfg := convertRegisterConfigToInternalETCDRegisterConfig(config) - return registerRegistry.Register(_cfg, client, etcdConfig) +func RegisterEtcd(config *config.RegisterConfig, client *clientV3.Client, etcdConfig *clientV3.Config) error { + return registerRegistry.Register(config, client, etcdConfig) } // RegisterInfo return the basic info about register: key and register addr.