Skip to content

Commit

Permalink
expose remote service route for user custom
Browse files Browse the repository at this point in the history
  • Loading branch information
lacorey committed Mar 29, 2023
1 parent abcf7c9 commit ee534d4
Show file tree
Hide file tree
Showing 12 changed files with 1,368 additions and 6 deletions.
30 changes: 24 additions & 6 deletions cluster/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ var (

type rpcHandler func(session *session.Session, msg *message.Message, noCopy bool)

// CustomerRemoteServiceRoute customer remote service route
type CustomerRemoteServiceRoute func(service string, session *session.Session, members []*clusterpb.MemberInfo) *clusterpb.MemberInfo

func cache() {
hrdata := map[string]interface{}{
"code": 200,
Expand Down Expand Up @@ -336,14 +339,29 @@ func (h *LocalHandler) remoteProcess(session *session.Session, msg *message.Mess
}

// Select a remote service address
// 1. Use the service address directly if the router contains binding item
// 2. Select a remote service address randomly and bind to router
// 1. if exist customer remote service route ,use it, otherwise use default strategy
// 2. Use the service address directly if the router contains binding item
// 3. Select a remote service address randomly and bind to router
var remoteAddr string
if addr, found := session.Router().Find(service); found {
remoteAddr = addr
if h.currentNode.Options.RemoteServiceRoute != nil {
if addr, found := session.Router().Find(service); found {
remoteAddr = addr
} else {
member := h.currentNode.Options.RemoteServiceRoute(service, session, members)
if member == nil {
log.Println(fmt.Sprintf("customize remoteServiceRoute handler: %s is not found", msg.Route))
return
}
remoteAddr = member.ServiceAddr
session.Router().Bind(service, remoteAddr)
}
} else {
remoteAddr = members[rand.Intn(len(members))].ServiceAddr
session.Router().Bind(service, remoteAddr)
if addr, found := session.Router().Find(service); found {
remoteAddr = addr
} else {
remoteAddr = members[rand.Intn(len(members))].ServiceAddr
session.Router().Bind(service, remoteAddr)
}
}
pool, err := h.currentNode.rpcClient.getConnPool(remoteAddr)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Options struct {
TSLCertificate string
TSLKey string
UnregisterCallback func(Member)
RemoteServiceRoute CustomerRemoteServiceRoute
}

// Node represents a node in nano cluster, which will contains a group of services.
Expand Down
27 changes: 27 additions & 0 deletions examples/customerroute/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Nano cluster example

## About this example



## How to run the example?

```shell
cd examples/customerroute
go build

# run master server
./customerroute master
./customerroute chat --listen "127.0.0.1:34580"
./customerroute chat --listen "127.0.0.1:34581"
./customerroute gate --listen "127.0.0.1:34570" --gate-address "127.0.0.1:34590"
```

## open browser and visit url for 4 times
```
http://127.0.0.1:12345/web/
http://127.0.0.1:12345/web/
http://127.0.0.1:12345/web/
http://127.0.0.1:12345/web/
```
input content and send, the same ChatRoomService node will sync the message each other
186 changes: 186 additions & 0 deletions examples/customerroute/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package main

import (
"fmt"
"github.com/lonng/nano/cluster/clusterpb"
"log"
"net/http"
"os"
"path/filepath"
"runtime"

"github.com/lonng/nano"
"github.com/lonng/nano/examples/customerroute/onegate"
"github.com/lonng/nano/examples/customerroute/tworoom"
"github.com/lonng/nano/serialize/json"
"github.com/lonng/nano/session"
"github.com/pingcap/errors"
"github.com/urfave/cli"
)

func main() {
app := cli.NewApp()
app.Name = "NanoCustomerRouteDemo"
app.Author = "Lonng"
app.Email = "heng@lonng.org"
app.Description = "Nano cluster demo"
app.Commands = []cli.Command{
{
Name: "master",
Flags: []cli.Flag{
cli.StringFlag{
Name: "listen,l",
Usage: "Master service listen address",
Value: "127.0.0.1:34567",
},
},
Action: runMaster,
},
{
Name: "gate",
Flags: []cli.Flag{
cli.StringFlag{
Name: "master",
Usage: "master server address",
Value: "127.0.0.1:34567",
},
cli.StringFlag{
Name: "listen,l",
Usage: "Gate service listen address",
Value: "",
},
cli.StringFlag{
Name: "gate-address",
Usage: "Client connect address",
Value: "",
},
},
Action: runGate,
},
{
Name: "chat",
Flags: []cli.Flag{
cli.StringFlag{
Name: "master",
Usage: "master server address",
Value: "127.0.0.1:34567",
},
cli.StringFlag{
Name: "listen,l",
Usage: "Chat service listen address",
Value: "",
},
},
Action: runChat,
},
}
log.SetFlags(log.LstdFlags | log.Lshortfile)
if err := app.Run(os.Args); err != nil {
log.Fatalf("Startup server error %+v", err)
}
}

func srcPath() string {
_, file, _, _ := runtime.Caller(0)
return filepath.Dir(file)
}

func runMaster(args *cli.Context) error {
listen := args.String("listen")
if listen == "" {
return errors.Errorf("master listen address cannot empty")
}

webDir := filepath.Join(srcPath(), "onemaster", "web")
log.Println("Nano master server web content directory", webDir)
log.Println("Nano master listen address", listen)
log.Println("Open http://127.0.0.1:12345/web/ in browser")

http.Handle("/web/", http.StripPrefix("/web/", http.FileServer(http.Dir(webDir))))
go func() {
if err := http.ListenAndServe(":12345", nil); err != nil {
panic(err)
}
}()

// Startup Nano server with the specified listen address
nano.Listen(listen,
nano.WithMaster(),
nano.WithSerializer(json.NewSerializer()),
nano.WithDebugMode(),
)

return nil
}

func runGate(args *cli.Context) error {
listen := args.String("listen")
if listen == "" {
return errors.Errorf("gate listen address cannot empty")
}

masterAddr := args.String("master")
if masterAddr == "" {
return errors.Errorf("master address cannot empty")
}

gateAddr := args.String("gate-address")
if gateAddr == "" {
return errors.Errorf("gate address cannot empty")
}

log.Println("Current server listen address", listen)
log.Println("Current gate server address", gateAddr)
log.Println("Remote master server address", masterAddr)

// Startup Nano server with the specified listen address
nano.Listen(listen,
nano.WithAdvertiseAddr(masterAddr),
nano.WithClientAddr(gateAddr),
nano.WithComponents(onegate.Services),
nano.WithSerializer(json.NewSerializer()),
nano.WithIsWebsocket(true),
nano.WithWSPath("/nano"),
nano.WithCheckOriginFunc(func(_ *http.Request) bool { return true }),
nano.WithDebugMode(),
//set remote service route for gate
nano.WithCustomerRemoteServiceRoute(customerRemoteServiceRoute),
nano.WithNodeId(2), // if you deploy multi gate, option set nodeId, default nodeId = os.Getpid()
)
return nil
}

func runChat(args *cli.Context) error {
listen := args.String("listen")
if listen == "" {
return errors.Errorf("chat listen address cannot empty")
}

masterAddr := args.String("master")
if listen == "" {
return errors.Errorf("master address cannot empty")
}

log.Println("Current chat server listen address", listen)
log.Println("Remote master server address", masterAddr)

// Register session closed callback
session.Lifetime.OnClosed(tworoom.OnSessionClosed)

// Startup Nano server with the specified listen address
nano.Listen(listen,
nano.WithAdvertiseAddr(masterAddr),
nano.WithComponents(tworoom.Services),
nano.WithSerializer(json.NewSerializer()),
nano.WithDebugMode(),
)

return nil
}

func customerRemoteServiceRoute(service string, session *session.Session, members []*clusterpb.MemberInfo) *clusterpb.MemberInfo {
count := int64(len(members))
var index = session.UID() % count
fmt.Printf("remote service:%s route to :%v \n", service, members[index])
return members[index]
}
43 changes: 43 additions & 0 deletions examples/customerroute/onegate/gate_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package onegate

import (
"fmt"
"github.com/lonng/nano/component"
"github.com/lonng/nano/examples/cluster/protocol"
"github.com/lonng/nano/session"
"github.com/pingcap/errors"
)

type RegisterService struct {
component.Base
nextGateUid int64
}

func newRegisterService() *RegisterService {
return &RegisterService{}
}

type (
RegisterRequest struct {
Nickname string `json:"nickname"`
}
RegisterResponse struct {
Code int `json:"code"`
}
)

func (bs *RegisterService) Login(s *session.Session, msg *RegisterRequest) error {
bs.nextGateUid++
uid := bs.nextGateUid
s.Bind(uid)
fmt.Println("Login uid:", uid)
chat := &protocol.JoinRoomRequest{
Nickname: msg.Nickname,
GateUid: uid,
MasterUid: uid,
}
if err := s.RPC("ChatRoomService.JoinRoom", chat); err != nil {
return errors.Trace(err)
}
return s.Response(&RegisterResponse{})
}
14 changes: 14 additions & 0 deletions examples/customerroute/onegate/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package onegate

import "github.com/lonng/nano/component"

var (
// All services in master server
Services = &component.Components{}

bindService = newRegisterService()
)

func init() {
Services.Register(bindService)
}
Loading

0 comments on commit ee534d4

Please sign in to comment.