Skip to content

Commit

Permalink
Merge cb1f98e into 506205e
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Sep 12, 2018
2 parents 506205e + cb1f98e commit b478a81
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 33 deletions.
28 changes: 22 additions & 6 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,32 @@ func Configure(
func configureMetrics(serverType string) {
app.metricsReporters = make([]metrics.Reporter, 0)

defaultTags := app.config.GetStringMapString("pitaya.metrics.tags")
constTags := app.config.GetStringMapString("pitaya.metrics.constTags")
if app.config.GetBool("pitaya.metrics.prometheus.enabled") {
logger.Log.Infof("prometheus is enabled, configuring the metrics reporter on port %d", app.config.GetInt("pitaya.metrics.prometheus.port"))
AddMetricsReporter(metrics.GetPrometheusReporter(serverType, app.config.GetString("pitaya.game"), app.config.GetInt("pitaya.metrics.prometheus.port"), defaultTags))
port := app.config.GetInt("pitaya.metrics.prometheus.port")
game := app.config.GetString("pitaya.game")
logger.Log.Infof(
"prometheus is enabled, configuring the metrics reporter on port %d", port,
)

additionalTags := app.config.GetStringMapString("pitaya.metrics.additionalTags")
prometheus := metrics.GetPrometheusReporter(serverType, game, port,
constTags, additionalTags)
AddMetricsReporter(prometheus)
} else {
logger.Log.Info("prometheus is disabled, the metrics reporter will not be enabled")
}

if app.config.GetBool("pitaya.metrics.statsd.enabled") {
logger.Log.Infof("statsd is enabled, configuring the metrics reporter with host: %s", app.config.Get("pitaya.metrics.statsd.host"))
metricsReporter, err := metrics.NewStatsdReporter(app.config, serverType, defaultTags)
logger.Log.Infof(
"statsd is enabled, configuring the metrics reporter with host: %s",
app.config.Get("pitaya.metrics.statsd.host"),
)
metricsReporter, err := metrics.NewStatsdReporter(
app.config,
serverType,
constTags,
)
if err != nil {
logger.Log.Errorf("failed to start statds metrics reporter, skipping %v", err)
} else {
Expand Down Expand Up @@ -515,7 +530,8 @@ func GetDefaultLoggerFromCtx(ctx context.Context) logger.Logger {
}

// AddMetricTagsToPropagateCtx adds a key and metric tags that will
// be propagated through RPC calls
// be propagated through RPC calls. Use the same tags that are at
// 'pitaya.metrics.additionalTags' config
func AddMetricTagsToPropagateCtx(
ctx context.Context,
tags map[string]string,
Expand Down
6 changes: 3 additions & 3 deletions app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,19 +285,19 @@ func TestAddRoute(t *testing.T) {
initApp()
Configure(true, "testtype", Cluster, map[string]string{}, viper.New())
app.router = nil
err := AddRoute("somesv", func(session *session.Session, route *route.Route, payload []byte, servers map[string]*cluster.Server) (*cluster.Server, error) {
err := AddRoute("somesv", func(ctx context.Context, route *route.Route, payload []byte, servers map[string]*cluster.Server) (*cluster.Server, error) {
return nil, nil
})
assert.EqualError(t, constants.ErrRouterNotInitialized, err.Error())

app.router = router.New()
err = AddRoute("somesv", func(session *session.Session, route *route.Route, payload []byte, servers map[string]*cluster.Server) (*cluster.Server, error) {
err = AddRoute("somesv", func(ctx context.Context, route *route.Route, payload []byte, servers map[string]*cluster.Server) (*cluster.Server, error) {
return nil, nil
})
assert.NoError(t, err)

app.running = true
err = AddRoute("somesv", func(session *session.Session, route *route.Route, payload []byte, servers map[string]*cluster.Server) (*cluster.Server, error) {
err = AddRoute("somesv", func(ctx context.Context, route *route.Route, payload []byte, servers map[string]*cluster.Server) (*cluster.Server, error) {
return nil, nil
})
assert.EqualError(t, constants.ErrChangeRouteWhileRunning, err.Error())
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (c *Config) fillDefaultValues() {
"pitaya.metrics.statsd.rate": 1,
"pitaya.metrics.prometheus.port": 9090,
"pitaya.metrics.prometheus.enabled": false,
"pitaya.metrics.tags": map[string]string{},
"pitaya.metrics.constTags": map[string]string{},
"pitaya.metrics.additionalTags": map[string]string{},
"pitaya.metrics.periodicMetrics.period": "15s",
"pitaya.defaultpipelines.structvalidation.enabled": false,
}
Expand Down
8 changes: 6 additions & 2 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,14 @@ Metrics Reporting
- 9090
- int
- Port to expose prometheus metrics
* - pitaya.metrics.tags
* - pitaya.metrics.constTags
- map[string]string{}
- map[string]string
- Tags to be added to reported metrics
- Constant tags to be added to reported metrics
* - pitaya.metrics.additionalTags
- map[string]string{}
- map[string]string
- Additional tags to reported metrics, the values must be added on ctx with the method AddMetricTagsToPropagateCtx
* - pitaya.metrics.periodicMetrics.period
- 15s
- string
Expand Down
52 changes: 40 additions & 12 deletions metrics/prometheus_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,21 @@ type PrometheusReporter struct {
countReportersMap map[string]*prometheus.CounterVec
summaryReportersMap map[string]*prometheus.SummaryVec
gaugeReportersMap map[string]*prometheus.GaugeVec
additionalLabels map[string]string
}

func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
func (p *PrometheusReporter) registerMetrics(
constLabels, additionalLabels map[string]string,
) {
constLabels["game"] = p.game
constLabels["serverType"] = p.serverType

p.additionalLabels = additionalLabels
additionalLabelsKeys := make([]string, 0, len(additionalLabels))
for key := range additionalLabels {
additionalLabelsKeys = append(additionalLabelsKeys, key)
}

// HandlerResponseTimeMs summary
p.summaryReportersMap[ResponseTime] = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Expand All @@ -58,7 +67,7 @@ func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
Objectives: map[float64]float64{0.7: 0.02, 0.95: 0.005, 0.99: 0.001},
ConstLabels: constLabels,
},
[]string{"route", "status", "type"},
append([]string{"route", "status", "type"}, additionalLabelsKeys...),
)

// ProcessDelay summary
Expand All @@ -71,7 +80,7 @@ func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
Objectives: map[float64]float64{0.7: 0.02, 0.95: 0.005, 0.99: 0.001},
ConstLabels: constLabels,
},
[]string{"route", "type"},
append([]string{"route", "type"}, additionalLabelsKeys...),
)

// ConnectedClients gauge
Expand All @@ -83,7 +92,7 @@ func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
Help: "the number of clients connected right now",
ConstLabels: constLabels,
},
[]string{},
additionalLabelsKeys,
)

p.gaugeReportersMap[CountServers] = prometheus.NewGaugeVec(
Expand All @@ -94,7 +103,7 @@ func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
Help: "the number of discovered servers by service discovery",
ConstLabels: constLabels,
},
[]string{"type"},
append([]string{"type"}, additionalLabelsKeys...),
)

p.gaugeReportersMap[ChannelCapacity] = prometheus.NewGaugeVec(
Expand All @@ -105,7 +114,7 @@ func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
Help: "the available capacity of the channel",
ConstLabels: constLabels,
},
[]string{"channel"},
append([]string{"channel"}, additionalLabelsKeys...),
)

p.gaugeReportersMap[DroppedMessages] = prometheus.NewGaugeVec(
Expand All @@ -116,7 +125,7 @@ func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
Help: "the number of rpc server dropped messages (messages that are not handled)",
ConstLabels: constLabels,
},
[]string{},
additionalLabelsKeys,
)

p.gaugeReportersMap[Goroutines] = prometheus.NewGaugeVec(
Expand All @@ -127,7 +136,7 @@ func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
Help: "the current number of goroutines",
ConstLabels: constLabels,
},
[]string{},
additionalLabelsKeys,
)

p.gaugeReportersMap[HeapSize] = prometheus.NewGaugeVec(
Expand All @@ -138,7 +147,7 @@ func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
Help: "the current heap size",
ConstLabels: constLabels,
},
[]string{},
additionalLabelsKeys,
)

p.gaugeReportersMap[HeapObjects] = prometheus.NewGaugeVec(
Expand All @@ -149,7 +158,7 @@ func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
Help: "the current number of allocated heap objects",
ConstLabels: constLabels,
},
[]string{},
additionalLabelsKeys,
)

toRegister := make([]prometheus.Collector, 0)
Expand All @@ -169,7 +178,11 @@ func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
}

// GetPrometheusReporter gets the prometheus reporter singleton
func GetPrometheusReporter(serverType string, game string, port int, constLabels map[string]string) *PrometheusReporter {
func GetPrometheusReporter(
serverType, game string,
port int,
constLabels, additionalLabels map[string]string,
) *PrometheusReporter {
once.Do(func() {
prometheusReporter = &PrometheusReporter{
serverType: serverType,
Expand All @@ -178,7 +191,7 @@ func GetPrometheusReporter(serverType string, game string, port int, constLabels
summaryReportersMap: make(map[string]*prometheus.SummaryVec),
gaugeReportersMap: make(map[string]*prometheus.GaugeVec),
}
prometheusReporter.registerMetrics(constLabels)
prometheusReporter.registerMetrics(constLabels, additionalLabels)
http.Handle("/metrics", prometheus.Handler())
go (func() {
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
Expand All @@ -191,6 +204,7 @@ func GetPrometheusReporter(serverType string, game string, port int, constLabels
func (p *PrometheusReporter) ReportSummary(metric string, labels map[string]string, value float64) error {
sum := p.summaryReportersMap[metric]
if sum != nil {
labels = p.ensureLabels(labels)
sum.With(labels).Observe(value)
return nil
}
Expand All @@ -201,6 +215,7 @@ func (p *PrometheusReporter) ReportSummary(metric string, labels map[string]stri
func (p *PrometheusReporter) ReportCount(metric string, labels map[string]string, count float64) error {
cnt := p.countReportersMap[metric]
if cnt != nil {
labels = p.ensureLabels(labels)
cnt.With(labels).Add(count)
return nil
}
Expand All @@ -211,8 +226,21 @@ func (p *PrometheusReporter) ReportCount(metric string, labels map[string]string
func (p *PrometheusReporter) ReportGauge(metric string, labels map[string]string, value float64) error {
g := p.gaugeReportersMap[metric]
if g != nil {
labels = p.ensureLabels(labels)
g.With(labels).Set(value)
return nil
}
return constants.ErrMetricNotKnown
}

// ensureLabels check if labels contains the additionalLabels values,
// otherwise adds them with the default values
func (p *PrometheusReporter) ensureLabels(labels map[string]string) map[string]string {
for key, defaultVal := range p.additionalLabels {
if _, ok := labels[key]; !ok {
labels[key] = defaultVal
}
}

return labels
}
8 changes: 4 additions & 4 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package router

import (
"context"
"math/rand"
"time"

Expand All @@ -30,7 +31,6 @@ import (
"github.com/topfreegames/pitaya/logger"
"github.com/topfreegames/pitaya/protos"
"github.com/topfreegames/pitaya/route"
"github.com/topfreegames/pitaya/session"
)

// Router struct
Expand All @@ -41,7 +41,7 @@ type Router struct {

// RoutingFunc defines a routing function
type RoutingFunc func(
session *session.Session,
ctx context.Context,
route *route.Route,
payload []byte,
servers map[string]*cluster.Server,
Expand Down Expand Up @@ -74,9 +74,9 @@ func (r *Router) defaultRoute(

// Route gets the right server to use in the call
func (r *Router) Route(
ctx context.Context,
rpcType protos.RPCType,
svType string,
session *session.Session,
route *route.Route,
msg *message.Message,
) (*cluster.Server, error) {
Expand All @@ -97,7 +97,7 @@ func (r *Router) Route(
server := r.defaultRoute(serversOfType)
return server, nil
}
return routeFunc(session, route, msg.Data, serversOfType)
return routeFunc(ctx, route, msg.Data, serversOfType)
}

// AddRoute adds a routing function to a server type
Expand Down
8 changes: 4 additions & 4 deletions router/router_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package router

import (
"context"
"errors"
"testing"

Expand All @@ -11,7 +12,6 @@ import (
"github.com/topfreegames/pitaya/internal/message"
"github.com/topfreegames/pitaya/protos"
"github.com/topfreegames/pitaya/route"
"github.com/topfreegames/pitaya/session"
)

var (
Expand All @@ -24,7 +24,7 @@ var (
}

routingFunction = func(
session *session.Session,
ctx context.Context,
route *route.Route,
payload []byte,
servers map[string]*cluster.Server,
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestDefaultRoute(t *testing.T) {
func TestRoute(t *testing.T) {
t.Parallel()

session := &session.Session{}
ctx := context.Background()
route := route.NewRoute(serverType, "service", "method")

for name, table := range routerTables {
Expand All @@ -86,7 +86,7 @@ func TestRoute(t *testing.T) {
router.AddRoute(serverType, routingFunction)
router.SetServiceDiscovery(mockServiceDiscovery)

retServer, err := router.Route(table.rpcType, table.serverType, session, route, &message.Message{
retServer, err := router.Route(ctx, table.rpcType, table.serverType, route, &message.Message{
Data: []byte{0x01},
})
assert.Equal(t, table.server, retServer)
Expand Down
1 change: 1 addition & 0 deletions service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func (h *HandlerService) processMessage(a *agent.Agent, msg *message.Message) {
"msg.type": strings.ToLower(msg.Type.String()),
}
ctx = tracing.StartSpan(ctx, msg.Route, tags)
ctx = context.WithValue(ctx, constants.SessionCtxKey, a.Session)

r, err := route.Decode(msg.Route)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion service/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (r *RemoteService) remoteCall(
target := server

if target == nil {
target, err = r.router.Route(rpcType, svType, session, route, msg)
target, err = r.router.Route(ctx, rpcType, svType, route, msg)
if err != nil {
return nil, e.NewError(err, e.ErrInternalCode)
}
Expand Down

0 comments on commit b478a81

Please sign in to comment.