Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support remote hooks on user RPC #365

Merged
merged 18 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# pitaya [![Build Status][7]][8] [![Coverage Status][9]][10] [![GoDoc][1]][2] [![Docs][11]][12] [![Go Report Card][3]][4] [![MIT licensed][5]][6]

*WARNING*: The [version v1.x](https://github.com/topfreegames/pitaya/tree/v1) of pitaya is deprecated, complete guide of the new version v2.x can be found [here](https://github.com/topfreegames/pitaya/issues/240).
The current README is for version 2, make sure you look at branch v1 if you'd like to see the documentation for the older version.
The current README is for version v2.x, make sure you look at branch v1 if you'd like to see the documentation for the older version.

---

Expand Down
3 changes: 3 additions & 0 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Builder struct {
Groups groups.GroupService
SessionPool session.SessionPool
Worker *worker.Worker
RemoteHooks *pipeline.RemoteHooks
HandlerHooks *pipeline.HandlerHooks
}

Expand Down Expand Up @@ -202,6 +203,7 @@ func NewBuilder(isFrontend bool,
Server: server,
ServerMode: serverMode,
Groups: gsi,
RemoteHooks: pipeline.NewRemoteHooks(),
HandlerHooks: handlerHooks,
ServiceDiscovery: serviceDiscovery,
SessionPool: sessionPool,
Expand Down Expand Up @@ -248,6 +250,7 @@ func (builder *Builder) Build() Pitaya {
builder.MessageEncoder,
builder.Server,
builder.SessionPool,
builder.RemoteHooks,
builder.HandlerHooks,
handlerPool,
)
Expand Down
4 changes: 3 additions & 1 deletion examples/testing/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ services:
ports:
- 4222:4222
etcd:
image: appcelerator/etcd
image: bitnami/etcd
environment:
- ALLOW_NONE_AUTHENTICATION=yes
ports:
- 2379:2379
redis:
Expand Down
28 changes: 24 additions & 4 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,38 @@ type (
Handlers []AfterHandlerTempl
}

// HandlerHooks contains before and after channels
HandlerHooks struct {
// Hooks contains before and after channels
Hooks struct {
BeforeHandler *Channel
AfterHandler *AfterChannel
}

HandlerHooks struct {
Hooks
}

RemoteHooks struct {
Hooks
}
)

// NewHandlerHooks ctor
func NewHandlerHooks() *HandlerHooks {
return &HandlerHooks{
BeforeHandler: NewChannel(),
AfterHandler: NewAfterChannel(),
Hooks: Hooks{
BeforeHandler: NewChannel(),
AfterHandler: NewAfterChannel(),
},
}
}

// NewRemoteHooks ctor
func NewRemoteHooks() *RemoteHooks {
return &RemoteHooks{
Hooks: Hooks{
BeforeHandler: NewChannel(),
AfterHandler: NewAfterChannel(),
},
}
}

Expand Down
2 changes: 1 addition & 1 deletion rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestDoSendRPC(t *testing.T) {
sessionPool := sessionmocks.NewMockSessionPool(ctrl)
router := router.New()
handlerPool := service.NewHandlerPool()
svc := service.NewRemoteService(mockRPCClient, mockRPCServer, mockSD, packetEncoder, mockSerializer, router, messageEncoder, &cluster.Server{}, sessionPool, pipeline.NewHandlerHooks(), handlerPool)
svc := service.NewRemoteService(mockRPCClient, mockRPCServer, mockSD, packetEncoder, mockSerializer, router, messageEncoder, &cluster.Server{}, sessionPool, pipeline.NewRemoteHooks(), pipeline.NewHandlerHooks(), handlerPool)
assert.NotNil(t, svc)
app.remoteService = svc
app.server.ID = "notmyserver"
Expand Down
2 changes: 1 addition & 1 deletion service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"strings"
"time"
"net"

"github.com/nats-io/nuid"

Expand Down
32 changes: 27 additions & 5 deletions service/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type RemoteService struct {
messageEncoder message.Encoder
server *cluster.Server // server obj
remoteBindingListeners []cluster.RemoteBindingListener
remoteHooks *pipeline.RemoteHooks
sessionPool session.SessionPool
handlerPool *HandlerPool
remotes map[string]*component.Remote // all remote method
Expand All @@ -78,6 +79,7 @@ func NewRemoteService(
messageEncoder message.Encoder,
server *cluster.Server,
sessionPool session.SessionPool,
remoteHooks *pipeline.RemoteHooks,
handlerHooks *pipeline.HandlerHooks,
handlerPool *HandlerPool,
) *RemoteService {
Expand All @@ -97,6 +99,7 @@ func NewRemoteService(
remotes: make(map[string]*component.Remote),
}

remote.remoteHooks = remoteHooks
remote.handlerHooks = handlerHooks

return remote
Expand Down Expand Up @@ -334,8 +337,6 @@ func processRemoteMessage(ctx context.Context, req *protos.Request, r *RemoteSer
}

func (r *RemoteService) handleRPCUser(ctx context.Context, req *protos.Request, rt *route.Route) *protos.Response {
response := &protos.Response{}

remote, ok := r.remotes[rt.Short()]
if !ok {
logger.Log.Warnf("pitaya/remote: %s not found", rt.Short())
Expand All @@ -350,9 +351,13 @@ func (r *RemoteService) handleRPCUser(ctx context.Context, req *protos.Request,
}
return response
}
params := []reflect.Value{remote.Receiver, reflect.ValueOf(ctx)}

var ret interface{}
var arg interface{}
var err error

if remote.HasArgs {
arg, err := unmarshalRemoteArg(remote, req.GetMsg().GetData())
arg, err = unmarshalRemoteArg(remote, req.GetMsg().GetData())
if err != nil {
response := &protos.Response{
Error: &protos.Error{
Expand All @@ -362,10 +367,26 @@ func (r *RemoteService) handleRPCUser(ctx context.Context, req *protos.Request,
}
return response
}
}

ctx, arg, err = r.remoteHooks.BeforeHandler.ExecuteBeforePipeline(ctx, arg)
if err != nil {
response := &protos.Response{
Error: &protos.Error{
Code: e.ErrInternalCode,
Msg: err.Error(),
},
}
return response
}

params := []reflect.Value{remote.Receiver, reflect.ValueOf(ctx)}
if remote.HasArgs {
params = append(params, reflect.ValueOf(arg))
}
ret, err = util.Pcall(remote.Method, params)

ret, err := util.Pcall(remote.Method, params)
ret, err = r.remoteHooks.AfterHandler.ExecuteAfterPipeline(ctx, ret, err)
if err != nil {
response := &protos.Response{
Error: &protos.Error{
Expand Down Expand Up @@ -405,6 +426,7 @@ func (r *RemoteService) handleRPCUser(ctx context.Context, req *protos.Request,
}
}

response := &protos.Response{}
response.Data = b
return response
}
Expand Down
Loading