diff --git a/cluster/grpc_rpc_client.go b/cluster/grpc_rpc_client.go index 6bae5287..1f54013d 100644 --- a/cluster/grpc_rpc_client.go +++ b/cluster/grpc_rpc_client.go @@ -44,14 +44,14 @@ import ( // GRPCClient rpc server struct type GRPCClient struct { - server *Server - config *config.Config - metricsReporters []metrics.Reporter - clientMap sync.Map bindingStorage interfaces.BindingStorage + clientMap sync.Map + dialTimeout time.Duration infoRetriever InfoRetriever + lazy bool + metricsReporters []metrics.Reporter reqTimeout time.Duration - dialTimeout time.Duration + server *Server } // NewGRPCClient returns a new instance of GRPCClient @@ -63,21 +63,22 @@ func NewGRPCClient( infoRetriever InfoRetriever, ) (*GRPCClient, error) { gs := &GRPCClient{ - config: config, - server: server, - metricsReporters: metricsReporters, bindingStorage: bindingStorage, infoRetriever: infoRetriever, + metricsReporters: metricsReporters, + server: server, } - gs.configure() - + gs.configure(config) return gs, nil } type grpcClient struct { - conn *grpc.ClientConn - cli protos.PitayaClient + address string + cli protos.PitayaClient + conn *grpc.ClientConn + connected bool + lock sync.Mutex } // Init inits grpc rpc client @@ -85,9 +86,10 @@ func (gs *GRPCClient) Init() error { return nil } -func (gs *GRPCClient) configure() { - gs.reqTimeout = gs.config.GetDuration("pitaya.cluster.rpc.client.grpc.requesttimeout") - gs.dialTimeout = gs.config.GetDuration("pitaya.cluster.rpc.client.grpc.dialtimeout") +func (gs *GRPCClient) configure(cfg *config.Config) { + gs.dialTimeout = cfg.GetDuration("pitaya.cluster.rpc.client.grpc.dialtimeout") + gs.lazy = cfg.GetBool("pitaya.cluster.rpc.client.grpc.lazyconnection") + gs.reqTimeout = cfg.GetDuration("pitaya.cluster.rpc.client.grpc.requesttimeout") } // Call makes a RPC Call @@ -106,7 +108,7 @@ func (gs *GRPCClient) Call( parent, err := tracing.ExtractSpan(ctx) if err != nil { - logger.Log.Warnf("failed to retrieve parent span: %s", err.Error()) + logger.Log.Warnf("[grpc client] failed to retrieve parent span: %s", err.Error()) } tags := opentracing.Tags{ "span.kind": "client", @@ -132,7 +134,7 @@ func (gs *GRPCClient) Call( defer metrics.ReportTimingFromCtx(ctxT, gs.metricsReporters, "rpc", err) } - res, err := c.(*grpcClient).cli.Call(ctxT, &req) + res, err := c.(*grpcClient).call(ctxT, &req) if err != nil { return nil, err } @@ -231,32 +233,30 @@ func (gs *GRPCClient) AddServer(sv *Server) { host, portKey = gs.getServerHost(sv) if host == "" { - logger.Log.Errorf("server %s has no grpcHost specified in metadata", sv.ID) + logger.Log.Errorf("[grpc client] server %s has no grpcHost specified in metadata", sv.ID) return } if port, ok = sv.Metadata[portKey]; !ok { - logger.Log.Errorf("server %s has no %s specified in metadata", sv.ID, portKey) + logger.Log.Errorf("[grpc client] server %s has no %s specified in metadata", sv.ID, portKey) return } address := fmt.Sprintf("%s:%s", host, port) - conn, err := grpc.Dial(address, - grpc.WithInsecure(), - ) - if err != nil { - logger.Log.Errorf("unable to connect to server %s at %s: %v", sv.ID, address, err) - return + client := &grpcClient{address: address} + if !gs.lazy { + if err := client.connect(); err != nil { + logger.Log.Errorf("[grpc client] unable to connect to server %s at %s: %v", sv.ID, address, err) + } } - c := protos.NewPitayaClient(conn) - gs.clientMap.Store(sv.ID, &grpcClient{conn: conn, cli: c}) + gs.clientMap.Store(sv.ID, client) logger.Log.Debugf("[grpc client] added server %s at %s", sv.ID, address) } // RemoveServer is called when a server is removed func (gs *GRPCClient) RemoveServer(sv *Server) { if c, ok := gs.clientMap.Load(sv.ID); ok { - c.(*grpcClient).conn.Close() + c.(*grpcClient).disconnect() gs.clientMap.Delete(sv.ID) logger.Log.Debugf("[grpc client] removed server %s", sv.ID) } @@ -285,20 +285,58 @@ func (gs *GRPCClient) getServerHost(sv *Server) (host, portKey string) { if !hasRegion { if hasExternal { - msg := "server %s has no region specified in metadata, using external host" - logger.Log.Warnf(msg, sv.ID) + logger.Log.Warnf("[grpc client] server %s has no region specified in metadata, using external host", sv.ID) return externalHost, constants.GRPCExternalPortKey } - logger.Log.Warnf("server %s has no region nor external host specified in metadata, using internal host", sv.ID) + logger.Log.Warnf("[grpc client] server %s has no region nor external host specified in metadata, using internal host", sv.ID) return internalHost, constants.GRPCPortKey } if gs.infoRetriever.Region() == serverRegion || !hasExternal { - logger.Log.Infof("server %s is in same region or external host not provided, using internal host", sv.ID) + logger.Log.Infof("[grpc client] server %s is in same region or external host not provided, using internal host", sv.ID) return internalHost, constants.GRPCPortKey } - logger.Log.Infof("server %s is in other region, using external host", sv.ID) + logger.Log.Infof("[grpc client] server %s is in other region, using external host", sv.ID) return externalHost, constants.GRPCExternalPortKey } + +func (gc *grpcClient) connect() error { + gc.lock.Lock() + defer gc.lock.Unlock() + if gc.connected { + return nil + } + + conn, err := grpc.Dial( + gc.address, + grpc.WithInsecure(), + ) + if err != nil { + return err + } + c := protos.NewPitayaClient(conn) + gc.cli = c + gc.conn = conn + gc.connected = true + return nil +} + +func (gc *grpcClient) disconnect() { + gc.lock.Lock() + if gc.connected { + gc.conn.Close() + gc.connected = false + } + gc.lock.Unlock() +} + +func (gc *grpcClient) call(ctx context.Context, req *protos.Request) (*protos.Response, error) { + if !gc.connected { + if err := gc.connect(); err != nil { + return nil, err + } + } + return gc.cli.Call(ctx, req) +} diff --git a/cluster/grpc_rpc_client_test.go b/cluster/grpc_rpc_client_test.go index aaad3171..5f4f2e72 100644 --- a/cluster/grpc_rpc_client_test.go +++ b/cluster/grpc_rpc_client_test.go @@ -41,7 +41,10 @@ func TestCall(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockPitayaClient := protosmocks.NewMockPitayaClient(ctrl) - g.clientMap.Store(g.server.ID, &grpcClient{cli: mockPitayaClient}) + g.clientMap.Store(g.server.ID, &grpcClient{ + cli: mockPitayaClient, + connected: true, + }) ctx := context.Background() rpcType := protos.RPCType_Sys @@ -290,6 +293,49 @@ func TestAddServer(t *testing.T) { sv, ok := g.clientMap.Load(server.ID) assert.NotNil(t, sv) assert.True(t, ok) + cli := sv.(*grpcClient) + assert.True(t, cli.connected) + assert.NotNil(t, cli.cli) + }) + + t.Run("lazy", func(t *testing.T) { + // listen + c := viper.New() + port := helpers.GetFreePort(t) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + c.Set("pitaya.cluster.rpc.server.grpc.port", port) + c.Set("pitaya.cluster.rpc.client.grpc.lazyconnection", true) + conf := getConfig(c) + server := &Server{ + ID: "someid", + Type: "sometype", + Metadata: map[string]string{ + constants.GRPCHostKey: "localhost", + constants.GRPCPortKey: fmt.Sprintf("%d", port), + }, + Frontend: false, + } + gs, err := NewGRPCServer(conf, server, []metrics.Reporter{}) + assert.NoError(t, err) + + mockPitayaServer := protosmocks.NewMockPitayaServer(ctrl) + gs.SetPitayaServer(mockPitayaServer) + + err = gs.Init() + assert.NoError(t, err) + g, err := getRPCClient(conf) + assert.NoError(t, err) + // --- should not connect to the server and add it to the client map + g.AddServer(server) + + sv, ok := g.clientMap.Load(server.ID) + assert.NotNil(t, sv) + assert.True(t, ok) + cli := sv.(*grpcClient) + assert.False(t, cli.connected) + assert.Nil(t, cli.cli) }) } diff --git a/config/config.go b/config/config.go index c038ecce..7478c402 100644 --- a/config/config.go +++ b/config/config.go @@ -59,6 +59,7 @@ func (c *Config) fillDefaultValues() { "pitaya.cluster.info.region": "", "pitaya.cluster.rpc.client.grpc.dialtimeout": "5s", "pitaya.cluster.rpc.client.grpc.requesttimeout": "5s", + "pitaya.cluster.rpc.client.grpc.lazyconnection": false, "pitaya.cluster.rpc.client.nats.connect": "nats://localhost:4222", "pitaya.cluster.rpc.client.nats.maxreconnectionretries": 15, "pitaya.cluster.rpc.client.nats.requesttimeout": "5s", diff --git a/docs/configuration.rst b/docs/configuration.rst index 0ef76e13..d8efcab5 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -86,14 +86,18 @@ The configurations only need to be set if the RPC Service is enabled with the gi - 100 - int - Size of the buffer that the nats RPC server creates for push messages - * - pitaya.cluster.rpc.client.grpc.requesttimeout - - 5s - - time.Time - - Request timeout for RPC calls with the gRPC client * - pitaya.cluster.rpc.client.grpc.dialtimeout - 5s - time.Time - Timeout for the gRPC client to establish the connection + * - pitaya.cluster.rpc.client.grpc.lazyconnection + - false + - bool + - Whether the gRPC client should use a lazy connection, that is, connect only when a request is made to that server + * - pitaya.cluster.rpc.client.grpc.requesttimeout + - 5s + - time.Time + - Request timeout for RPC calls with the gRPC client * - pitaya.cluster.rpc.client.nats.connect - nats://localhost:4222 - string diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 14194c55..1fcd60ec 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -70,7 +70,7 @@ func TestHandlerCallToFront(t *testing.T) { port := helpers.GetFreePort(t) sdPrefix := fmt.Sprintf("%s/", uuid.New().String()) - defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc)() + defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc, false)() c := client.New(logrus.InfoLevel) err := c.ConnectTo(fmt.Sprintf("localhost:%d", port)) @@ -93,7 +93,7 @@ func TestGroupFront(t *testing.T) { port := helpers.GetFreePort(t) sdPrefix := fmt.Sprintf("%s/", uuid.New().String()) - defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc)() + defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc, false)() c1 := client.New(logrus.InfoLevel) c2 := client.New(logrus.InfoLevel) @@ -141,7 +141,7 @@ func TestKick(t *testing.T) { port1 := helpers.GetFreePort(t) sdPrefix := fmt.Sprintf("%s/", uuid.New().String()) - defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)() + defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)() c1 := client.New(logrus.InfoLevel) c2 := client.New(logrus.InfoLevel) @@ -173,7 +173,7 @@ func TestSameUIDUserShouldBeKicked(t *testing.T) { port1 := helpers.GetFreePort(t) sdPrefix := fmt.Sprintf("%s/", uuid.New().String()) - defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)() + defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)() c1 := client.New(logrus.InfoLevel) c2 := client.New(logrus.InfoLevel) @@ -205,8 +205,8 @@ func TestSameUIDUserShouldBeKickedInDifferentServersFromSameType(t *testing.T) { port2 := helpers.GetFreePort(t) sdPrefix := fmt.Sprintf("%s/", uuid.New().String()) - defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)() - defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc)() + defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)() + defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc, false)() c1 := client.New(logrus.InfoLevel) c2 := client.New(logrus.InfoLevel) @@ -238,8 +238,8 @@ func TestSameUIDUserShouldNotBeKickedInDifferentServersFromDiffType(t *testing.T port2 := helpers.GetFreePort(t) sdPrefix := fmt.Sprintf("%s/", uuid.New().String()) - defer helpers.StartServer(t, true, true, "connector1", port1, sdPrefix, *grpc)() - defer helpers.StartServer(t, true, true, "connector2", port2, sdPrefix, *grpc)() + defer helpers.StartServer(t, true, true, "connector1", port1, sdPrefix, *grpc, false)() + defer helpers.StartServer(t, true, true, "connector2", port2, sdPrefix, *grpc, false)() c1 := client.New(logrus.InfoLevel) c2 := client.New(logrus.InfoLevel) @@ -270,8 +270,8 @@ func TestKickOnBack(t *testing.T) { port1 := helpers.GetFreePort(t) sdPrefix := fmt.Sprintf("%s/", uuid.New().String()) - defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)() - defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)() + defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)() + defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)() c1 := client.New(logrus.DebugLevel) err := c1.ConnectTo(fmt.Sprintf("localhost:%d", port1)) @@ -294,10 +294,10 @@ func TestPushToUsers(t *testing.T) { port1 := helpers.GetFreePort(t) sdPrefix := fmt.Sprintf("%s/", uuid.New().String()) - defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)() - defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)() + defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)() + defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)() port2 := helpers.GetFreePort(t) - defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc)() + defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc, false)() c1 := client.New(logrus.InfoLevel) c2 := client.New(logrus.InfoLevel) @@ -350,8 +350,8 @@ func TestPushToUsers(t *testing.T) { func TestForwardToBackend(t *testing.T) { portFront := helpers.GetFreePort(t) sdPrefix := fmt.Sprintf("%s/", uuid.New().String()) - defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)() - defer helpers.StartServer(t, true, true, "connector", portFront, sdPrefix, *grpc)() + defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)() + defer helpers.StartServer(t, true, true, "connector", portFront, sdPrefix, *grpc, true)() tables := []struct { req string @@ -389,9 +389,9 @@ func TestGroupBack(t *testing.T) { port2 := helpers.GetFreePort(t) sdPrefix := fmt.Sprintf("%s/", uuid.New().String()) - defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)() - defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)() - defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc)() + defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)() + defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)() + defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc, false)() c1 := client.New(logrus.InfoLevel) c2 := client.New(logrus.InfoLevel) @@ -439,8 +439,9 @@ func TestUserRPC(t *testing.T) { port1 := helpers.GetFreePort(t) sdPrefix := fmt.Sprintf("%s/", uuid.New().String()) - defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)() - defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)() + // set lazy connections + defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, true)() + defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)() c1 := client.New(logrus.InfoLevel) err := c1.ConnectTo(fmt.Sprintf("localhost:%d", port1)) diff --git a/helpers/helpers.go b/helpers/helpers.go index 8ebc834c..4f7c3355 100644 --- a/helpers/helpers.go +++ b/helpers/helpers.go @@ -105,7 +105,14 @@ func waitForServerToBeReady(t testing.TB, out *bufio.Reader) { } // StartServer starts a server -func StartServer(t testing.TB, frontend bool, debug bool, svType string, port int, sdPrefix string, grpc bool) func() { +func StartServer( + t testing.TB, + frontend, debug bool, + svType string, + port int, + sdPrefix string, + grpc, lazyConnection bool, +) func() { grpcPort := GetFreePort(t) promPort := GetFreePort(t) var useGRPC string @@ -133,8 +140,12 @@ func StartServer(t testing.TB, frontend bool, debug bool, svType string, port in "../examples/testing/server", args..., ) - // always use a random port for prometheus, for avoiding e2e conflicts - cmd.Env = []string{fmt.Sprintf("PITAYA_METRICS_PROMETHEUS_PORT=%d", promPort)} + + // always use a random port for prometheus, to avoid e2e conflicts + cmd.Env = []string{ + fmt.Sprintf("PITAYA_METRICS_PROMETHEUS_PORT=%d", promPort), + fmt.Sprintf("PITAYA_CLUSTER_RPC_CLIENT_GRPC_LAZYCONNECTION=%v", lazyConnection), + } outPipe, err := cmd.StderrPipe() if err != nil { @@ -147,7 +158,6 @@ func StartServer(t testing.TB, frontend bool, debug bool, svType string, port in } waitForServerToBeReady(t, bufio.NewReader(outPipe)) - return func() { err := cmd.Process.Kill() if err != nil {