Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy gRPC connections #109

Merged
merged 4 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 71 additions & 33 deletions cluster/grpc_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ import (

// GRPCClient rpc server struct
type GRPCClient struct {
server *Server
config *config.Config
metricsReporters []metrics.Reporter
clientMap sync.Map
bindingStorage interfaces.BindingStorage
clientMap sync.Map
dialTimeout time.Duration
infoRetriever InfoRetriever
lazy bool
metricsReporters []metrics.Reporter
reqTimeout time.Duration
dialTimeout time.Duration
server *Server
}

// NewGRPCClient returns a new instance of GRPCClient
Expand All @@ -63,31 +63,33 @@ func NewGRPCClient(
infoRetriever InfoRetriever,
) (*GRPCClient, error) {
gs := &GRPCClient{
config: config,
server: server,
metricsReporters: metricsReporters,
bindingStorage: bindingStorage,
infoRetriever: infoRetriever,
metricsReporters: metricsReporters,
server: server,
}

gs.configure()

gs.configure(config)
return gs, nil
}

type grpcClient struct {
conn *grpc.ClientConn
cli protos.PitayaClient
address string
cli protos.PitayaClient
conn *grpc.ClientConn
connected bool
lock sync.Mutex
}

// Init inits grpc rpc client
func (gs *GRPCClient) Init() error {
return nil
}

func (gs *GRPCClient) configure() {
gs.reqTimeout = gs.config.GetDuration("pitaya.cluster.rpc.client.grpc.requesttimeout")
gs.dialTimeout = gs.config.GetDuration("pitaya.cluster.rpc.client.grpc.dialtimeout")
func (gs *GRPCClient) configure(cfg *config.Config) {
gs.dialTimeout = cfg.GetDuration("pitaya.cluster.rpc.client.grpc.dialtimeout")
gs.lazy = cfg.GetBool("pitaya.cluster.rpc.client.grpc.lazyconnection")
gs.reqTimeout = cfg.GetDuration("pitaya.cluster.rpc.client.grpc.requesttimeout")
}

// Call makes a RPC Call
Expand All @@ -106,7 +108,7 @@ func (gs *GRPCClient) Call(

parent, err := tracing.ExtractSpan(ctx)
if err != nil {
logger.Log.Warnf("failed to retrieve parent span: %s", err.Error())
logger.Log.Warnf("[grpc client] failed to retrieve parent span: %s", err.Error())
andrehp marked this conversation as resolved.
Show resolved Hide resolved
}
tags := opentracing.Tags{
"span.kind": "client",
Expand All @@ -132,7 +134,7 @@ func (gs *GRPCClient) Call(
defer metrics.ReportTimingFromCtx(ctxT, gs.metricsReporters, "rpc", err)
}

res, err := c.(*grpcClient).cli.Call(ctxT, &req)
res, err := c.(*grpcClient).call(ctxT, &req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,32 +233,30 @@ func (gs *GRPCClient) AddServer(sv *Server) {

host, portKey = gs.getServerHost(sv)
if host == "" {
logger.Log.Errorf("server %s has no grpcHost specified in metadata", sv.ID)
logger.Log.Errorf("[grpc client] server %s has no grpcHost specified in metadata", sv.ID)
return
}

if port, ok = sv.Metadata[portKey]; !ok {
logger.Log.Errorf("server %s has no %s specified in metadata", sv.ID, portKey)
logger.Log.Errorf("[grpc client] server %s has no %s specified in metadata", sv.ID, portKey)
return
}

address := fmt.Sprintf("%s:%s", host, port)
conn, err := grpc.Dial(address,
grpc.WithInsecure(),
)
if err != nil {
logger.Log.Errorf("unable to connect to server %s at %s: %v", sv.ID, address, err)
return
client := &grpcClient{address: address}
if !gs.lazy {
if err := client.connect(); err != nil {
logger.Log.Errorf("[grpc client] unable to connect to server %s at %s: %v", sv.ID, address, err)
}
}
c := protos.NewPitayaClient(conn)
gs.clientMap.Store(sv.ID, &grpcClient{conn: conn, cli: c})
gs.clientMap.Store(sv.ID, client)
logger.Log.Debugf("[grpc client] added server %s at %s", sv.ID, address)
}

// RemoveServer is called when a server is removed
func (gs *GRPCClient) RemoveServer(sv *Server) {
if c, ok := gs.clientMap.Load(sv.ID); ok {
c.(*grpcClient).conn.Close()
c.(*grpcClient).disconnect()
gs.clientMap.Delete(sv.ID)
logger.Log.Debugf("[grpc client] removed server %s", sv.ID)
}
Expand Down Expand Up @@ -285,20 +285,58 @@ func (gs *GRPCClient) getServerHost(sv *Server) (host, portKey string) {

if !hasRegion {
if hasExternal {
msg := "server %s has no region specified in metadata, using external host"
logger.Log.Warnf(msg, sv.ID)
logger.Log.Warnf("[grpc client] server %s has no region specified in metadata, using external host", sv.ID)
return externalHost, constants.GRPCExternalPortKey
}

logger.Log.Warnf("server %s has no region nor external host specified in metadata, using internal host", sv.ID)
logger.Log.Warnf("[grpc client] server %s has no region nor external host specified in metadata, using internal host", sv.ID)
return internalHost, constants.GRPCPortKey
}

if gs.infoRetriever.Region() == serverRegion || !hasExternal {
logger.Log.Infof("server %s is in same region or external host not provided, using internal host", sv.ID)
logger.Log.Infof("[grpc client] server %s is in same region or external host not provided, using internal host", sv.ID)
return internalHost, constants.GRPCPortKey
}

logger.Log.Infof("server %s is in other region, using external host", sv.ID)
logger.Log.Infof("[grpc client] server %s is in other region, using external host", sv.ID)
return externalHost, constants.GRPCExternalPortKey
}

func (gc *grpcClient) connect() error {
gc.lock.Lock()
defer gc.lock.Unlock()
if gc.connected {
return nil
}

conn, err := grpc.Dial(
gc.address,
grpc.WithInsecure(),
)
if err != nil {
return err
}
c := protos.NewPitayaClient(conn)
gc.cli = c
gc.conn = conn
gc.connected = true
return nil
}

func (gc *grpcClient) disconnect() {
gc.lock.Lock()
if gc.connected {
gc.conn.Close()
gc.connected = false
}
gc.lock.Unlock()
}

func (gc *grpcClient) call(ctx context.Context, req *protos.Request) (*protos.Response, error) {
if !gc.connected {
if err := gc.connect(); err != nil {
return nil, err
}
}
return gc.cli.Call(ctx, req)
}
48 changes: 47 additions & 1 deletion cluster/grpc_rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func TestCall(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockPitayaClient := protosmocks.NewMockPitayaClient(ctrl)
g.clientMap.Store(g.server.ID, &grpcClient{cli: mockPitayaClient})
g.clientMap.Store(g.server.ID, &grpcClient{
cli: mockPitayaClient,
connected: true,
})

ctx := context.Background()
rpcType := protos.RPCType_Sys
Expand Down Expand Up @@ -290,6 +293,49 @@ func TestAddServer(t *testing.T) {
sv, ok := g.clientMap.Load(server.ID)
assert.NotNil(t, sv)
assert.True(t, ok)
cli := sv.(*grpcClient)
assert.True(t, cli.connected)
assert.NotNil(t, cli.cli)
})

t.Run("lazy", func(t *testing.T) {
// listen
c := viper.New()
port := helpers.GetFreePort(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

c.Set("pitaya.cluster.rpc.server.grpc.port", port)
c.Set("pitaya.cluster.rpc.client.grpc.lazyconnection", true)
conf := getConfig(c)
server := &Server{
ID: "someid",
Type: "sometype",
Metadata: map[string]string{
constants.GRPCHostKey: "localhost",
constants.GRPCPortKey: fmt.Sprintf("%d", port),
},
Frontend: false,
}
gs, err := NewGRPCServer(conf, server, []metrics.Reporter{})
assert.NoError(t, err)

mockPitayaServer := protosmocks.NewMockPitayaServer(ctrl)
gs.SetPitayaServer(mockPitayaServer)

err = gs.Init()
assert.NoError(t, err)
g, err := getRPCClient(conf)
assert.NoError(t, err)
// --- should not connect to the server and add it to the client map
g.AddServer(server)

sv, ok := g.clientMap.Load(server.ID)
assert.NotNil(t, sv)
assert.True(t, ok)
cli := sv.(*grpcClient)
assert.False(t, cli.connected)
assert.Nil(t, cli.cli)
})
}

Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (c *Config) fillDefaultValues() {
"pitaya.cluster.info.region": "",
"pitaya.cluster.rpc.client.grpc.dialtimeout": "5s",
"pitaya.cluster.rpc.client.grpc.requesttimeout": "5s",
"pitaya.cluster.rpc.client.grpc.lazyconnection": false,
"pitaya.cluster.rpc.client.nats.connect": "nats://localhost:4222",
"pitaya.cluster.rpc.client.nats.maxreconnectionretries": 15,
"pitaya.cluster.rpc.client.nats.requesttimeout": "5s",
Expand Down
12 changes: 8 additions & 4 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,18 @@ The configurations only need to be set if the RPC Service is enabled with the gi
- 100
- int
- Size of the buffer that the nats RPC server creates for push messages
* - pitaya.cluster.rpc.client.grpc.requesttimeout
- 5s
- time.Time
- Request timeout for RPC calls with the gRPC client
* - pitaya.cluster.rpc.client.grpc.dialtimeout
- 5s
- time.Time
- Timeout for the gRPC client to establish the connection
* - pitaya.cluster.rpc.client.grpc.lazyconnection
- false
- bool
- Whether the gRPC client should use a lazy connection, that is, connect only when a request is made to that server
* - pitaya.cluster.rpc.client.grpc.requesttimeout
- 5s
- time.Time
- Request timeout for RPC calls with the gRPC client
* - pitaya.cluster.rpc.client.nats.connect
- nats://localhost:4222
- string
Expand Down
41 changes: 21 additions & 20 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestHandlerCallToFront(t *testing.T) {
port := helpers.GetFreePort(t)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())

defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc, false)()
c := client.New(logrus.InfoLevel)

err := c.ConnectTo(fmt.Sprintf("localhost:%d", port))
Expand All @@ -93,7 +93,7 @@ func TestGroupFront(t *testing.T) {
port := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -141,7 +141,7 @@ func TestKick(t *testing.T) {
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -173,7 +173,7 @@ func TestSameUIDUserShouldBeKicked(t *testing.T) {
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -205,8 +205,8 @@ func TestSameUIDUserShouldBeKickedInDifferentServersFromSameType(t *testing.T) {
port2 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -238,8 +238,8 @@ func TestSameUIDUserShouldNotBeKickedInDifferentServersFromDiffType(t *testing.T
port2 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector1", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector2", port2, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector1", port1, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector2", port2, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -270,8 +270,8 @@ func TestKickOnBack(t *testing.T) {
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)()
c1 := client.New(logrus.DebugLevel)

err := c1.ConnectTo(fmt.Sprintf("localhost:%d", port1))
Expand All @@ -294,10 +294,10 @@ func TestPushToUsers(t *testing.T) {
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
port2 := helpers.GetFreePort(t)
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -350,8 +350,8 @@ func TestPushToUsers(t *testing.T) {
func TestForwardToBackend(t *testing.T) {
portFront := helpers.GetFreePort(t)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", portFront, sdPrefix, *grpc)()
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector", portFront, sdPrefix, *grpc, true)()

tables := []struct {
req string
Expand Down Expand Up @@ -389,9 +389,9 @@ func TestGroupBack(t *testing.T) {
port2 := helpers.GetFreePort(t)
sdPrefix := fmt.Sprintf("%s/", uuid.New().String())

defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc)()
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
defer helpers.StartServer(t, true, true, "connector", port2, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)
c2 := client.New(logrus.InfoLevel)

Expand Down Expand Up @@ -439,8 +439,9 @@ func TestUserRPC(t *testing.T) {
port1 := helpers.GetFreePort(t)

sdPrefix := fmt.Sprintf("%s/", uuid.New().String())
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc)()
// set lazy connections
defer helpers.StartServer(t, false, true, "game", 0, sdPrefix, *grpc, true)()
defer helpers.StartServer(t, true, true, "connector", port1, sdPrefix, *grpc, false)()
c1 := client.New(logrus.InfoLevel)

err := c1.ConnectTo(fmt.Sprintf("localhost:%d", port1))
Expand Down
Loading