Skip to content

Commit

Permalink
add statsReporter to tokenPG
Browse files Browse the repository at this point in the history
  • Loading branch information
Moises Silva committed Feb 6, 2019
1 parent b7af02f commit ea09576
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 22 deletions.
12 changes: 10 additions & 2 deletions extensions/apns_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ = FDescribe("APNS Message Handler", func() {
feedbackClients = []interfaces.FeedbackReporter{kc}

db = mocks.NewPGMock(0, 1)
it, err := NewTokenPG(config, logger, db)
it, err := NewTokenPG(config, logger, statsClients, db)
Expect(err).NotTo(HaveOccurred())
invalidTokenHandlers = []interfaces.InvalidTokenHandler{it}

Expand Down Expand Up @@ -135,6 +135,10 @@ var _ = FDescribe("APNS Message Handler", func() {
Expect(handler.failuresReceived).To(Equal(int64(1)))
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(ContainLogMessage("deleting token"))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }).
Should(Equal(1))
//Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError"))
})

Expand All @@ -149,6 +153,10 @@ var _ = FDescribe("APNS Message Handler", func() {
Expect(handler.failuresReceived).To(Equal(int64(1)))
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(ContainLogMessage("deleting token"))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }).
Should(Equal(1))
//Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError"))
})

Expand Down Expand Up @@ -427,7 +435,7 @@ var _ = FDescribe("APNS Message Handler", func() {
feedbackClients = []interfaces.FeedbackReporter{kc}

db = mocks.NewPGMock(0, 1)
it, err := NewTokenPG(config, logger, db)
it, err := NewTokenPG(config, logger, statsClients, db)
Expect(err).NotTo(HaveOccurred())
invalidTokenHandlers = []interfaces.InvalidTokenHandler{it}
handler, err = NewAPNSMessageHandler(
Expand Down
6 changes: 6 additions & 0 deletions extensions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,9 @@ func statsReporterHandleNotificationFailure(statsReporters []interfaces.StatsRep
statsReporter.HandleNotificationFailure(game, platform, err)
}
}

func statsReporterReportMetricIncrement(statsReporters []interfaces.StatsReporter, metric string, game string, platform string) {
for _, statsReporter := range statsReporters {
statsReporter.ReportMetricIncrement(metric, game, platform)
}
}
18 changes: 17 additions & 1 deletion extensions/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var _ = Describe("Common", func() {
var feedbackClients []interfaces.FeedbackReporter
var invalidTokenHandlers []interfaces.InvalidTokenHandler
var db *mocks.PGMock
var mockStatsDClient *mocks.StatsDClientMock
var statsClients []interfaces.StatsReporter

configFile := "../config/test.yaml"
logger, hook := test.NewNullLogger()
Expand All @@ -58,10 +60,16 @@ var _ = Describe("Common", func() {
mockKafkaProducerClient = mocks.NewKafkaProducerClientMock()
kc, err := NewKafkaProducer(config, logger, mockKafkaProducerClient)
Expect(err).NotTo(HaveOccurred())

mockStatsDClient = mocks.NewStatsDClientMock()
c, err := NewStatsD(config, logger, mockStatsDClient)
Expect(err).NotTo(HaveOccurred())

statsClients = []interfaces.StatsReporter{c}
feedbackClients = []interfaces.FeedbackReporter{kc}

db = mocks.NewPGMock(0, 1)
it, err := NewTokenPG(config, logger, db)
it, err := NewTokenPG(config, logger, statsClients, db)
Expect(err).NotTo(HaveOccurred())
invalidTokenHandlers = []interfaces.InvalidTokenHandler{it}

Expand All @@ -80,6 +88,10 @@ var _ = Describe("Common", func() {
Should(BeEquivalentTo(query))
Eventually(func() interface{} { return db.Execs[1][1] }).
Should(BeEquivalentTo([]interface{}{token}))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }).
Should(Equal(1))
})

It("should fail silently", func() {
Expand All @@ -94,6 +106,10 @@ var _ = Describe("Common", func() {
Should(BeEquivalentTo(query))
Eventually(func() interface{} { return db.Execs[1][1] }).
Should(BeEquivalentTo([]interface{}{token}))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeletionError] }).
Should(Equal(1))
})

Describe("should stop handler gracefully", func() {
Expand Down
12 changes: 10 additions & 2 deletions extensions/gcm_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var _ = Describe("GCM Message Handler", func() {
feedbackClients = []interfaces.FeedbackReporter{kc}

mockDb = mocks.NewPGMock(0, 1)
it, err := NewTokenPG(config, logger, mockDb)
it, err := NewTokenPG(config, logger, statsClients, mockDb)
Expect(err).NotTo(HaveOccurred())
invalidTokenHandlers = []interfaces.InvalidTokenHandler{it}

Expand Down Expand Up @@ -135,6 +135,10 @@ var _ = Describe("GCM Message Handler", func() {
Expect(handler.failuresReceived).To(Equal(int64(1)))
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(ContainLogMessage("deleting token"))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }).
Should(Equal(1))
})

It("if response has error BAD_REGISTRATION", func() {
Expand All @@ -146,6 +150,10 @@ var _ = Describe("GCM Message Handler", func() {
Expect(handler.failuresReceived).To(Equal(int64(1)))
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(ContainLogMessage("deleting token"))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }).
Should(Equal(1))
})

It("if response has error INVALID_JSON", func() {
Expand Down Expand Up @@ -509,7 +517,7 @@ var _ = Describe("GCM Message Handler", func() {
feedbackClients = []interfaces.FeedbackReporter{kc}

mockClient = mocks.NewGCMClientMock()
it, err := NewTokenPG(config, logger, mockDb)
it, err := NewTokenPG(config, logger, statsClients, mockDb)
Expect(err).NotTo(HaveOccurred())
invalidTokenHandlers = []interfaces.InvalidTokenHandler{it}

Expand Down
40 changes: 36 additions & 4 deletions extensions/token_pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ import (
"github.com/topfreegames/pusher/util"
)

// Metrics name sent by TokenPG
const (
MetricsTokensToDelete = "tokens_to_delete"
MetricsTokensDeleted = "tokens_deleted"
MetricsTokensDeletionError = "tokens_deletion_error"
MetricsTokensHandleAfterClosing = "tokens_handle_after_closing"
)

// TokenPG for deleting invalid tokens from the database
type TokenPG struct {
Client *PGClient
Expand All @@ -44,6 +52,8 @@ type TokenPG struct {
done chan struct{}
closed bool
wg sync.WaitGroup

StatsReporters []interfaces.StatsReporter
}

// TokenMsg represents a token to be deleted in the db
Expand All @@ -54,10 +64,15 @@ type TokenMsg struct {
}

// NewTokenPG for creating a new TokenPG instance
func NewTokenPG(config *viper.Viper, logger *logrus.Logger, dbOrNil ...interfaces.DB) (*TokenPG, error) {
func NewTokenPG(config *viper.Viper,
logger *logrus.Logger,
statsReporters []interfaces.StatsReporter,
dbOrNil ...interfaces.DB,
) (*TokenPG, error) {
q := &TokenPG{
Config: config,
Logger: logger,
Config: config,
Logger: logger,
StatsReporters: statsReporters,
}
var db interfaces.DB
if len(dbOrNil) == 1 {
Expand Down Expand Up @@ -104,6 +119,13 @@ func (t *TokenPG) tokenPGWorker() {
err := t.deleteToken(tkMsg)
if err != nil {
l.WithError(err).Error("error deleting token")
statsReporterReportMetricIncrement(t.StatsReporters,
MetricsTokensDeletionError, tkMsg.Game, tkMsg.Platform,
)
} else {
statsReporterReportMetricIncrement(t.StatsReporters,
MetricsTokensDeleted, tkMsg.Game, tkMsg.Platform,
)
}

case <-t.done:
Expand All @@ -113,7 +135,7 @@ func (t *TokenPG) tokenPGWorker() {
}
}

// HandleToken handles an invalid token. It sends it to a channel to be process by
// HandleToken handles an invalid token. It sends it to a channel to be processed by
// the PGToken worker. If the TokenPG was stopped, an error is returned
func (t *TokenPG) HandleToken(token string, game string, platform string) error {
l := t.Logger.WithFields(logrus.Fields{
Expand All @@ -130,10 +152,16 @@ func (t *TokenPG) HandleToken(token string, game string, platform string) error
if t.closed {
e := errors.New("can't handle more tokens. The TokenPG has been stopped")
l.Error(e)
statsReporterReportMetricIncrement(t.StatsReporters,
MetricsTokensHandleAfterClosing, tkMsg.Game, tkMsg.Platform,
)
return e
}

t.tokensToDelete <- tkMsg
statsReporterReportMetricIncrement(t.StatsReporters,
MetricsTokensToDelete, tkMsg.Game, tkMsg.Platform,
)
return nil
}

Expand Down Expand Up @@ -174,6 +202,10 @@ func (t *TokenPG) Stop() error {
l.Info("waiting the worker to finish")
t.wg.Wait()
l.Info("tokenPG closed")
if ut := len(t.tokensToDelete); ut > 0 {
l.Warnf("%d tokens haven't been processed", ut)
}

t.closed = true

return nil
Expand Down
35 changes: 32 additions & 3 deletions extensions/token_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/interfaces"
"github.com/topfreegames/pusher/mocks"
testing "github.com/topfreegames/pusher/testing"
"github.com/topfreegames/pusher/util"
Expand All @@ -41,23 +42,32 @@ var _ = Describe("TokenPG Extension", func() {
var config *viper.Viper
var mockClient *mocks.PGMock
var tokenPG *TokenPG
var mockStatsDClient *mocks.StatsDClientMock
var statsClients []interfaces.StatsReporter
logger, hook := test.NewNullLogger()
token := uuid.NewV4().String()

BeforeEach(func() {
var err error
config, err = util.NewViperWithConfigFile("../config/test.yaml")
Expect(err).NotTo(HaveOccurred())

mockStatsDClient = mocks.NewStatsDClientMock()
c, err := NewStatsD(config, logger, mockStatsDClient)
Expect(err).NotTo(HaveOccurred())

statsClients = []interfaces.StatsReporter{c}

mockClient = mocks.NewPGMock(0, 1)
tokenPG, err = NewTokenPG(config, logger, mockClient)
tokenPG, err = NewTokenPG(config, logger, statsClients, mockClient)
Expect(err).NotTo(HaveOccurred())
hook.Reset()
})

Describe("[Unit]", func() {
Describe("Creating new client", func() {
It("should return connected client", func() {
t, err := NewTokenPG(config, logger, mockClient)
t, err := NewTokenPG(config, logger, statsClients, mockClient)
Expect(err).NotTo(HaveOccurred())
Expect(t).NotTo(BeNil())
Expect(t.Client).NotTo(BeNil())
Expand All @@ -69,7 +79,7 @@ var _ = Describe("TokenPG Extension", func() {

It("should return an error if failed to connect to postgres", func() {
mockClient.RowsReturned = 0
t, err := NewTokenPG(config, logger, mockClient)
t, err := NewTokenPG(config, logger, statsClients, mockClient)
Expect(t).NotTo(BeNil())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("Timed out waiting for PostgreSQL to connect"))
Expand All @@ -90,6 +100,10 @@ var _ = Describe("TokenPG Extension", func() {
Should(BeEquivalentTo(query))
Eventually(func() interface{} { return mockClient.Execs[1][1] }).
Should(BeEquivalentTo([]interface{}{token}))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }).
Should(Equal(1))
})

It("should not break if token does not exist in db", func() {
Expand All @@ -105,6 +119,10 @@ var _ = Describe("TokenPG Extension", func() {
Should(BeEquivalentTo(query))
Eventually(func() interface{} { return mockClient.Execs[1][1] }).
Should(BeEquivalentTo([]interface{}{token}))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }).
Should(Equal(1))
})

It("should return an error if pg error occurred", func() {
Expand All @@ -113,6 +131,10 @@ var _ = Describe("TokenPG Extension", func() {

Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(testing.ContainLogMessage(deletionError))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeletionError] }).
Should(Equal(1))
})
})

Expand All @@ -139,11 +161,15 @@ var _ = Describe("TokenPG Extension", func() {
To(testing.ContainLogMessage("waiting the worker to finish"))
Expect(hook.Entries).
To(testing.ContainLogMessage("tokenPG closed"))
Expect(hook.Entries).
To(testing.ContainLogMessage(fmt.Sprintf("%d tokens haven't been processed", len(tokenPG.tokensToDelete))))

err := tokenPG.HandleToken(token, "test", "apns")
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("can't handle more tokens. The TokenPG has been stopped"))
Consistently(tokenPG.HasJob).Should(BeTrue())

Expect(mockStatsDClient.Count[MetricsTokensHandleAfterClosing]).To(Equal(1))
})

It("waiting finish jobs to stop", func() {
Expand All @@ -170,6 +196,9 @@ var _ = Describe("TokenPG Extension", func() {
Expect(mockClient.Execs[i+1][0]).To(BeIdenticalTo(query))
Expect(mockClient.Execs[i+1][1]).To(BeEquivalentTo([]interface{}{tokens[i]}))
}

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(size))
Expect(mockStatsDClient.Count[MetricsTokensDeleted]).To(Equal(size))
})
})
})
Expand Down
2 changes: 1 addition & 1 deletion pusher/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (a *APNSPusher) configureStatsReporters(clientOrNil interfaces.StatsDClient
}

func (a *APNSPusher) configureInvalidTokenHandlers(dbOrNil interfaces.DB) error {
invalidTokenHandlers, err := configureInvalidTokenHandlers(a.Config, a.Logger, dbOrNil)
invalidTokenHandlers, err := configureInvalidTokenHandlers(a.Config, a.Logger, a.StatsReporters, dbOrNil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pusher/gcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (g *GCMPusher) configureFeedbackReporters() error {
}

func (g *GCMPusher) configureInvalidTokenHandlers(dbOrNil interfaces.DB) error {
invalidTokenHandlers, err := configureInvalidTokenHandlers(g.Config, g.Logger, dbOrNil)
invalidTokenHandlers, err := configureInvalidTokenHandlers(g.Config, g.Logger, g.StatsReporters, dbOrNil)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pusher/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ import (
"github.com/topfreegames/pusher/interfaces"
)

type invalidTokenHandlerInitializer func(*viper.Viper, *logrus.Logger, interfaces.DB) (interfaces.InvalidTokenHandler, error)
type invalidTokenHandlerInitializer func(*viper.Viper, *logrus.Logger, []interfaces.StatsReporter, interfaces.DB) (interfaces.InvalidTokenHandler, error)

// AvailableInvalidTokenHandlers contains functions to initialize all invalid token handlers
var AvailableInvalidTokenHandlers = map[string]invalidTokenHandlerInitializer{
"pg": func(config *viper.Viper, logger *logrus.Logger, dbOrNil interfaces.DB) (interfaces.InvalidTokenHandler, error) {
return extensions.NewTokenPG(config, logger, dbOrNil)
"pg": func(config *viper.Viper, logger *logrus.Logger, statsReporter []interfaces.StatsReporter, dbOrNil interfaces.DB) (interfaces.InvalidTokenHandler, error) {
return extensions.NewTokenPG(config, logger, statsReporter, dbOrNil)
},
}

func configureInvalidTokenHandlers(config *viper.Viper, logger *logrus.Logger, dbOrNil interfaces.DB) ([]interfaces.InvalidTokenHandler, error) {
func configureInvalidTokenHandlers(config *viper.Viper, logger *logrus.Logger, statsReporter []interfaces.StatsReporter, dbOrNil interfaces.DB) ([]interfaces.InvalidTokenHandler, error) {
invalidTokenHandlers := []interfaces.InvalidTokenHandler{}
invalidTokenHandlerNames := config.GetStringSlice("invalidToken.handlers")
for _, invalidTokenHandlerName := range invalidTokenHandlerNames {
Expand All @@ -49,7 +49,7 @@ func configureInvalidTokenHandlers(config *viper.Viper, logger *logrus.Logger, d
return nil, fmt.Errorf("Failed to initialize %s. Invalid Token Handler not available.", invalidTokenHandlerName)
}

r, err := invalidTokenHandlerFunc(config, logger, dbOrNil)
r, err := invalidTokenHandlerFunc(config, logger, statsReporter, dbOrNil)
if err != nil {
return nil, fmt.Errorf("Failed to initialize %s. %s", invalidTokenHandlerName, err.Error())
}
Expand Down
Loading

0 comments on commit ea09576

Please sign in to comment.