Skip to content

Commit

Permalink
Lazy gRPC connections (#109)
Browse files Browse the repository at this point in the history
* Improve grpc client logging

* Lazy grpc connections

* Avoiding concurrent connection

* Add locking on grpc disconnect
  • Loading branch information
andrehp committed Sep 6, 2019
1 parent d7bd48f commit c2f787d
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 62 deletions.
104 changes: 71 additions & 33 deletions cluster/grpc_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ import (

// GRPCClient rpc server struct
type GRPCClient struct {
server *Server
config *config.Config
metricsReporters []metrics.Reporter
clientMap sync.Map
bindingStorage interfaces.BindingStorage
clientMap sync.Map
dialTimeout time.Duration
infoRetriever InfoRetriever
lazy bool
metricsReporters []metrics.Reporter
reqTimeout time.Duration
dialTimeout time.Duration
server *Server
}

// NewGRPCClient returns a new instance of GRPCClient
Expand All @@ -63,31 +63,33 @@ func NewGRPCClient(
infoRetriever InfoRetriever,
) (*GRPCClient, error) {
gs := &GRPCClient{
config: config,
server: server,
metricsReporters: metricsReporters,
bindingStorage: bindingStorage,
infoRetriever: infoRetriever,
metricsReporters: metricsReporters,
server: server,
}

gs.configure()

gs.configure(config)
return gs, nil
}

type grpcClient struct {
conn *grpc.ClientConn
cli protos.PitayaClient
address string
cli protos.PitayaClient
conn *grpc.ClientConn
connected bool
lock sync.Mutex
}

// Init inits grpc rpc client
func (gs *GRPCClient) Init() error {
return nil
}

func (gs *GRPCClient) configure() {
gs.reqTimeout = gs.config.GetDuration("pitaya.cluster.rpc.client.grpc.requesttimeout")
gs.dialTimeout = gs.config.GetDuration("pitaya.cluster.rpc.client.grpc.dialtimeout")
func (gs *GRPCClient) configure(cfg *config.Config) {
gs.dialTimeout = cfg.GetDuration("pitaya.cluster.rpc.client.grpc.dialtimeout")
gs.lazy = cfg.GetBool("pitaya.cluster.rpc.client.grpc.lazyconnection")
gs.reqTimeout = cfg.GetDuration("pitaya.cluster.rpc.client.grpc.requesttimeout")
}

// Call makes a RPC Call
Expand All @@ -106,7 +108,7 @@ func (gs *GRPCClient) Call(

parent, err := tracing.ExtractSpan(ctx)
if err != nil {
logger.Log.Warnf("failed to retrieve parent span: %s", err.Error())
logger.Log.Warnf("[grpc client] failed to retrieve parent span: %s", err.Error())
}
tags := opentracing.Tags{
"span.kind": "client",
Expand All @@ -132,7 +134,7 @@ func (gs *GRPCClient) Call(
defer metrics.ReportTimingFromCtx(ctxT, gs.metricsReporters, "rpc", err)
}

res, err := c.(*grpcClient).cli.Call(ctxT, &req)
res, err := c.(*grpcClient).call(ctxT, &req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,32 +233,30 @@ func (gs *GRPCClient) AddServer(sv *Server) {

host, portKey = gs.getServerHost(sv)
if host == "" {
logger.Log.Errorf("server %s has no grpcHost specified in metadata", sv.ID)
logger.Log.Errorf("[grpc client] server %s has no grpcHost specified in metadata", sv.ID)
return
}

if port, ok = sv.Metadata[portKey]; !ok {
logger.Log.Errorf("server %s has no %s specified in metadata", sv.ID, portKey)
logger.Log.Errorf("[grpc client] server %s has no %s specified in metadata", sv.ID, portKey)
return
}

address := fmt.Sprintf("%s:%s", host, port)
conn, err := grpc.Dial(address,
grpc.WithInsecure(),
)
if err != nil {
logger.Log.Errorf("unable to connect to server %s at %s: %v", sv.ID, address, err)
return
client := &grpcClient{address: address}
if !gs.lazy {
if err := client.connect(); err != nil {
logger.Log.Errorf("[grpc client] unable to connect to server %s at %s: %v", sv.ID, address, err)
}
}
c := protos.NewPitayaClient(conn)
gs.clientMap.Store(sv.ID, &grpcClient{conn: conn, cli: c})
gs.clientMap.Store(sv.ID, client)
logger.Log.Debugf("[grpc client] added server %s at %s", sv.ID, address)
}

// RemoveServer is called when a server is removed
func (gs *GRPCClient) RemoveServer(sv *Server) {
if c, ok := gs.clientMap.Load(sv.ID); ok {
c.(*grpcClient).conn.Close()
c.(*grpcClient).disconnect()
gs.clientMap.Delete(sv.ID)
logger.Log.Debugf("[grpc client] removed server %s", sv.ID)
}
Expand Down Expand Up @@ -285,20 +285,58 @@ func (gs *GRPCClient) getServerHost(sv *Server) (host, portKey string) {

if !hasRegion {
if hasExternal {
msg := "server %s has no region specified in metadata, using external host"
logger.Log.Warnf(msg, sv.ID)
logger.Log.Warnf("[grpc client] server %s has no region specified in metadata, using external host", sv.ID)
return externalHost, constants.GRPCExternalPortKey
}

logger.Log.Warnf("server %s has no region nor external host specified in metadata, using internal host", sv.ID)
logger.Log.Warnf("[grpc client] server %s has no region nor external host specified in metadata, using internal host", sv.ID)
return internalHost, constants.GRPCPortKey
}

if gs.infoRetriever.Region() == serverRegion || !hasExternal {
logger.Log.Infof("server %s is in same region or external host not provided, using internal host", sv.ID)
logger.Log.Infof("[grpc client] server %s is in same region or external host not provided, using internal host", sv.ID)
return internalHost, constants.GRPCPortKey
}

logger.Log.Infof("server %s is in other region, using external host", sv.ID)
logger.Log.Infof("[grpc client] server %s is in other region, using external host", sv.ID)
return externalHost, constants.GRPCExternalPortKey
}

func (gc *grpcClient) connect() error {
gc.lock.Lock()
defer gc.lock.Unlock()
if gc.connected {
return nil
}

conn, err := grpc.Dial(
gc.address,
grpc.WithInsecure(),
)
if err != nil {
return err
}
c := protos.NewPitayaClient(conn)
gc.cli = c
gc.conn = conn
gc.connected = true
return nil
}

func (gc *grpcClient) disconnect() {
gc.lock.Lock()
if gc.connected {
gc.conn.Close()
gc.connected = false
}
gc.lock.Unlock()
}

func (gc *grpcClient) call(ctx context.Context, req *protos.Request) (*protos.Response, error) {
if !gc.connected {
if err := gc.connect(); err != nil {
return nil, err
}
}
return gc.cli.Call(ctx, req)
}
48 changes: 47 additions & 1 deletion cluster/grpc_rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func TestCall(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockPitayaClient := protosmocks.NewMockPitayaClient(ctrl)
g.clientMap.Store(g.server.ID, &grpcClient{cli: mockPitayaClient})
g.clientMap.Store(g.server.ID, &grpcClient{
cli: mockPitayaClient,
connected: true,
})

ctx := context.Background()
rpcType := protos.RPCType_Sys
Expand Down Expand Up @@ -290,6 +293,49 @@ func TestAddServer(t *testing.T) {
sv, ok := g.clientMap.Load(server.ID)
assert.NotNil(t, sv)
assert.True(t, ok)
cli := sv.(*grpcClient)
assert.True(t, cli.connected)
assert.NotNil(t, cli.cli)
})

t.Run("lazy", func(t *testing.T) {
// listen
c := viper.New()
port := helpers.GetFreePort(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

c.Set("pitaya.cluster.rpc.server.grpc.port", port)
c.Set("pitaya.cluster.rpc.client.grpc.lazyconnection", true)
conf := getConfig(c)
server := &Server{
ID: "someid",
Type: "sometype",
Metadata: map[string]string{
constants.GRPCHostKey: "localhost",
constants.GRPCPortKey: fmt.Sprintf("%d", port),
},
Frontend: false,
}
gs, err := NewGRPCServer(conf, server, []metrics.Reporter{})
assert.NoError(t, err)

mockPitayaServer := protosmocks.NewMockPitayaServer(ctrl)
gs.SetPitayaServer(mockPitayaServer)

err = gs.Init()
assert.NoError(t, err)
g, err := getRPCClient(conf)
assert.NoError(t, err)
// --- should not connect to the server and add it to the client map
g.AddServer(server)

sv, ok := g.clientMap.Load(server.ID)
assert.NotNil(t, sv)
assert.True(t, ok)
cli := sv.(*grpcClient)
assert.False(t, cli.connected)
assert.Nil(t, cli.cli)
})
}

Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (c *Config) fillDefaultValues() {
"pitaya.cluster.info.region": "",
"pitaya.cluster.rpc.client.grpc.dialtimeout": "5s",
"pitaya.cluster.rpc.client.grpc.requesttimeout": "5s",
"pitaya.cluster.rpc.client.grpc.lazyconnection": false,
"pitaya.cluster.rpc.client.nats.connect": "nats://localhost:4222",
"pitaya.cluster.rpc.client.nats.maxreconnectionretries": 15,
"pitaya.cluster.rpc.client.nats.requesttimeout": "5s",
Expand Down
12 changes: 8 additions & 4 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,18 @@ The configurations only need to be set if the RPC Service is enabled with the gi
- 100
- int
- Size of the buffer that the nats RPC server creates for push messages
* - pitaya.cluster.rpc.client.grpc.requesttimeout
- 5s
- time.Time
- Request timeout for RPC calls with the gRPC client
* - pitaya.cluster.rpc.client.grpc.dialtimeout
- 5s
- time.Time
- Timeout for the gRPC client to establish the connection
* - pitaya.cluster.rpc.client.grpc.lazyconnection
- false
- bool
- Whether the gRPC client should use a lazy connection, that is, connect only when a request is made to that server
* - pitaya.cluster.rpc.client.grpc.requesttimeout
- 5s
- time.Time
- Request timeout for RPC calls with the gRPC client
* - pitaya.cluster.rpc.client.nats.connect
- nats://localhost:4222
- string
Expand Down
41 changes: 21 additions & 20 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestHandlerCallToFront(t *testing.T) {
port := helpers.GetFreePort(t)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())

defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc, false)()
c := client.New(logrus.InfoLevel)

err := c.ConnectTo(fmt.Sprintf("localhost:%d", port))
Expand All @@ -93,7 +93,7 @@ func TestGroupFront(t *testing.T) {
port := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -141,7 +141,7 @@ func TestKick(t *testing.T) {
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -173,7 +173,7 @@ func TestSameUIDUserShouldBeKicked(t *testing.T) {
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -205,8 +205,8 @@ func TestSameUIDUserShouldBeKickedInDifferentServersFromSameType(t *testing.T) {
port2 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -238,8 +238,8 @@ func TestSameUIDUserShouldNotBeKickedInDifferentServersFromDiffType(t *testing.T
port2 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector1", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector2", port2, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector1", port1, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector2", port2, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -270,8 +270,8 @@ func TestKickOnBack(t *testing.T) {
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)()
c1 := client.New(logrus.DebugLevel)

err := c1.ConnectTo(fmt.Sprintf("localhost:%d", port1))
Expand All @@ -294,10 +294,10 @@ func TestPushToUsers(t *testing.T) {
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
port2 := helpers.GetFreePort(t)
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -350,8 +350,8 @@ func TestPushToUsers(t *testing.T) {
func TestForwardToBackend(t *testing.T) {
portFront := helpers.GetFreePort(t)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", portFront, sdPrefix, *grpc)()
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector", portFront, sdPrefix, *grpc, true)()

tables := []struct {
req string
Expand Down Expand Up @@ -389,9 +389,9 @@ func TestGroupBack(t *testing.T) {
port2 := helpers.GetFreePort(t)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())

defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc)()
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -439,8 +439,9 @@ func TestUserRPC(t *testing.T) {
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
// set lazy connections
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, true)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)

err := c1.ConnectTo(fmt.Sprintf("localhost:%d", port1))
Expand Down

0 comments on commit c2f787d

Please sign in to comment.