Skip to content

Commit

Permalink
refactor config and update examples (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
xwi88 committed Mar 29, 2022
1 parent ad71635 commit 6a3e7ed
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 146 deletions.
86 changes: 24 additions & 62 deletions config_etcd.go → config/config.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,18 @@
package rd
package config

import (
"log"
"time"

disETCD "github.com/v8fg/rd/internal/discovering/etcd"
regETCD "github.com/v8fg/rd/internal/registering/etcd"
)

type CommonConfig struct {
ChannelBufferSize int // default 256, for errors or messages channel

// Return specifies what will be populated. If they are set to true,
// you must read from them to prevent deadlock.
Return struct {
// If enabled, any errors that occurred while consuming are returned on
// the Errors channel (default disabled).
Errors bool
// If enabled, any messages that occurred while consuming are returned on
// the Messages channel (default disabled).
Messages bool
}

ErrorsHandler func(err <-chan error) // consume errors, if not set drop.
MessagesHandler func(string <-chan string) // consume messages expect errors, if not set drop.
Logger *log.Logger // shall not set, use for debug
}

// RegisterConfig is used to pass multiple configuration options to the register's constructors.
// Watch out the Immutable set, allow override your KV, shall use default value false.
type RegisterConfig struct {
Name string // the name for store the instance, unique. If empty will use Key replace.
Key string // register key, unique. The format maybe like: /{scheme}/{service}/{endPoint}.
Val string
TTL time.Duration
Name string // the name for store the instance, unique. If empty will use Key replace.
Key string // register key, unique. The format maybe like: /{scheme}/{service}/{endPoint}.
Val string
TTL time.Duration
MaxLoopTry uint64 // default 64, if error and try max times, register effect only KeepAlive.Mode=1.

MutableVal bool // If true you can override the 'Val', default false. Pls watch out other items shall not change, so dangerous.

Expand All @@ -40,10 +21,10 @@ type RegisterConfig struct {
Mode uint8 // 0=ticker(default, KeepAliveOnce), 1=KeepAlive(not support val update)
}

MaxLoopTry uint64 // default 64, if error and try max times, register effect only KeepAlive.Mode=1.
CommonConfig
}

// DiscoverConfig is used to pass multiple configuration options to the discovery's constructors.
type DiscoverConfig struct {
Name string // the name for store the instance, unique. If empty will use Scheme, Service replace.
Scheme string // register resolver with name scheme, like: services
Expand All @@ -55,41 +36,22 @@ type DiscoverConfig struct {
CommonConfig
}

func convertRegisterConfigToInternalETCDRegisterConfig(rgc *RegisterConfig) (rge *regETCD.Config) {
if rgc != nil {
rge = &regETCD.Config{
Name: rgc.Name,
Key: rgc.Key,
Val: rgc.Val,
TTL: rgc.TTL,
MaxLoopTry: rgc.MaxLoopTry,
ChannelBufferSize: rgc.ChannelBufferSize,
MutableVal: rgc.MutableVal,
KeepAlive: rgc.KeepAlive,
Return: rgc.Return,
ErrorsHandler: rgc.ErrorsHandler,
MessagesHandler: rgc.MessagesHandler,
Logger: rgc.Logger,
}
// CommonConfig common config used for register and discovery
type CommonConfig struct {
ChannelBufferSize int // default 256, for errors or messages channel

// Return specifies what will be populated. If they are set to true,
// you must read from them to prevent deadlock.
Return struct {
// If enabled, any errors that occurred while consuming are returned on
// the Errors channel (default disabled).
Errors bool
// If enabled, any messages that occurred while consuming are returned on
// the Messages channel (default disabled).
Messages bool
}
return rge
}

func convertDiscoverConfigToInternalETCDDiscoverConfig(dc *DiscoverConfig) (dce *disETCD.Config) {
if dc != nil {
dce = &disETCD.Config{
Name: dc.Name,
Scheme: dc.Scheme,
Service: dc.Service,
ReturnResolve: dc.ReturnResolve,
ChannelBufferSize: dc.ChannelBufferSize,
Return: dc.Return,
ErrorsHandler: dc.ErrorsHandler,
MessagesHandler: dc.MessagesHandler,
AddressesParser: dc.AddressesParser,
Logger: dc.Logger,
}
}
return dce
ErrorsHandler func(err <-chan error) // consume errors, if not set drop.
MessagesHandler func(string <-chan string) // consume messages expect errors, if not set drop.
Logger *log.Logger // shall not set, use for debug
}
6 changes: 3 additions & 3 deletions discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rd
import (
clientV3 "go.etcd.io/etcd/client/v3"

"github.com/v8fg/rd/config"
"github.com/v8fg/rd/internal/discovering"
)

Expand All @@ -15,9 +16,8 @@ func init() {

// DiscoverEtcd etcd discover with some configurations.
// registry key: name or key, the name preferred.
func DiscoverEtcd(config *DiscoverConfig, client *clientV3.Client, etcdConfig *clientV3.Config) error {
_cfg := convertDiscoverConfigToInternalETCDDiscoverConfig(config)
return discoverRegistry.Register(_cfg, client, etcdConfig)
func DiscoverEtcd(config *config.DiscoverConfig, client *clientV3.Client, etcdConfig *clientV3.Config) error {
return discoverRegistry.Register(config, client, etcdConfig)
}

// DiscoverInfo return the basic info about discover: key and discover addr.
Expand Down
49 changes: 29 additions & 20 deletions examples/discover/simple_discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,53 @@ import (
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

clientV3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"

"github.com/v8fg/rd"
"github.com/v8fg/rd/config"
)

const moduleName = "simple_discover"

var (
errorsHandler = func(errors <-chan error) {
for errMsg := range errors {
fmt.Printf("errors consume, count:%v, content:%v\n", len(errors), errMsg)
log.Printf("[%s] errors consume, count:%v, content:%v\n", moduleName, len(errors), errMsg)
}
}

messagesHandler = func(messages <-chan string) {
for message := range messages {
fmt.Printf("messages consume, count:%v, content:%v\n", len(messages), message)
log.Printf("[%s] messages consume, count:%v, content:%v\n", moduleName, len(messages), message)
}
}

addressesParser = func(key string, data []byte) (addr string, err error) {
fmt.Printf("addressesParser consume, key:%v, data: %s\n", key, data)
log.Printf("[%s] addressesParser consume, key:%v, data: %s\n", moduleName, key, data)
return string(data), err
}

addressesParserJSON = func(key string, data []byte) (addr string, err error) {
fmt.Printf("addressesParser consume, key:%v, data:%s\n", key, data)
log.Printf("[%s] addressesParser consume, key:%v, data:%s\n", moduleName, key, data)
dict := make(map[string]interface{})
err = json.Unmarshal(data, &dict)
fmt.Printf("dict:%v, endPoint: %s\n", dict, dict["endPoint"].(string))
log.Printf("[%s] dict:%v, endPoint: %s\n", moduleName, dict, dict["endPoint"].(string))
return addr, err
}

// logger = log.New(log.Writer(), "[rd-test-discover] ", log.LstdFlags|log.Lshortfile)
logger = log.New(log.Writer(), "[rd-test-discover] ", log.LstdFlags)
// logger = log.New(log.Writer(), fmt.Sprintf("[%s] ", moduleName), log.LstdFlags|log.Lshortfile)
logger = log.New(log.Writer(), fmt.Sprintf("[%s] ", moduleName), log.LstdFlags)

// your register key will be: /{scheme}/{service}
scheme = "services"
service = "test/v1.0"
discoverRegistryName = "my-rd-test-discover" + time.Now().Format("200601021504")
discoverRegistryName = fmt.Sprintf("%s-", moduleName) + time.Now().Format("200601021504")
)

func init() {
Expand All @@ -67,11 +73,11 @@ func main() {

func initDiscoverGRPCETCD() {
var err error
cfg := rd.DiscoverConfig{
cfg := config.DiscoverConfig{
Name: discoverRegistryName,
Scheme: scheme,
Service: service,
CommonConfig: rd.CommonConfig{
CommonConfig: config.CommonConfig{
ChannelBufferSize: 16,
ErrorsHandler: errorsHandler,
MessagesHandler: messagesHandler,
Expand All @@ -87,7 +93,7 @@ func initDiscoverGRPCETCD() {
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: time.Second * 5,
})
fmt.Printf("DiscoverEtcd: %+v, err:%v\n", rd.DiscoverInfo(), err)
log.Printf("[%s] initDiscoverGRPCETCD: %+v, err:%v\n", moduleName, rd.DiscoverInfo(), err)
if err != nil {
panic(err)
}
Expand All @@ -96,33 +102,36 @@ func initDiscoverGRPCETCD() {
func runDiscover() {
errs := rd.DiscoverRun()
if len(errs) > 0 {
fmt.Printf("DiscoverRun errors:%v\n", errs)
log.Printf("[%s] runDiscover errors:%v", moduleName, errs)
}
}

func block() {
log.Printf("[block] block main exit\n")
log.Printf("[%s] block enter block main", moduleName)

tk := time.NewTicker(time.Second * 5)
defer tk.Stop()

done := make(chan struct{})
quit := make(chan struct{})
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)

go func() {
<-time.After(time.Second * 900)
done <- struct{}{}
sig := <-ch
log.Printf("[%s] received signal: %v", moduleName, sig)
quit <- struct{}{}
}()

loop:
for {
select {
case <-tk.C:
case <-done:
case <-quit:
rd.DiscoverClose()
fmt.Printf("close success")
log.Printf("[%s] block close success and exit block main", moduleName)
break loop
}
log.Println("out select, but in for loop")
log.Printf("[%s] out select, but in for loop", moduleName)
}
log.Println("[block] all is done")
log.Printf("[%s] block all is done", moduleName)
}
43 changes: 26 additions & 17 deletions examples/register/simple_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,41 @@ import (
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"syscall"
"time"

clientV3 "go.etcd.io/etcd/client/v3"

"github.com/v8fg/rd"
"github.com/v8fg/rd/config"
)

const moduleName = "simple_register"

var (
errorsHandler = func(errors <-chan error) {
for errMsg := range errors {
fmt.Printf("errors consume, count:%v, content:%v\n", len(errors), errMsg)
log.Printf("[%s] errors consume, count:%v, content:%v", moduleName, len(errors), errMsg)
}
}
messagesHandler = func(messages <-chan string) {
for message := range messages {
fmt.Printf("messages consume, count:%v, content:%v\n", len(messages), message)
log.Printf("[%s] messages consume, count:%v, content:%v", moduleName, len(messages), message)
}
}
// logger = log.New(log.Writer(), "[rd-test-register] ", log.LstdFlags|log.Lshortfile)
logger = log.New(log.Writer(), "[rd-test-register] ", log.LstdFlags)
// logger = log.New(log.Writer(), fmt.Sprintf("[%s] ", moduleName), log.LstdFlags|log.Lshortfile)
logger = log.New(log.Writer(), fmt.Sprintf("[%s] ", moduleName), log.LstdFlags)
key = fmt.Sprintf("/services/test/v1.0/grpc/127.0.0.1:33%v", time.Now().Second()+int(rand.Int31n(300)))
)

var cfg = rd.RegisterConfig{
Name: "my-rd-test-register" + time.Now().Format("200601021504"),
var cfg = config.RegisterConfig{
Name: fmt.Sprintf("%s-", moduleName) + time.Now().Format("200601021504"),
Key: key,
Val: key,
TTL: time.Second * 15,
CommonConfig: rd.CommonConfig{
CommonConfig: config.CommonConfig{
ChannelBufferSize: 64,
ErrorsHandler: errorsHandler,
MessagesHandler: messagesHandler,
Expand Down Expand Up @@ -60,7 +66,7 @@ func initRegisterETCD() {
DialTimeout: time.Second * 5,
})

fmt.Printf("RegisterEtcd: %+v, err:%v\n", rd.RegisterInfo(), err)
log.Printf("[%s] initRegisterETCD: %+v, err:%v", moduleName, rd.RegisterInfo(), err)
if err != nil {
panic(err)
}
Expand All @@ -75,11 +81,14 @@ func runRegister() {
tk := time.NewTicker(time.Second * 20)
defer tk.Stop()

done := make(chan struct{})
quit := make(chan struct{})
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)

go func() {
<-time.After(time.Second * 6000)
done <- struct{}{}
sig := <-ch
log.Printf("[%s] received signal: %v", moduleName, sig)
quit <- struct{}{}
}()

loop:
Expand All @@ -88,16 +97,16 @@ loop:
case <-tk.C:
newVal := fmt.Sprintf("127.0.0.1:33%v", time.Now().Second()+int(rand.Int31n(300)))
err := rd.RegisterUpdateVal(cfg.Name, newVal)
log.Printf("ticker update val:%v, err:%v\n", newVal, err)
case <-done:
log.Printf("[%s] runRegister ticker update val:%v, err:%v", moduleName, newVal, err)
case <-quit:
errs := rd.RegisterClose()
if len(errs) != 0 {
panic(fmt.Sprintf("close err:%+v", errs))
panic(fmt.Sprintf("[%s] close err:%+v", moduleName, errs))
}
fmt.Printf("close success")
log.Printf("[%s] close success", moduleName)
break loop
}
// log.Println("out select, but in for loop")
// log.Printf("[%s] out select, but in for loop", moduleName)
}
log.Println("all is done")
log.Printf("[%s] all is done", moduleName)
}
Loading

0 comments on commit 6a3e7ed

Please sign in to comment.