From e09037eecb104210935ce4c8894755d9492d8f8a Mon Sep 17 00:00:00 2001 From: Rafael Leite Date: Thu, 16 Aug 2018 13:40:02 -0300 Subject: [PATCH] Pipeline now receives the decoded arguments insted of raw data --- Gopkg.lock | 28 ++++++++++ examples/demo/chat/main.go | 23 -------- examples/demo/cluster/main.go | 4 -- examples/demo/cluster_grpc/main.go | 4 -- examples/demo/cluster_protobuf/main.go | 4 -- examples/demo/pipeline/main.go | 74 ++++++++++++++++++++++++++ pipeline/pipeline.go | 2 +- pipeline/pipeline_test.go | 4 +- pipeline_test.go | 2 +- service/util.go | 16 +++--- service/util_test.go | 16 +++--- 11 files changed, 123 insertions(+), 54 deletions(-) create mode 100644 examples/demo/pipeline/main.go diff --git a/Gopkg.lock b/Gopkg.lock index 6bf46b75..b640b39b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -179,6 +179,25 @@ revision = "0ca9ea5df5451ffdf184b4428c902747c2c11cd7" version = "v1.0.0" +[[projects]] + digest = "1:e1ff887e232b2d8f4f7c7db15a5fac7be418025afc4dda53c59c765dbb5aa6b4" + name = "github.com/go-playground/locales" + packages = [ + ".", + "currency", + ] + pruneopts = "UT" + revision = "f63010822830b6fe52288ee52d5a1151088ce039" + version = "v0.12.1" + +[[projects]] + digest = "1:e022cf244bcac1b6ef933f1a2e0adcf6a6dfd7b872d8d41e4d4179bb09a87cbc" + name = "github.com/go-playground/universal-translator" + packages = ["."] + pruneopts = "UT" + revision = "b32fa301c9fe55953584134cb6853a13c87ec0a1" + version = "v0.16.0" + [[projects]] digest = "1:bfc758d5a03d57d97226fac6934551c01bd76612adb119c177395b057a0a46db" name = "github.com/gogo/protobuf" @@ -682,6 +701,14 @@ revision = "8e4536a86ab602859c20df5ebfd0bd4228d08655" version = "v1.10.0" +[[projects]] + digest = "1:e2f64cca6e235f32cd4c2f9be9ae0cda1f8608fc6fdb68936e8d10e4e0bb074d" + name = "gopkg.in/go-playground/validator.v9" + packages = ["."] + pruneopts = "UT" + revision = "e69e9a28bb62b977fdc58d051f1bb477b7cbe486" + version = "v9.21.0" + [[projects]] digest = "1:342378ac4dcb378a5448dd723f0784ae519383532f5e70ade24132c4c8693202" name = "gopkg.in/yaml.v2" @@ -716,6 +743,7 @@ "github.com/uber/jaeger-client-go/config", "golang.org/x/net/context", "google.golang.org/grpc", + "gopkg.in/go-playground/validator.v9", ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/examples/demo/chat/main.go b/examples/demo/chat/main.go index ff718224..9af6205b 100644 --- a/examples/demo/chat/main.go +++ b/examples/demo/chat/main.go @@ -25,7 +25,6 @@ type ( component.Base group *pitaya.Group timer *timer.Timer - stats *stats } // UserMessage represents a message that user sent @@ -49,28 +48,12 @@ type ( Code int `json:"code"` Result string `json:"result"` } - - stats struct { - outboundBytes int - inboundBytes int - } ) -func (stats *stats) outbound(ctx context.Context, in []byte) ([]byte, error) { - stats.outboundBytes += len(in) - return in, nil -} - -func (stats *stats) inbound(ctx context.Context, in []byte) ([]byte, error) { - stats.inboundBytes += len(in) - return in, nil -} - // NewRoom returns a new room func NewRoom() *Room { return &Room{ group: pitaya.NewGroup("room"), - stats: &stats{}, } } @@ -78,8 +61,6 @@ func NewRoom() *Room { func (r *Room) AfterInit() { r.timer = pitaya.NewTimer(time.Minute, func() { println("UserCount: Time=>", time.Now().String(), "Count=>", r.group.Count()) - println("OutboundBytes", r.stats.outboundBytes) - println("InboundBytes", r.stats.outboundBytes) }) } @@ -129,10 +110,6 @@ func main() { component.WithNameFunc(strings.ToLower), ) - // traffic stats - pitaya.AfterHandler(room.stats.outbound) - pitaya.BeforeHandler(room.stats.inbound) - log.SetFlags(log.LstdFlags | log.Llongfile) http.Handle("/web/", http.StripPrefix("/web/", http.FileServer(http.Dir("web")))) diff --git a/examples/demo/cluster/main.go b/examples/demo/cluster/main.go index 277544fa..194a417c 100644 --- a/examples/demo/cluster/main.go +++ b/examples/demo/cluster/main.go @@ -27,10 +27,6 @@ func configureBackend() { component.WithName("room"), component.WithNameFunc(strings.ToLower), ) - - // traffic stats - pitaya.AfterHandler(room.Stats.Outbound) - pitaya.BeforeHandler(room.Stats.Inbound) } func configureFrontend(port int) { diff --git a/examples/demo/cluster_grpc/main.go b/examples/demo/cluster_grpc/main.go index 169ee02d..36f7d03a 100644 --- a/examples/demo/cluster_grpc/main.go +++ b/examples/demo/cluster_grpc/main.go @@ -30,10 +30,6 @@ func configureBackend() { component.WithName("room"), component.WithNameFunc(strings.ToLower), ) - - // traffic stats - pitaya.AfterHandler(room.Stats.Outbound) - pitaya.BeforeHandler(room.Stats.Inbound) } func configureFrontend(port int) { diff --git a/examples/demo/cluster_protobuf/main.go b/examples/demo/cluster_protobuf/main.go index 7b98060f..492d9ff5 100644 --- a/examples/demo/cluster_protobuf/main.go +++ b/examples/demo/cluster_protobuf/main.go @@ -24,10 +24,6 @@ func configureBackend() { component.WithName("room"), component.WithNameFunc(strings.ToLower), ) - - // traffic stats - pitaya.AfterHandler(room.Stats.Outbound) - pitaya.BeforeHandler(room.Stats.Inbound) } func configureFrontend(port int) { diff --git a/examples/demo/pipeline/main.go b/examples/demo/pipeline/main.go new file mode 100644 index 00000000..c8e231a2 --- /dev/null +++ b/examples/demo/pipeline/main.go @@ -0,0 +1,74 @@ +package main + +import ( + "context" + "flag" + "fmt" + + "github.com/topfreegames/pitaya" + "github.com/topfreegames/pitaya/acceptor" + "github.com/topfreegames/pitaya/component" + "github.com/topfreegames/pitaya/serialize/json" + validator "gopkg.in/go-playground/validator.v9" +) + +// MetagameServer ... +type MetagameServer struct { + component.Base +} + +// NewGameMock ... +func NewGameMock() *MetagameServer { + return &MetagameServer{} +} + +// CreatePlayerCheatArgs ... +type CreatePlayerCheatArgs struct { + Name string `json:"name"` + Email string `json:"email" validate:"email"` + SoftCurrency int `json:"softCurrency" validate:"gte=0,lte=1000"` + HardCurrency int `json:"hardCurrency" validate:"gte=0,lte=200"` +} + +// CreatePlayerCheatResponse ... +type CreatePlayerCheatResponse struct { + Msg string `json:"msg"` +} + +// CreatePlayerCheat ... +func (g *MetagameServer) CreatePlayerCheat(ctx context.Context, args *CreatePlayerCheatArgs) (*CreatePlayerCheatResponse, error) { + // Do nothing. This is just an example of how pipelines can be helpful + return &CreatePlayerCheatResponse{ + Msg: "ok", + }, nil +} + +func handlerParamsValidator(ctx context.Context, in interface{}) (interface{}, error) { + var validate *validator.Validate + validate = validator.New() + + if err := validate.Struct(in); err != nil { + return nil, err + } + + return in, nil +} + +func main() { + svType := flag.String("type", "metagameDemo", "the server type") + isFrontend := flag.Bool("frontend", true, "if server is frontend") + flag.Parse() + + defer pitaya.Shutdown() + + pitaya.SetSerializer(json.NewSerializer()) + pitaya.Register(NewGameMock(), + component.WithName("metagameHandler"), + ) + pitaya.BeforeHandler(handlerParamsValidator) + + tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", 3251)) + pitaya.AddAcceptor(tcp) + pitaya.Configure(*isFrontend, *svType, pitaya.Cluster, map[string]string{}) + pitaya.Start() +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 52090654..157e9805 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -32,7 +32,7 @@ var ( type ( // Handler is a function that has the same signature as a handler and will // be called before or after handler methods - Handler func(ctx context.Context, in []byte) (out []byte, err error) + Handler func(ctx context.Context, in interface{}) (out interface{}, err error) pipelineChannel struct { Handlers []Handler diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index 6665ba47..ee1d3cbe 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -29,10 +29,10 @@ import ( ) var ( - handler1 = func(ctx context.Context, in []byte) ([]byte, error) { + handler1 = func(ctx context.Context, in interface{}) (interface{}, error) { return in, errors.New("ohno") } - handler2 = func(ctx context.Context, in []byte) ([]byte, error) { + handler2 = func(ctx context.Context, in interface{}) (interface{}, error) { return nil, nil } p = &pipelineChannel{} diff --git a/pipeline_test.go b/pipeline_test.go index 54aa6133..cff55d51 100644 --- a/pipeline_test.go +++ b/pipeline_test.go @@ -33,7 +33,7 @@ func resetPipelines() { pipeline.AfterHandler.Handlers = make([]pipeline.Handler, 0) } -var myHandler = func(ctx context.Context, in []byte) ([]byte, error) { +var myHandler = func(ctx context.Context, in interface{}) (interface{}, error) { return []byte("test"), nil } diff --git a/service/util.go b/service/util.go index d35b7a7f..d3fe1d37 100644 --- a/service/util.go +++ b/service/util.go @@ -96,7 +96,7 @@ func getMsgType(msgTypeIface interface{}) (message.Type, error) { return msgType, nil } -func executeBeforePipeline(ctx context.Context, data []byte) ([]byte, error) { +func executeBeforePipeline(ctx context.Context, data interface{}) (interface{}, error) { var err error res := data if len(pipeline.BeforeHandler.Handlers) > 0 { @@ -114,7 +114,7 @@ func executeBeforePipeline(ctx context.Context, data []byte) ([]byte, error) { return res, nil } -func executeAfterPipeline(ctx context.Context, ser serialize.Serializer, res []byte) []byte { +func executeAfterPipeline(ctx context.Context, ser serialize.Serializer, res interface{}) interface{} { var err error ret := res if len(pipeline.AfterHandler.Handlers) > 0 { @@ -170,15 +170,17 @@ func processHandlerMessage( logger.Log.Warn(err.Error()) } - if data, err = executeBeforePipeline(ctx, data); err != nil { - return nil, err - } - + // First unmarshal the handler arg that will be passed to + // both handler and pipeline functions arg, err := unmarshalHandlerArg(h, serializer, data) if err != nil { return nil, e.NewError(err, e.ErrBadRequestCode) } + if arg, err = executeBeforePipeline(ctx, arg); err != nil { + return nil, err + } + logger.Log.Debugf("SID=%d, Data=%s", session.ID(), data) args := []reflect.Value{h.Receiver, reflect.ValueOf(ctx)} if arg != nil { @@ -199,11 +201,11 @@ func processHandlerMessage( resp = []byte("ack") } + resp = executeAfterPipeline(ctx, serializer, resp) ret, err := serializeReturn(serializer, resp) if err != nil { return nil, err } - ret = executeAfterPipeline(ctx, serializer, ret) return ret, nil } diff --git a/service/util_test.go b/service/util_test.go index ae47b72d..4c969164 100644 --- a/service/util_test.go +++ b/service/util_test.go @@ -217,12 +217,12 @@ func TestExecuteBeforePipelineSuccess(t *testing.T) { data := []byte("ok") expected1 := []byte("oh noes 1") expected2 := []byte("oh noes 2") - before1 := func(ctx context.Context, in []byte) ([]byte, error) { + before1 := func(ctx context.Context, in interface{}) (interface{}, error) { assert.Equal(t, c, ctx) assert.Equal(t, data, in) return expected1, nil } - before2 := func(ctx context.Context, in []byte) ([]byte, error) { + before2 := func(ctx context.Context, in interface{}) (interface{}, error) { assert.Equal(t, c, ctx) assert.Equal(t, expected1, in) return expected2, nil @@ -239,7 +239,7 @@ func TestExecuteBeforePipelineSuccess(t *testing.T) { func TestExecuteBeforePipelineError(t *testing.T) { c := context.Background() expected := errors.New("oh noes") - before := func(ctx context.Context, in []byte) ([]byte, error) { + before := func(ctx context.Context, in interface{}) (interface{}, error) { assert.Equal(t, c, ctx) return nil, expected } @@ -261,12 +261,12 @@ func TestExecuteAfterPipelineSuccess(t *testing.T) { data := []byte("ok") expected1 := []byte("oh noes 1") expected2 := []byte("oh noes 2") - after1 := func(ctx context.Context, in []byte) ([]byte, error) { + after1 := func(ctx context.Context, in interface{}) (interface{}, error) { assert.Equal(t, c, ctx) assert.Equal(t, data, in) return expected1, nil } - after2 := func(ctx context.Context, in []byte) ([]byte, error) { + after2 := func(ctx context.Context, in interface{}) (interface{}, error) { assert.Equal(t, c, ctx) assert.Equal(t, expected1, in) return expected2, nil @@ -285,7 +285,7 @@ func TestExecuteAfterPipelineError(t *testing.T) { mockSerializer := mocks.NewMockSerializer(ctrl) c := context.Background() - after := func(ctx context.Context, in []byte) ([]byte, error) { + after := func(ctx context.Context, in interface{}) (interface{}, error) { assert.Equal(t, c, ctx) return nil, errors.New("oh noes") } @@ -407,7 +407,7 @@ func TestProcessHandlerMessageBrokenBeforePipeline(t *testing.T) { handlers[rt.Short()] = &component.Handler{} defer func() { delete(handlers, rt.Short()) }() expected := errors.New("oh noes") - before := func(ctx context.Context, in []byte) ([]byte, error) { + before := func(ctx context.Context, in interface{}) (interface{}, error) { return nil, expected } pipeline.BeforeHandler.PushFront(before) @@ -428,7 +428,7 @@ func TestProcessHandlerMessageBrokenAfterPipeline(t *testing.T) { handlers[rt.Short()] = &component.Handler{Receiver: reflect.ValueOf(tObj), Method: m, Type: m.Type.In(2)} defer func() { delete(handlers, rt.Short()) }() - after := func(ctx context.Context, in []byte) ([]byte, error) { + after := func(ctx context.Context, in interface{}) (interface{}, error) { return nil, errors.New("oh noes") } pipeline.AfterHandler.PushFront(after)