From e309a3e4422e33c0c574cdb8555080586b337348 Mon Sep 17 00:00:00 2001 From: Moises Silva Date: Tue, 5 Feb 2019 21:01:29 -0200 Subject: [PATCH 1/4] asynchronous invalidTokenHandlers --- Gopkg.lock | 27 ------- config/default.yaml | 1 + config/test.yaml | 1 + extensions/apns_message_handler_test.go | 6 +- extensions/common.go | 35 ++++++++ extensions/common_test.go | 55 +++++++++++-- extensions/gcm_message_handler_test.go | 44 +++++----- extensions/token_pg.go | 102 ++++++++++++++++++++++-- extensions/token_pg_test.go | 92 ++++++++++++++++++--- interfaces/invalid_token_handler.go | 2 + pusher/apns.go | 1 + pusher/gcm.go | 1 + 12 files changed, 295 insertions(+), 72 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 73f6097..adec1ea 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -311,24 +311,6 @@ revision = "31e6666875e2bf5aed58ae88c7c79f21a8188913" version = "v1.3" -[[projects]] - digest = "1:1aa50e504ff54351992a2324a4aba36f8eaa9ff34308ffa6a610a8dc188332b5" - name = "github.com/topfreegames/pusher" - packages = [ - "cmd", - "errors", - "extensions", - "interfaces", - "mocks", - "pusher", - "structs", - "testing", - "util", - ] - pruneopts = "" - revision = "7239579ceed0c5d4ae13af66a5e1ece9e8ccf5bd" - version = "3.4.2" - [[projects]] branch = "master" digest = "1:1710f8497e287491a719f21eb51b1068b224c63d31084b3d7121c6d65bc3bebf" @@ -439,15 +421,6 @@ "github.com/spf13/cobra", "github.com/spf13/viper", "github.com/topfreegames/go-gcm", - "github.com/topfreegames/pusher/cmd", - "github.com/topfreegames/pusher/errors", - "github.com/topfreegames/pusher/extensions", - "github.com/topfreegames/pusher/interfaces", - "github.com/topfreegames/pusher/mocks", - "github.com/topfreegames/pusher/pusher", - "github.com/topfreegames/pusher/structs", - "github.com/topfreegames/pusher/testing", - "github.com/topfreegames/pusher/util", "golang.org/x/crypto/pkcs12", "gopkg.in/pg.v5", "gopkg.in/pg.v5/types", diff --git a/config/default.yaml b/config/default.yaml index 7ed2fa3..dbdcb13 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -57,3 +57,4 @@ invalidToken: maxRetries: 3 database: push connectionTimeout: 100 + chanSize: 1000 diff --git a/config/test.yaml b/config/test.yaml index 0f9fc17..3c03315 100644 --- a/config/test.yaml +++ b/config/test.yaml @@ -59,3 +59,4 @@ invalidToken: maxRetries: 3 database: push connectionTimeout: 100 + chanSize: 1000 diff --git a/extensions/apns_message_handler_test.go b/extensions/apns_message_handler_test.go index 6d1a107..ba76686 100644 --- a/extensions/apns_message_handler_test.go +++ b/extensions/apns_message_handler_test.go @@ -133,7 +133,8 @@ var _ = FDescribe("APNS Message Handler", func() { handler.handleAPNSResponse(res) Expect(handler.responsesReceived).To(Equal(int64(1))) Expect(handler.failuresReceived).To(Equal(int64(1))) - Expect(hook.Entries).To(ContainLogMessage("deleting token")) + Eventually(func() []*logrus.Entry { return hook.Entries }). + Should(ContainLogMessage("deleting token")) //Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError")) }) @@ -146,7 +147,8 @@ var _ = FDescribe("APNS Message Handler", func() { handler.handleAPNSResponse(res) Expect(handler.responsesReceived).To(Equal(int64(1))) Expect(handler.failuresReceived).To(Equal(int64(1))) - Expect(hook.Entries).To(ContainLogMessage("deleting token")) + Eventually(func() []*logrus.Entry { return hook.Entries }). + Should(ContainLogMessage("deleting token")) //Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError")) }) diff --git a/extensions/common.go b/extensions/common.go index dd657f8..ea92f46 100644 --- a/extensions/common.go +++ b/extensions/common.go @@ -25,7 +25,9 @@ package extensions import ( "encoding/json" "regexp" + "time" + log "github.com/sirupsen/logrus" "github.com/topfreegames/pusher/errors" "github.com/topfreegames/pusher/interfaces" ) @@ -44,6 +46,39 @@ func handleInvalidToken(invalidTokenHandlers []interfaces.InvalidTokenHandler, t } } +func StopInvalidTokenHandlers( + logger *log.Logger, + invalidTokenHandlers []interfaces.InvalidTokenHandler, + timeout time.Duration, +) { + l := logger.WithField( + "method", "stopInvalidTokenHandlers", + ) + + for _, invalidTokenHandler := range invalidTokenHandlers { + done := make(chan struct{}) + stopCheck := false + go func() { + defer close(done) + for invalidTokenHandler.HasJob() && !stopCheck { + time.Sleep(10 * time.Millisecond) + } + }() + + select { + case <-done: + invalidTokenHandler.Stop() + return + + case <-time.After(timeout): + invalidTokenHandler.Stop() + stopCheck = true + l.Error("timeout reached - invalidTokenHandler was closed with jobs in queue") + return + } + } +} + func getGameAndPlatformFromTopic(topic string) ParsedTopic { res := topicRegex.FindStringSubmatch(topic) return ParsedTopic{ diff --git a/extensions/common_test.go b/extensions/common_test.go index f57a9df..66f0858 100644 --- a/extensions/common_test.go +++ b/extensions/common_test.go @@ -24,6 +24,7 @@ package extensions import ( "fmt" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -34,6 +35,7 @@ import ( "github.com/sirupsen/logrus/hooks/test" "github.com/topfreegames/pusher/interfaces" "github.com/topfreegames/pusher/mocks" + testing "github.com/topfreegames/pusher/testing" "github.com/topfreegames/pusher/util" ) @@ -71,19 +73,60 @@ var _ = Describe("Common", func() { token := uuid.NewV4().String() handleInvalidToken(invalidTokenHandlers, token, "test", "apns") query := "DELETE FROM test_apns WHERE token = ?0;" - Expect(db.Execs).To(HaveLen(2)) - Expect(db.Execs[1][0]).To(BeEquivalentTo(query)) - Expect(db.Execs[1][1]).To(BeEquivalentTo([]interface{}{token})) + + Eventually(func() [][]interface{} { return db.Execs }). + Should(HaveLen(2)) + Eventually(func() interface{} { return db.Execs[1][0] }). + Should(BeEquivalentTo(query)) + Eventually(func() interface{} { return db.Execs[1][1] }). + Should(BeEquivalentTo([]interface{}{token})) }) It("should fail silently", func() { token := uuid.NewV4().String() db.Error = fmt.Errorf("pg: error") handleInvalidToken(invalidTokenHandlers, token, "test", "apns") - Expect(db.Execs).To(HaveLen(2)) + + Eventually(func() [][]interface{} { return db.Execs }). + Should(HaveLen(2)) query := "DELETE FROM test_apns WHERE token = ?0;" - Expect(db.Execs[1][0]).To(BeEquivalentTo(query)) - Expect(db.Execs[1][1]).To(BeEquivalentTo([]interface{}{token})) + Eventually(func() interface{} { return db.Execs[1][0] }). + Should(BeEquivalentTo(query)) + Eventually(func() interface{} { return db.Execs[1][1] }). + Should(BeEquivalentTo([]interface{}{token})) + }) + + Describe("should stop handler gracefully", func() { + It("if there's no more job to do", func() { + token := uuid.NewV4().String() + handleInvalidToken(invalidTokenHandlers, token, "test", "apns") + + timeout := 1 * time.Second + StopInvalidTokenHandlers(logger, invalidTokenHandlers, timeout) + Consistently(func() []*logrus.Entry { return hook.Entries }, 2*timeout). + ShouldNot(testing.ContainLogMessage("timeout reached - invalidTokenHandler was closed with jobs in queue")) + + Eventually(func() [][]interface{} { return db.Execs }). + Should(HaveLen(2)) + query := "DELETE FROM test_apns WHERE token = ?0;" + Eventually(func() interface{} { return db.Execs[1][0] }). + Should(BeEquivalentTo(query)) + Eventually(func() interface{} { return db.Execs[1][1] }). + Should(BeEquivalentTo([]interface{}{token})) + }) + + It("timeout reached", func() { + size := 1000 + for i := 0; i < size; i++ { + token := uuid.NewV4().String() + handleInvalidToken(invalidTokenHandlers, token, "test", "apns") + } + + timeout := 1 * time.Nanosecond + StopInvalidTokenHandlers(logger, invalidTokenHandlers, timeout) + Eventually(func() []*logrus.Entry { return hook.Entries }). + Should(testing.ContainLogMessage("timeout reached - invalidTokenHandler was closed with jobs in queue")) + }) }) }) diff --git a/extensions/gcm_message_handler_test.go b/extensions/gcm_message_handler_test.go index 8ebb874..0e787ea 100644 --- a/extensions/gcm_message_handler_test.go +++ b/extensions/gcm_message_handler_test.go @@ -32,7 +32,7 @@ import ( uuid "github.com/satori/go.uuid" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" - "github.com/topfreegames/go-gcm" + gcm "github.com/topfreegames/go-gcm" "github.com/topfreegames/pusher/interfaces" "github.com/topfreegames/pusher/mocks" . "github.com/topfreegames/pusher/testing" @@ -133,7 +133,8 @@ var _ = Describe("GCM Message Handler", func() { handler.handleGCMResponse(res) Expect(handler.responsesReceived).To(Equal(int64(1))) Expect(handler.failuresReceived).To(Equal(int64(1))) - Expect(hook.Entries).To(ContainLogMessage("deleting token")) + Eventually(func() []*logrus.Entry { return hook.Entries }). + Should(ContainLogMessage("deleting token")) }) It("if response has error BAD_REGISTRATION", func() { @@ -143,7 +144,8 @@ var _ = Describe("GCM Message Handler", func() { handler.handleGCMResponse(res) Expect(handler.responsesReceived).To(Equal(int64(1))) Expect(handler.failuresReceived).To(Equal(int64(1))) - Expect(hook.Entries).To(ContainLogMessage("deleting token")) + Eventually(func() []*logrus.Entry { return hook.Entries }). + Should(ContainLogMessage("deleting token")) }) It("if response has error INVALID_JSON", func() { @@ -214,9 +216,9 @@ var _ = Describe("GCM Message Handler", func() { gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, }, metadata, makeTimestamp() + int64(1000000), @@ -245,9 +247,9 @@ var _ = Describe("GCM Message Handler", func() { gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, }, metadata, makeTimestamp() - int64(100), @@ -285,9 +287,9 @@ var _ = Describe("GCM Message Handler", func() { msg := &gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, } msgBytes, err := json.Marshal(msg) Expect(err).NotTo(HaveOccurred()) @@ -315,9 +317,9 @@ var _ = Describe("GCM Message Handler", func() { gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, }, metadata, makeTimestamp() + int64(1000000), @@ -341,9 +343,9 @@ var _ = Describe("GCM Message Handler", func() { msg := &gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, } msgBytes, err := json.Marshal(msg) Expect(err).NotTo(HaveOccurred()) @@ -460,9 +462,9 @@ var _ = Describe("GCM Message Handler", func() { msg := &gcm.XMPPMessage{ TimeToLive: &ttl, DeliveryReceiptRequested: false, - DryRun: true, - To: uuid.NewV4().String(), - Data: map[string]interface{}{}, + DryRun: true, + To: uuid.NewV4().String(), + Data: map[string]interface{}{}, } msgBytes, err := json.Marshal(msg) Expect(err).NotTo(HaveOccurred()) diff --git a/extensions/token_pg.go b/extensions/token_pg.go index 0618095..567338f 100644 --- a/extensions/token_pg.go +++ b/extensions/token_pg.go @@ -23,7 +23,9 @@ package extensions import ( + "errors" "fmt" + "sync" raven "github.com/getsentry/raven-go" "github.com/sirupsen/logrus" @@ -32,11 +34,23 @@ import ( "github.com/topfreegames/pusher/util" ) -// TokenPG for sending metrics +// TokenPG for deleting invalid tokens from the database type TokenPG struct { Client *PGClient Config *viper.Viper Logger *logrus.Logger + + tokensToDelete chan *TokenMsg + done chan struct{} + closed bool + wg sync.WaitGroup +} + +// TokenMsg represents a token to be deleted in the db +type TokenMsg struct { + Token string + Game string + Platform string } // NewTokenPG for creating a new TokenPG instance @@ -50,6 +64,14 @@ func NewTokenPG(config *viper.Viper, logger *logrus.Logger, dbOrNil ...interface db = dbOrNil[0] } err := q.configure(db) + + q.tokensToDelete = make(chan *TokenMsg, config.GetInt("invalidToken.pg.chanSize")) + q.done = make(chan struct{}) + q.closed = false + + go q.tokenPGWorker() + q.wg.Add(1) + return q, err } @@ -70,21 +92,67 @@ func (t *TokenPG) configure(db interfaces.DB) error { return nil } -// HandleToken handles an invalid token +// tokenPGWorker picks TokenMsgs from the queue to delete them from the database +func (t *TokenPG) tokenPGWorker() { + defer t.wg.Done() + + l := t.Logger.WithField("method", "listenToTokenMsgQueue") + + for { + select { + case tkMsg := <-t.tokensToDelete: + err := t.deleteToken(tkMsg) + if err != nil { + l.WithError(err).Error("error deleting token") + } + + case <-t.done: + l.Warn("stopping tokenPGWorker") + return + } + } +} + +// HandleToken handles an invalid token. It sends it to a channel to be process by +// the PGToken worker. If the TokenPG was stopped, an error is returned func (t *TokenPG) HandleToken(token string, game string, platform string) error { l := t.Logger.WithFields(logrus.Fields{ "method": "HandleToken", "token": token, }) + + tkMsg := &TokenMsg{ + Token: token, + Game: game, + Platform: platform, + } + + if t.closed { + e := errors.New("can't handle more tokens. The TokenPG has been stopped") + l.Error(e) + return e + } + + t.tokensToDelete <- tkMsg + return nil +} + +// deleteToken deletes the given token in the database +func (t *TokenPG) deleteToken(tkMsg *TokenMsg) error { + l := t.Logger.WithFields(logrus.Fields{ + "method": "DeleteToken", + "token": tkMsg.Token, + }) + l.Debug("deleting token") - query := fmt.Sprintf("DELETE FROM %s WHERE token = ?0;", game+"_"+platform) - _, err := t.Client.DB.Exec(query, token) + query := fmt.Sprintf("DELETE FROM %s WHERE token = ?0;", tkMsg.Game+"_"+tkMsg.Platform) + _, err := t.Client.DB.Exec(query, tkMsg.Token) if err != nil && err.Error() != "pg: no rows in result set" { raven.CaptureError(err, map[string]string{ "version": util.Version, "extension": "token-pg", }) - l.WithError(err).Error("error deleting token") + return err } return nil @@ -95,3 +163,27 @@ func (t *TokenPG) Cleanup() error { t.Client.Close() return nil } + +// Stop stops the TokenPG's worker +func (t *TokenPG) Stop() error { + l := t.Logger.WithFields(logrus.Fields{ + "method": "Stop", + }) + + close(t.done) + l.Info("waiting the worker to finish") + t.wg.Wait() + l.Info("tokenPG closed") + t.closed = true + + return nil +} + +// HasJob returns if the TokenPG still has tokens to be processed +func (t *TokenPG) HasJob() bool { + if len(t.tokensToDelete) > 0 { + return true + } + + return false +} diff --git a/extensions/token_pg_test.go b/extensions/token_pg_test.go index d1137b6..3bd812f 100644 --- a/extensions/token_pg_test.go +++ b/extensions/token_pg_test.go @@ -24,13 +24,16 @@ package extensions import ( "fmt" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" uuid "github.com/satori/go.uuid" + "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" "github.com/topfreegames/pusher/mocks" + testing "github.com/topfreegames/pusher/testing" "github.com/topfreegames/pusher/util" ) @@ -74,32 +77,99 @@ var _ = Describe("TokenPG Extension", func() { }) Describe("Handle invalid token", func() { + deletionError := "error deleting token" It("should delete apns token", func() { - err := tokenPG.HandleToken(token, "test", "apns") - Expect(err).NotTo(HaveOccurred()) + tokenPG.HandleToken(token, "test", "apns") + Consistently(func() []*logrus.Entry { return hook.Entries }). + ShouldNot(testing.ContainLogMessage(deletionError)) query := "DELETE FROM test_apns WHERE token = ?0;" - Expect(mockClient.Execs).To(HaveLen(2)) - Expect(mockClient.Execs[1][0]).To(BeEquivalentTo(query)) - Expect(mockClient.Execs[1][1]).To(BeEquivalentTo([]interface{}{token})) + Eventually(func() [][]interface{} { return mockClient.Execs }). + Should(HaveLen(2)) + Eventually(func() interface{} { return mockClient.Execs[1][0] }). + Should(BeEquivalentTo(query)) + Eventually(func() interface{} { return mockClient.Execs[1][1] }). + Should(BeEquivalentTo([]interface{}{token})) }) It("should not break if token does not exist in db", func() { mockClient.Error = fmt.Errorf("pg: no rows in result set") - err := tokenPG.HandleToken(token, "test", "apns") - Expect(err).NotTo(HaveOccurred()) + tokenPG.HandleToken(token, "test", "apns") + Consistently(func() []*logrus.Entry { return hook.Entries }). + ShouldNot(testing.ContainLogMessage(deletionError)) query := "DELETE FROM test_apns WHERE token = ?0;" - Expect(mockClient.Execs).To(HaveLen(2)) - Expect(mockClient.Execs[1][0]).To(BeEquivalentTo(query)) - Expect(mockClient.Execs[1][1]).To(BeEquivalentTo([]interface{}{token})) + Eventually(func() [][]interface{} { return mockClient.Execs }). + Should(HaveLen(2)) + Eventually(func() interface{} { return mockClient.Execs[1][0] }). + Should(BeEquivalentTo(query)) + Eventually(func() interface{} { return mockClient.Execs[1][1] }). + Should(BeEquivalentTo([]interface{}{token})) }) It("should return an error if pg error occurred", func() { mockClient.Error = fmt.Errorf("pg: error") + tokenPG.HandleToken(token, "test", "apns") + + Eventually(func() []*logrus.Entry { return hook.Entries }). + Should(testing.ContainLogMessage(deletionError)) + }) + }) + + Describe("Stopping a tokenPG client", func() { + It("without waiting", func() { + tokenPG.Stop() + + Expect(hook.Entries). + To(testing.ContainLogMessage("waiting the worker to finish")) + Expect(hook.Entries). + To(testing.ContainLogMessage("tokenPG closed")) + }) + + It("lefting job undone", func() { + size := 1000 + tokens := make([]string, size) + for i := 0; i < len(tokens); i++ { + tokens[i] = uuid.NewV4().String() + tokenPG.HandleToken(tokens[i], "test", "apns") + } + + tokenPG.Stop() + Expect(hook.Entries). + To(testing.ContainLogMessage("waiting the worker to finish")) + Expect(hook.Entries). + To(testing.ContainLogMessage("tokenPG closed")) + err := tokenPG.HandleToken(token, "test", "apns") Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("pg: error")) + Expect(err.Error()).To(Equal("can't handle more tokens. The TokenPG has been stopped")) + Consistently(tokenPG.HasJob).Should(BeTrue()) + }) + + It("waiting finish jobs to stop", func() { + size := 1000 + tokens := make([]string, size) + for i := 0; i < len(tokens); i++ { + tokens[i] = uuid.NewV4().String() + tokenPG.HandleToken(tokens[i], "test", "apns") + } + + for tokenPG.HasJob() == true { + time.Sleep(10 * time.Millisecond) + } + tokenPG.Stop() + + Expect(hook.Entries). + To(testing.ContainLogMessage("waiting the worker to finish")) + Expect(hook.Entries). + To(testing.ContainLogMessage("tokenPG closed")) + + Expect(mockClient.Execs).To(HaveLen(size + 1)) + query := "DELETE FROM test_apns WHERE token = ?0;" + for i := 0; i < len(tokens); i++ { + Expect(mockClient.Execs[i+1][0]).To(BeIdenticalTo(query)) + Expect(mockClient.Execs[i+1][1]).To(BeEquivalentTo([]interface{}{tokens[i]})) + } }) }) }) diff --git a/interfaces/invalid_token_handler.go b/interfaces/invalid_token_handler.go index 43e75c7..bad03b7 100644 --- a/interfaces/invalid_token_handler.go +++ b/interfaces/invalid_token_handler.go @@ -25,4 +25,6 @@ package interfaces // InvalidTokenHandler interface for defining functions that handle invalid tokens easily pluggable type InvalidTokenHandler interface { HandleToken(token string, game string, platform string) error + HasJob() bool + Stop() error } diff --git a/pusher/apns.go b/pusher/apns.go index 1393b99..9bef273 100644 --- a/pusher/apns.go +++ b/pusher/apns.go @@ -217,6 +217,7 @@ func (a *APNSPusher) Start() { } a.Queue.StopConsuming() GracefulShutdown(a.Queue.PendingMessagesWaitGroup(), time.Duration(a.GracefulShutdownTimeout)*time.Second) + extensions.StopInvalidTokenHandlers(a.Logger, a.InvalidTokenHandlers, time.Duration(a.GracefulShutdownTimeout)*time.Second) } func (a *APNSPusher) reportGoStats() { diff --git a/pusher/gcm.go b/pusher/gcm.go index b32a9e1..03687a9 100644 --- a/pusher/gcm.go +++ b/pusher/gcm.go @@ -211,6 +211,7 @@ func (g *GCMPusher) Start() { } g.Queue.StopConsuming() GracefulShutdown(g.Queue.PendingMessagesWaitGroup(), time.Duration(g.GracefulShutdownTimeout)*time.Second) + extensions.StopInvalidTokenHandlers(g.Logger, g.InvalidTokenHandlers, time.Duration(g.GracefulShutdownTimeout)*time.Second) } func (g *GCMPusher) reportGoStats() { From d7cd32be2ab1eee1cb9728f677058b9a26979cd7 Mon Sep 17 00:00:00 2001 From: Moises Silva Date: Wed, 6 Feb 2019 13:25:48 -0200 Subject: [PATCH 2/4] comment StopInvalidTokenHandlers --- docs/hosting.md | 1 + extensions/common.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/docs/hosting.md b/docs/hosting.md index 7f1cde3..732f8f1 100644 --- a/docs/hosting.md +++ b/docs/hosting.md @@ -47,6 +47,7 @@ If Pusher needs to connect to a PostgreSQL database in order to delete invalid t * `PUSHER_INVALIDTOKEN_PG_POOLSIZE` - PostgreSQL connection pool size; * `PUSHER_INVALIDTOKEN_PG_MAXRETRIES` - PostgreSQL connection max retries; * `PUSHER_INVALIDTOKEN_PG_CONNECTIONTIMEOUT` - Timeout for trying to establish connection; +* `PUSHER_INVALIDTOKEN_PG_CHANSIZE` - Size of the channel to buffer the tokens to be deleted; Other than that, there are a couple more configurations you can pass using environment variables: diff --git a/extensions/common.go b/extensions/common.go index ea92f46..d05c7ed 100644 --- a/extensions/common.go +++ b/extensions/common.go @@ -46,6 +46,9 @@ func handleInvalidToken(invalidTokenHandlers []interfaces.InvalidTokenHandler, t } } +// StopInvalidTokenHandlers stops the invalid token handlers +// For each handler, it waits its buffered queue to be completely empty and forces +// the termination after the given timeout func StopInvalidTokenHandlers( logger *log.Logger, invalidTokenHandlers []interfaces.InvalidTokenHandler, From b7af02f5d7803940e83374e34f2cb128716b0e26 Mon Sep 17 00:00:00 2001 From: Moises Silva Date: Wed, 6 Feb 2019 13:47:19 -0200 Subject: [PATCH 3/4] add generic metric increment report method --- extensions/datadog_statsd.go | 14 ++++++++++++++ extensions/datadog_statsd_test.go | 21 +++++++++++++++++++++ interfaces/stats_reporter.go | 1 + 3 files changed, 36 insertions(+) diff --git a/extensions/datadog_statsd.go b/extensions/datadog_statsd.go index 37532e7..2c57f1b 100644 --- a/extensions/datadog_statsd.go +++ b/extensions/datadog_statsd.go @@ -117,6 +117,20 @@ func (s *StatsD) ReportGoStats( s.Client.Gauge("next_gc_bytes", float64(nextGCBytes), tags, 1) } +// ReportMetricIncrement reports a custom metric increment in statsd +func (s *StatsD) ReportMetricIncrement( + metric string, + game, platform string, +) { + hostname, _ := os.Hostname() + tags := []string{ + fmt.Sprintf("hostname:%s", hostname), + fmt.Sprintf("game:%s", game), + fmt.Sprintf("platform:%s", platform), + } + s.Client.Incr(metric, tags, 1) +} + //Cleanup closes statsd connection func (s *StatsD) Cleanup() error { s.Client.Close() diff --git a/extensions/datadog_statsd_test.go b/extensions/datadog_statsd_test.go index fbd9435..c51c772 100644 --- a/extensions/datadog_statsd_test.go +++ b/extensions/datadog_statsd_test.go @@ -99,6 +99,27 @@ var _ = Describe("StatsD Extension", func() { Expect(mockClient.Count["failed"]).To(Equal(2)) }) }) + + Describe("Reporting metric increment", func() { + It("should report metric increment in statsd", func() { + statsd, err := NewStatsD(config, logger, mockClient) + Expect(err).NotTo(HaveOccurred()) + defer statsd.Cleanup() + + statsd.ReportMetricIncrement("tokens_to_delete", "game", "apns") + statsd.ReportMetricIncrement("tokens_to_delete", "game", "apns") + statsd.ReportMetricIncrement("tokens_to_delete", "game", "apns") + + statsd.ReportMetricIncrement("tokens_deleted", "game", "apns") + statsd.ReportMetricIncrement("tokens_deleted", "game", "apns") + + statsd.ReportMetricIncrement("tokens_deletion_failed", "game", "apns") + + Expect(mockClient.Count["tokens_to_delete"]).To(Equal(3)) + Expect(mockClient.Count["tokens_deleted"]).To(Equal(2)) + Expect(mockClient.Count["tokens_deletion_failed"]).To(Equal(1)) + }) + }) }) Describe("[Integration]", func() { diff --git a/interfaces/stats_reporter.go b/interfaces/stats_reporter.go index 210f76a..a92d814 100644 --- a/interfaces/stats_reporter.go +++ b/interfaces/stats_reporter.go @@ -30,4 +30,5 @@ type StatsReporter interface { HandleNotificationSuccess(game string, platform string) HandleNotificationFailure(game string, platform string, err *errors.PushError) ReportGoStats(numGoRoutines int, allocatedAndNotFreed, heapObjects, nextGCBytes, pauseGCNano uint64) + ReportMetricIncrement(metric string, game, platform string) } From ea09576c04410401384007d3efc37a50929a8dff Mon Sep 17 00:00:00 2001 From: Moises Silva Date: Wed, 6 Feb 2019 15:07:22 -0200 Subject: [PATCH 4/4] add statsReporter to tokenPG --- extensions/apns_message_handler_test.go | 12 ++++++-- extensions/common.go | 6 ++++ extensions/common_test.go | 18 ++++++++++- extensions/gcm_message_handler_test.go | 12 ++++++-- extensions/token_pg.go | 40 ++++++++++++++++++++++--- extensions/token_pg_test.go | 35 ++++++++++++++++++++-- pusher/apns.go | 2 +- pusher/gcm.go | 2 +- pusher/handlers.go | 10 +++---- pusher/handlers_test.go | 17 +++++++++-- 10 files changed, 132 insertions(+), 22 deletions(-) diff --git a/extensions/apns_message_handler_test.go b/extensions/apns_message_handler_test.go index ba76686..e9a329e 100644 --- a/extensions/apns_message_handler_test.go +++ b/extensions/apns_message_handler_test.go @@ -77,7 +77,7 @@ var _ = FDescribe("APNS Message Handler", func() { feedbackClients = []interfaces.FeedbackReporter{kc} db = mocks.NewPGMock(0, 1) - it, err := NewTokenPG(config, logger, db) + it, err := NewTokenPG(config, logger, statsClients, db) Expect(err).NotTo(HaveOccurred()) invalidTokenHandlers = []interfaces.InvalidTokenHandler{it} @@ -135,6 +135,10 @@ var _ = FDescribe("APNS Message Handler", func() { Expect(handler.failuresReceived).To(Equal(int64(1))) Eventually(func() []*logrus.Entry { return hook.Entries }). Should(ContainLogMessage("deleting token")) + + Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1)) + Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }). + Should(Equal(1)) //Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError")) }) @@ -149,6 +153,10 @@ var _ = FDescribe("APNS Message Handler", func() { Expect(handler.failuresReceived).To(Equal(int64(1))) Eventually(func() []*logrus.Entry { return hook.Entries }). Should(ContainLogMessage("deleting token")) + + Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1)) + Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }). + Should(Equal(1)) //Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError")) }) @@ -427,7 +435,7 @@ var _ = FDescribe("APNS Message Handler", func() { feedbackClients = []interfaces.FeedbackReporter{kc} db = mocks.NewPGMock(0, 1) - it, err := NewTokenPG(config, logger, db) + it, err := NewTokenPG(config, logger, statsClients, db) Expect(err).NotTo(HaveOccurred()) invalidTokenHandlers = []interfaces.InvalidTokenHandler{it} handler, err = NewAPNSMessageHandler( diff --git a/extensions/common.go b/extensions/common.go index d05c7ed..d35dfb9 100644 --- a/extensions/common.go +++ b/extensions/common.go @@ -120,3 +120,9 @@ func statsReporterHandleNotificationFailure(statsReporters []interfaces.StatsRep statsReporter.HandleNotificationFailure(game, platform, err) } } + +func statsReporterReportMetricIncrement(statsReporters []interfaces.StatsReporter, metric string, game string, platform string) { + for _, statsReporter := range statsReporters { + statsReporter.ReportMetricIncrement(metric, game, platform) + } +} diff --git a/extensions/common_test.go b/extensions/common_test.go index 66f0858..b063a37 100644 --- a/extensions/common_test.go +++ b/extensions/common_test.go @@ -45,6 +45,8 @@ var _ = Describe("Common", func() { var feedbackClients []interfaces.FeedbackReporter var invalidTokenHandlers []interfaces.InvalidTokenHandler var db *mocks.PGMock + var mockStatsDClient *mocks.StatsDClientMock + var statsClients []interfaces.StatsReporter configFile := "../config/test.yaml" logger, hook := test.NewNullLogger() @@ -58,10 +60,16 @@ var _ = Describe("Common", func() { mockKafkaProducerClient = mocks.NewKafkaProducerClientMock() kc, err := NewKafkaProducer(config, logger, mockKafkaProducerClient) Expect(err).NotTo(HaveOccurred()) + + mockStatsDClient = mocks.NewStatsDClientMock() + c, err := NewStatsD(config, logger, mockStatsDClient) + Expect(err).NotTo(HaveOccurred()) + + statsClients = []interfaces.StatsReporter{c} feedbackClients = []interfaces.FeedbackReporter{kc} db = mocks.NewPGMock(0, 1) - it, err := NewTokenPG(config, logger, db) + it, err := NewTokenPG(config, logger, statsClients, db) Expect(err).NotTo(HaveOccurred()) invalidTokenHandlers = []interfaces.InvalidTokenHandler{it} @@ -80,6 +88,10 @@ var _ = Describe("Common", func() { Should(BeEquivalentTo(query)) Eventually(func() interface{} { return db.Execs[1][1] }). Should(BeEquivalentTo([]interface{}{token})) + + Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1)) + Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }). + Should(Equal(1)) }) It("should fail silently", func() { @@ -94,6 +106,10 @@ var _ = Describe("Common", func() { Should(BeEquivalentTo(query)) Eventually(func() interface{} { return db.Execs[1][1] }). Should(BeEquivalentTo([]interface{}{token})) + + Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1)) + Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeletionError] }). + Should(Equal(1)) }) Describe("should stop handler gracefully", func() { diff --git a/extensions/gcm_message_handler_test.go b/extensions/gcm_message_handler_test.go index 0e787ea..e5f3288 100644 --- a/extensions/gcm_message_handler_test.go +++ b/extensions/gcm_message_handler_test.go @@ -73,7 +73,7 @@ var _ = Describe("GCM Message Handler", func() { feedbackClients = []interfaces.FeedbackReporter{kc} mockDb = mocks.NewPGMock(0, 1) - it, err := NewTokenPG(config, logger, mockDb) + it, err := NewTokenPG(config, logger, statsClients, mockDb) Expect(err).NotTo(HaveOccurred()) invalidTokenHandlers = []interfaces.InvalidTokenHandler{it} @@ -135,6 +135,10 @@ var _ = Describe("GCM Message Handler", func() { Expect(handler.failuresReceived).To(Equal(int64(1))) Eventually(func() []*logrus.Entry { return hook.Entries }). Should(ContainLogMessage("deleting token")) + + Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1)) + Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }). + Should(Equal(1)) }) It("if response has error BAD_REGISTRATION", func() { @@ -146,6 +150,10 @@ var _ = Describe("GCM Message Handler", func() { Expect(handler.failuresReceived).To(Equal(int64(1))) Eventually(func() []*logrus.Entry { return hook.Entries }). Should(ContainLogMessage("deleting token")) + + Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1)) + Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }). + Should(Equal(1)) }) It("if response has error INVALID_JSON", func() { @@ -509,7 +517,7 @@ var _ = Describe("GCM Message Handler", func() { feedbackClients = []interfaces.FeedbackReporter{kc} mockClient = mocks.NewGCMClientMock() - it, err := NewTokenPG(config, logger, mockDb) + it, err := NewTokenPG(config, logger, statsClients, mockDb) Expect(err).NotTo(HaveOccurred()) invalidTokenHandlers = []interfaces.InvalidTokenHandler{it} diff --git a/extensions/token_pg.go b/extensions/token_pg.go index 567338f..b3353d7 100644 --- a/extensions/token_pg.go +++ b/extensions/token_pg.go @@ -34,6 +34,14 @@ import ( "github.com/topfreegames/pusher/util" ) +// Metrics name sent by TokenPG +const ( + MetricsTokensToDelete = "tokens_to_delete" + MetricsTokensDeleted = "tokens_deleted" + MetricsTokensDeletionError = "tokens_deletion_error" + MetricsTokensHandleAfterClosing = "tokens_handle_after_closing" +) + // TokenPG for deleting invalid tokens from the database type TokenPG struct { Client *PGClient @@ -44,6 +52,8 @@ type TokenPG struct { done chan struct{} closed bool wg sync.WaitGroup + + StatsReporters []interfaces.StatsReporter } // TokenMsg represents a token to be deleted in the db @@ -54,10 +64,15 @@ type TokenMsg struct { } // NewTokenPG for creating a new TokenPG instance -func NewTokenPG(config *viper.Viper, logger *logrus.Logger, dbOrNil ...interfaces.DB) (*TokenPG, error) { +func NewTokenPG(config *viper.Viper, + logger *logrus.Logger, + statsReporters []interfaces.StatsReporter, + dbOrNil ...interfaces.DB, +) (*TokenPG, error) { q := &TokenPG{ - Config: config, - Logger: logger, + Config: config, + Logger: logger, + StatsReporters: statsReporters, } var db interfaces.DB if len(dbOrNil) == 1 { @@ -104,6 +119,13 @@ func (t *TokenPG) tokenPGWorker() { err := t.deleteToken(tkMsg) if err != nil { l.WithError(err).Error("error deleting token") + statsReporterReportMetricIncrement(t.StatsReporters, + MetricsTokensDeletionError, tkMsg.Game, tkMsg.Platform, + ) + } else { + statsReporterReportMetricIncrement(t.StatsReporters, + MetricsTokensDeleted, tkMsg.Game, tkMsg.Platform, + ) } case <-t.done: @@ -113,7 +135,7 @@ func (t *TokenPG) tokenPGWorker() { } } -// HandleToken handles an invalid token. It sends it to a channel to be process by +// HandleToken handles an invalid token. It sends it to a channel to be processed by // the PGToken worker. If the TokenPG was stopped, an error is returned func (t *TokenPG) HandleToken(token string, game string, platform string) error { l := t.Logger.WithFields(logrus.Fields{ @@ -130,10 +152,16 @@ func (t *TokenPG) HandleToken(token string, game string, platform string) error if t.closed { e := errors.New("can't handle more tokens. The TokenPG has been stopped") l.Error(e) + statsReporterReportMetricIncrement(t.StatsReporters, + MetricsTokensHandleAfterClosing, tkMsg.Game, tkMsg.Platform, + ) return e } t.tokensToDelete <- tkMsg + statsReporterReportMetricIncrement(t.StatsReporters, + MetricsTokensToDelete, tkMsg.Game, tkMsg.Platform, + ) return nil } @@ -174,6 +202,10 @@ func (t *TokenPG) Stop() error { l.Info("waiting the worker to finish") t.wg.Wait() l.Info("tokenPG closed") + if ut := len(t.tokensToDelete); ut > 0 { + l.Warnf("%d tokens haven't been processed", ut) + } + t.closed = true return nil diff --git a/extensions/token_pg_test.go b/extensions/token_pg_test.go index 3bd812f..3572503 100644 --- a/extensions/token_pg_test.go +++ b/extensions/token_pg_test.go @@ -32,6 +32,7 @@ import ( "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" + "github.com/topfreegames/pusher/interfaces" "github.com/topfreegames/pusher/mocks" testing "github.com/topfreegames/pusher/testing" "github.com/topfreegames/pusher/util" @@ -41,6 +42,8 @@ var _ = Describe("TokenPG Extension", func() { var config *viper.Viper var mockClient *mocks.PGMock var tokenPG *TokenPG + var mockStatsDClient *mocks.StatsDClientMock + var statsClients []interfaces.StatsReporter logger, hook := test.NewNullLogger() token := uuid.NewV4().String() @@ -48,8 +51,15 @@ var _ = Describe("TokenPG Extension", func() { var err error config, err = util.NewViperWithConfigFile("../config/test.yaml") Expect(err).NotTo(HaveOccurred()) + + mockStatsDClient = mocks.NewStatsDClientMock() + c, err := NewStatsD(config, logger, mockStatsDClient) + Expect(err).NotTo(HaveOccurred()) + + statsClients = []interfaces.StatsReporter{c} + mockClient = mocks.NewPGMock(0, 1) - tokenPG, err = NewTokenPG(config, logger, mockClient) + tokenPG, err = NewTokenPG(config, logger, statsClients, mockClient) Expect(err).NotTo(HaveOccurred()) hook.Reset() }) @@ -57,7 +67,7 @@ var _ = Describe("TokenPG Extension", func() { Describe("[Unit]", func() { Describe("Creating new client", func() { It("should return connected client", func() { - t, err := NewTokenPG(config, logger, mockClient) + t, err := NewTokenPG(config, logger, statsClients, mockClient) Expect(err).NotTo(HaveOccurred()) Expect(t).NotTo(BeNil()) Expect(t.Client).NotTo(BeNil()) @@ -69,7 +79,7 @@ var _ = Describe("TokenPG Extension", func() { It("should return an error if failed to connect to postgres", func() { mockClient.RowsReturned = 0 - t, err := NewTokenPG(config, logger, mockClient) + t, err := NewTokenPG(config, logger, statsClients, mockClient) Expect(t).NotTo(BeNil()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Timed out waiting for PostgreSQL to connect")) @@ -90,6 +100,10 @@ var _ = Describe("TokenPG Extension", func() { Should(BeEquivalentTo(query)) Eventually(func() interface{} { return mockClient.Execs[1][1] }). Should(BeEquivalentTo([]interface{}{token})) + + Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1)) + Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }). + Should(Equal(1)) }) It("should not break if token does not exist in db", func() { @@ -105,6 +119,10 @@ var _ = Describe("TokenPG Extension", func() { Should(BeEquivalentTo(query)) Eventually(func() interface{} { return mockClient.Execs[1][1] }). Should(BeEquivalentTo([]interface{}{token})) + + Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1)) + Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }). + Should(Equal(1)) }) It("should return an error if pg error occurred", func() { @@ -113,6 +131,10 @@ var _ = Describe("TokenPG Extension", func() { Eventually(func() []*logrus.Entry { return hook.Entries }). Should(testing.ContainLogMessage(deletionError)) + + Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1)) + Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeletionError] }). + Should(Equal(1)) }) }) @@ -139,11 +161,15 @@ var _ = Describe("TokenPG Extension", func() { To(testing.ContainLogMessage("waiting the worker to finish")) Expect(hook.Entries). To(testing.ContainLogMessage("tokenPG closed")) + Expect(hook.Entries). + To(testing.ContainLogMessage(fmt.Sprintf("%d tokens haven't been processed", len(tokenPG.tokensToDelete)))) err := tokenPG.HandleToken(token, "test", "apns") Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("can't handle more tokens. The TokenPG has been stopped")) Consistently(tokenPG.HasJob).Should(BeTrue()) + + Expect(mockStatsDClient.Count[MetricsTokensHandleAfterClosing]).To(Equal(1)) }) It("waiting finish jobs to stop", func() { @@ -170,6 +196,9 @@ var _ = Describe("TokenPG Extension", func() { Expect(mockClient.Execs[i+1][0]).To(BeIdenticalTo(query)) Expect(mockClient.Execs[i+1][1]).To(BeEquivalentTo([]interface{}{tokens[i]})) } + + Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(size)) + Expect(mockStatsDClient.Count[MetricsTokensDeleted]).To(Equal(size)) }) }) }) diff --git a/pusher/apns.go b/pusher/apns.go index 9bef273..8c19e84 100644 --- a/pusher/apns.go +++ b/pusher/apns.go @@ -160,7 +160,7 @@ func (a *APNSPusher) configureStatsReporters(clientOrNil interfaces.StatsDClient } func (a *APNSPusher) configureInvalidTokenHandlers(dbOrNil interfaces.DB) error { - invalidTokenHandlers, err := configureInvalidTokenHandlers(a.Config, a.Logger, dbOrNil) + invalidTokenHandlers, err := configureInvalidTokenHandlers(a.Config, a.Logger, a.StatsReporters, dbOrNil) if err != nil { return err } diff --git a/pusher/gcm.go b/pusher/gcm.go index 03687a9..08e1695 100644 --- a/pusher/gcm.go +++ b/pusher/gcm.go @@ -154,7 +154,7 @@ func (g *GCMPusher) configureFeedbackReporters() error { } func (g *GCMPusher) configureInvalidTokenHandlers(dbOrNil interfaces.DB) error { - invalidTokenHandlers, err := configureInvalidTokenHandlers(g.Config, g.Logger, dbOrNil) + invalidTokenHandlers, err := configureInvalidTokenHandlers(g.Config, g.Logger, g.StatsReporters, dbOrNil) if err != nil { return err } diff --git a/pusher/handlers.go b/pusher/handlers.go index 17174f7..d35d500 100644 --- a/pusher/handlers.go +++ b/pusher/handlers.go @@ -31,16 +31,16 @@ import ( "github.com/topfreegames/pusher/interfaces" ) -type invalidTokenHandlerInitializer func(*viper.Viper, *logrus.Logger, interfaces.DB) (interfaces.InvalidTokenHandler, error) +type invalidTokenHandlerInitializer func(*viper.Viper, *logrus.Logger, []interfaces.StatsReporter, interfaces.DB) (interfaces.InvalidTokenHandler, error) // AvailableInvalidTokenHandlers contains functions to initialize all invalid token handlers var AvailableInvalidTokenHandlers = map[string]invalidTokenHandlerInitializer{ - "pg": func(config *viper.Viper, logger *logrus.Logger, dbOrNil interfaces.DB) (interfaces.InvalidTokenHandler, error) { - return extensions.NewTokenPG(config, logger, dbOrNil) + "pg": func(config *viper.Viper, logger *logrus.Logger, statsReporter []interfaces.StatsReporter, dbOrNil interfaces.DB) (interfaces.InvalidTokenHandler, error) { + return extensions.NewTokenPG(config, logger, statsReporter, dbOrNil) }, } -func configureInvalidTokenHandlers(config *viper.Viper, logger *logrus.Logger, dbOrNil interfaces.DB) ([]interfaces.InvalidTokenHandler, error) { +func configureInvalidTokenHandlers(config *viper.Viper, logger *logrus.Logger, statsReporter []interfaces.StatsReporter, dbOrNil interfaces.DB) ([]interfaces.InvalidTokenHandler, error) { invalidTokenHandlers := []interfaces.InvalidTokenHandler{} invalidTokenHandlerNames := config.GetStringSlice("invalidToken.handlers") for _, invalidTokenHandlerName := range invalidTokenHandlerNames { @@ -49,7 +49,7 @@ func configureInvalidTokenHandlers(config *viper.Viper, logger *logrus.Logger, d return nil, fmt.Errorf("Failed to initialize %s. Invalid Token Handler not available.", invalidTokenHandlerName) } - r, err := invalidTokenHandlerFunc(config, logger, dbOrNil) + r, err := invalidTokenHandlerFunc(config, logger, statsReporter, dbOrNil) if err != nil { return nil, fmt.Errorf("Failed to initialize %s. %s", invalidTokenHandlerName, err.Error()) } diff --git a/pusher/handlers_test.go b/pusher/handlers_test.go index f9696de..0e49820 100644 --- a/pusher/handlers_test.go +++ b/pusher/handlers_test.go @@ -27,6 +27,8 @@ import ( . "github.com/onsi/gomega" "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" + "github.com/topfreegames/pusher/extensions" + "github.com/topfreegames/pusher/interfaces" "github.com/topfreegames/pusher/mocks" "github.com/topfreegames/pusher/util" ) @@ -34,6 +36,8 @@ import ( var _ = Describe("Handlers", func() { var config *viper.Viper var mockClient *mocks.PGMock + var mockStatsDClient *mocks.StatsDClientMock + var statsClients []interfaces.StatsReporter logger, hook := test.NewNullLogger() BeforeEach(func() { @@ -41,13 +45,20 @@ var _ = Describe("Handlers", func() { config, err = util.NewViperWithConfigFile("../config/test.yaml") Expect(err).NotTo(HaveOccurred()) mockClient = mocks.NewPGMock(0, 1) + + mockStatsDClient = mocks.NewStatsDClientMock() + c, err := extensions.NewStatsD(config, logger, mockStatsDClient) + Expect(err).NotTo(HaveOccurred()) + + statsClients = []interfaces.StatsReporter{c} + hook.Reset() }) Describe("[Unit]", func() { Describe("Configuring invalid token handlers", func() { It("should return invalid token handler list", func() { - handlers, err := configureInvalidTokenHandlers(config, logger, mockClient) + handlers, err := configureInvalidTokenHandlers(config, logger, statsClients, mockClient) Expect(err).NotTo(HaveOccurred()) Expect(handlers).To(HaveLen(1)) Expect(handlers[0]).NotTo(BeNil()) @@ -55,7 +66,7 @@ var _ = Describe("Handlers", func() { It("should return an error if token handler is not available", func() { config.Set("invalidToken.handlers", []string{"notAvailable"}) - handlers, err := configureInvalidTokenHandlers(config, logger, mockClient) + handlers, err := configureInvalidTokenHandlers(config, logger, statsClients, mockClient) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Failed to initialize notAvailable. Invalid Token Handler not available.")) Expect(handlers).To(BeNil()) @@ -63,7 +74,7 @@ var _ = Describe("Handlers", func() { It("should return an error if failed to create invalid token handler", func() { mockClient.RowsReturned = 0 - handlers, err := configureInvalidTokenHandlers(config, logger, mockClient) + handlers, err := configureInvalidTokenHandlers(config, logger, statsClients, mockClient) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Failed to initialize pg. Timed out waiting for PostgreSQL to connect")) Expect(handlers).To(BeNil())