Skip to content

Commit

Permalink
send default tags on reporter (#22)
Browse files Browse the repository at this point in the history
* send default tags on reporter

* fix tests and new ones
  • Loading branch information
henrod authored and felipejfc committed Jun 26, 2018
1 parent 4e4f74c commit 9765843
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 78 deletions.
10 changes: 5 additions & 5 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ func Configure(
app.messageEncoder = message.NewMessagesEncoder(app.config.GetBool("pitaya.handler.messages.compression"))
app.configured = true
app.metricsReporters = make([]metrics.Reporter, 0)
AddMetricsReporter(metrics.GetPrometheusReporter(serverType, app.config.GetString("pitaya.game"), app.config.GetInt("pitaya.metrics.prometheus.port")))

defaultTags := app.config.GetStringMapString("pitaya.metrics.tags")
AddMetricsReporter(metrics.GetPrometheusReporter(serverType, app.config.GetString("pitaya.game"), app.config.GetInt("pitaya.metrics.prometheus.port"), defaultTags))

if app.config.GetBool("pitaya.metrics.statsd.enabled") {
logger.Log.Infof(
"statsd is enabled, configuring the metrics reporter with host: %s",
app.config.Get("pitaya.metrics.statsd.host"))
metricsReporter, err := metrics.NewStatsdReporter(app.config, serverType)
logger.Log.Infof("statsd is enabled, configuring the metrics reporter with host: %s", app.config.Get("pitaya.metrics.statsd.host"))
metricsReporter, err := metrics.NewStatsdReporter(app.config, serverType, defaultTags)
if err != nil {
logger.Log.Errorf("failed to start statds metrics reporter, skipping %v", err)
} else {
Expand Down
4 changes: 3 additions & 1 deletion app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ func TestSetServiceDiscovery(t *testing.T) {
func TestAddMetricsReporter(t *testing.T) {
initApp()
Configure(true, "testtype", Cluster, map[string]string{}, viper.New())
r, err := metrics.NewStatsdReporter(app.config, app.server.Type)
r, err := metrics.NewStatsdReporter(app.config, app.server.Type, map[string]string{
"tag1": "value1",
})
assert.NoError(t, err)
assert.NotNil(t, r)
AddMetricsReporter(r)
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (c *Config) fillDefaultValues() {
"pitaya.metrics.statsd.prefix": "pitaya.",
"pitaya.metrics.statsd.rate": 1,
"pitaya.metrics.prometheus.port": 9090,
"pitaya.metrics.tags": map[string]string{},
}

for param := range defaultsMap {
Expand Down Expand Up @@ -119,3 +120,8 @@ func (c *Config) GetStringSlice(s string) []string {
func (c *Config) Get(s string) interface{} {
return c.config.Get(s)
}

// GetStringMapString returns a string map string from the inner config
func (c *Config) GetStringMapString(s string) map[string]string {
return c.config.GetStringMapString(s)
}
88 changes: 38 additions & 50 deletions metrics/prometheus_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,76 +54,64 @@ type PrometheusReporter struct {
gaugeReportersMap map[string]*prometheus.GaugeVec
}

func (p *PrometheusReporter) registerMetrics() {
func (p *PrometheusReporter) registerMetrics(constLabels map[string]string) {
constLabels["game"] = p.game
constLabels["serverType"] = p.serverType

// HanadlerResponseTimeMs summaary
p.summaryReportersMap[ResponseTime] = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "pitaya",
Subsystem: "handler",
Name: ResponseTime,
Help: "the time to process a msg in nanoseconds",
Objectives: map[float64]float64{0.7: 0.02, 0.95: 0.005, 0.99: 0.001},
ConstLabels: map[string]string{
"game": p.game,
"serverType": p.serverType,
},
Namespace: "pitaya",
Subsystem: "handler",
Name: ResponseTime,
Help: "the time to process a msg in nanoseconds",
Objectives: map[float64]float64{0.7: 0.02, 0.95: 0.005, 0.99: 0.001},
ConstLabels: constLabels,
},
[]string{"route", "status", "type"},
)

// ConnectedClients gauge
p.gaugeReportersMap[ConnectedClients] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pitaya",
Subsystem: "acceptor",
Name: ConnectedClients,
Help: "the number of clients connected right now",
ConstLabels: map[string]string{
"game": p.game,
"serverType": p.serverType,
},
Namespace: "pitaya",
Subsystem: "acceptor",
Name: ConnectedClients,
Help: "the number of clients connected right now",
ConstLabels: constLabels,
},
[]string{},
)

p.gaugeReportersMap[CountServers] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pitaya",
Subsystem: "service_discovery",
Name: CountServers,
Help: "the number of discovered servers by service discovery",
ConstLabels: map[string]string{
"game": p.game,
"serverType": p.serverType,
},
Namespace: "pitaya",
Subsystem: "service_discovery",
Name: CountServers,
Help: "the number of discovered servers by service discovery",
ConstLabels: constLabels,
},
[]string{"type"},
)

p.gaugeReportersMap[ChannelCapacity] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pitaya",
Subsystem: "channel",
Name: ChannelCapacity,
Help: "the available capacity of the channel",
ConstLabels: map[string]string{
"game": p.game,
"serverType": p.serverType,
},
Namespace: "pitaya",
Subsystem: "channel",
Name: ChannelCapacity,
Help: "the available capacity of the channel",
ConstLabels: constLabels,
},
[]string{"channel"},
)

p.gaugeReportersMap[DroppedMessages] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pitaya",
Subsystem: "rpc_server",
Name: DroppedMessages,
Help: "the number of rpc server dropped messages (messages that are not handled)",
ConstLabels: map[string]string{
"game": p.game,
"serverType": p.serverType,
},
Namespace: "pitaya",
Subsystem: "rpc_server",
Name: DroppedMessages,
Help: "the number of rpc server dropped messages (messages that are not handled)",
ConstLabels: constLabels,
},
[]string{},
)
Expand All @@ -145,7 +133,7 @@ func (p *PrometheusReporter) registerMetrics() {
}

// GetPrometheusReporter gets the prometheus reporter singleton
func GetPrometheusReporter(serverType string, game string, port int) *PrometheusReporter {
func GetPrometheusReporter(serverType string, game string, port int, constLabels map[string]string) *PrometheusReporter {
once.Do(func() {
prometheusReporter = &PrometheusReporter{
serverType: serverType,
Expand All @@ -154,7 +142,7 @@ func GetPrometheusReporter(serverType string, game string, port int) *Prometheus
summaryReportersMap: make(map[string]*prometheus.SummaryVec),
gaugeReportersMap: make(map[string]*prometheus.GaugeVec),
}
prometheusReporter.registerMetrics()
prometheusReporter.registerMetrics(constLabels)
http.Handle("/metrics", prometheus.Handler())
go (func() {
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
Expand All @@ -164,30 +152,30 @@ func GetPrometheusReporter(serverType string, game string, port int) *Prometheus
}

// ReportSummary reports a summary metric
func (p *PrometheusReporter) ReportSummary(metric string, tags map[string]string, value float64) error {
func (p *PrometheusReporter) ReportSummary(metric string, labels map[string]string, value float64) error {
sum := p.summaryReportersMap[metric]
if sum != nil {
sum.With(tags).Observe(value)
sum.With(labels).Observe(value)
return nil
}
return constants.ErrMetricNotKnown
}

// ReportCount reports a summary metric
func (p *PrometheusReporter) ReportCount(metric string, tags map[string]string, count float64) error {
func (p *PrometheusReporter) ReportCount(metric string, labels map[string]string, count float64) error {
cnt := p.countReportersMap[metric]
if cnt != nil {
cnt.With(tags).Add(count)
cnt.With(labels).Add(count)
return nil
}
return constants.ErrMetricNotKnown
}

// ReportGauge reports a gauge metric
func (p *PrometheusReporter) ReportGauge(metric string, tags map[string]string, value float64) error {
func (p *PrometheusReporter) ReportGauge(metric string, labels map[string]string, value float64) error {
g := p.gaugeReportersMap[metric]
if g != nil {
g.With(tags).Set(value)
g.With(labels).Set(value)
return nil
}
return constants.ErrMetricNotKnown
Expand Down
48 changes: 31 additions & 17 deletions metrics/statsd_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,20 @@ type Client interface {

// StatsdReporter sends application metrics to statsd
type StatsdReporter struct {
client Client
rate float64
serverType string
hostname string
client Client
rate float64
serverType string
hostname string
defaultTags []string
}

// NewStatsdReporter returns an instance of statsd reportar and an error if something fails
func NewStatsdReporter(config *config.Config, serverType string, clientOrNil ...Client) (*StatsdReporter, error) {
func NewStatsdReporter(
config *config.Config,
serverType string,
tagsMap map[string]string,
clientOrNil ...Client,
) (*StatsdReporter, error) {
host := config.GetString("pitaya.metrics.statsd.host")
prefix := config.GetString("pitaya.metrics.statsd.prefix")
rate, err := strconv.ParseFloat(config.GetString("pitaya.metrics.statsd.rate"), 64)
Expand All @@ -64,6 +70,8 @@ func NewStatsdReporter(config *config.Config, serverType string, clientOrNil ...
hostname: hostname,
}

sr.buildDefaultTags(tagsMap)

if len(clientOrNil) > 0 {
sr.client = clientOrNil[0]
} else {
Expand All @@ -77,12 +85,24 @@ func NewStatsdReporter(config *config.Config, serverType string, clientOrNil ...
return sr, nil
}

func (s *StatsdReporter) buildDefaultTags(tagsMap map[string]string) {
defaultTags := make([]string, len(tagsMap)+2)

defaultTags[0] = fmt.Sprintf("serverType:%s", s.serverType)
defaultTags[1] = fmt.Sprintf("hostname:%s", s.hostname)

idx := 2
for k, v := range tagsMap {
defaultTags[idx] = fmt.Sprintf("%s:%s", k, v)
idx++
}

s.defaultTags = defaultTags
}

// ReportCount sends count reports to statsd
func (s *StatsdReporter) ReportCount(metric string, tagsMap map[string]string, count float64) error {
fullTags := []string{
fmt.Sprintf("serverType:%s", s.serverType),
fmt.Sprintf("hostname:%s", s.hostname),
}
fullTags := s.defaultTags

for k, v := range tagsMap {
fullTags = append(fullTags, fmt.Sprintf("%s:%s", k, v))
Expand All @@ -98,10 +118,7 @@ func (s *StatsdReporter) ReportCount(metric string, tagsMap map[string]string, c

// ReportGauge sents the gauge value and reports to statsd
func (s *StatsdReporter) ReportGauge(metric string, tagsMap map[string]string, value float64) error {
fullTags := []string{
fmt.Sprintf("serverType:%s", s.serverType),
fmt.Sprintf("hostname:%s", s.hostname),
}
fullTags := s.defaultTags

for k, v := range tagsMap {
fullTags = append(fullTags, fmt.Sprintf("%s:%s", k, v))
Expand All @@ -117,10 +134,7 @@ func (s *StatsdReporter) ReportGauge(metric string, tagsMap map[string]string, v

// ReportSummary observes the summary value and reports to statsd
func (s *StatsdReporter) ReportSummary(metric string, tagsMap map[string]string, value float64) error {
fullTags := []string{
fmt.Sprintf("serverType:%s", s.serverType),
fmt.Sprintf("hostname:%s", s.hostname),
}
fullTags := s.defaultTags

for k, v := range tagsMap {
fullTags = append(fullTags, fmt.Sprintf("%s:%s", k, v))
Expand Down
16 changes: 11 additions & 5 deletions metrics/statsd_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestNewStatsdReporter(t *testing.T) {
mockClient := metricsmocks.NewMockClient(ctrl)

cfg := config.NewConfig()
sr, err := NewStatsdReporter(cfg, "svType", mockClient)
sr, err := NewStatsdReporter(cfg, "svType", map[string]string{}, mockClient)
assert.NoError(t, err)
assert.Equal(t, mockClient, sr.client)
assert.Equal(t, float64(cfg.GetInt("pitaya.metrics.statsd.rate")), sr.rate)
Expand All @@ -53,7 +53,9 @@ func TestReportLatency(t *testing.T) {
mockClient := metricsmocks.NewMockClient(ctrl)

cfg := config.NewConfig()
sr, err := NewStatsdReporter(cfg, "svType", mockClient)
sr, err := NewStatsdReporter(cfg, "svType", map[string]string{
"defaultTag": "value",
}, mockClient)
assert.NoError(t, err)

expectedDuration, err := time.ParseDuration("200ms")
Expand All @@ -67,6 +69,7 @@ func TestReportLatency(t *testing.T) {
assert.Contains(t, tags, fmt.Sprintf("type:%s", expectedType))
assert.Contains(t, tags, fmt.Sprintf("status:%s", expectedErrored))
assert.Contains(t, tags, fmt.Sprintf("serverType:%s", sr.serverType))
assert.Contains(t, tags, "defaultTag:value")
})

err = sr.ReportSummary(ResponseTime, map[string]string{
Expand All @@ -84,7 +87,7 @@ func TestReportLatencyError(t *testing.T) {
mockClient := metricsmocks.NewMockClient(ctrl)

cfg := config.NewConfig()
sr, err := NewStatsdReporter(cfg, "svType", mockClient)
sr, err := NewStatsdReporter(cfg, "svType", map[string]string{}, mockClient)
assert.NoError(t, err)

expectedError := errors.New("some error")
Expand All @@ -100,7 +103,9 @@ func TestReportCount(t *testing.T) {
mockClient := metricsmocks.NewMockClient(ctrl)

cfg := config.NewConfig()
sr, err := NewStatsdReporter(cfg, "svType", mockClient)
sr, err := NewStatsdReporter(cfg, "svType", map[string]string{
"defaultTag": "value",
}, mockClient)
assert.NoError(t, err)

expectedCount := 123
Expand All @@ -115,6 +120,7 @@ func TestReportCount(t *testing.T) {
}
assert.Contains(t, tags, fmt.Sprintf("serverType:%s", sr.serverType))
assert.Contains(t, tags, fmt.Sprintf("hostname:%s", sr.hostname))
assert.Contains(t, tags, "defaultTag:value")
})

err = sr.ReportCount(expectedMetric, customTags, float64(expectedCount))
Expand All @@ -127,7 +133,7 @@ func TestReportCountError(t *testing.T) {
mockClient := metricsmocks.NewMockClient(ctrl)

cfg := config.NewConfig()
sr, err := NewStatsdReporter(cfg, "svType", mockClient)
sr, err := NewStatsdReporter(cfg, "svType", map[string]string{}, mockClient)
assert.NoError(t, err)

expectedError := errors.New("some error")
Expand Down

0 comments on commit 9765843

Please sign in to comment.