Skip to content

Commit

Permalink
Merge 00e1679 into 4159ed6
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Sep 18, 2018
2 parents 4159ed6 + 00e1679 commit 601c31c
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 20 deletions.
15 changes: 15 additions & 0 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,3 +570,18 @@ func Documentation(getPtrNames bool) (map[string]interface{}, error) {
"remotes": remoteDocs,
}, nil
}

// AddGRPCInfoToMetadata adds host, external host and
// port into metadata
func AddGRPCInfoToMetadata(
metadata map[string]string,
region, host, externalHost, port string,
) map[string]string {
// TODO: should I get all this information here? Or just
// receive them as argument
metadata[constants.GRPCHostKey] = host
metadata[constants.GRPCExternalHostKey] = externalHost
metadata[constants.GRPCPortKey] = port
metadata[constants.RegionKey] = region
return metadata
}
22 changes: 22 additions & 0 deletions app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,25 @@ func TestDocumentationTrue(t *testing.T) {
"handlers": map[string]interface{}{},
}, doc)
}

func TestAddGRPCInfoToMetadata(t *testing.T) {
t.Parallel()

metadata := map[string]string{
"key1": "value1",
"key2": "value2",
"key3": "value3",
}

metadata = AddGRPCInfoToMetadata(metadata, "region", "host", "external-host", "port")

assert.Equal(t, map[string]string{
"key1": "value1",
"key2": "value2",
"key3": "value3",
constants.GRPCHostKey: "host",
constants.GRPCExternalHostKey: "external-host",
constants.GRPCPortKey: "port",
constants.RegionKey: "region",
}, metadata)
}
7 changes: 7 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ type RemoteBindingListener interface {
OnUserBind(uid, fid string)
}

// InfoRetriever gets cluster info
// It can be implemented, for exemple, by reading
// env var, config or by accessing the cluster API
type InfoRetriever interface {
Region() string
}

// Action type for enum
type Action int

Expand Down
21 changes: 21 additions & 0 deletions cluster/config_info_retriever.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cluster

import "github.com/topfreegames/pitaya/config"

// ConfigInfoRetriever gets cluster info from config
// Implements InfoRetriever interface
type ConfigInfoRetriever struct {
region string
}

// NewConfigInfoRetriever returns a *ConfigInfoRetriever
func NewConfigInfoRetriever(c *config.Config) *ConfigInfoRetriever {
return &ConfigInfoRetriever{
region: c.GetString("pitaya.cluster.info.region"),
}
}

// Region gets server's region from config
func (c *ConfigInfoRetriever) Region() string {
return c.region
}
21 changes: 21 additions & 0 deletions cluster/config_info_retriever_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cluster

import (
"testing"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/topfreegames/pitaya/config"
)

func TestConfigInfoRetrieverRegion(t *testing.T) {
t.Parallel()

c := viper.New()
c.Set("pitaya.cluster.info.region", "us")
config := config.NewConfig(c)

infoRetriever := NewConfigInfoRetriever(config)

assert.Equal(t, "us", infoRetriever.Region())
}
58 changes: 48 additions & 10 deletions cluster/grpc_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,25 @@ type GRPCClient struct {
metricsReporters []metrics.Reporter
clientMap sync.Map
bindingStorage interfaces.BindingStorage
infoRetriever InfoRetriever
reqTimeout time.Duration
dialTimeout time.Duration
}

// NewGRPCClient returns a new instance of GRPCClient
func NewGRPCClient(config *config.Config, server *Server, metricsReporters []metrics.Reporter, bindingStorage ...interfaces.BindingStorage) (*GRPCClient, error) {
var bs interfaces.BindingStorage
if len(bindingStorage) > 0 {
bs = bindingStorage[0]
}
func NewGRPCClient(
config *config.Config,
server *Server,
metricsReporters []metrics.Reporter,
bindingStorage interfaces.BindingStorage,
infoRetriever InfoRetriever,
) (*GRPCClient, error) {
gs := &GRPCClient{
config: config,
server: server,
metricsReporters: metricsReporters,
bindingStorage: bs,
bindingStorage: bindingStorage,
infoRetriever: infoRetriever,
}

gs.configure()
Expand Down Expand Up @@ -201,14 +205,18 @@ func (gs *GRPCClient) SendPush(userID string, frontendSv *Server, push *protos.P
func (gs *GRPCClient) AddServer(sv *Server) {
var host, port string
var ok bool
if host, ok = sv.Metadata["grpc-host"]; !ok {
logger.Log.Errorf("server %s doesn't have a grpc-host specified in metadata", sv.ID)

host = gs.getServerHost(sv)
if host == "" {
logger.Log.Errorf("server %s has no grpc-host specified in metadata", sv.ID)
return
}
if port, ok = sv.Metadata["grpc-port"]; !ok {
logger.Log.Errorf("server %s doesn't have a grpc-port specified in metadata", sv.ID)

if port, ok = sv.Metadata[constants.GRPCPortKey]; !ok {
logger.Log.Errorf("server %s has no grpc-port specified in metadata", sv.ID)
return
}

address := fmt.Sprintf("%s:%s", host, port)
ctxT, done := context.WithTimeout(context.Background(), gs.dialTimeout)
defer done()
Expand Down Expand Up @@ -241,3 +249,33 @@ func (gs *GRPCClient) BeforeShutdown() {}
func (gs *GRPCClient) Shutdown() error {
return nil
}

func (gs *GRPCClient) getServerHost(sv *Server) string {
var (
serverRegion, hasRegion = sv.Metadata[constants.RegionKey]
externalHost, hasExternal = sv.Metadata[constants.GRPCExternalHostKey]
internalHost, _ = sv.Metadata[constants.GRPCHostKey]
)

hasRegion = hasRegion && serverRegion != ""
hasExternal = hasExternal && externalHost != ""

if !hasRegion {
if hasExternal {
msg := "server %s has no region specified in metadata, using external host"
logger.Log.Warnf(msg, sv.ID)
return externalHost
}

logger.Log.Warnf("server %s has no region nor external host specified in metadata, using internal host", sv.ID)
return internalHost
}

if gs.infoRetriever.Region() == serverRegion {
logger.Log.Warnf("server %s is in same region, using internal host", sv.ID)
return internalHost
}

logger.Log.Warnf("server %s is in other region, using external host", sv.ID)
return externalHost
}
8 changes: 4 additions & 4 deletions cluster/grpc_rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func getRPCClient(c *config.Config) (*GRPCClient, error) {
sv := getServer()
return NewGRPCClient(c, sv, []metrics.Reporter{})
return NewGRPCClient(c, sv, []metrics.Reporter{}, nil, nil)
}

func TestNewGRPCClient(t *testing.T) {
Expand Down Expand Up @@ -269,8 +269,8 @@ func TestAddServer(t *testing.T) {
ID: "someid",
Type: "sometype",
Metadata: map[string]string{
"grpc-host": "localhost",
"grpc-port": fmt.Sprintf("%d", port),
constants.GRPCHostKey: "localhost",
constants.GRPCPortKey: fmt.Sprintf("%d", port),
},
Frontend: false,
}
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestRemoveServer(t *testing.T) {

server := getServer()
conf := getConfig()
gc, err := NewGRPCClient(conf, server, []metrics.Reporter{})
gc, err := NewGRPCClient(conf, server, []metrics.Reporter{}, nil, nil)
assert.NoError(t, err)
gc.clientMap.Store(server.ID, mockPitayaClient)

Expand Down
5 changes: 4 additions & 1 deletion cluster/grpc_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (gs *GRPCServer) BeforeShutdown() {}

// Shutdown stops grpc rpc server
func (gs *GRPCServer) Shutdown() error {
gs.grpcSv.Stop()
// graceful: stops the server from accepting new connections and RPCs and
// blocks until all the pending RPCs are finished.
// source: https://godoc.org/google.golang.org/grpc#Server.GracefulStop
gs.grpcSv.GracefulStop()
return nil
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (c *Config) fillDefaultValues() {
"pitaya.cluster.sd.etcd.revoke.timeout": "5s",
"pitaya.cluster.sd.etcd.heartbeat.log": false,
"pitaya.cluster.sd.etcd.syncservers.interval": "120s",
"pitaya.cluster.info.region": "",
"pitaya.modules.bindingstorage.etcd.endpoints": "localhost:2379",
"pitaya.modules.bindingstorage.etcd.prefix": "pitaya/",
"pitaya.modules.bindingstorage.etcd.dialtimeout": "5s",
Expand Down
12 changes: 12 additions & 0 deletions constants/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,15 @@ var RouteKey = "req-route"
// MetricTagsKey is the key holding request tags to be sent over the context
// to be reported
var MetricTagsKey = "metric-tags"

// GRPCHostKey is the key for grpc host on server metadata
var GRPCHostKey = "grpc-host"

// GRPCExternalHostKey is the key for grpc external host on server metadata
var GRPCExternalHostKey = "grpc-external-host"

// GRPCPortKey is the key for grpc port on server metadata
var GRPCPortKey = "grpc-port"

// RegionKey is the key to save the region server is on
var RegionKey = "region"
13 changes: 10 additions & 3 deletions examples/demo/cluster_grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/topfreegames/pitaya/acceptor"
"github.com/topfreegames/pitaya/cluster"
"github.com/topfreegames/pitaya/component"
"github.com/topfreegames/pitaya/constants"
"github.com/topfreegames/pitaya/examples/demo/cluster_grpc/services"
"github.com/topfreegames/pitaya/modules"
"github.com/topfreegames/pitaya/route"
Expand Down Expand Up @@ -98,8 +99,8 @@ func main() {
confs.Set("pitaya.cluster.rpc.server.grpc.port", *rpcServerPort)

meta := map[string]string{
"grpc-host": "127.0.0.1",
"grpc-port": *rpcServerPort,
constants.GRPCHostKey: "127.0.0.1",
constants.GRPCPortKey: *rpcServerPort,
}

pitaya.Configure(*isFrontend, *svType, pitaya.Cluster, meta, confs)
Expand All @@ -111,7 +112,13 @@ func main() {
bs := modules.NewETCDBindingStorage(pitaya.GetServer(), pitaya.GetConfig())
pitaya.RegisterModule(bs, "bindingsStorage")

gc, err := cluster.NewGRPCClient(pitaya.GetConfig(), pitaya.GetServer(), pitaya.GetMetricsReporters(), bs)
gc, err := cluster.NewGRPCClient(
pitaya.GetConfig(),
pitaya.GetServer(),
pitaya.GetMetricsReporters(),
bs,
cluster.NewConfigInfoRetriever(pitaya.GetConfig()),
)
if err != nil {
panic(err)
}
Expand Down
13 changes: 11 additions & 2 deletions examples/testing/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,10 @@ func main() {
cfg.Set("pitaya.cluster.sd.etcd.prefix", *sdPrefix)
cfg.Set("pitaya.cluster.rpc.server.grpc.port", *grpcPort)

pitaya.Configure(*isFrontend, *svType, pitaya.Cluster, map[string]string{"grpc-host": "127.0.0.1", "grpc-port": fmt.Sprintf("%d", *grpcPort)}, cfg)
pitaya.Configure(*isFrontend, *svType, pitaya.Cluster, map[string]string{
constants.GRPCHostKey: "127.0.0.1",
constants.GRPCPortKey: fmt.Sprintf("%d", *grpcPort),
}, cfg)
if *grpc {
gs, err := cluster.NewGRPCServer(pitaya.GetConfig(), pitaya.GetServer(), pitaya.GetMetricsReporters())
if err != nil {
Expand All @@ -298,7 +301,13 @@ func main() {
bs := modules.NewETCDBindingStorage(pitaya.GetServer(), pitaya.GetConfig())
pitaya.RegisterModule(bs, "bindingsStorage")

gc, err := cluster.NewGRPCClient(pitaya.GetConfig(), pitaya.GetServer(), pitaya.GetMetricsReporters(), bs)
gc, err := cluster.NewGRPCClient(
pitaya.GetConfig(),
pitaya.GetServer(),
pitaya.GetMetricsReporters(),
bs,
cluster.NewConfigInfoRetriever(pitaya.GetConfig()),
)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 601c31c

Please sign in to comment.