Skip to content

Commit

Permalink
Merge 095a4e3 into 51f5fa4
Browse files Browse the repository at this point in the history
  • Loading branch information
felipejfc committed Jun 4, 2018
2 parents 51f5fa4 + 095a4e3 commit abea765
Show file tree
Hide file tree
Showing 55 changed files with 2,413 additions and 519 deletions.
4 changes: 3 additions & 1 deletion agent/agent_remote.go
Expand Up @@ -91,7 +91,9 @@ func (a *Remote) Kick(ctx context.Context) error {
if a.Session.UID() == "" {
return constants.ErrNoUIDBind
}
b, err := util.GobEncode([]byte(a.Session.UID()))
b, err := proto.Marshal(&protos.KickMsg{
UserId: a.Session.UID(),
})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion agent/agent_remote_test.go
Expand Up @@ -80,7 +80,7 @@ func TestNewRemoteFailsIfFailedToSetEncodedData(t *testing.T) {
ss := &protos.Session{Data: []byte("invalid")}

remote, err := NewRemote(ss, "", nil, nil, nil, nil, "", nil)
assert.Equal(t, errors.New("unexpected EOF"), err)
assert.Equal(t, errors.New("invalid character 'i' looking for beginning of value").Error(), err.Error())
assert.Nil(t, remote)
}

Expand Down
3 changes: 0 additions & 3 deletions app.go
Expand Up @@ -22,7 +22,6 @@ package pitaya

import (
"context"
"encoding/gob"
"os"
"os/signal"
"reflect"
Expand Down Expand Up @@ -265,8 +264,6 @@ func startDefaultRPCClient() {
}

func initSysRemotes() {
gob.Register(&session.Data{})
gob.Register(map[string]interface{}{})
sys := &remote.Sys{}
RegisterRemote(sys,
component.WithName("sys"),
Expand Down
2 changes: 1 addition & 1 deletion cluster/nats_rpc_client.go
Expand Up @@ -106,7 +106,7 @@ func (ns *NatsRPCClient) buildRequest(
if err != nil {
logger.Log.Errorf("failed to inject span: %s", err)
}
ctx = pcontext.AddToPropagateCtx(ctx, constants.PeerIdKey, ns.server.ID)
ctx = pcontext.AddToPropagateCtx(ctx, constants.PeerIDKey, ns.server.ID)
ctx = pcontext.AddToPropagateCtx(ctx, constants.PeerServiceKey, ns.server.Type)
req.Metadata, err = pcontext.Encode(ctx)
if err != nil {
Expand Down
15 changes: 0 additions & 15 deletions cluster/nats_rpc_client_test.go
Expand Up @@ -22,10 +22,8 @@ package cluster

import (
"context"
"encoding/gob"
"errors"
"fmt"
"os"
"testing"
"time"

Expand All @@ -46,19 +44,6 @@ import (
"github.com/topfreegames/pitaya/session"
)

func TestMain(m *testing.M) {
setup()
code := m.Run()
shutdown()
os.Exit(code)
}

func setup() {
gob.Register(map[string]interface{}{})
}

func shutdown() {}

func TestNewNatsRPCClient(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
Expand Down
30 changes: 23 additions & 7 deletions component/method.go
Expand Up @@ -26,13 +26,15 @@ import (
"unicode"
"unicode/utf8"

"github.com/gogo/protobuf/proto"
"github.com/topfreegames/pitaya/internal/message"
)

var (
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
typeOfBytes = reflect.TypeOf(([]byte)(nil))
typeOfContext = reflect.TypeOf(new(context.Context)).Elem()
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
typeOfBytes = reflect.TypeOf(([]byte)(nil))
typeOfContext = reflect.TypeOf(new(context.Context)).Elem()
typeOfProtoMsg = reflect.TypeOf(new(proto.Message)).Elem()
)

func isExported(name string) bool {
Expand All @@ -50,20 +52,30 @@ func isRemoteMethod(method reflect.Method) bool {
}

// Method needs at least two ins: receiver and context.Context
if mt.NumIn() < 2 {
if mt.NumIn() != 2 && mt.NumIn() != 3 {
return false
}

if t1 := mt.In(1); !t1.Implements(typeOfContext) {
return false
}

// Method needs two outs: interface{}(or []byte), error
if mt.NumIn() == 3 {
if t2 := mt.In(2); !t2.Implements(typeOfProtoMsg) {
return false
}
}

// Method needs two outs: interface{}(that implements proto.Message), error
if mt.NumOut() != 2 {
return false
}

if (mt.Out(0).Kind() != reflect.Ptr && mt.Out(0) != typeOfBytes) || mt.Out(1) != typeOfError {
if (mt.Out(0).Kind() != reflect.Ptr) || mt.Out(1) != typeOfError {
return false
}

if o0 := mt.Out(0); !o0.Implements(typeOfProtoMsg) {
return false
}

Expand Down Expand Up @@ -107,6 +119,7 @@ func suitableRemoteMethods(typ reflect.Type, nameFunc func(string) string) map[s
methods := make(map[string]*Remote)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mt := method.Type
mn := method.Name
if isRemoteMethod(method) {
// rewrite remote name
Expand All @@ -115,7 +128,10 @@ func suitableRemoteMethods(typ reflect.Type, nameFunc func(string) string) map[s
}
methods[mn] = &Remote{
Method: method,
HasArgs: method.Type.NumIn() > 2,
HasArgs: method.Type.NumIn() == 3,
}
if mt.NumIn() == 3 {
methods[mn].Type = mt.In(2)
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions component/method_test.go
Expand Up @@ -27,6 +27,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/topfreegames/pitaya/protos/test"
)

type TestType struct {
Expand All @@ -42,10 +43,10 @@ func (t *TestType) ExportedHandlerWithSessionAndPointerWithRawOut(ctx context.Co
func (t *TestType) ExportedHandlerWithSessionAndPointerWithPointerOut(ctx context.Context, tt *TestType) (*TestType, error) {
return nil, nil
}
func (t *TestType) ExportedRemoteRawOut(ctx context.Context) ([]byte, error) {
func (t *TestType) ExportedRemoteRawOut(ctx context.Context) (*test.SomeStruct, error) {
return nil, nil
}
func (t *TestType) ExportedRemotePointerOut(ctx context.Context) (*TestType, error) {
func (t *TestType) ExportedRemotePointerOut(ctx context.Context) (*test.SomeStruct, error) {
return nil, nil
}

Expand Down
1 change: 1 addition & 0 deletions component/service.go
Expand Up @@ -43,6 +43,7 @@ type (
Receiver reflect.Value // receiver of method
Method reflect.Method // method stub
HasArgs bool // if remote has no args we won't try to serialize received data into arguments
Type reflect.Type // low-level type of method
}

// Service implements a specific service, some of it's methods will be
Expand Down
4 changes: 2 additions & 2 deletions constants/const.go
Expand Up @@ -56,8 +56,8 @@ var PropagateCtxKey = propagateKey{}
// the propagate key
var SpanPropagateCtxKey = "opentracing-span"

// PeerIdKey is the key holding the peer id to be sent over the context
var PeerIdKey = "peer.id"
// PeerIDKey is the key holding the peer id to be sent over the context
var PeerIDKey = "peer.id"

// PeerServiceKey is the key holding the peer service to be sent over the context
var PeerServiceKey = "peer.service"
Expand Down
1 change: 1 addition & 0 deletions constants/errors.go
Expand Up @@ -63,4 +63,5 @@ var (
ErrInvalidCertificates = errors.New("certificates must be exactly two")
ErrTimeoutTerminatingBinaryModule = errors.New("timeout waiting to binary module to die")
ErrFrontendTypeNotSpecified = errors.New("for using SendPushToUsers from a backend server you have to specify a valid frontendType")
ErrMetricNotKnown = errors.New("the provided metric does not exist")
)
12 changes: 5 additions & 7 deletions context/context.go
Expand Up @@ -21,12 +21,10 @@
package context

import (
"bytes"
"context"
"encoding/gob"
"encoding/json"

"github.com/topfreegames/pitaya/constants"
"github.com/topfreegames/pitaya/util"
)

// AddToPropagateCtx adds a key and value that will be propagated through RPC calls
Expand Down Expand Up @@ -63,7 +61,7 @@ func FromMap(val map[string]interface{}) context.Context {
func Encode(ctx context.Context) ([]byte, error) {
m := ToMap(ctx)
if len(m) > 0 {
return util.GobEncode(m)
return json.Marshal(m)
}
return nil, nil
}
Expand All @@ -74,10 +72,10 @@ func Decode(m []byte) (context.Context, error) {
// TODO maybe return an error
return nil, nil
}
args := make([]interface{}, 0)
err := gob.NewDecoder(bytes.NewReader(m)).Decode(&args)
mp := make(map[string]interface{}, 0)
err := json.Unmarshal(m, &mp)
if err != nil {
return nil, err
}
return FromMap(args[0].(map[string]interface{})), nil
return FromMap(mp), nil
}
25 changes: 3 additions & 22 deletions context/context_test.go
Expand Up @@ -22,39 +22,23 @@ package context

import (
"context"
"encoding/gob"
"encoding/json"
"errors"
"flag"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/topfreegames/pitaya/constants"
"github.com/topfreegames/pitaya/helpers"
"github.com/topfreegames/pitaya/util"
)

var update = flag.Bool("update", false, "update .golden files")

type unregisteredStruct struct{}
type registeredStruct struct{}

func TestMain(m *testing.M) {
setup()
code := m.Run()
shutdown()
os.Exit(code)
}

func setup() {
gob.Register(map[string]interface{}{})
gob.Register(&registeredStruct{})
}

func shutdown() {}

func TestAddToPropagateCtx(t *testing.T) {
tables := []struct {
name string
Expand Down Expand Up @@ -166,8 +150,6 @@ func TestEncode(t *testing.T) {
}{
{"no_elements", map[string]interface{}{}, nil},
{"one_element", map[string]interface{}{"key1": "val1"}, nil},
{"unregistered_struct", map[string]interface{}{"key1": &unregisteredStruct{}}, errors.New("gob: type not registered for interface: context.unregisteredStruct")},
{"registered_struct", map[string]interface{}{"key1": &registeredStruct{}}, nil},
}

for _, table := range tables {
Expand All @@ -177,7 +159,7 @@ func TestEncode(t *testing.T) {
if len(table.items) > 0 && table.err == nil {
gp := filepath.Join("fixtures", table.name+".golden")
if *update {
b, err := util.GobEncode(table.items)
b, err := json.Marshal(table.items)
require.NoError(t, err)
t.Log("updating golden file")
helpers.WriteFile(t, gp, b)
Expand All @@ -202,7 +184,6 @@ func TestDecode(t *testing.T) {
items map[string]interface{}
}{
{"one_element", map[string]interface{}{"key1": "val1"}},
{"registered_struct", map[string]interface{}{"key1": &registeredStruct{}}},
}

for _, table := range tables {
Expand All @@ -221,7 +202,7 @@ func TestDecode(t *testing.T) {

func TestDecodeFailsIfBadEncodedData(t *testing.T) {
decoded, err := Decode([]byte("oh noes"))
assert.Equal(t, errors.New("unexpected EOF"), err)
assert.Equal(t, errors.New("invalid character 'o' looking for beginning of value").Error(), err.Error())
assert.Nil(t, decoded)
}

Expand Down
Binary file modified context/fixtures/one_element.golden
Binary file not shown.
Binary file modified context/fixtures/registered_struct.golden
Binary file not shown.
22 changes: 11 additions & 11 deletions e2e/e2e_test.go
Expand Up @@ -384,21 +384,21 @@ func TestForwardToBackend(t *testing.T) {
}

func TestGroupBack(t *testing.T) {
portFront1 := helpers.GetFreePort(t)
portFront2 := helpers.GetFreePort(t)
port1 := helpers.GetFreePort(t)
port2 := helpers.GetFreePort(t)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())

defer helpers.StartServer(t, false, true, "game", 0, sdPrefix)()
defer helpers.StartServer(t, true, true, "connector", portFront1, sdPrefix)()
defer helpers.StartServer(t, true, true, "connector", portFront2, sdPrefix)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix)()
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

err := c1.ConnectTo(fmt.Sprintf("localhost:%d", portFront1))
err := c1.ConnectTo(fmt.Sprintf("localhost:%d", port1))
assert.NoError(t, err)
defer c1.Disconnect()

err = c2.ConnectTo(fmt.Sprintf("localhost:%d", portFront2))
err = c2.ConnectTo(fmt.Sprintf("localhost:%d", port2))
assert.NoError(t, err)
defer c2.Disconnect()

Expand Down Expand Up @@ -435,14 +435,14 @@ func TestGroupBack(t *testing.T) {
}

func TestUserRPC(t *testing.T) {
portFront1 := helpers.GetFreePort(t)
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix)()
defer helpers.StartServer(t, true, true, "connector", portFront1, sdPrefix)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix)()
c1 := client.New(logrus.InfoLevel)

err := c1.ConnectTo(fmt.Sprintf("localhost:%d", portFront1))
err := c1.ConnectTo(fmt.Sprintf("localhost:%d", port1))
assert.NoError(t, err)
defer c1.Disconnect()

Expand All @@ -457,9 +457,9 @@ func TestUserRPC(t *testing.T) {
{"front_to_back_error", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestreturnserror","data":"thisthis"}`), []byte(`{"code":"PIT-433","msg":"test error","metadata":{"some":"meta"}}`)},
{"back_to_front_error", "game.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestreturnserror","data":"thisthis"}`), []byte(`{"code":"PIT-433","msg":"test error","metadata":{"some":"meta"}}`)},
{"same_server", "connector.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), []byte(`{"code":"PIT-000","msg":"you are making a rpc that may be processed locally, either specify a different server type or specify a server id"}`)},
{"front_to_back_ptr", "connector.testsvc.testsendrpcpointer", []byte(`{"route":"game.testremotesvc.rpctestptrreturnsptr","data":"thisthis"}`), []byte(`{"code":200,"msg":"got thisthis"}`)},
{"front_to_back_ptr", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestptrreturnsptr","data":"thisthis"}`), []byte(`{"code":200,"msg":"got thisthis"}`)},
{"no_args", "connector.testsvc.testsendrpcnoargs", []byte(`{"route":"game.testremotesvc.rpctestnoargs"}`), []byte(`{"code":200,"msg":"got nothing"}`)},
{"not_found", "connector.testsvc.testsendrpcpointer", []byte(`{"route":"game.testremotesvc.rpctestnotfound","data":"thisthis"}`), []byte(`{"code":"PIT-404","msg":"route not found","metadata":{"route":"testremotesvc.rpctestnotfound"}}`)},
{"not_found", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestnotfound","data":"thisthis"}`), []byte(`{"code":"PIT-404","msg":"route not found","metadata":{"route":"testremotesvc.rpctestnotfound"}}`)},
}

for _, table := range tables {
Expand Down
4 changes: 1 addition & 3 deletions examples/demo/cluster/main.go
Expand Up @@ -50,8 +50,7 @@ func configureBackend() {

func configureFrontend(port int) {
configureJaeger("connector")
ws := acceptor.NewWSAcceptor(fmt.Sprintf(":%d", port))
tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", port+1))
tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", port))

pitaya.Register(&services.Connector{},
component.WithName("connector"),
Expand Down Expand Up @@ -90,7 +89,6 @@ func configureFrontend(port int) {
fmt.Printf("error setting route dictionary %s\n", err.Error())
}

pitaya.AddAcceptor(ws)
pitaya.AddAcceptor(tcp)
}

Expand Down

0 comments on commit abea765

Please sign in to comment.