From e15ed8533cdac4644cb1415547a812c88cbf0b3e Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Thu, 2 May 2024 20:03:05 +0800 Subject: [PATCH] =?UTF-8?q?other:=20v2=20=E5=9F=BA=E4=BA=8E=20Nats=20?= =?UTF-8?q?=E7=9A=84=20RPC=20=E5=9F=BA=E6=9C=AC=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/rpccore/nats.go | 10 +++++++++- rpc/rpccore/nats_test.go | 23 +++++++++++++++++++---- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/rpc/rpccore/nats.go b/rpc/rpccore/nats.go index 2c9388db..e2d1d92e 100644 --- a/rpc/rpccore/nats.go +++ b/rpc/rpccore/nats.go @@ -7,6 +7,7 @@ import ( "github.com/kercylan98/minotaur/toolkit" "github.com/kercylan98/minotaur/toolkit/codec" "github.com/nats-io/nats.go" + "sync" "time" ) @@ -49,6 +50,7 @@ type Nats struct { base64 codec.Base64 curr natsService + rw sync.RWMutex services map[string]map[string]natsService targets map[string]natsService } @@ -86,7 +88,9 @@ func (n *Nats) OnCall(routes ...rpc.Route) func(request any) error { if err != nil { return err } + n.rw.RLock() target := n.targets[string(b64)] + n.rw.RUnlock() return n.conn.Publish("services.rpc."+target.ServerInfo.UniqueId, toolkit.MarshalJSON(packet)) } } @@ -96,11 +100,13 @@ func (n *Nats) Close() { } func (n *Nats) refresh() error { - _, err := n.kv.Put("services."+n.info.UniqueId, toolkit.MarshalJSON(n.info)) + _, err := n.kv.Put("services."+n.info.UniqueId, toolkit.MarshalJSON(n.curr)) return err } func (n *Nats) update(info natsService) { + n.rw.Lock() + defer n.rw.Unlock() nodes, exist := n.services[info.ServerInfo.Name] if !exist { nodes = make(map[string]natsService) @@ -148,10 +154,12 @@ func (n *Nats) loop() { case nats.KeyValueDelete: var info natsService toolkit.UnmarshalJSON(entry.Value(), &info) + n.rw.Lock() delete(n.services[info.ServerInfo.Name], info.ServerInfo.UniqueId) if len(n.services[info.ServerInfo.Name]) == 0 { delete(n.services, info.ServerInfo.Name) } + n.rw.Unlock() default: } diff --git a/rpc/rpccore/nats_test.go b/rpc/rpccore/nats_test.go index caedeba6..0a0548c4 100644 --- a/rpc/rpccore/nats_test.go +++ b/rpc/rpccore/nats_test.go @@ -15,19 +15,34 @@ type TestService struct { func (t *TestService) OnRPCSetup(router rpc.Router) { t.router = router - router.Route("account", t.onAccount) + router.Route("system", t.onSystem) + router.Register("account", "login").Bind(t.onAccountLogin) } func (t *TestService) testCall() { - if err := t.router.Call("account")(123); err != nil { + if err := t.router.Call("system")(123); err != nil { + panic(err) + } + if err := t.router.Call("account", "login")(struct { + Username, Password string + }{Username: "username", Password: "pwd"}); err != nil { panic(err) } } -func (t *TestService) onAccount(reader rpc.Reader) { +func (t *TestService) onSystem(reader rpc.Reader) { var id int reader.ReadTo(&id) - fmt.Println("call account", id) + fmt.Println("call system", id) +} + +func (t *TestService) onAccountLogin(reader rpc.Reader) { + var params struct { + Username string + Password string + } + reader.ReadTo(¶ms) + fmt.Println("call account login", params.Username, params.Password) } func TestNats_OnServiceRegister(t *testing.T) {