Skip to content

Commit

Permalink
other: v2 基于 Nats 的 RPC 基本实现
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed May 2, 2024
1 parent ac1f2d0 commit e15ed85
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
10 changes: 9 additions & 1 deletion rpc/rpccore/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/kercylan98/minotaur/toolkit"
"github.com/kercylan98/minotaur/toolkit/codec"
"github.com/nats-io/nats.go"
"sync"
"time"
)

Expand Down Expand Up @@ -49,6 +50,7 @@ type Nats struct {
base64 codec.Base64

curr natsService
rw sync.RWMutex
services map[string]map[string]natsService
targets map[string]natsService
}
Expand Down Expand Up @@ -86,7 +88,9 @@ func (n *Nats) OnCall(routes ...rpc.Route) func(request any) error {
if err != nil {
return err
}
n.rw.RLock()
target := n.targets[string(b64)]
n.rw.RUnlock()
return n.conn.Publish("services.rpc."+target.ServerInfo.UniqueId, toolkit.MarshalJSON(packet))
}
}
Expand All @@ -96,11 +100,13 @@ func (n *Nats) Close() {
}

func (n *Nats) refresh() error {
_, err := n.kv.Put("services."+n.info.UniqueId, toolkit.MarshalJSON(n.info))
_, err := n.kv.Put("services."+n.info.UniqueId, toolkit.MarshalJSON(n.curr))
return err
}

func (n *Nats) update(info natsService) {
n.rw.Lock()
defer n.rw.Unlock()
nodes, exist := n.services[info.ServerInfo.Name]
if !exist {
nodes = make(map[string]natsService)
Expand Down Expand Up @@ -148,10 +154,12 @@ func (n *Nats) loop() {
case nats.KeyValueDelete:
var info natsService
toolkit.UnmarshalJSON(entry.Value(), &info)
n.rw.Lock()
delete(n.services[info.ServerInfo.Name], info.ServerInfo.UniqueId)
if len(n.services[info.ServerInfo.Name]) == 0 {
delete(n.services, info.ServerInfo.Name)
}
n.rw.Unlock()
default:
}

Expand Down
23 changes: 19 additions & 4 deletions rpc/rpccore/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,34 @@ type TestService struct {

func (t *TestService) OnRPCSetup(router rpc.Router) {
t.router = router
router.Route("account", t.onAccount)
router.Route("system", t.onSystem)
router.Register("account", "login").Bind(t.onAccountLogin)
}

func (t *TestService) testCall() {
if err := t.router.Call("account")(123); err != nil {
if err := t.router.Call("system")(123); err != nil {
panic(err)
}
if err := t.router.Call("account", "login")(struct {
Username, Password string
}{Username: "username", Password: "pwd"}); err != nil {
panic(err)
}
}

func (t *TestService) onAccount(reader rpc.Reader) {
func (t *TestService) onSystem(reader rpc.Reader) {
var id int
reader.ReadTo(&id)
fmt.Println("call account", id)
fmt.Println("call system", id)
}

func (t *TestService) onAccountLogin(reader rpc.Reader) {
var params struct {
Username string
Password string
}
reader.ReadTo(&params)
fmt.Println("call account login", params.Username, params.Password)
}

func TestNats_OnServiceRegister(t *testing.T) {
Expand Down

0 comments on commit e15ed85

Please sign in to comment.