Skip to content

Commit

Permalink
Merge 8b485b3 into 26412ab
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielerzinger committed Aug 17, 2018
2 parents 26412ab + 8b485b3 commit 43429ad
Show file tree
Hide file tree
Showing 14 changed files with 540 additions and 20 deletions.
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type RPCServer interface {
type RPCClient interface {
Send(route string, data []byte) error
SendPush(userID string, frontendSv *Server, push *protos.Push) error
SendKick(userID string, serverType string, kick *protos.KickMsg) error
BroadcastSessionBind(uid string) error
Call(ctx context.Context, rpcType protos.RPCType, route *route.Route, session *session.Session, msg *message.Message, server *Server) (*protos.Response, error)
interfaces.Module
Expand Down
23 changes: 23 additions & 0 deletions cluster/grpc_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,29 @@ func (gs *GRPCClient) BroadcastSessionBind(uid string) error {
return nil
}

// SendKick sends a kick to an user
func (gs *GRPCClient) SendKick(userID string, serverType string, kick *protos.KickMsg) error {
var svID string
var err error

if gs.bindingStorage == nil {
return constants.ErrNoBindingStorageModule
}

svID, err = gs.bindingStorage.GetUserFrontendID(userID, serverType)
if err != nil {
return err
}

if c, ok := gs.clientMap.Load(svID); ok {
ctxT, done := context.WithTimeout(context.Background(), gs.reqTimeout)
defer done()
_, err := c.(protos.PitayaClient).KickUser(ctxT, kick)
return err
}
return constants.ErrNoConnectionToServer
}

// SendPush sends a message to an user, if you dont know the serverID that the user is connected to, you need to set a BindingStorage when creating the client
// TODO: Jaeger?
func (gs *GRPCClient) SendPush(userID string, frontendSv *Server, push *protos.Push) error {
Expand Down
60 changes: 60 additions & 0 deletions cluster/grpc_rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,66 @@ func TestBroadcastSessionBind(t *testing.T) {
}
}

func TestSendKick(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockBindingStorage := mocks.NewMockBindingStorage(ctrl)
mockPitayaClient := protosmocks.NewMockPitayaClient(ctrl)
tables := []struct {
name string
userId string
bindingStorage interfaces.BindingStorage
sv *Server
err error
}{
{"bindingstorage", "uid", mockBindingStorage, &Server{
Type: "tp",
Frontend: true}, nil,
},
{"nobindingstorage", "uid", nil, &Server{
Type: "tp",
Frontend: true,
}, constants.ErrNoBindingStorageModule},
{"nobindingstorage", "", mockBindingStorage, &Server{
Type: "tp",
Frontend: true,
}, nil},
}

for _, table := range tables {
t.Run(table.name, func(t *testing.T) {
c := getConfig()
g, err := getRPCClient(c)
assert.NoError(t, err)

if table.bindingStorage != nil {
g.clientMap.Store(table.sv.ID, mockPitayaClient)
g.bindingStorage = table.bindingStorage
mockBindingStorage.EXPECT().GetUserFrontendID(table.userId, gomock.Any()).DoAndReturn(func(u, svType string) (string, error) {
assert.Equal(t, table.userId, u)
assert.Equal(t, table.sv.Type, svType)
return table.sv.ID, nil
})

mockPitayaClient.EXPECT().KickUser(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, msg *protos.KickMsg) {
assert.Equal(t, table.userId, msg.UserId)
})
}

kick := &protos.KickMsg{
UserId: table.userId,
}

err = g.SendKick(table.userId, table.sv.Type, kick)
if table.err != nil {
assert.Equal(t, err, table.err)
} else {
assert.NoError(t, err)
}
})
}
}

func TestSendPush(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
14 changes: 13 additions & 1 deletion cluster/mocks/cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions cluster/nats_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ func (ns *NatsRPCClient) SendPush(userID string, frontendSv *Server, push *proto
return ns.Send(topic, msg)
}

// SendKick kicks an user
func (ns *NatsRPCClient) SendKick(userID string, serverType string, kick *protos.KickMsg) error {
topic := GetUserKickTopic(userID, serverType)
msg, err := proto.Marshal(kick)
if err != nil {
return err
}
return ns.Send(topic, msg)
}

// Call calls a method remotelly
func (ns *NatsRPCClient) Call(
ctx context.Context,
Expand Down
36 changes: 36 additions & 0 deletions cluster/nats_rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,42 @@ func TestNatsRPCClientBroadcastSessionBind(t *testing.T) {
subs.Unsubscribe()
}

func TestNatsRPCClientSendKick(t *testing.T) {
uid := "testuid"
s := helpers.GetTestNatsServer(t)
defer s.Shutdown()
cfg := viper.New()
cfg.Set("pitaya.cluster.rpc.client.nats.connect", fmt.Sprintf("nats://%s", s.Addr()))
config := getConfig(cfg)
sv := getServer()

rpcClient, _ := NewNatsRPCClient(config, sv, nil, nil)
err := rpcClient.Init()
assert.NoError(t, err)

kickChan := make(chan *nats.Msg)
subs, err := rpcClient.conn.ChanSubscribe(GetUserKickTopic(uid, sv.Type), kickChan)
assert.NoError(t, err)
time.Sleep(50 * time.Millisecond)

kick := &protos.KickMsg{
UserId: uid,
}

err = rpcClient.SendKick(uid, sv.Type, kick)
assert.NoError(t, err)

m := helpers.ShouldEventuallyReceive(t, kickChan).(*nats.Msg)

actual := &protos.KickMsg{}
err = proto.Unmarshal(m.Data, actual)
assert.NoError(t, err)

assert.Equal(t, kick.UserId, actual.UserId)
err = subs.Unsubscribe()
assert.NoError(t, err)
}

func TestNatsRPCClientSendPush(t *testing.T) {
uid := "testuid123"
s := helpers.GetTestNatsServer(t)
Expand Down
37 changes: 37 additions & 0 deletions cluster/nats_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type NatsRPCServer struct {
bindingsChan chan *nats.Msg // bindingsChan receives notify from other servers on every user bind to session
unhandledReqCh chan *protos.Request
userPushCh chan *protos.Push
userKickCh chan *nats.Msg
sub *nats.Subscription
dropped int
pitayaServer protos.PitayaServer
Expand Down Expand Up @@ -101,6 +102,7 @@ func (ns *NatsRPCServer) configure() error {
// the reason this channel is buffered is that we can achieve more performance by not
// blocking producers on a massive push
ns.userPushCh = make(chan *protos.Push, ns.pushBufferSize)
ns.userKickCh = make(chan *nats.Msg, ns.messagesBufferSize)
return nil
}

Expand All @@ -114,6 +116,11 @@ func GetUserMessagesTopic(uid string, svType string) string {
return fmt.Sprintf("pitaya/%s/user/%s/push", svType, uid)
}

// GetUserKickTopic get the topic for kicking an user
func GetUserKickTopic(uid string, svType string) string {
return fmt.Sprintf("pitaya/%s/user/%s/kick", svType, uid)
}

// GetBindBroadcastTopic gets the topic on which bind events will be broadcasted
func GetBindBroadcastTopic(svType string) string {
return fmt.Sprintf("pitaya/%s/bindings", svType)
Expand All @@ -126,6 +133,10 @@ func (ns *NatsRPCServer) onSessionBind(ctx context.Context, s *session.Session)
if err != nil {
return err
}
err = ns.subscribeToUserKickChannel(s.UID(), ns.server.Type)
if err != nil {
return err
}
s.Subscription = subs
}
return nil
Expand All @@ -141,6 +152,11 @@ func (ns *NatsRPCServer) subscribeToBindingsChannel() error {
return err
}

func (ns *NatsRPCServer) subscribeToUserKickChannel(uid string, svType string) error {
_, err := ns.conn.ChanSubscribe(GetUserKickTopic(uid, svType), ns.userKickCh)
return err
}

func (ns *NatsRPCServer) subscribeToUserMessages(uid string, svType string) (*nats.Subscription, error) {
subs, err := ns.conn.Subscribe(GetUserMessagesTopic(uid, svType), func(msg *nats.Msg) {
push := &protos.Push{}
Expand Down Expand Up @@ -203,6 +219,10 @@ func (ns *NatsRPCServer) getUserPushChannel() chan *protos.Push {
return ns.userPushCh
}

func (ns *NatsRPCServer) getUserKickChannel() chan *nats.Msg {
return ns.userKickCh
}

func (ns *NatsRPCServer) marshalResponse(res *protos.Response) ([]byte, error) {
p, err := proto.Marshal(res)
if err != nil {
Expand Down Expand Up @@ -267,6 +287,22 @@ func (ns *NatsRPCServer) processPushes() {
}
}

func (ns *NatsRPCServer) processKick() {
for kick := range ns.getUserKickChannel() {
k := &protos.KickMsg{}
err := proto.Unmarshal(kick.Data, k)
if err != nil {
logger.Log.Error("error processing kick msg: %v", err)
continue
}
logger.Log.Debugf("sending kick to user %s", k.GetUserId())
_, err = ns.pitayaServer.KickUser(context.Background(), k)
if err != nil {
logger.Log.Errorf("error sending kick to user: %v", err)
}
}
}

// Init inits nats rpc server
func (ns *NatsRPCServer) Init() error {
// TODO should we have concurrency here? it feels like we should
Expand Down Expand Up @@ -294,6 +330,7 @@ func (ns *NatsRPCServer) Init() error {
// this should be so fast that we shoudn't need concurrency
go ns.processPushes()
go ns.processSessionBindings()
go ns.processKick()

return nil
}
Expand Down
69 changes: 69 additions & 0 deletions cluster/nats_rpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ func TestNatsRPCServerGetUserMessagesTopic(t *testing.T) {
assert.Equal(t, "pitaya/connector/user/1/push", GetUserMessagesTopic("1", "connector"))
}

func TestNatsRPCServerGetUserKickTopic(t *testing.T) {
t.Parallel()
assert.Equal(t, "pitaya/connector/user/0/kick", GetUserKickTopic("0", "connector"))
assert.Equal(t, "pitaya/game/user/1/kick", GetUserKickTopic("1", "game"))
assert.Equal(t, "pitaya/connector/user/10/kick", GetUserKickTopic("10", "connector"))
assert.Equal(t, "pitaya/game/user/11/kick", GetUserKickTopic("11", "game"))
}

func TestNatsRPCServerGetUnhandledRequestsChannel(t *testing.T) {
t.Parallel()
cfg := getConfig()
Expand Down Expand Up @@ -123,6 +131,7 @@ func TestNatsRPCServerOnSessionBind(t *testing.T) {
err = rpcServer.onSessionBind(context.Background(), sess)
assert.NoError(t, err)
assert.NotNil(t, sess.Subscription)
assert.NotNil(t, rpcServer.userKickCh)
}

func TestNatsRPCServerSubscribeToBindingsChannel(t *testing.T) {
Expand All @@ -143,6 +152,25 @@ func TestNatsRPCServerSubscribeToBindingsChannel(t *testing.T) {
assert.Equal(t, msg.Data, dt)
}

func TestNatsRPCServerSubscribeUserKickChannel(t *testing.T) {
t.Parallel()
cfg := getConfig()
sv := getServer()
rpcServer, _ := NewNatsRPCServer(cfg, sv, nil, nil)
s := helpers.GetTestNatsServer(t)
defer s.Shutdown()
conn, err := setupNatsConn(fmt.Sprintf("nats://%s", s.Addr()), nil)
assert.NoError(t, err)
rpcServer.conn = conn
err = rpcServer.subscribeToUserKickChannel("someuid", sv.Type)
assert.NoError(t, err)
dt := []byte("somedata")
err = conn.Publish(GetUserKickTopic("someuid", sv.Type), dt)
assert.NoError(t, err)
msg := helpers.ShouldEventuallyReceive(t, rpcServer.getUserKickChannel()).(*nats.Msg)
assert.Equal(t, msg.Data, dt)
}

func TestNatsRPCServerGetUserPushChannel(t *testing.T) {
t.Parallel()
cfg := getConfig()
Expand All @@ -152,6 +180,14 @@ func TestNatsRPCServerGetUserPushChannel(t *testing.T) {
assert.IsType(t, make(chan *protos.Push), n.getUserPushChannel())
}

func TestNatsRPCServerGetUserKickChannel(t *testing.T) {
t.Parallel()
cfg := getConfig()
sv := getServer()
n, _ := NewNatsRPCServer(cfg, sv, nil, nil)
assert.NotNil(t, n.getUserKickChannel())
}

func TestNatsRPCServerSubscribeToUserMessages(t *testing.T) {
cfg := getConfig()
sv := getServer()
Expand Down Expand Up @@ -374,6 +410,39 @@ func TestNatsRPCServerProcessPushes(t *testing.T) {
time.Sleep(30 * time.Millisecond)
}

func TestNatsRPCServerProcessKick(t *testing.T) {
s := helpers.GetTestNatsServer(t)
defer s.Shutdown()
cfg := viper.New()
cfg.Set("pitaya.cluster.rpc.server.nats.connect", fmt.Sprintf("nats://%s", s.Addr()))
config := getConfig(cfg)
sv := getServer()
rpcServer, _ := NewNatsRPCServer(config, sv, nil, nil)
err := rpcServer.Init()

assert.NoError(t, err)

ctrl := gomock.NewController(t)
pitayaSvMock := protosmocks.NewMockPitayaServer(ctrl)
defer ctrl.Finish()

rpcServer.SetPitayaServer(pitayaSvMock)

kick := &protos.KickMsg{
UserId: "someuid",
}

msg, err := proto.Marshal(kick)
assert.NoError(t, err)

pitayaSvMock.EXPECT().KickUser(gomock.Any(), kick).Do(func(ctx context.Context, p *protos.KickMsg) {
assert.Equal(t, kick.UserId, p.UserId)
})

rpcServer.userKickCh <- &nats.Msg{Data: msg}
time.Sleep(30 * time.Millisecond)
}

func TestNatsRPCServerReportMetrics(t *testing.T) {
cfg := getConfig()
sv := getServer()
Expand Down
Loading

0 comments on commit 43429ad

Please sign in to comment.