Skip to content

Commit

Permalink
[WIP] kick user
Browse files Browse the repository at this point in the history
  • Loading branch information
felipejfc committed May 2, 2018
1 parent 80bdbad commit 35e314e
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 5 deletions.
2 changes: 1 addition & 1 deletion agent/agent.go
Expand Up @@ -233,7 +233,7 @@ func (a *Agent) GetStatus() int32 {
// Kick sends a kick packet to a client
func (a *Agent) Kick(ctx context.Context) error {
// packet encode
p, err := a.encoder.Encode(packet.Kick, []byte{})
p, err := a.encoder.Encode(packet.Kick, nil)
if err != nil {
return err
}
Expand Down
22 changes: 22 additions & 0 deletions agent/agent_remote_test.go
Expand Up @@ -21,6 +21,7 @@
package agent

import (
"context"
"errors"
"math/rand"
"reflect"
Expand Down Expand Up @@ -157,6 +158,27 @@ func TestAgentRemotePush(t *testing.T) {
}
}

func TestKickRemote(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

rpcClient := clustermocks.NewMockRPCClient(ctrl)
ss := &protos.Session{Uid: uuid.New().String()}
mockSD := clustermocks.NewMockServiceDiscovery(ctrl)
mockSerializer := serializemocks.NewMockSerializer(ctrl)
frontID := uuid.New().String()
remote, err := NewRemote(ss, "", rpcClient, nil, mockSerializer, mockSD, frontID, nil)
assert.NoError(t, err)

mockSD.EXPECT().GetServer(frontID)
c := context.Background()
r, _ := route.Decode("sys.kick")
rpcClient.EXPECT().Call(c, protos.RPCType_User, r, gomock.Nil(), gomock.Any(), gomock.Nil())
err = remote.Kick(c)

assert.NoError(t, err)
}

func TestAgentRemoteResponseMID(t *testing.T) {
tables := []struct {
name string
Expand Down
25 changes: 25 additions & 0 deletions agent/agent_test.go
Expand Up @@ -21,6 +21,7 @@
package agent

import (
"context"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -100,6 +101,30 @@ func TestNewAgent(t *testing.T) {
assert.NotNil(t, ag)
}

func TestKick(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockSerializer := serializemocks.NewMockSerializer(ctrl)
mockEncoder := codecmocks.NewMockPacketEncoder(ctrl)
mockDecoder := codecmocks.NewMockPacketDecoder(ctrl)
dieChan := make(chan bool)
hbTime := time.Second

mockConn := mocks.NewMockConn(ctrl)
mockEncoder.EXPECT().Encode(gomock.Any(), gomock.Nil()).Do(
func(typ packet.Type, d []byte) {
assert.EqualValues(t, packet.Kick, typ)
})
mockConn.EXPECT().Write(gomock.Any()).Return(0, nil)
messageEncoder := message.NewEncoder(false)

ag := NewAgent(mockConn, mockDecoder, mockEncoder, mockSerializer, hbTime, 10, dieChan, messageEncoder)
c := context.Background()
err := ag.Kick(c)
assert.NoError(t, err)
}

func TestAgentSend(t *testing.T) {
tables := []struct {
name string
Expand Down
3 changes: 3 additions & 0 deletions client/client.go
Expand Up @@ -228,6 +228,9 @@ func (c *Client) handlePackets() {
c.pendingReqMutex.Unlock()
}
c.IncomingMsgChan <- m
case packet.Kick:
logger.Log.Warn("got kick packet from the server! disconnecting...")
c.Disconnect()
}
case <-c.closeChan:
return
Expand Down
2 changes: 1 addition & 1 deletion cluster/nats_rpc_client.go
Expand Up @@ -145,7 +145,7 @@ func (ns *NatsRPCClient) Call(
) (*protos.Response, error) {
parent, err := jaeger.ExtractSpan(ctx)
if err != nil {
logger.Log.Errorf("failed to retrieve parent span: %s", err.Error())
logger.Log.Warnf("failed to retrieve parent span: %s", err.Error())
}
tags := opentracing.Tags{
"span.kind": "client",
Expand Down
57 changes: 57 additions & 0 deletions e2e/e2e_test.go
Expand Up @@ -135,6 +135,63 @@ func TestGroupFront(t *testing.T) {
}
}

func TestKick(t *testing.T) {
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

err := c1.ConnectTo(fmt.Sprintf("localhost:%d", port1))
assert.NoError(t, err)
defer c1.Disconnect()

err = c2.ConnectTo(fmt.Sprintf("localhost:%d", port1))
assert.NoError(t, err)
defer c2.Disconnect()

uid1 := uuid.New().String()
err = c1.SendRequest("connector.testsvc.testbindid", []byte(uid1))
assert.NoError(t, err)
err = c2.SendRequest("connector.testsvc.testrequestkickuser", []byte(uid1))
assert.NoError(t, err)

helpers.ShouldEventuallyReturn(t, func() bool {
return c1.Connected
}, false)
}

func TestKickOnBack(t *testing.T) {
// TODO do not skip
t.Skip()
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix)()
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

err := c1.ConnectTo(fmt.Sprintf("localhost:%d", port1))
assert.NoError(t, err)
defer c1.Disconnect()

err = c2.ConnectTo(fmt.Sprintf("localhost:%d", port1))
assert.NoError(t, err)
defer c2.Disconnect()

uid1 := uuid.New().String()
err = c1.SendRequest("game.testsvc.testbindid", []byte(uid1))
assert.NoError(t, err)
err = c1.SendRequest("game.testsvc.testrequestkickme", nil)
assert.NoError(t, err)

helpers.ShouldEventuallyReturn(t, func() bool {
return c1.Connected
}, false)
}

func TestPushToUsers(t *testing.T) {
port1 := helpers.GetFreePort(t)

Expand Down
29 changes: 29 additions & 0 deletions examples/testing/main.go
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/topfreegames/pitaya"
"github.com/topfreegames/pitaya/acceptor"
"github.com/topfreegames/pitaya/component"
"github.com/topfreegames/pitaya/constants"
"github.com/topfreegames/pitaya/serialize/json"
"github.com/topfreegames/pitaya/serialize/protobuf"
"github.com/topfreegames/pitaya/session"
)

// TestSvc service for e2e tests
Expand Down Expand Up @@ -109,7 +111,34 @@ func (t *TestSvc) Init() {

// TestRequestKickUser handler for e2e tests
func (t *TestSvc) TestRequestKickUser(ctx context.Context, userID []byte) (*TestResponse, error) {
s := session.GetSessionByUID(string(userID))
if s == nil {
return nil, pitaya.Error(constants.ErrSessionNotFound, "PIT-404")
}
err := s.Kick(ctx)
if err != nil {
return nil, err
}
return &TestResponse{
Code: 200,
Msg: "ok",
}, nil
}

// TestRequestKickMe handler for e2e tests
func (t *TestSvc) TestRequestKickMe(ctx context.Context) (*TestResponse, error) {
s := pitaya.GetSessionFromCtx(ctx)
if s == nil {
return nil, pitaya.Error(constants.ErrSessionNotFound, "PIT-404")
}
err := s.Kick(ctx)
if err != nil {
return nil, err
}
return &TestResponse{
Code: 200,
Msg: "ok",
}, nil
}

// TestRequestOnlySessionReturnsPtr handler for e2e tests
Expand Down
2 changes: 1 addition & 1 deletion service/remote.go
Expand Up @@ -207,7 +207,7 @@ func (r *RemoteService) ProcessRemoteMessages(threadID int) {
}
parent, err := jaeger.ExtractSpan(ctx)
if err != nil {
logger.Log.Errorf("failed to retrieve parent span: %s", err.Error())
logger.Log.Warnf("failed to retrieve parent span: %s", err.Error())
}

tags := opentracing.Tags{
Expand Down
3 changes: 1 addition & 2 deletions session/session.go
Expand Up @@ -249,8 +249,7 @@ func (s *Session) Kick(ctx context.Context) error {
if err != nil {
return err
}
s.Close()
return nil
return s.entity.Close()
}

// OnClose adds the function it receives to the callbacks that will be called
Expand Down
12 changes: 12 additions & 0 deletions session/session_test.go
Expand Up @@ -169,6 +169,18 @@ func TestGetSessionByUIDDoenstExist(t *testing.T) {
assert.Nil(t, ss)
}

func TestKick(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
entity := mocks.NewMockNetworkEntity(ctrl)
ss := New(entity, true)
c := context.Background()
entity.EXPECT().Kick(c)
entity.EXPECT().Close()
err := ss.Kick(c)
assert.NoError(t, err)
}

func TestSessionUpdateEncodedData(t *testing.T) {
tables := []struct {
name string
Expand Down

0 comments on commit 35e314e

Please sign in to comment.