Skip to content

Commit

Permalink
WIP: kick refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielerzinger committed Aug 16, 2018
1 parent 4114def commit 57b91c9
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 20 deletions.
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type RPCServer interface {
type RPCClient interface {
Send(route string, data []byte) error
SendPush(userID string, frontendSv *Server, push *protos.Push) error
SendKick(userID string, serverType string, kick *protos.KickMsg) error
BroadcastSessionBind(uid string) error
Call(ctx context.Context, rpcType protos.RPCType, route *route.Route, session *session.Session, msg *message.Message, server *Server) (*protos.Response, error)
interfaces.Module
Expand Down
22 changes: 22 additions & 0 deletions cluster/grpc_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,28 @@ func (gs *GRPCClient) BroadcastSessionBind(uid string) error {
return nil
}

// SendKick sends a kick to an user
func (gs *GRPCClient) SendKick(userID string, serverType string, kick *protos.KickMsg) error {
var svID string
var err error

if gs.bindingStorage == nil {
return constants.ErrNoBindingStorageModule
}
svID, err = gs.bindingStorage.GetUserFrontendID(userID, serverType)
if err != nil {
return err
}

if c, ok := gs.clientMap.Load(svID); ok {
ctxT, done := context.WithTimeout(context.Background(), gs.reqTimeout)
defer done()
_, err := c.(protos.PitayaClient).KickUser(ctxT, kick)
return err
}
return constants.ErrNoConnectionToServer
}

// SendPush sends a message to an user, if you dont know the serverID that the user is connected to, you need to set a BindingStorage when creating the client
// TODO: Jaeger?
func (gs *GRPCClient) SendPush(userID string, frontendSv *Server, push *protos.Push) error {
Expand Down
14 changes: 13 additions & 1 deletion cluster/mocks/cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions cluster/nats_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ func (ns *NatsRPCClient) SendPush(userID string, frontendSv *Server, push *proto
return ns.Send(topic, msg)
}

// SendKick kicks an user
func (ns *NatsRPCClient) SendKick(userID string, serverType string, kick *protos.KickMsg) error {
topic := GetUserKickTopic(userID, serverType)
fmt.Println(topic)
msg, err := proto.Marshal(kick)
if err != nil {
return err
}
return ns.Send(topic, msg)
}

// Call calls a method remotelly
func (ns *NatsRPCClient) Call(
ctx context.Context,
Expand Down
33 changes: 33 additions & 0 deletions cluster/nats_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type NatsRPCServer struct {
bindingsChan chan *nats.Msg // bindingsChan receives notify from other servers on every user bind to session
unhandledReqCh chan *protos.Request
userPushCh chan *protos.Push
userKickCh chan *nats.Msg
sub *nats.Subscription
dropped int
pitayaServer protos.PitayaServer
Expand Down Expand Up @@ -101,6 +102,7 @@ func (ns *NatsRPCServer) configure() error {
// the reason this channel is buffered is that we can achieve more performance by not
// blocking producers on a massive push
ns.userPushCh = make(chan *protos.Push, ns.pushBufferSize)
ns.userKickCh = make(chan *nats.Msg, ns.messagesBufferSize)
return nil
}

Expand All @@ -114,6 +116,11 @@ func GetUserMessagesTopic(uid string, svType string) string {
return fmt.Sprintf("pitaya/%s/user/%s/push", svType, uid)
}

// GetUserKickTopic get the topic for kicking an user
func GetUserKickTopic(uid string, svType string) string {
return fmt.Sprintf("pitaya/%s/user/%s/kick", svType, uid)
}

// GetBindBroadcastTopic gets the topic on which bind events will be broadcasted
func GetBindBroadcastTopic(svType string) string {
return fmt.Sprintf("pitaya/%s/bindings", svType)
Expand All @@ -126,6 +133,10 @@ func (ns *NatsRPCServer) onSessionBind(ctx context.Context, s *session.Session)
if err != nil {
return err
}
err = ns.subscribeToUserKickChannel(s.UID(), ns.server.Type)
if err != nil {
return err
}
s.Subscription = subs
}
return nil
Expand All @@ -141,6 +152,11 @@ func (ns *NatsRPCServer) subscribeToBindingsChannel() error {
return err
}

func (ns *NatsRPCServer) subscribeToUserKickChannel(uid string, svType string) error {
_, err := ns.conn.ChanSubscribe(GetUserKickTopic(uid, svType), ns.userKickCh)
return err
}

func (ns *NatsRPCServer) subscribeToUserMessages(uid string, svType string) (*nats.Subscription, error) {
subs, err := ns.conn.Subscribe(GetUserMessagesTopic(uid, svType), func(msg *nats.Msg) {
push := &protos.Push{}
Expand Down Expand Up @@ -203,6 +219,10 @@ func (ns *NatsRPCServer) getUserPushChannel() chan *protos.Push {
return ns.userPushCh
}

func (ns *NatsRPCServer) getUserKickChannel() chan *nats.Msg {
return ns.userKickCh
}

func (ns *NatsRPCServer) marshalResponse(res *protos.Response) ([]byte, error) {
p, err := proto.Marshal(res)
if err != nil {
Expand Down Expand Up @@ -267,6 +287,18 @@ func (ns *NatsRPCServer) processPushes() {
}
}

func (ns *NatsRPCServer) processKick() {
for kick := range ns.getUserKickChannel() {
k := &protos.KickMsg{}
err := proto.Unmarshal(kick.Data, k)
logger.Log.Debugf("sending kick to user %s", k.GetUserId())
_, err = ns.pitayaServer.KickUser(context.Background(), k)
if err != nil {
logger.Log.Errorf("error sending kick to user: %v", err)
}
}
}

// Init inits nats rpc server
func (ns *NatsRPCServer) Init() error {
// TODO should we have concurrency here? it feels like we should
Expand Down Expand Up @@ -294,6 +326,7 @@ func (ns *NatsRPCServer) Init() error {
// this should be so fast that we shoudn't need concurrency
go ns.processPushes()
go ns.processSessionBindings()
go ns.processKick()

return nil
}
Expand Down
24 changes: 24 additions & 0 deletions kick.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package pitaya

import (
"context"

"github.com/topfreegames/pitaya/logger"
"github.com/topfreegames/pitaya/protos"
"github.com/topfreegames/pitaya/session"
)

// SendKickToUser sends kick to an user
func SendKickToUser(uid, frontendType string) error {
if s := session.GetSessionByUID(uid); s != nil {
if err := s.Kick(context.Background()); err != nil {
logger.Log.Errorf("Session kick error, ID=%d, UID=%d, ERROR=%s", s.ID(), s.UID(), err.Error())
}
} else if app.rpcClient != nil {
kick := &protos.KickMsg{UserId: uid}
if err := app.rpcClient.SendKick(uid, frontendType, kick); err != nil {
logger.Log.Errorf("RPCClient send kick error, UID=%d, SvType=%s, Error=%s", uid, frontendType, err.Error())
}
}
return nil
}
33 changes: 32 additions & 1 deletion protos/mocks/pitaya.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 55 additions & 18 deletions protos/pitaya.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions service/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ func (r *RemoteService) PushToUser(ctx context.Context, push *protos.Push) (*pro
return nil, constants.ErrSessionNotFound
}

// KickUser sends a kick to user
func (r *RemoteService) KickUser(ctx context.Context, kick *protos.KickMsg) (*protos.KickAnswer, error) {
logger.Log.Debugf("sending kick to user %s", kick.GetUserId())
s := session.GetSessionByUID(kick.GetUserId())
if s != nil {
err := s.Kick(ctx)
if err != nil {
return nil, err
}
return &protos.KickAnswer{
Kicked: true,
}, nil
}
return nil, constants.ErrSessionNotFound
}

// DoRPC do rpc and get answer
func (r *RemoteService) DoRPC(ctx context.Context, serverID string, route *route.Route, protoData []byte) (*protos.Response, error) {
msg := &message.Message{
Expand Down

0 comments on commit 57b91c9

Please sign in to comment.