Skip to content

Commit

Permalink
Created MessageEncoder
Browse files Browse the repository at this point in the history
  • Loading branch information
Danilo Luvizotto committed Apr 23, 2018
1 parent 4245089 commit 228cee8
Show file tree
Hide file tree
Showing 14 changed files with 362 additions and 243 deletions.
10 changes: 5 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type (
messagesBufferSize int // size of the pending messages buffer
serializer serialize.Serializer // message serializer
state int32 // current agent state
dataCompression bool
messageEncoder message.MessageEncoder
}

pendingMessage struct {
Expand All @@ -89,11 +89,11 @@ func NewAgent(
heartbeatTime time.Duration,
messagesBufferSize int,
dieChan chan bool,
dataCompression bool,
messageEncoder message.MessageEncoder,
) *Agent {
// initialize heartbeat and handshake data on first player connection
once.Do(func() {
hbdEncode(heartbeatTime, packetEncoder, dataCompression)
hbdEncode(heartbeatTime, packetEncoder, messageEncoder.CompressEnabled())
})

a := &Agent{
Expand All @@ -110,7 +110,7 @@ func NewAgent(
lastAt: time.Now().Unix(),
serializer: serializer,
state: constants.StatusStart,
dataCompression: dataCompression,
messageEncoder: messageEncoder,
}

// bindng session
Expand Down Expand Up @@ -326,7 +326,7 @@ func (a *Agent) write() {
ID: data.mid,
Err: data.err,
}
em, err := m.Encode(a.dataCompression)
em, err := a.messageEncoder.Encode(m)
if err != nil {
logger.Log.Error(err.Error())
break
Expand Down
9 changes: 5 additions & 4 deletions agent/agent_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Remote struct {
Session *session.Session // session
Srv reflect.Value // cached session reflect.Value, this avoids repeated calls to reflect.value(a.Session)
chDie chan struct{} // wait for close
dataCompression bool
messageEncoder message.MessageEncoder
encoder codec.PacketEncoder // binary encoder
frontendID string // the frontend that sent the request
reply string // nats reply topic
Expand All @@ -61,7 +61,7 @@ func NewRemote(
serializer serialize.Serializer,
serviceDiscovery cluster.ServiceDiscovery,
frontendID string,
dataCompression bool,
messageEncoder message.MessageEncoder,
) (*Remote, error) {
a := &Remote{
chDie: make(chan struct{}),
Expand All @@ -71,7 +71,7 @@ func NewRemote(
rpcClient: rpcClient,
serviceDiscovery: serviceDiscovery,
frontendID: frontendID,
dataCompression: dataCompression,
messageEncoder: messageEncoder,
}

// binding session
Expand Down Expand Up @@ -151,7 +151,8 @@ func (a *Remote) serialize(m pendingMessage) ([]byte, error) {
ID: m.mid,
Err: m.err,
}
em, err := msg.Encode(a.dataCompression)

em, err := a.messageEncoder.Encode(msg)
if err != nil {
return nil, err
}
Expand Down
60 changes: 30 additions & 30 deletions agent/agent_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/topfreegames/pitaya/constants"
codecmocks "github.com/topfreegames/pitaya/internal/codec/mocks"
"github.com/topfreegames/pitaya/internal/message"
messagemocks "github.com/topfreegames/pitaya/internal/message/mocks"
"github.com/topfreegames/pitaya/internal/packet"
"github.com/topfreegames/pitaya/protos"
"github.com/topfreegames/pitaya/route"
Expand All @@ -50,7 +51,6 @@ func TestNewRemote(t *testing.T) {
ss := &protos.Session{Uid: uid}
reply := uuid.New().String()
frontendID := uuid.New().String()
dataCompression := true

ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand All @@ -59,8 +59,9 @@ func TestNewRemote(t *testing.T) {
mockSD := clustermocks.NewMockServiceDiscovery(ctrl)
mockSerializer := serializemocks.NewMockSerializer(ctrl)
mockEncoder := codecmocks.NewMockPacketEncoder(ctrl)
mockMessageEncoder := messagemocks.NewMockMessageEncoder(ctrl)

remote, err := NewRemote(ss, reply, mockRPCClient, mockEncoder, mockSerializer, mockSD, frontendID, dataCompression)
remote, err := NewRemote(ss, reply, mockRPCClient, mockEncoder, mockSerializer, mockSD, frontendID, mockMessageEncoder)
assert.NoError(t, err)
assert.NotNil(t, remote)
assert.IsType(t, make(chan struct{}), remote.chDie)
Expand All @@ -78,21 +79,21 @@ func TestNewRemote(t *testing.T) {
func TestNewRemoteFailsIfFailedToSetEncodedData(t *testing.T) {
ss := &protos.Session{Data: []byte("invalid")}

remote, err := NewRemote(ss, "", nil, nil, nil, nil, "", false)
remote, err := NewRemote(ss, "", nil, nil, nil, nil, "", nil)
assert.Equal(t, errors.New("unexpected EOF"), err)
assert.Nil(t, remote)
}

func TestAgentRemoteClose(t *testing.T) {
remote, err := NewRemote(nil, "", nil, nil, nil, nil, "", false)
remote, err := NewRemote(nil, "", nil, nil, nil, nil, "", nil)
assert.NoError(t, err)
assert.NotNil(t, remote)
err = remote.Close()
assert.NoError(t, err)
}

func TestAgentRemoteRemoteAddr(t *testing.T) {
remote, err := NewRemote(nil, "", nil, nil, nil, nil, "", false)
remote, err := NewRemote(nil, "", nil, nil, nil, nil, "", nil)
assert.NoError(t, err)
assert.NotNil(t, remote)
addr := remote.RemoteAddr()
Expand All @@ -106,15 +107,14 @@ func TestAgentRemotePush(t *testing.T) {
uid string
rpcClient cluster.RPCClient
data interface{}
dataCompression bool
errSerialize error
err error
}{
{"nats_rpc_session_not_bound", "", &cluster.NatsRPCClient{}, nil, false, nil, constants.ErrNoUIDBind},
{"success_raw_message", uuid.New().String(), nil, []byte("ok"), false, nil, nil},
{"failed_struct_message_serialize", uuid.New().String(), nil, &someStruct{A: "ok"}, false, errors.New("failed serialize"), errors.New("failed serialize")},
{"success_struct_message", uuid.New().String(), nil, &someStruct{A: "ok"}, true, nil, nil},
{"failed_send", uuid.New().String(), nil, []byte("ok"), true, nil, errors.New("failed send")},
{"nats_rpc_session_not_bound", "", &cluster.NatsRPCClient{}, nil, nil, constants.ErrNoUIDBind},
{"success_raw_message", uuid.New().String(), nil, []byte("ok"), nil, nil},
{"failed_struct_message_serialize", uuid.New().String(), nil, &someStruct{A: "ok"}, errors.New("failed serialize"), errors.New("failed serialize")},
{"success_struct_message", uuid.New().String(), nil, &someStruct{A: "ok"}, nil, nil},
{"failed_send", uuid.New().String(), nil, []byte("ok"), nil, errors.New("failed send")},
}

for _, table := range tables {
Expand All @@ -127,7 +127,7 @@ func TestAgentRemotePush(t *testing.T) {
}
ss := &protos.Session{Uid: table.uid}
mockSerializer := serializemocks.NewMockSerializer(ctrl)
remote, err := NewRemote(ss, "", table.rpcClient, nil, mockSerializer, nil, "", table.dataCompression)
remote, err := NewRemote(ss, "", table.rpcClient, nil, mockSerializer, nil, "", nil)
assert.NoError(t, err)
assert.NotNil(t, remote)

Expand Down Expand Up @@ -164,18 +164,17 @@ func TestAgentRemoteResponseMID(t *testing.T) {
mid uint
data interface{}
msgErr bool
dataCompression bool
errEncode error
errSerialize error
err error
}{
{"success_raw_message", uint(rand.Int()), []byte("ok"), false, false, nil, nil, nil},
{"success_struct_message", uint(rand.Int()), &someStruct{A: "ok"}, false, true, nil, nil, nil},
{"success_struct_message_with_error", uint(rand.Int()), &someStruct{A: "ok"}, true, false, nil, nil, nil},
{"failed_struct_message_serialize", uint(rand.Int()), &someStruct{A: "ok"}, false, true, nil, errors.New("failed serialize"), errors.New("failed serialize")},
{"failed_encode", uint(rand.Int()), &someStruct{A: "ok"}, false, false, errors.New("failed encode"), nil, errors.New("failed encode")},
{"failed_send", uint(rand.Int()), &someStruct{A: "ok"}, false, true, nil, nil, errors.New("failed send")},
{"zero_mid", 0, nil, false, false, nil, nil, constants.ErrSessionOnNotify},
{"success_raw_message", uint(rand.Int()), []byte("ok"), false, nil, nil, nil},
{"success_struct_message", uint(rand.Int()), &someStruct{A: "ok"}, false, nil, nil, nil},
{"success_struct_message_with_error", uint(rand.Int()), &someStruct{A: "ok"}, true, nil, nil, nil},
{"failed_struct_message_serialize", uint(rand.Int()), &someStruct{A: "ok"}, false, nil, errors.New("failed serialize"), errors.New("failed serialize")},
{"failed_encode", uint(rand.Int()), &someStruct{A: "ok"}, false, errors.New("failed encode"), nil, errors.New("failed encode")},
{"failed_send", uint(rand.Int()), &someStruct{A: "ok"}, false, nil, nil, errors.New("failed send")},
{"zero_mid", 0, nil, false, nil, nil, constants.ErrSessionOnNotify},
}

for _, table := range tables {
Expand All @@ -189,7 +188,8 @@ func TestAgentRemoteResponseMID(t *testing.T) {
mockEnconder := codecmocks.NewMockPacketEncoder(ctrl)
mockSerializer := serializemocks.NewMockSerializer(ctrl)
mockRPCClient := clustermocks.NewMockRPCClient(ctrl)
remote, err := NewRemote(ss, reply, mockRPCClient, mockEnconder, mockSerializer, nil, "", table.dataCompression)
messageEncoder := message.NewEncoder(false)
remote, err := NewRemote(ss, reply, mockRPCClient, mockEnconder, mockSerializer, nil, "", messageEncoder)
assert.NoError(t, err)
assert.NotNil(t, remote)

Expand All @@ -208,7 +208,7 @@ func TestAgentRemoteResponseMID(t *testing.T) {
ID: table.mid,
Err: table.msgErr,
}
expectedMsg, _ := rawMsg.Encode(table.dataCompression)
expectedMsg, _ := messageEncoder.Encode(rawMsg)
mockEnconder.EXPECT().Encode(gomock.Any(), expectedMsg).Return(nil, table.errEncode).Do(
func(typ packet.Type, d []byte) {
// cannot compare inside the expect because they are equivalent but not equal
Expand Down Expand Up @@ -237,18 +237,17 @@ func TestAgentRemoteSendRequest(t *testing.T) {
serverID string
reqRoute string
data interface{}
dataCompression bool
errSerialize error
errGetServer error
err error
resp *protos.Response
}{
{"test_failed_bad_route", uuid.New().String(), uuid.New().String(), []byte("ok"), false, nil, nil, errors.New("invalid route"), nil},
{"test_success_raw", uuid.New().String(), "", []byte("ok"), true, nil, nil, nil, &protos.Response{Data: []byte("resp")}},
{"test_success_struct", uuid.New().String(), "", &someStruct{A: "ok"}, false, nil, nil, nil, &protos.Response{Data: []byte("resp")}},
{"test_failed_serialize", uuid.New().String(), "", &someStruct{A: "ok"}, true, errors.New("ser"), nil, errors.New("ser"), nil},
{"test_failed_get_server", uuid.New().String(), "", &someStruct{A: "ok"}, false, nil, errors.New("get sv"), errors.New("get sv"), nil},
{"test_failed_call", uuid.New().String(), "", &someStruct{A: "ok"}, true, nil, nil, errors.New("call"), nil},
{"test_failed_bad_route", uuid.New().String(), uuid.New().String(), []byte("ok"), nil, nil, errors.New("invalid route"), nil},
{"test_success_raw", uuid.New().String(), "", []byte("ok"), nil, nil, nil, &protos.Response{Data: []byte("resp")}},
{"test_success_struct", uuid.New().String(), "", &someStruct{A: "ok"}, nil, nil, nil, &protos.Response{Data: []byte("resp")}},
{"test_failed_serialize", uuid.New().String(), "", &someStruct{A: "ok"}, errors.New("ser"), nil, errors.New("ser"), nil},
{"test_failed_get_server", uuid.New().String(), "", &someStruct{A: "ok"}, nil, errors.New("get sv"), errors.New("get sv"), nil},
{"test_failed_call", uuid.New().String(), "", &someStruct{A: "ok"}, nil, nil, errors.New("call"), nil},
}

for _, table := range tables {
Expand All @@ -259,7 +258,8 @@ func TestAgentRemoteSendRequest(t *testing.T) {
mockSD := clustermocks.NewMockServiceDiscovery(ctrl)
mockSerializer := serializemocks.NewMockSerializer(ctrl)
mockRPCClient := clustermocks.NewMockRPCClient(ctrl)
remote, err := NewRemote(nil, "", mockRPCClient, nil, mockSerializer, mockSD, "", table.dataCompression)
mockMessageEncoder := messagemocks.NewMockMessageEncoder(ctrl)
remote, err := NewRemote(nil, "", mockRPCClient, nil, mockSerializer, mockSD, "", mockMessageEncoder)
assert.NoError(t, err)
assert.NotNil(t, remote)

Expand Down
Loading

0 comments on commit 228cee8

Please sign in to comment.