From 30f38afda120a547c6355a824463d5710813a124 Mon Sep 17 00:00:00 2001 From: Gabriel Erzinger Date: Thu, 16 Aug 2018 17:41:21 -0300 Subject: [PATCH] WIP: kick refactor --- cluster/cluster.go | 1 + cluster/grpc_rpc_client.go | 22 ++++++++++++ cluster/mocks/cluster.go | 14 +++++++- cluster/nats_rpc_client.go | 10 ++++++ cluster/nats_rpc_server.go | 37 +++++++++++++++++++ kick.go | 24 +++++++++++++ protos/mocks/pitaya.go | 33 ++++++++++++++++- protos/pitaya.pb.go | 73 ++++++++++++++++++++++++++++---------- service/remote.go | 16 +++++++++ 9 files changed, 210 insertions(+), 20 deletions(-) create mode 100644 kick.go diff --git a/cluster/cluster.go b/cluster/cluster.go index f0830840..117fa97f 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -44,6 +44,7 @@ type RPCServer interface { type RPCClient interface { Send(route string, data []byte) error SendPush(userID string, frontendSv *Server, push *protos.Push) error + SendKick(userID string, serverType string, kick *protos.KickMsg) error BroadcastSessionBind(uid string) error Call(ctx context.Context, rpcType protos.RPCType, route *route.Route, session *session.Session, msg *message.Message, server *Server) (*protos.Response, error) interfaces.Module diff --git a/cluster/grpc_rpc_client.go b/cluster/grpc_rpc_client.go index 001f313e..00e0904c 100644 --- a/cluster/grpc_rpc_client.go +++ b/cluster/grpc_rpc_client.go @@ -149,6 +149,28 @@ func (gs *GRPCClient) BroadcastSessionBind(uid string) error { return nil } +// SendKick sends a kick to an user +func (gs *GRPCClient) SendKick(userID string, serverType string, kick *protos.KickMsg) error { + var svID string + var err error + + if gs.bindingStorage == nil { + return constants.ErrNoBindingStorageModule + } + svID, err = gs.bindingStorage.GetUserFrontendID(userID, serverType) + if err != nil { + return err + } + + if c, ok := gs.clientMap.Load(svID); ok { + ctxT, done := context.WithTimeout(context.Background(), gs.reqTimeout) + defer done() + _, err := c.(protos.PitayaClient).KickUser(ctxT, kick) + return err + } + return constants.ErrNoConnectionToServer +} + // SendPush sends a message to an user, if you dont know the serverID that the user is connected to, you need to set a BindingStorage when creating the client // TODO: Jaeger? func (gs *GRPCClient) SendPush(userID string, frontendSv *Server, push *protos.Push) error { diff --git a/cluster/mocks/cluster.go b/cluster/mocks/cluster.go index 7c2fd7b7..18791dd8 100644 --- a/cluster/mocks/cluster.go +++ b/cluster/mocks/cluster.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: ./cluster.go +// Source: cluster.go // Package mocks is a generated GoMock package. package mocks @@ -139,6 +139,18 @@ func (mr *MockRPCClientMockRecorder) SendPush(userID, frontendSv, push interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendPush", reflect.TypeOf((*MockRPCClient)(nil).SendPush), userID, frontendSv, push) } +// SendKick mocks base method +func (m *MockRPCClient) SendKick(userID, serverType string, kick *protos.KickMsg) error { + ret := m.ctrl.Call(m, "SendKick", userID, serverType, kick) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendKick indicates an expected call of SendKick +func (mr *MockRPCClientMockRecorder) SendKick(userID, serverType, kick interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendKick", reflect.TypeOf((*MockRPCClient)(nil).SendKick), userID, serverType, kick) +} + // BroadcastSessionBind mocks base method func (m *MockRPCClient) BroadcastSessionBind(uid string) error { ret := m.ctrl.Call(m, "BroadcastSessionBind", uid) diff --git a/cluster/nats_rpc_client.go b/cluster/nats_rpc_client.go index d8fef758..a656dd36 100644 --- a/cluster/nats_rpc_client.go +++ b/cluster/nats_rpc_client.go @@ -117,6 +117,16 @@ func (ns *NatsRPCClient) SendPush(userID string, frontendSv *Server, push *proto return ns.Send(topic, msg) } +// SendKick kicks an user +func (ns *NatsRPCClient) SendKick(userID string, serverType string, kick *protos.KickMsg) error { + topic := GetUserKickTopic(userID, serverType) + msg, err := proto.Marshal(kick) + if err != nil { + return err + } + return ns.Send(topic, msg) +} + // Call calls a method remotelly func (ns *NatsRPCClient) Call( ctx context.Context, diff --git a/cluster/nats_rpc_server.go b/cluster/nats_rpc_server.go index 1e94d0c0..c412de43 100644 --- a/cluster/nats_rpc_server.go +++ b/cluster/nats_rpc_server.go @@ -52,6 +52,7 @@ type NatsRPCServer struct { bindingsChan chan *nats.Msg // bindingsChan receives notify from other servers on every user bind to session unhandledReqCh chan *protos.Request userPushCh chan *protos.Push + userKickCh chan *nats.Msg sub *nats.Subscription dropped int pitayaServer protos.PitayaServer @@ -101,6 +102,7 @@ func (ns *NatsRPCServer) configure() error { // the reason this channel is buffered is that we can achieve more performance by not // blocking producers on a massive push ns.userPushCh = make(chan *protos.Push, ns.pushBufferSize) + ns.userKickCh = make(chan *nats.Msg, ns.messagesBufferSize) return nil } @@ -114,6 +116,11 @@ func GetUserMessagesTopic(uid string, svType string) string { return fmt.Sprintf("pitaya/%s/user/%s/push", svType, uid) } +// GetUserKickTopic get the topic for kicking an user +func GetUserKickTopic(uid string, svType string) string { + return fmt.Sprintf("pitaya/%s/user/%s/kick", svType, uid) +} + // GetBindBroadcastTopic gets the topic on which bind events will be broadcasted func GetBindBroadcastTopic(svType string) string { return fmt.Sprintf("pitaya/%s/bindings", svType) @@ -126,6 +133,10 @@ func (ns *NatsRPCServer) onSessionBind(ctx context.Context, s *session.Session) if err != nil { return err } + err = ns.subscribeToUserKickChannel(s.UID(), ns.server.Type) + if err != nil { + return err + } s.Subscription = subs } return nil @@ -141,6 +152,11 @@ func (ns *NatsRPCServer) subscribeToBindingsChannel() error { return err } +func (ns *NatsRPCServer) subscribeToUserKickChannel(uid string, svType string) error { + _, err := ns.conn.ChanSubscribe(GetUserKickTopic(uid, svType), ns.userKickCh) + return err +} + func (ns *NatsRPCServer) subscribeToUserMessages(uid string, svType string) (*nats.Subscription, error) { subs, err := ns.conn.Subscribe(GetUserMessagesTopic(uid, svType), func(msg *nats.Msg) { push := &protos.Push{} @@ -203,6 +219,10 @@ func (ns *NatsRPCServer) getUserPushChannel() chan *protos.Push { return ns.userPushCh } +func (ns *NatsRPCServer) getUserKickChannel() chan *nats.Msg { + return ns.userKickCh +} + func (ns *NatsRPCServer) marshalResponse(res *protos.Response) ([]byte, error) { p, err := proto.Marshal(res) if err != nil { @@ -267,6 +287,22 @@ func (ns *NatsRPCServer) processPushes() { } } +func (ns *NatsRPCServer) processKick() { + for kick := range ns.getUserKickChannel() { + k := &protos.KickMsg{} + err := proto.Unmarshal(kick.Data, k) + if err != nil { + logger.Log.Error("error processing kick msg: %v", err) + continue + } + logger.Log.Debugf("sending kick to user %s", k.GetUserId()) + _, err = ns.pitayaServer.KickUser(context.Background(), k) + if err != nil { + logger.Log.Errorf("error sending kick to user: %v", err) + } + } +} + // Init inits nats rpc server func (ns *NatsRPCServer) Init() error { // TODO should we have concurrency here? it feels like we should @@ -294,6 +330,7 @@ func (ns *NatsRPCServer) Init() error { // this should be so fast that we shoudn't need concurrency go ns.processPushes() go ns.processSessionBindings() + go ns.processKick() return nil } diff --git a/kick.go b/kick.go new file mode 100644 index 00000000..fde53536 --- /dev/null +++ b/kick.go @@ -0,0 +1,24 @@ +package pitaya + +import ( + "context" + + "github.com/topfreegames/pitaya/logger" + "github.com/topfreegames/pitaya/protos" + "github.com/topfreegames/pitaya/session" +) + +// SendKickToUser sends kick to an user +func SendKickToUser(uid, frontendType string) error { + if s := session.GetSessionByUID(uid); s != nil { + if err := s.Kick(context.Background()); err != nil { + logger.Log.Errorf("Session kick error, ID=%d, UID=%d, ERROR=%s", s.ID(), s.UID(), err.Error()) + } + } else if app.rpcClient != nil { + kick := &protos.KickMsg{UserId: uid} + if err := app.rpcClient.SendKick(uid, frontendType, kick); err != nil { + logger.Log.Errorf("RPCClient send kick error, UID=%d, SvType=%s, Error=%s", uid, frontendType, err.Error()) + } + } + return nil +} diff --git a/protos/mocks/pitaya.go b/protos/mocks/pitaya.go index 5d1196db..0a8d1d75 100644 --- a/protos/mocks/pitaya.go +++ b/protos/mocks/pitaya.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: ./pitaya.pb.go +// Source: pitaya.pb.go // Package mocks is a generated GoMock package. package mocks @@ -71,6 +71,24 @@ func (mr *MockPitayaClientMockRecorder) PushToUser(ctx, in interface{}, opts ... return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushToUser", reflect.TypeOf((*MockPitayaClient)(nil).PushToUser), varargs...) } +// KickUser mocks base method +func (m *MockPitayaClient) KickUser(ctx context.Context, in *protos.KickMsg, opts ...grpc.CallOption) (*protos.KickAnswer, error) { + varargs := []interface{}{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "KickUser", varargs...) + ret0, _ := ret[0].(*protos.KickAnswer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// KickUser indicates an expected call of KickUser +func (mr *MockPitayaClientMockRecorder) KickUser(ctx, in interface{}, opts ...interface{}) *gomock.Call { + varargs := append([]interface{}{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KickUser", reflect.TypeOf((*MockPitayaClient)(nil).KickUser), varargs...) +} + // SessionBindRemote mocks base method func (m *MockPitayaClient) SessionBindRemote(ctx context.Context, in *protos.BindMsg, opts ...grpc.CallOption) (*protos.Response, error) { varargs := []interface{}{ctx, in} @@ -138,6 +156,19 @@ func (mr *MockPitayaServerMockRecorder) PushToUser(arg0, arg1 interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushToUser", reflect.TypeOf((*MockPitayaServer)(nil).PushToUser), arg0, arg1) } +// KickUser mocks base method +func (m *MockPitayaServer) KickUser(arg0 context.Context, arg1 *protos.KickMsg) (*protos.KickAnswer, error) { + ret := m.ctrl.Call(m, "KickUser", arg0, arg1) + ret0, _ := ret[0].(*protos.KickAnswer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// KickUser indicates an expected call of KickUser +func (mr *MockPitayaServerMockRecorder) KickUser(arg0, arg1 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KickUser", reflect.TypeOf((*MockPitayaServer)(nil).KickUser), arg0, arg1) +} + // SessionBindRemote mocks base method func (m *MockPitayaServer) SessionBindRemote(arg0 context.Context, arg1 *protos.BindMsg) (*protos.Response, error) { ret := m.ctrl.Call(m, "SessionBindRemote", arg0, arg1) diff --git a/protos/pitaya.pb.go b/protos/pitaya.pb.go index 82aa6f9e..1812175e 100644 --- a/protos/pitaya.pb.go +++ b/protos/pitaya.pb.go @@ -7,8 +7,10 @@ import proto "github.com/gogo/protobuf/proto" import fmt "fmt" import math "math" -import context "golang.org/x/net/context" -import grpc "google.golang.org/grpc" +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -29,11 +31,13 @@ var _ grpc.ClientConn // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion4 -// Client API for Pitaya service - +// PitayaClient is the client API for Pitaya service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type PitayaClient interface { Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) PushToUser(ctx context.Context, in *Push, opts ...grpc.CallOption) (*Response, error) + KickUser(ctx context.Context, in *KickMsg, opts ...grpc.CallOption) (*KickAnswer, error) SessionBindRemote(ctx context.Context, in *BindMsg, opts ...grpc.CallOption) (*Response, error) } @@ -63,6 +67,15 @@ func (c *pitayaClient) PushToUser(ctx context.Context, in *Push, opts ...grpc.Ca return out, nil } +func (c *pitayaClient) KickUser(ctx context.Context, in *KickMsg, opts ...grpc.CallOption) (*KickAnswer, error) { + out := new(KickAnswer) + err := c.cc.Invoke(ctx, "/protos.Pitaya/KickUser", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *pitayaClient) SessionBindRemote(ctx context.Context, in *BindMsg, opts ...grpc.CallOption) (*Response, error) { out := new(Response) err := c.cc.Invoke(ctx, "/protos.Pitaya/SessionBindRemote", in, out, opts...) @@ -72,11 +85,11 @@ func (c *pitayaClient) SessionBindRemote(ctx context.Context, in *BindMsg, opts return out, nil } -// Server API for Pitaya service - +// PitayaServer is the server API for Pitaya service. type PitayaServer interface { Call(context.Context, *Request) (*Response, error) PushToUser(context.Context, *Push) (*Response, error) + KickUser(context.Context, *KickMsg) (*KickAnswer, error) SessionBindRemote(context.Context, *BindMsg) (*Response, error) } @@ -120,6 +133,24 @@ func _Pitaya_PushToUser_Handler(srv interface{}, ctx context.Context, dec func(i return interceptor(ctx, in, info, handler) } +func _Pitaya_KickUser_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(KickMsg) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PitayaServer).KickUser(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protos.Pitaya/KickUser", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PitayaServer).KickUser(ctx, req.(*KickMsg)) + } + return interceptor(ctx, in, info, handler) +} + func _Pitaya_SessionBindRemote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(BindMsg) if err := dec(in); err != nil { @@ -150,6 +181,10 @@ var _Pitaya_serviceDesc = grpc.ServiceDesc{ MethodName: "PushToUser", Handler: _Pitaya_PushToUser_Handler, }, + { + MethodName: "KickUser", + Handler: _Pitaya_KickUser_Handler, + }, { MethodName: "SessionBindRemote", Handler: _Pitaya_SessionBindRemote_Handler, @@ -159,20 +194,22 @@ var _Pitaya_serviceDesc = grpc.ServiceDesc{ Metadata: "pitaya.proto", } -func init() { proto.RegisterFile("pitaya.proto", fileDescriptor_pitaya_0f049d2a113c2f60) } +func init() { proto.RegisterFile("pitaya.proto", fileDescriptor_pitaya_7d6e89e64bd0f510) } -var fileDescriptor_pitaya_0f049d2a113c2f60 = []byte{ - // 186 bytes of a gzipped FileDescriptorProto +var fileDescriptor_pitaya_7d6e89e64bd0f510 = []byte{ + // 220 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0xc8, 0x2c, 0x49, 0xac, 0x4c, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x03, 0x53, 0xc5, 0x52, 0xbc, 0x45, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x10, 0x61, 0x29, 0xbe, 0xa2, 0xd4, 0xe2, 0x82, 0xfc, 0xbc, - 0xe2, 0x54, 0x28, 0x9f, 0xab, 0xa0, 0xb4, 0x38, 0x03, 0xc6, 0x4e, 0xca, 0xcc, 0x4b, 0x81, 0xb0, - 0x8d, 0xe6, 0x33, 0x72, 0xb1, 0x05, 0x80, 0xcd, 0x13, 0xd2, 0xe6, 0x62, 0x71, 0x4e, 0xcc, 0xc9, - 0x11, 0xe2, 0x87, 0x48, 0x15, 0xeb, 0x05, 0x41, 0x4c, 0x94, 0x12, 0x40, 0x08, 0x40, 0xcc, 0x54, - 0x62, 0x10, 0xd2, 0xe3, 0xe2, 0x0a, 0x28, 0x2d, 0xce, 0x08, 0xc9, 0x0f, 0x2d, 0x4e, 0x2d, 0x12, - 0xe2, 0x81, 0xa9, 0x00, 0x89, 0x61, 0x55, 0x6f, 0xc1, 0x25, 0x18, 0x9c, 0x5a, 0x5c, 0x9c, 0x99, - 0x9f, 0xe7, 0x94, 0x99, 0x97, 0x12, 0x94, 0x9a, 0x9b, 0x5f, 0x92, 0x8a, 0xb0, 0x09, 0x24, 0xe6, - 0x5b, 0x9c, 0x8e, 0x4d, 0xa7, 0x93, 0xc0, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, - 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0x43, 0x12, 0xc4, 0xcb, 0xc6, 0x80, 0x00, 0x00, 0x00, - 0xff, 0xff, 0x99, 0xe0, 0x57, 0x3f, 0x09, 0x01, 0x00, 0x00, + 0xe2, 0x54, 0x28, 0x9f, 0xab, 0xa0, 0xb4, 0x38, 0x03, 0xc6, 0x4e, 0xca, 0xcc, 0x4b, 0x81, 0xb1, + 0xb3, 0x33, 0x93, 0xb3, 0x21, 0x6c, 0xa3, 0x4b, 0x8c, 0x5c, 0x6c, 0x01, 0x60, 0xb3, 0x85, 0xb4, + 0xb9, 0x58, 0x9c, 0x13, 0x73, 0x72, 0x84, 0xf8, 0x21, 0x52, 0xc5, 0x7a, 0x41, 0x10, 0xd3, 0xa5, + 0x04, 0x10, 0x02, 0x10, 0xf3, 0x95, 0x18, 0x84, 0xf4, 0xb8, 0xb8, 0x02, 0x4a, 0x8b, 0x33, 0x42, + 0xf2, 0x43, 0x8b, 0x53, 0x8b, 0x84, 0x78, 0x60, 0x2a, 0x40, 0x62, 0x58, 0xd5, 0x1b, 0x72, 0x71, + 0x78, 0x67, 0x26, 0x67, 0x83, 0x55, 0xc3, 0x2d, 0x00, 0x89, 0xf8, 0x16, 0xa7, 0x4b, 0x09, 0x21, + 0x0b, 0x38, 0xe6, 0x15, 0x97, 0xa7, 0x16, 0x29, 0x31, 0x08, 0x59, 0x70, 0x09, 0x06, 0xa7, 0x16, + 0x17, 0x67, 0xe6, 0xe7, 0x39, 0x65, 0xe6, 0xa5, 0x04, 0xa5, 0xe6, 0xe6, 0x97, 0xa4, 0x22, 0xf4, + 0x82, 0xc4, 0x40, 0x7a, 0xb1, 0x58, 0xe6, 0x24, 0x70, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, + 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x90, 0x04, 0x09, 0x31, 0x63, 0x40, 0x00, + 0x00, 0x00, 0xff, 0xff, 0x36, 0x9c, 0xd4, 0xc0, 0x48, 0x01, 0x00, 0x00, } diff --git a/service/remote.go b/service/remote.go index b48979f1..dd7b3e3c 100644 --- a/service/remote.go +++ b/service/remote.go @@ -172,6 +172,22 @@ func (r *RemoteService) PushToUser(ctx context.Context, push *protos.Push) (*pro return nil, constants.ErrSessionNotFound } +// KickUser sends a kick to user +func (r *RemoteService) KickUser(ctx context.Context, kick *protos.KickMsg) (*protos.KickAnswer, error) { + logger.Log.Debugf("sending kick to user %s", kick.GetUserId()) + s := session.GetSessionByUID(kick.GetUserId()) + if s != nil { + err := s.Kick(ctx) + if err != nil { + return nil, err + } + return &protos.KickAnswer{ + Kicked: true, + }, nil + } + return nil, constants.ErrSessionNotFound +} + // DoRPC do rpc and get answer func (r *RemoteService) DoRPC(ctx context.Context, serverID string, route *route.Route, protoData []byte) (*protos.Response, error) { msg := &message.Message{