diff --git a/app.go b/app.go index 6f28d44f..151f6924 100644 --- a/app.go +++ b/app.go @@ -570,3 +570,18 @@ func Documentation(getPtrNames bool) (map[string]interface{}, error) { "remotes": remoteDocs, }, nil } + +// AddGRPCInfoToMetadata adds host, external host and +// port into metadata +func AddGRPCInfoToMetadata( + metadata map[string]string, + region, host, externalHost, port string, +) map[string]string { + // TODO: should I get all this information here? Or just + // receive them as argument + metadata[constants.GRPCHostKey] = host + metadata[constants.GRPCExternalHostKey] = externalHost + metadata[constants.GRPCPortKey] = port + metadata[constants.RegionKey] = region + return metadata +} diff --git a/app_test.go b/app_test.go index b97c66a9..74c5bd19 100644 --- a/app_test.go +++ b/app_test.go @@ -620,3 +620,25 @@ func TestDocumentationTrue(t *testing.T) { "handlers": map[string]interface{}{}, }, doc) } + +func TestAddGRPCInfoToMetadata(t *testing.T) { + t.Parallel() + + metadata := map[string]string{ + "key1": "value1", + "key2": "value2", + "key3": "value3", + } + + metadata = AddGRPCInfoToMetadata(metadata, "region", "host", "external-host", "port") + + assert.Equal(t, map[string]string{ + "key1": "value1", + "key2": "value2", + "key3": "value3", + constants.GRPCHostKey: "host", + constants.GRPCExternalHostKey: "external-host", + constants.GRPCPortKey: "port", + constants.RegionKey: "region", + }, metadata) +} diff --git a/cluster/cluster.go b/cluster/cluster.go index 117fa97f..4dbd5a4f 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -61,6 +61,13 @@ type RemoteBindingListener interface { OnUserBind(uid, fid string) } +// InfoRetriever gets cluster info +// It can be implemented, for exemple, by reading +// env var, config or by accessing the cluster API +type InfoRetriever interface { + Region() string +} + // Action type for enum type Action int diff --git a/cluster/config_info_retriever.go b/cluster/config_info_retriever.go new file mode 100644 index 00000000..a2ba1320 --- /dev/null +++ b/cluster/config_info_retriever.go @@ -0,0 +1,21 @@ +package cluster + +import "github.com/topfreegames/pitaya/config" + +// ConfigInfoRetriever gets cluster info from config +// Implements InfoRetriever interface +type ConfigInfoRetriever struct { + region string +} + +// NewConfigInfoRetriever returns a *ConfigInfoRetriever +func NewConfigInfoRetriever(c *config.Config) *ConfigInfoRetriever { + return &ConfigInfoRetriever{ + region: c.GetString("pitaya.cluster.info.region"), + } +} + +// Region gets server's region from config +func (c *ConfigInfoRetriever) Region() string { + return c.region +} diff --git a/cluster/config_info_retriever_test.go b/cluster/config_info_retriever_test.go new file mode 100644 index 00000000..4782e046 --- /dev/null +++ b/cluster/config_info_retriever_test.go @@ -0,0 +1,21 @@ +package cluster + +import ( + "testing" + + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/topfreegames/pitaya/config" +) + +func TestConfigInfoRetrieverRegion(t *testing.T) { + t.Parallel() + + c := viper.New() + c.Set("pitaya.cluster.info.region", "us") + config := config.NewConfig(c) + + infoRetriever := NewConfigInfoRetriever(config) + + assert.Equal(t, "us", infoRetriever.Region()) +} diff --git a/cluster/grpc_rpc_client.go b/cluster/grpc_rpc_client.go index 0813856c..1846c47c 100644 --- a/cluster/grpc_rpc_client.go +++ b/cluster/grpc_rpc_client.go @@ -48,21 +48,25 @@ type GRPCClient struct { metricsReporters []metrics.Reporter clientMap sync.Map bindingStorage interfaces.BindingStorage + infoRetriever InfoRetriever reqTimeout time.Duration dialTimeout time.Duration } // NewGRPCClient returns a new instance of GRPCClient -func NewGRPCClient(config *config.Config, server *Server, metricsReporters []metrics.Reporter, bindingStorage ...interfaces.BindingStorage) (*GRPCClient, error) { - var bs interfaces.BindingStorage - if len(bindingStorage) > 0 { - bs = bindingStorage[0] - } +func NewGRPCClient( + config *config.Config, + server *Server, + metricsReporters []metrics.Reporter, + bindingStorage interfaces.BindingStorage, + infoRetriever InfoRetriever, +) (*GRPCClient, error) { gs := &GRPCClient{ config: config, server: server, metricsReporters: metricsReporters, - bindingStorage: bs, + bindingStorage: bindingStorage, + infoRetriever: infoRetriever, } gs.configure() @@ -201,14 +205,18 @@ func (gs *GRPCClient) SendPush(userID string, frontendSv *Server, push *protos.P func (gs *GRPCClient) AddServer(sv *Server) { var host, port string var ok bool - if host, ok = sv.Metadata["grpc-host"]; !ok { - logger.Log.Errorf("server %s doesn't have a grpc-host specified in metadata", sv.ID) + + host = gs.getServerHost(sv) + if host == "" { + logger.Log.Errorf("server %s has no grpc-host specified in metadata", sv.ID) return } - if port, ok = sv.Metadata["grpc-port"]; !ok { - logger.Log.Errorf("server %s doesn't have a grpc-port specified in metadata", sv.ID) + + if port, ok = sv.Metadata[constants.GRPCPortKey]; !ok { + logger.Log.Errorf("server %s has no grpc-port specified in metadata", sv.ID) return } + address := fmt.Sprintf("%s:%s", host, port) ctxT, done := context.WithTimeout(context.Background(), gs.dialTimeout) defer done() @@ -241,3 +249,33 @@ func (gs *GRPCClient) BeforeShutdown() {} func (gs *GRPCClient) Shutdown() error { return nil } + +func (gs *GRPCClient) getServerHost(sv *Server) string { + var ( + serverRegion, hasRegion = sv.Metadata[constants.RegionKey] + externalHost, hasExternal = sv.Metadata[constants.GRPCExternalHostKey] + internalHost, _ = sv.Metadata[constants.GRPCHostKey] + ) + + hasRegion = hasRegion && serverRegion != "" + hasExternal = hasExternal && externalHost != "" + + if !hasRegion { + if hasExternal { + msg := "server %s has no region specified in metadata, using external host" + logger.Log.Warnf(msg, sv.ID) + return externalHost + } + + logger.Log.Warnf("server %s has no region nor external host specified in metadata, using internal host", sv.ID) + return internalHost + } + + if gs.infoRetriever.Region() == serverRegion { + logger.Log.Warnf("server %s is in same region, using internal host", sv.ID) + return internalHost + } + + logger.Log.Warnf("server %s is in other region, using external host", sv.ID) + return externalHost +} diff --git a/cluster/grpc_rpc_client_test.go b/cluster/grpc_rpc_client_test.go index c91f6909..6024d8d8 100644 --- a/cluster/grpc_rpc_client_test.go +++ b/cluster/grpc_rpc_client_test.go @@ -24,7 +24,7 @@ import ( func getRPCClient(c *config.Config) (*GRPCClient, error) { sv := getServer() - return NewGRPCClient(c, sv, []metrics.Reporter{}) + return NewGRPCClient(c, sv, []metrics.Reporter{}, nil, nil) } func TestNewGRPCClient(t *testing.T) { @@ -269,8 +269,8 @@ func TestAddServer(t *testing.T) { ID: "someid", Type: "sometype", Metadata: map[string]string{ - "grpc-host": "localhost", - "grpc-port": fmt.Sprintf("%d", port), + constants.GRPCHostKey: "localhost", + constants.GRPCPortKey: fmt.Sprintf("%d", port), }, Frontend: false, } @@ -298,7 +298,7 @@ func TestRemoveServer(t *testing.T) { server := getServer() conf := getConfig() - gc, err := NewGRPCClient(conf, server, []metrics.Reporter{}) + gc, err := NewGRPCClient(conf, server, []metrics.Reporter{}, nil, nil) assert.NoError(t, err) gc.clientMap.Store(server.ID, mockPitayaClient) diff --git a/cluster/grpc_rpc_server.go b/cluster/grpc_rpc_server.go index a2e4ce0c..1e22a4b7 100644 --- a/cluster/grpc_rpc_server.go +++ b/cluster/grpc_rpc_server.go @@ -76,6 +76,9 @@ func (gs *GRPCServer) BeforeShutdown() {} // Shutdown stops grpc rpc server func (gs *GRPCServer) Shutdown() error { - gs.grpcSv.Stop() + // graceful: stops the server from accepting new connections and RPCs and + // blocks until all the pending RPCs are finished. + // source: https://godoc.org/google.golang.org/grpc#Server.GracefulStop + gs.grpcSv.GracefulStop() return nil } diff --git a/config/config.go b/config/config.go index bd978140..a92ef0db 100644 --- a/config/config.go +++ b/config/config.go @@ -80,6 +80,7 @@ func (c *Config) fillDefaultValues() { "pitaya.cluster.sd.etcd.revoke.timeout": "5s", "pitaya.cluster.sd.etcd.heartbeat.log": false, "pitaya.cluster.sd.etcd.syncservers.interval": "120s", + "pitaya.cluster.info.region": "", "pitaya.modules.bindingstorage.etcd.endpoints": "localhost:2379", "pitaya.modules.bindingstorage.etcd.prefix": "pitaya/", "pitaya.modules.bindingstorage.etcd.dialtimeout": "5s", diff --git a/constants/const.go b/constants/const.go index 895141fb..52c8dd57 100644 --- a/constants/const.go +++ b/constants/const.go @@ -74,3 +74,15 @@ var RouteKey = "req-route" // MetricTagsKey is the key holding request tags to be sent over the context // to be reported var MetricTagsKey = "metric-tags" + +// GRPCHostKey is the key for grpc host on server metadata +var GRPCHostKey = "grpc-host" + +// GRPCExternalHostKey is the key for grpc external host on server metadata +var GRPCExternalHostKey = "grpc-external-host" + +// GRPCPortKey is the key for grpc port on server metadata +var GRPCPortKey = "grpc-port" + +// RegionKey is the key to save the region server is on +var RegionKey = "region" diff --git a/examples/demo/cluster_grpc/main.go b/examples/demo/cluster_grpc/main.go index b354648a..57ed692c 100644 --- a/examples/demo/cluster_grpc/main.go +++ b/examples/demo/cluster_grpc/main.go @@ -13,6 +13,7 @@ import ( "github.com/topfreegames/pitaya/acceptor" "github.com/topfreegames/pitaya/cluster" "github.com/topfreegames/pitaya/component" + "github.com/topfreegames/pitaya/constants" "github.com/topfreegames/pitaya/examples/demo/cluster_grpc/services" "github.com/topfreegames/pitaya/modules" "github.com/topfreegames/pitaya/route" @@ -98,8 +99,8 @@ func main() { confs.Set("pitaya.cluster.rpc.server.grpc.port", *rpcServerPort) meta := map[string]string{ - "grpc-host": "127.0.0.1", - "grpc-port": *rpcServerPort, + constants.GRPCHostKey: "127.0.0.1", + constants.GRPCPortKey: *rpcServerPort, } pitaya.Configure(*isFrontend, *svType, pitaya.Cluster, meta, confs) @@ -111,7 +112,13 @@ func main() { bs := modules.NewETCDBindingStorage(pitaya.GetServer(), pitaya.GetConfig()) pitaya.RegisterModule(bs, "bindingsStorage") - gc, err := cluster.NewGRPCClient(pitaya.GetConfig(), pitaya.GetServer(), pitaya.GetMetricsReporters(), bs) + gc, err := cluster.NewGRPCClient( + pitaya.GetConfig(), + pitaya.GetServer(), + pitaya.GetMetricsReporters(), + bs, + cluster.NewConfigInfoRetriever(pitaya.GetConfig()), + ) if err != nil { panic(err) } diff --git a/examples/testing/main.go b/examples/testing/main.go index 46d31298..1ca9f233 100644 --- a/examples/testing/main.go +++ b/examples/testing/main.go @@ -288,7 +288,10 @@ func main() { cfg.Set("pitaya.cluster.sd.etcd.prefix", *sdPrefix) cfg.Set("pitaya.cluster.rpc.server.grpc.port", *grpcPort) - pitaya.Configure(*isFrontend, *svType, pitaya.Cluster, map[string]string{"grpc-host": "127.0.0.1", "grpc-port": fmt.Sprintf("%d", *grpcPort)}, cfg) + pitaya.Configure(*isFrontend, *svType, pitaya.Cluster, map[string]string{ + constants.GRPCHostKey: "127.0.0.1", + constants.GRPCPortKey: fmt.Sprintf("%d", *grpcPort), + }, cfg) if *grpc { gs, err := cluster.NewGRPCServer(pitaya.GetConfig(), pitaya.GetServer(), pitaya.GetMetricsReporters()) if err != nil { @@ -298,7 +301,13 @@ func main() { bs := modules.NewETCDBindingStorage(pitaya.GetServer(), pitaya.GetConfig()) pitaya.RegisterModule(bs, "bindingsStorage") - gc, err := cluster.NewGRPCClient(pitaya.GetConfig(), pitaya.GetServer(), pitaya.GetMetricsReporters(), bs) + gc, err := cluster.NewGRPCClient( + pitaya.GetConfig(), + pitaya.GetServer(), + pitaya.GetMetricsReporters(), + bs, + cluster.NewConfigInfoRetriever(pitaya.GetConfig()), + ) if err != nil { panic(err) }