diff --git a/cmd/feedback_listener.go b/cmd/feedback_listener.go index 966cb40..3c5310d 100644 --- a/cmd/feedback_listener.go +++ b/cmd/feedback_listener.go @@ -45,7 +45,7 @@ func newFeedbackListener( log.Level = logrus.InfoLevel } - return feedback.NewListener(config, log) + return feedback.NewListener(config, log, nil) } // starFeedbackListenerCmd represents the start-feedback-listener command diff --git a/config/default.yaml b/config/default.yaml index ad830a0..8caf6c4 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -42,6 +42,8 @@ feedback: stats: reporters: - statsd + flush: + s: 5 statsd: host: "localhost:8125" prefix: "push" diff --git a/config/test.yaml b/config/test.yaml index 69cf000..bb70261 100644 --- a/config/test.yaml +++ b/config/test.yaml @@ -43,6 +43,8 @@ feedback: stats: reporters: - statsd + flush: + s: 5 statsd: host: "localhost:40001" prefix: "push" diff --git a/extensions/apns_message_handler_test.go b/extensions/apns_message_handler_test.go index 6d1a107..80adb67 100644 --- a/extensions/apns_message_handler_test.go +++ b/extensions/apns_message_handler_test.go @@ -382,7 +382,7 @@ var _ = FDescribe("APNS Message Handler", func() { handler.sendMessage(kafkaMessage) handler.sendMessage(kafkaMessage) - Expect(mockStatsDClient.Count["sent"]).To(Equal(2)) + Expect(mockStatsDClient.Counts["sent"]).To(Equal(int64(2))) }) It("should call HandleNotificationSuccess upon message response received", func() { @@ -396,7 +396,7 @@ var _ = FDescribe("APNS Message Handler", func() { handler.handleAPNSResponse(res) handler.handleAPNSResponse(res) - Expect(mockStatsDClient.Count["ack"]).To(Equal(2)) + Expect(mockStatsDClient.Counts["ack"]).To(Equal(int64(2))) }) It("should call HandleNotificationFailure upon message response received", func() { @@ -411,7 +411,7 @@ var _ = FDescribe("APNS Message Handler", func() { handler.handleAPNSResponse(res) handler.handleAPNSResponse(res) - Expect(mockStatsDClient.Count["failed"]).To(Equal(2)) + Expect(mockStatsDClient.Counts["failed"]).To(Equal(int64(2))) }) }) diff --git a/extensions/datadog_statsd.go b/extensions/datadog_statsd.go index 37532e7..9c38997 100644 --- a/extensions/datadog_statsd.go +++ b/extensions/datadog_statsd.go @@ -122,3 +122,47 @@ func (s *StatsD) Cleanup() error { s.Client.Close() return nil } + +// ReportMetricGauge reports a metric as a Gauge with hostname, game and platform +// as tags +func (s *StatsD) ReportMetricGauge( + metric string, value float64, + game, platform string, +) { + hostname, _ := os.Hostname() + tags := []string{ + fmt.Sprintf("hostname:%s", hostname), + } + + if game != "" { + tags = append(tags, game) + } + + if platform != "" { + tags = append(tags, platform) + } + + s.Client.Gauge(metric, value, tags, 1) +} + +// ReportMetricCount reports a metric as a Count with hostname, game and platform +// as tags +func (s *StatsD) ReportMetricCount( + metric string, value int64, + game, platform string, +) { + hostname, _ := os.Hostname() + tags := []string{ + fmt.Sprintf("hostname:%s", hostname), + } + + if game != "" { + tags = append(tags, game) + } + + if platform != "" { + tags = append(tags, platform) + } + + s.Client.Count(metric, value, tags, 1) +} diff --git a/extensions/datadog_statsd_test.go b/extensions/datadog_statsd_test.go index fbd9435..4b93908 100644 --- a/extensions/datadog_statsd_test.go +++ b/extensions/datadog_statsd_test.go @@ -53,7 +53,7 @@ var _ = Describe("StatsD Extension", func() { statsd.HandleNotificationSent("game", "apns") statsd.HandleNotificationSent("game", "apns") - Expect(mockClient.Count["sent"]).To(Equal(2)) + Expect(mockClient.Counts["sent"]).To(Equal(int64(2))) }) }) @@ -65,7 +65,7 @@ var _ = Describe("StatsD Extension", func() { statsd.HandleNotificationSuccess("game", "apns") statsd.HandleNotificationSuccess("game", "apns") - Expect(mockClient.Count["ack"]).To(Equal(2)) + Expect(mockClient.Counts["ack"]).To(Equal(int64(2))) }) }) @@ -96,7 +96,41 @@ var _ = Describe("StatsD Extension", func() { statsd.HandleNotificationFailure("game", "apns", pErr) statsd.HandleNotificationFailure("game", "apns", pErr) - Expect(mockClient.Count["failed"]).To(Equal(2)) + Expect(mockClient.Counts["failed"]).To(Equal(int64(2))) + }) + }) + + Describe("Reporting metric count", func() { + It("should report metric increment in statsd", func() { + statsd, err := NewStatsD(config, logger, mockClient) + Expect(err).NotTo(HaveOccurred()) + defer statsd.Cleanup() + + statsd.ReportMetricCount("tokens_delete_success", 2, "game", "apns") + statsd.ReportMetricCount("tokens_delete_error", 3, "game", "apns") + + statsd.ReportMetricCount("tokens_delete_success", 3, "game", "apns") + statsd.ReportMetricCount("tokens_delete_error", 0, "game", "apns") + + Expect(mockClient.Counts["tokens_delete_success"]).To(Equal(int64(5))) + Expect(mockClient.Counts["tokens_delete_error"]).To(Equal(int64(3))) + }) + }) + + Describe("Reporting metric gauge", func() { + It("should report metric gauge in statsd", func() { + statsd, err := NewStatsD(config, logger, mockClient) + Expect(err).NotTo(HaveOccurred()) + defer statsd.Cleanup() + + statsd.ReportMetricGauge("in_chan_size", 10, "game", "apns") + Expect(mockClient.Gauges["in_chan_size"]).To(Equal(float64(10))) + + statsd.ReportMetricGauge("in_chan_size", 0, "game", "apns") + Expect(mockClient.Gauges["in_chan_size"]).To(Equal(float64(0))) + + statsd.ReportMetricGauge("in_chan_size", 3, "game", "apns") + Expect(mockClient.Gauges["in_chan_size"]).To(Equal(float64(3))) }) }) }) diff --git a/extensions/gcm_message_handler_test.go b/extensions/gcm_message_handler_test.go index 8ebb874..28afac3 100644 --- a/extensions/gcm_message_handler_test.go +++ b/extensions/gcm_message_handler_test.go @@ -32,7 +32,7 @@ import ( uuid "github.com/satori/go.uuid" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" - "github.com/topfreegames/go-gcm" + gcm "github.com/topfreegames/go-gcm" "github.com/topfreegames/pusher/interfaces" "github.com/topfreegames/pusher/mocks" . "github.com/topfreegames/pusher/testing" @@ -214,9 +214,9 @@ var _ = Describe("GCM Message Handler", func() { gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, }, metadata, makeTimestamp() + int64(1000000), @@ -245,9 +245,9 @@ var _ = Describe("GCM Message Handler", func() { gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, }, metadata, makeTimestamp() - int64(100), @@ -285,9 +285,9 @@ var _ = Describe("GCM Message Handler", func() { msg := &gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, } msgBytes, err := json.Marshal(msg) Expect(err).NotTo(HaveOccurred()) @@ -315,9 +315,9 @@ var _ = Describe("GCM Message Handler", func() { gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, }, metadata, makeTimestamp() + int64(1000000), @@ -341,9 +341,9 @@ var _ = Describe("GCM Message Handler", func() { msg := &gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, } msgBytes, err := json.Marshal(msg) Expect(err).NotTo(HaveOccurred()) @@ -460,9 +460,9 @@ var _ = Describe("GCM Message Handler", func() { msg := &gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, } msgBytes, err := json.Marshal(msg) Expect(err).NotTo(HaveOccurred()) @@ -476,14 +476,14 @@ var _ = Describe("GCM Message Handler", func() { err = handler.sendMessage(kafkaMessage) Expect(err).NotTo(HaveOccurred()) - Expect(mockStatsDClient.Count["sent"]).To(Equal(2)) + Expect(mockStatsDClient.Counts["sent"]).To(Equal(int64(2))) }) It("should call HandleNotificationSuccess upon message response received", func() { res := gcm.CCSMessage{} handler.handleGCMResponse(res) handler.handleGCMResponse(res) - Expect(mockStatsDClient.Count["ack"]).To(Equal(2)) + Expect(mockStatsDClient.Counts["ack"]).To(Equal(int64(2))) }) It("should call HandleNotificationFailure upon message response received", func() { @@ -493,7 +493,7 @@ var _ = Describe("GCM Message Handler", func() { handler.handleGCMResponse(res) handler.handleGCMResponse(res) - Expect(mockStatsDClient.Count["failed"]).To(Equal(2)) + Expect(mockStatsDClient.Counts["failed"]).To(Equal(int64(2))) }) }) diff --git a/feedback/broker.go b/feedback/broker.go index 3084992..8d9ffe6 100644 --- a/feedback/broker.go +++ b/feedback/broker.go @@ -27,6 +27,7 @@ import ( "sync" gcm "github.com/topfreegames/go-gcm" + "github.com/topfreegames/pusher/interfaces" "github.com/topfreegames/pusher/structs" "github.com/sideshow/apns2" @@ -53,6 +54,7 @@ type Message struct { type Broker struct { Logger *log.Logger Config *viper.Viper + StatsReporters []interfaces.StatsReporter InChan chan QueueMessage pendingMessagesWG *sync.WaitGroup InvalidTokenOutChan chan *InvalidToken @@ -63,13 +65,14 @@ type Broker struct { // NewBroker creates a new Broker instance func NewBroker( - logger *log.Logger, cfg *viper.Viper, + logger *log.Logger, cfg *viper.Viper, statsReporters []interfaces.StatsReporter, inChan chan QueueMessage, pendingMessagesWG *sync.WaitGroup, ) (*Broker, error) { b := &Broker{ Logger: logger, Config: cfg, + StatsReporters: statsReporters, InChan: inChan, pendingMessagesWG: pendingMessagesWG, stopChannel: make(chan struct{}), diff --git a/feedback/broker_test.go b/feedback/broker_test.go index 53da4bd..54daafe 100644 --- a/feedback/broker_test.go +++ b/feedback/broker_test.go @@ -59,7 +59,7 @@ var _ = Describe("Broker", func() { Describe("[Unit]", func() { It("Should start and stop correctly", func() { - broker, err := NewBroker(logger, config, inChan, nil) + broker, err := NewBroker(logger, config, nil, inChan, nil) Expect(err).NotTo(HaveOccurred()) broker.Start() @@ -95,7 +95,7 @@ var _ = Describe("Broker", func() { }) It("Should route an invalid token feedback", func() { - broker, err := NewBroker(logger, config, inChan, nil) + broker, err := NewBroker(logger, config, nil, inChan, nil) Expect(err).NotTo(HaveOccurred()) broker.Start() @@ -140,7 +140,7 @@ var _ = Describe("Broker", func() { }) It("Should route an invalid token feedback from GCM", func() { - broker, err := NewBroker(logger, config, inChan, nil) + broker, err := NewBroker(logger, config, nil, inChan, nil) Expect(err).NotTo(HaveOccurred()) broker.Start() diff --git a/feedback/common.go b/feedback/common.go new file mode 100644 index 0000000..9babc3e --- /dev/null +++ b/feedback/common.go @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2019 TFG Co + * Author: TFG Co + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package feedback + +import ( + "fmt" + + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "github.com/topfreegames/pusher/extensions" + "github.com/topfreegames/pusher/interfaces" +) + +type statsReporterInitializer func(*viper.Viper, *logrus.Logger, interfaces.StatsDClient) (interfaces.StatsReporter, error) + +//AvailableStatsReporters contains functions to initialize all stats reporters +var AvailableStatsReporters = map[string]statsReporterInitializer{ + "statsd": func(config *viper.Viper, logger *logrus.Logger, clientOrNil interfaces.StatsDClient) (interfaces.StatsReporter, error) { + return extensions.NewStatsD(config, logger, clientOrNil) + }, +} + +func configureStatsReporters( + config *viper.Viper, logger *logrus.Logger, + clientOrNil interfaces.StatsDClient, +) ([]interfaces.StatsReporter, error) { + reporters := []interfaces.StatsReporter{} + reporterNames := config.GetStringSlice("stats.reporters") + for _, reporterName := range reporterNames { + reporterFunc, ok := AvailableStatsReporters[reporterName] + if !ok { + return nil, fmt.Errorf("failed to initialize %s. Stats Reporter not available", reporterName) + } + + r, err := reporterFunc(config, logger, clientOrNil) + if err != nil { + return nil, fmt.Errorf("failed to initialize %s. %s", reporterName, err.Error()) + } + reporters = append(reporters, r) + } + + return reporters, nil +} + +func statsReporterReportMetricCount( + statsReporters []interfaces.StatsReporter, + metric string, value int64, game string, platform string, +) { + for _, statsReporter := range statsReporters { + statsReporter.ReportMetricCount(metric, value, game, platform) + } +} + +func statsReporterReportMetricGauge( + statsReporters []interfaces.StatsReporter, + metric string, value float64, game string, platform string, +) { + for _, statsReporter := range statsReporters { + statsReporter.ReportMetricGauge(metric, value, game, platform) + } +} diff --git a/feedback/invalid_token.go b/feedback/invalid_token.go index 6c9ad96..4c406e6 100644 --- a/feedback/invalid_token.go +++ b/feedback/invalid_token.go @@ -35,6 +35,13 @@ import ( "github.com/topfreegames/pusher/util" ) +// Metrics name sent by the Handler +const ( + MetricsTokensDeleteSuccess = "tokens_delete_success" + MetricsTokensDeleteError = "tokens_delete_error" + MetricsTokensDeleteNonexistent = "tokens_delete_nonexistent" +) + // InvalidToken represents a token with the necessary information to be deleted type InvalidToken struct { Token string @@ -46,9 +53,10 @@ type InvalidToken struct { // When the buffer is full or after a timeout, it is flushed, triggering the deletion // of the tokens from the database type InvalidTokenHandler struct { - Logger *log.Logger - Config *viper.Viper - Client *extensions.PGClient + Logger *log.Logger + Config *viper.Viper + StatsReporter []interfaces.StatsReporter + Client *extensions.PGClient flushTime time.Duration @@ -62,16 +70,16 @@ type InvalidTokenHandler struct { // NewInvalidTokenHandler returns a new InvalidTokenHandler instance func NewInvalidTokenHandler( - logger *log.Logger, cfg *viper.Viper, + logger *log.Logger, cfg *viper.Viper, statsReporter []interfaces.StatsReporter, inChan chan *InvalidToken, dbOrNil ...interfaces.DB, - ) (*InvalidTokenHandler, error) { h := &InvalidTokenHandler{ - Logger: logger, - Config: cfg, - InChan: inChan, - stopChan: make(chan bool), + Logger: logger, + Config: cfg, + StatsReporter: statsReporter, + InChan: inChan, + stopChan: make(chan bool), } var db interfaces.DB @@ -213,7 +221,7 @@ func (i *InvalidTokenHandler) deleteTokensFromGame(tokens []string, game, platfo query := queryBuild.String() l.Debug("deleting tokens") - _, err := i.Client.DB.Exec(query, params...) + res, err := i.Client.DB.Exec(query, params...) if err != nil && err.Error() != "pg: no rows in result set" { raven.CaptureError(err, map[string]string{ "version": util.Version, @@ -221,8 +229,33 @@ func (i *InvalidTokenHandler) deleteTokensFromGame(tokens []string, game, platfo }) l.WithError(err).Error("error deleting tokens") + statsReporterReportMetricCount(i.StatsReporter, + MetricsTokensDeleteError, int64(len(tokens)), game, platform) + return err } + if err != nil && err.Error() == "pg: no rows in result set" { + statsReporterReportMetricCount(i.StatsReporter, + MetricsTokensDeleteNonexistent, int64(len(tokens)), + game, platform) + + return nil + } + + if res.RowsAffected() != len(tokens) { + statsReporterReportMetricCount(i.StatsReporter, + MetricsTokensDeleteNonexistent, int64(len(tokens)-res.RowsAffected()), + game, platform) + + statsReporterReportMetricCount(i.StatsReporter, + MetricsTokensDeleteSuccess, int64(res.RowsAffected()), game, platform) + + return nil + } + + statsReporterReportMetricCount(i.StatsReporter, + MetricsTokensDeleteSuccess, int64(len(tokens)), game, platform) + return nil } diff --git a/feedback/invalid_token_test.go b/feedback/invalid_token_test.go index f3c2005..96c2cc8 100644 --- a/feedback/invalid_token_test.go +++ b/feedback/invalid_token_test.go @@ -31,6 +31,8 @@ import ( "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" + "github.com/topfreegames/pusher/extensions" + "github.com/topfreegames/pusher/interfaces" "github.com/topfreegames/pusher/mocks" "github.com/topfreegames/pusher/testing" "github.com/topfreegames/pusher/util" @@ -38,6 +40,8 @@ import ( var _ = Describe("InvalidToken Handler", func() { var config *viper.Viper + var mockStatsDClient *mocks.StatsDClientMock + var statsReporters []interfaces.StatsReporter var err error configFile := "../config/test.yaml" @@ -54,7 +58,13 @@ var _ = Describe("InvalidToken Handler", func() { mockClient := mocks.NewPGMock(0, 1) inChan := make(chan *InvalidToken, 100) - handler, err := NewInvalidTokenHandler(logger, config, inChan, mockClient) + mockStatsDClient = mocks.NewStatsDClientMock() + c, err := extensions.NewStatsD(config, logger, mockStatsDClient) + Expect(err).NotTo(HaveOccurred()) + + statsReporters = []interfaces.StatsReporter{c} + + handler, err := NewInvalidTokenHandler(logger, config, statsReporters, inChan, mockClient) Expect(err).NotTo(HaveOccurred()) Expect(handler).NotTo(BeNil()) }) @@ -79,17 +89,26 @@ var _ = Describe("InvalidToken Handler", func() { It("Should flush because buffer is full", func() { logger, hook := test.NewNullLogger() + logger.Level = logrus.DebugLevel + mockClient := mocks.NewPGMock(0, 1) inChan := make(chan *InvalidToken, 100) + mockStatsDClient = mocks.NewStatsDClientMock() + c, err := extensions.NewStatsD(config, logger, mockStatsDClient) + Expect(err).NotTo(HaveOccurred()) + + statsReporters = []interfaces.StatsReporter{c} + config.Set("feedbackListeners.invalidToken.flush.time.ms", 1000) config.Set("feedbackListeners.invalidToken.buffer.size", 2) - logger.Level = logrus.DebugLevel - handler, err := NewInvalidTokenHandler(logger, config, inChan, mockClient) + handler, err := NewInvalidTokenHandler(logger, config, statsReporters, inChan, mockClient) Expect(err).NotTo(HaveOccurred()) Expect(handler).NotTo(BeNil()) + mockClient.RowsAffected = 2 + mockClient.RowsReturned = 0 handler.Start() for _, t := range tokens { inChan <- t @@ -97,21 +116,42 @@ var _ = Describe("InvalidToken Handler", func() { Eventually(func() []*logrus.Entry { return hook.Entries }). Should(testing.ContainLogMessage("buffer is full")) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteSuccess] + }).Should(BeEquivalentTo(2)) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteError] + }).Should(BeEquivalentTo(0)) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteNonexistent] + }).Should(BeEquivalentTo(0)) }) It("Should flush because reached flush timeout", func() { logger, hook := test.NewNullLogger() + logger.Level = logrus.DebugLevel + mockClient := mocks.NewPGMock(0, 1) inChan := make(chan *InvalidToken, 100) config.Set("feedbackListeners.invalidToken.flush.time.ms", 1) config.Set("feedbackListeners.invalidToken.buffer.size", 200) - logger.Level = logrus.DebugLevel - handler, err := NewInvalidTokenHandler(logger, config, inChan, mockClient) + mockStatsDClient = mocks.NewStatsDClientMock() + c, err := extensions.NewStatsD(config, logger, mockStatsDClient) + Expect(err).NotTo(HaveOccurred()) + + statsReporters = []interfaces.StatsReporter{c} + + handler, err := NewInvalidTokenHandler(logger, config, statsReporters, inChan, mockClient) Expect(err).NotTo(HaveOccurred()) Expect(handler).NotTo(BeNil()) + mockClient.RowsAffected = 2 + mockClient.RowsReturned = 0 handler.Start() for _, t := range tokens { inChan <- t @@ -119,6 +159,18 @@ var _ = Describe("InvalidToken Handler", func() { Eventually(func() []*logrus.Entry { return hook.Entries }). Should(testing.ContainLogMessage("flush ticker")) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteSuccess] + }).Should(BeEquivalentTo(2)) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteError] + }).Should(BeEquivalentTo(0)) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteNonexistent] + }).Should(BeEquivalentTo(0)) }) }) @@ -131,7 +183,13 @@ var _ = Describe("InvalidToken Handler", func() { config.Set("feedbackListeners.invalidToken.flush.time.ms", 10000) config.Set("feedbackListeners.invalidToken.buffer.size", 6) - handler, err := NewInvalidTokenHandler(logger, config, inChan, mockClient) + mockStatsDClient = mocks.NewStatsDClientMock() + c, err := extensions.NewStatsD(config, logger, mockStatsDClient) + Expect(err).NotTo(HaveOccurred()) + + statsReporters = []interfaces.StatsReporter{c} + + handler, err := NewInvalidTokenHandler(logger, config, statsReporters, inChan, mockClient) Expect(err).NotTo(HaveOccurred()) Expect(handler).NotTo(BeNil()) @@ -214,7 +272,15 @@ var _ = Describe("InvalidToken Handler", func() { mockClient := mocks.NewPGMock(0, 1) inChan := make(chan *InvalidToken, 100) - handler, err := NewInvalidTokenHandler(logger, config, inChan, mockClient) + mockStatsDClient = mocks.NewStatsDClientMock() + c, err := extensions.NewStatsD(config, logger, mockStatsDClient) + Expect(err).NotTo(HaveOccurred()) + + statsReporters = []interfaces.StatsReporter{c} + + config.Set("feedbackListeners.invalidToken.buffer.size", 1) + + handler, err := NewInvalidTokenHandler(logger, config, statsReporters, inChan, mockClient) Expect(err).NotTo(HaveOccurred()) Expect(handler).NotTo(BeNil()) @@ -223,6 +289,8 @@ var _ = Describe("InvalidToken Handler", func() { time.Sleep(10 * time.Millisecond) } mockClient.Error = fmt.Errorf("pg: no rows in result set") + mockClient.RowsAffected = 0 + mockClient.RowsReturned = 0 handler.Start() inChan <- &InvalidToken{ @@ -232,6 +300,18 @@ var _ = Describe("InvalidToken Handler", func() { } Consistently(func() []*logrus.Entry { return hook.Entries }). ShouldNot(testing.ContainLogMessage("error deleting tokens")) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteSuccess] + }).Should(BeEquivalentTo(0)) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteError] + }).Should(BeEquivalentTo(0)) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteNonexistent] + }).Should(BeEquivalentTo(1)) }) It("should not break if a pg error occurred", func() { @@ -239,7 +319,15 @@ var _ = Describe("InvalidToken Handler", func() { mockClient := mocks.NewPGMock(0, 1) inChan := make(chan *InvalidToken, 100) - handler, err := NewInvalidTokenHandler(logger, config, inChan, mockClient) + mockStatsDClient = mocks.NewStatsDClientMock() + c, err := extensions.NewStatsD(config, logger, mockStatsDClient) + Expect(err).NotTo(HaveOccurred()) + + statsReporters = []interfaces.StatsReporter{c} + + config.Set("feedbackListeners.invalidToken.buffer.size", 1) + + handler, err := NewInvalidTokenHandler(logger, config, statsReporters, inChan, mockClient) Expect(err).NotTo(HaveOccurred()) Expect(handler).NotTo(BeNil()) handler.bufferSize = 1 @@ -266,6 +354,9 @@ var _ = Describe("InvalidToken Handler", func() { }).Should(testing.ContainLogMessage("error deleting tokens")) mockClient.Error = nil + mockClient.RowsAffected = 1 + mockClient.RowsReturned = 0 + inChan <- &InvalidToken{ Token: "BBBBBBBBBB", Game: "sniper", @@ -288,6 +379,18 @@ var _ = Describe("InvalidToken Handler", func() { } return nil }).Should(BeEquivalentTo(expTokens)) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteSuccess] + }).Should(BeEquivalentTo(1)) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteError] + }).Should(BeEquivalentTo(1)) + + Eventually(func() int64 { + return mockStatsDClient.Counts[MetricsTokensDeleteNonexistent] + }).Should(BeEquivalentTo(0)) }) }) }) diff --git a/feedback/listener.go b/feedback/listener.go index d9af5a3..e969432 100644 --- a/feedback/listener.go +++ b/feedback/listener.go @@ -32,30 +32,38 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/viper" + "github.com/topfreegames/pusher/interfaces" ) // Listener will consume push feedbacks from a queue and use a broker to route // the messages to a convenient handler type Listener struct { - Config *viper.Viper - Logger *log.Logger + Config *viper.Viper + Logger *log.Logger + StatsReporters []interfaces.StatsReporter + statsFlushTime time.Duration + Queue Queue Broker *Broker InvalidTokenHandler *InvalidTokenHandler GracefulShutdownTimeout int - run bool - stopChannel chan struct{} + + run bool + stopChannel chan struct{} } // NewListener creates and return a new Listener instance -func NewListener(config *viper.Viper, logger *log.Logger) (*Listener, error) { +func NewListener( + config *viper.Viper, logger *log.Logger, + statsdClientOrNil interfaces.StatsDClient, +) (*Listener, error) { l := &Listener{ Config: config, Logger: logger, stopChannel: make(chan struct{}), } - err := l.configure() + err := l.configure(statsdClientOrNil) if err != nil { return nil, err } @@ -65,12 +73,17 @@ func NewListener(config *viper.Viper, logger *log.Logger) (*Listener, error) { func (l *Listener) loadConfigurationDefaults() { l.Config.SetDefault("feedbackListeners.gracefulShutdownTimeout", 1) + l.Config.SetDefault("stats.flush.s", 5) } -func (l *Listener) configure() error { +func (l *Listener) configure(statsdClientrOrNil interfaces.StatsDClient) error { l.loadConfigurationDefaults() l.GracefulShutdownTimeout = l.Config.GetInt("feedbackListeners.gracefulShutdownTimeout") + if err := l.configureStatsReporters(statsdClientrOrNil); err != nil { + return fmt.Errorf("error configuring statsReporters") + } + q, err := NewKafkaConsumer( l.Config, l.Logger, &l.stopChannel, nil, @@ -80,13 +93,13 @@ func (l *Listener) configure() error { } l.Queue = q - broker, err := NewBroker(l.Logger, l.Config, q.MessagesChannel(), l.Queue.PendingMessagesWaitGroup()) + broker, err := NewBroker(l.Logger, l.Config, l.StatsReporters, q.MessagesChannel(), l.Queue.PendingMessagesWaitGroup()) if err != nil { return fmt.Errorf("error creating new broker: %s", err.Error()) } l.Broker = broker - handler, err := NewInvalidTokenHandler(l.Logger, l.Config, l.Broker.InvalidTokenOutChan) + handler, err := NewInvalidTokenHandler(l.Logger, l.Config, l.StatsReporters, l.Broker.InvalidTokenOutChan) if err != nil { return fmt.Errorf("error creating new invalid token handler: %s", err.Error()) } @@ -95,6 +108,17 @@ func (l *Listener) configure() error { return nil } +func (l *Listener) configureStatsReporters(clientOrNil interfaces.StatsDClient) error { + reporters, err := configureStatsReporters(l.Config, l.Logger, clientOrNil) + if err != nil { + return err + } + l.StatsReporters = reporters + l.statsFlushTime = time.Duration(l.Config.GetInt("stats.flush.s")) * time.Second + + return nil +} + // Start starts the listener func (l *Listener) Start() { l.run = true @@ -109,6 +133,10 @@ func (l *Listener) Start() { sigchan := make(chan os.Signal) signal.Notify(sigchan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + flushTicker := time.NewTicker(l.statsFlushTime) + defer flushTicker.Stop() + for l.run == true { select { case sig := <-sigchan: @@ -117,15 +145,29 @@ func (l *Listener) Start() { case <-l.stopChannel: log.Warn("Stop channel closed\n") l.run = false + case <-flushTicker.C: + l.flushStats() } } - l.Stop() + l.Cleanup() } -// Stop stops the execution of the Listener -func (l *Listener) Stop() { - l.run = false +func (l *Listener) flushStats() { + statsReporterReportMetricGauge(l.StatsReporters, + "queue_out_channel", float64(len(l.Queue.MessagesChannel())), "", "") + statsReporterReportMetricGauge(l.StatsReporters, + "broker_in_channel", float64(len(l.Broker.InChan)), "", "") + statsReporterReportMetricGauge(l.StatsReporters, + "broker_invalid_token_channel", float64(len(l.Broker.InvalidTokenOutChan)), "", "") + statsReporterReportMetricGauge(l.StatsReporters, + "invalid_token_handler_buffer", float64(len(l.InvalidTokenHandler.Buffer)), "", "") +} + +// Cleanup ends the Listener execution +func (l *Listener) Cleanup() { + l.flushStats() + l.Queue.StopConsuming() l.Queue.Cleanup() l.Broker.Stop() diff --git a/feedback/listener_test.go b/feedback/listener_test.go index 51cac9c..089746b 100644 --- a/feedback/listener_test.go +++ b/feedback/listener_test.go @@ -123,7 +123,7 @@ var _ = Describe("Feedback Listener", func() { }) It("should return a configured listener", func() { - listener, err := NewListener(config, logger) + listener, err := NewListener(config, logger, nil) Expect(err).NotTo(HaveOccurred()) Expect(listener).NotTo(BeNil()) Expect(listener.Queue).NotTo(BeNil()) @@ -169,7 +169,7 @@ var _ = Describe("Feedback Listener", func() { config.Set("feedbackListeners.queue.group", fmt.Sprintf("group-%s", uuid.NewV4().String())) - listener, err := NewListener(config, logger) + listener, err := NewListener(config, logger, nil) Expect(err).NotTo(HaveOccurred()) Expect(listener).NotTo(BeNil()) Expect(listener.Queue).NotTo(BeNil()) @@ -224,7 +224,7 @@ var _ = Describe("Feedback Listener", func() { return res.RowsReturned() }, 15*time.Second).Should(Equal(0)) - listener.Stop() + close(listener.stopChannel) }) It("should delete a batch of tokens from a single game", func() { @@ -233,7 +233,7 @@ var _ = Describe("Feedback Listener", func() { config.Set("feedbackListeners.queue.group", fmt.Sprintf("group-%s", uuid.NewV4().String())) - listener, err := NewListener(config, logger) + listener, err := NewListener(config, logger, nil) Expect(err).NotTo(HaveOccurred()) Expect(listener).NotTo(BeNil()) Expect(listener.Queue).NotTo(BeNil()) @@ -298,7 +298,7 @@ var _ = Describe("Feedback Listener", func() { }, 15*time.Second).Should(Equal(0)) } - listener.Stop() + close(listener.stopChannel) }) It("should delete a batch of tokens from different games", func() { @@ -307,7 +307,7 @@ var _ = Describe("Feedback Listener", func() { config.Set("feedbackListeners.queue.group", fmt.Sprintf("group-%s", uuid.NewV4().String())) - listener, err := NewListener(config, logger) + listener, err := NewListener(config, logger, nil) Expect(err).NotTo(HaveOccurred()) Expect(listener).NotTo(BeNil()) Expect(listener.Queue).NotTo(BeNil()) @@ -382,7 +382,7 @@ var _ = Describe("Feedback Listener", func() { } } - listener.Stop() + close(listener.stopChannel) }) }) @@ -425,7 +425,7 @@ var _ = Describe("Feedback Listener", func() { config.Set("feedbackListeners.queue.group", fmt.Sprintf("group-%s", uuid.NewV4().String())) - listener, err := NewListener(config, logger) + listener, err := NewListener(config, logger, nil) Expect(err).NotTo(HaveOccurred()) Expect(listener).NotTo(BeNil()) Expect(listener.Queue).NotTo(BeNil()) @@ -480,7 +480,7 @@ var _ = Describe("Feedback Listener", func() { return res.RowsReturned() }, 15*time.Second).Should(Equal(0)) - listener.Stop() + close(listener.stopChannel) }) It("should delete a batch of tokens from a single game", func() { @@ -489,7 +489,7 @@ var _ = Describe("Feedback Listener", func() { config.Set("feedbackListeners.queue.group", fmt.Sprintf("group-%s", uuid.NewV4().String())) - listener, err := NewListener(config, logger) + listener, err := NewListener(config, logger, nil) Expect(err).NotTo(HaveOccurred()) Expect(listener).NotTo(BeNil()) Expect(listener.Queue).NotTo(BeNil()) @@ -553,7 +553,7 @@ var _ = Describe("Feedback Listener", func() { }, 15*time.Second).Should(Equal(0)) } - listener.Stop() + close(listener.stopChannel) }) It("should delete a batch of tokens from different games", func() { @@ -562,7 +562,7 @@ var _ = Describe("Feedback Listener", func() { config.Set("feedbackListeners.queue.group", fmt.Sprintf("group-%s", uuid.NewV4().String())) - listener, err := NewListener(config, logger) + listener, err := NewListener(config, logger, nil) Expect(err).NotTo(HaveOccurred()) Expect(listener).NotTo(BeNil()) Expect(listener.Queue).NotTo(BeNil()) @@ -636,7 +636,8 @@ var _ = Describe("Feedback Listener", func() { }, 30*time.Second).Should(Equal(0)) } } - listener.Stop() + + close(listener.stopChannel) }) }) }) diff --git a/interfaces/stats_reporter.go b/interfaces/stats_reporter.go index 210f76a..947bfb7 100644 --- a/interfaces/stats_reporter.go +++ b/interfaces/stats_reporter.go @@ -30,4 +30,6 @@ type StatsReporter interface { HandleNotificationSuccess(game string, platform string) HandleNotificationFailure(game string, platform string, err *errors.PushError) ReportGoStats(numGoRoutines int, allocatedAndNotFreed, heapObjects, nextGCBytes, pauseGCNano uint64) + ReportMetricGauge(metric string, value float64, game string, platform string) + ReportMetricCount(metric string, value int64, game string, platform string) } diff --git a/interfaces/statsd.go b/interfaces/statsd.go index a98541d..b998e11 100644 --- a/interfaces/statsd.go +++ b/interfaces/statsd.go @@ -30,6 +30,7 @@ import ( type StatsDClient interface { Incr(string, []string, float64) error Gauge(string, float64, []string, float64) error + Count(string, int64, []string, float64) error Timing(string, time.Duration, []string, float64) error Close() error } diff --git a/mocks/pg.go b/mocks/pg.go index 14a0796..5f60b73 100644 --- a/mocks/pg.go +++ b/mocks/pg.go @@ -54,7 +54,7 @@ func NewPGMock(rowsAffected, rowsReturned int, errOrNil ...error) *PGMock { } func (m *PGMock) getResult() *types.Result { - return types.NewResult([]byte(fmt.Sprintf(" %d", m.RowsAffected)), m.RowsReturned) + return types.NewResult([]byte(fmt.Sprintf(" %d\000", m.RowsAffected)), m.RowsReturned) } //Close records that it is closed diff --git a/mocks/statsd.go b/mocks/statsd.go index 4b002a4..ca77da5 100644 --- a/mocks/statsd.go +++ b/mocks/statsd.go @@ -29,7 +29,7 @@ import ( //StatsDClientMock should be used for tests that need to send xmpp messages to StatsD type StatsDClientMock struct { - Count map[string]int + Counts map[string]int64 Gauges map[string]interface{} Timings map[string]interface{} Closed bool @@ -39,7 +39,7 @@ type StatsDClientMock struct { func NewStatsDClientMock() *StatsDClientMock { return &StatsDClientMock{ Closed: false, - Count: map[string]int{}, + Counts: map[string]int64{}, Gauges: map[string]interface{}{}, Timings: map[string]interface{}{}, } @@ -50,7 +50,15 @@ var mutexCount, mutexGauges, mutexTimings sync.Mutex //Incr stores the new count in a map func (m *StatsDClientMock) Incr(bucket string, tags []string, rate float64) error { mutexCount.Lock() - m.Count[bucket]++ + m.Counts[bucket]++ + mutexCount.Unlock() + return nil +} + +// Count increases a metric value by the given value +func (m *StatsDClientMock) Count(bucket string, value int64, tags []string, rate float64) error { + mutexCount.Lock() + m.Counts[bucket] += value mutexCount.Unlock() return nil }