Skip to content

Commit

Permalink
Merge ea09576 into f958ea4
Browse files Browse the repository at this point in the history
  • Loading branch information
mbotarro committed Feb 6, 2019
2 parents f958ea4 + ea09576 commit d580c1d
Show file tree
Hide file tree
Showing 18 changed files with 466 additions and 93 deletions.
27 changes: 0 additions & 27 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ invalidToken:
maxRetries: 3
database: push
connectionTimeout: 100
chanSize: 1000
1 change: 1 addition & 0 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ invalidToken:
maxRetries: 3
database: push
connectionTimeout: 100
chanSize: 1000
1 change: 1 addition & 0 deletions docs/hosting.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ If Pusher needs to connect to a PostgreSQL database in order to delete invalid t
* `PUSHER_INVALIDTOKEN_PG_POOLSIZE` - PostgreSQL connection pool size;
* `PUSHER_INVALIDTOKEN_PG_MAXRETRIES` - PostgreSQL connection max retries;
* `PUSHER_INVALIDTOKEN_PG_CONNECTIONTIMEOUT` - Timeout for trying to establish connection;
* `PUSHER_INVALIDTOKEN_PG_CHANSIZE` - Size of the channel to buffer the tokens to be deleted;

Other than that, there are a couple more configurations you can pass using environment variables:

Expand Down
18 changes: 14 additions & 4 deletions extensions/apns_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ = FDescribe("APNS Message Handler", func() {
feedbackClients = []interfaces.FeedbackReporter{kc}

db = mocks.NewPGMock(0, 1)
it, err := NewTokenPG(config, logger, db)
it, err := NewTokenPG(config, logger, statsClients, db)
Expect(err).NotTo(HaveOccurred())
invalidTokenHandlers = []interfaces.InvalidTokenHandler{it}

Expand Down Expand Up @@ -133,7 +133,12 @@ var _ = FDescribe("APNS Message Handler", func() {
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
Expect(hook.Entries).To(ContainLogMessage("deleting token"))
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(ContainLogMessage("deleting token"))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }).
Should(Equal(1))
//Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError"))
})

Expand All @@ -146,7 +151,12 @@ var _ = FDescribe("APNS Message Handler", func() {
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
Expect(hook.Entries).To(ContainLogMessage("deleting token"))
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(ContainLogMessage("deleting token"))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }).
Should(Equal(1))
//Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError"))
})

Expand Down Expand Up @@ -425,7 +435,7 @@ var _ = FDescribe("APNS Message Handler", func() {
feedbackClients = []interfaces.FeedbackReporter{kc}

db = mocks.NewPGMock(0, 1)
it, err := NewTokenPG(config, logger, db)
it, err := NewTokenPG(config, logger, statsClients, db)
Expect(err).NotTo(HaveOccurred())
invalidTokenHandlers = []interfaces.InvalidTokenHandler{it}
handler, err = NewAPNSMessageHandler(
Expand Down
44 changes: 44 additions & 0 deletions extensions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ package extensions
import (
"encoding/json"
"regexp"
"time"

log "github.com/sirupsen/logrus"
"github.com/topfreegames/pusher/errors"
"github.com/topfreegames/pusher/interfaces"
)
Expand All @@ -44,6 +46,42 @@ func handleInvalidToken(invalidTokenHandlers []interfaces.InvalidTokenHandler, t
}
}

// StopInvalidTokenHandlers stops the invalid token handlers
// For each handler, it waits its buffered queue to be completely empty and forces
// the termination after the given timeout
func StopInvalidTokenHandlers(
logger *log.Logger,
invalidTokenHandlers []interfaces.InvalidTokenHandler,
timeout time.Duration,
) {
l := logger.WithField(
"method", "stopInvalidTokenHandlers",
)

for _, invalidTokenHandler := range invalidTokenHandlers {
done := make(chan struct{})
stopCheck := false
go func() {
defer close(done)
for invalidTokenHandler.HasJob() && !stopCheck {
time.Sleep(10 * time.Millisecond)
}
}()

select {
case <-done:
invalidTokenHandler.Stop()
return

case <-time.After(timeout):
invalidTokenHandler.Stop()
stopCheck = true
l.Error("timeout reached - invalidTokenHandler was closed with jobs in queue")
return
}
}
}

func getGameAndPlatformFromTopic(topic string) ParsedTopic {
res := topicRegex.FindStringSubmatch(topic)
return ParsedTopic{
Expand Down Expand Up @@ -82,3 +120,9 @@ func statsReporterHandleNotificationFailure(statsReporters []interfaces.StatsRep
statsReporter.HandleNotificationFailure(game, platform, err)
}
}

func statsReporterReportMetricIncrement(statsReporters []interfaces.StatsReporter, metric string, game string, platform string) {
for _, statsReporter := range statsReporters {
statsReporter.ReportMetricIncrement(metric, game, platform)
}
}
73 changes: 66 additions & 7 deletions extensions/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package extensions

import (
"fmt"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/sirupsen/logrus/hooks/test"
"github.com/topfreegames/pusher/interfaces"
"github.com/topfreegames/pusher/mocks"
testing "github.com/topfreegames/pusher/testing"
"github.com/topfreegames/pusher/util"
)

Expand All @@ -43,6 +45,8 @@ var _ = Describe("Common", func() {
var feedbackClients []interfaces.FeedbackReporter
var invalidTokenHandlers []interfaces.InvalidTokenHandler
var db *mocks.PGMock
var mockStatsDClient *mocks.StatsDClientMock
var statsClients []interfaces.StatsReporter

configFile := "../config/test.yaml"
logger, hook := test.NewNullLogger()
Expand All @@ -56,10 +60,16 @@ var _ = Describe("Common", func() {
mockKafkaProducerClient = mocks.NewKafkaProducerClientMock()
kc, err := NewKafkaProducer(config, logger, mockKafkaProducerClient)
Expect(err).NotTo(HaveOccurred())

mockStatsDClient = mocks.NewStatsDClientMock()
c, err := NewStatsD(config, logger, mockStatsDClient)
Expect(err).NotTo(HaveOccurred())

statsClients = []interfaces.StatsReporter{c}
feedbackClients = []interfaces.FeedbackReporter{kc}

db = mocks.NewPGMock(0, 1)
it, err := NewTokenPG(config, logger, db)
it, err := NewTokenPG(config, logger, statsClients, db)
Expect(err).NotTo(HaveOccurred())
invalidTokenHandlers = []interfaces.InvalidTokenHandler{it}

Expand All @@ -71,19 +81,68 @@ var _ = Describe("Common", func() {
token := uuid.NewV4().String()
handleInvalidToken(invalidTokenHandlers, token, "test", "apns")
query := "DELETE FROM test_apns WHERE token = ?0;"
Expect(db.Execs).To(HaveLen(2))
Expect(db.Execs[1][0]).To(BeEquivalentTo(query))
Expect(db.Execs[1][1]).To(BeEquivalentTo([]interface{}{token}))

Eventually(func() [][]interface{} { return db.Execs }).
Should(HaveLen(2))
Eventually(func() interface{} { return db.Execs[1][0] }).
Should(BeEquivalentTo(query))
Eventually(func() interface{} { return db.Execs[1][1] }).
Should(BeEquivalentTo([]interface{}{token}))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeleted] }).
Should(Equal(1))
})

It("should fail silently", func() {
token := uuid.NewV4().String()
db.Error = fmt.Errorf("pg: error")
handleInvalidToken(invalidTokenHandlers, token, "test", "apns")
Expect(db.Execs).To(HaveLen(2))

Eventually(func() [][]interface{} { return db.Execs }).
Should(HaveLen(2))
query := "DELETE FROM test_apns WHERE token = ?0;"
Expect(db.Execs[1][0]).To(BeEquivalentTo(query))
Expect(db.Execs[1][1]).To(BeEquivalentTo([]interface{}{token}))
Eventually(func() interface{} { return db.Execs[1][0] }).
Should(BeEquivalentTo(query))
Eventually(func() interface{} { return db.Execs[1][1] }).
Should(BeEquivalentTo([]interface{}{token}))

Expect(mockStatsDClient.Count[MetricsTokensToDelete]).To(Equal(1))
Eventually(func() int { return mockStatsDClient.Count[MetricsTokensDeletionError] }).
Should(Equal(1))
})

Describe("should stop handler gracefully", func() {
It("if there's no more job to do", func() {
token := uuid.NewV4().String()
handleInvalidToken(invalidTokenHandlers, token, "test", "apns")

timeout := 1 * time.Second
StopInvalidTokenHandlers(logger, invalidTokenHandlers, timeout)
Consistently(func() []*logrus.Entry { return hook.Entries }, 2*timeout).
ShouldNot(testing.ContainLogMessage("timeout reached - invalidTokenHandler was closed with jobs in queue"))

Eventually(func() [][]interface{} { return db.Execs }).
Should(HaveLen(2))
query := "DELETE FROM test_apns WHERE token = ?0;"
Eventually(func() interface{} { return db.Execs[1][0] }).
Should(BeEquivalentTo(query))
Eventually(func() interface{} { return db.Execs[1][1] }).
Should(BeEquivalentTo([]interface{}{token}))
})

It("timeout reached", func() {
size := 1000
for i := 0; i < size; i++ {
token := uuid.NewV4().String()
handleInvalidToken(invalidTokenHandlers, token, "test", "apns")
}

timeout := 1 * time.Nanosecond
StopInvalidTokenHandlers(logger, invalidTokenHandlers, timeout)
Eventually(func() []*logrus.Entry { return hook.Entries }).
Should(testing.ContainLogMessage("timeout reached - invalidTokenHandler was closed with jobs in queue"))
})
})
})

Expand Down
14 changes: 14 additions & 0 deletions extensions/datadog_statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ func (s *StatsD) ReportGoStats(
s.Client.Gauge("next_gc_bytes", float64(nextGCBytes), tags, 1)
}

// ReportMetricIncrement reports a custom metric increment in statsd
func (s *StatsD) ReportMetricIncrement(
metric string,
game, platform string,
) {
hostname, _ := os.Hostname()
tags := []string{
fmt.Sprintf("hostname:%s", hostname),
fmt.Sprintf("game:%s", game),
fmt.Sprintf("platform:%s", platform),
}
s.Client.Incr(metric, tags, 1)
}

//Cleanup closes statsd connection
func (s *StatsD) Cleanup() error {
s.Client.Close()
Expand Down
21 changes: 21 additions & 0 deletions extensions/datadog_statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@ var _ = Describe("StatsD Extension", func() {
Expect(mockClient.Count["failed"]).To(Equal(2))
})
})

Describe("Reporting metric increment", func() {
It("should report metric increment in statsd", func() {
statsd, err := NewStatsD(config, logger, mockClient)
Expect(err).NotTo(HaveOccurred())
defer statsd.Cleanup()

statsd.ReportMetricIncrement("tokens_to_delete", "game", "apns")
statsd.ReportMetricIncrement("tokens_to_delete", "game", "apns")
statsd.ReportMetricIncrement("tokens_to_delete", "game", "apns")

statsd.ReportMetricIncrement("tokens_deleted", "game", "apns")
statsd.ReportMetricIncrement("tokens_deleted", "game", "apns")

statsd.ReportMetricIncrement("tokens_deletion_failed", "game", "apns")

Expect(mockClient.Count["tokens_to_delete"]).To(Equal(3))
Expect(mockClient.Count["tokens_deleted"]).To(Equal(2))
Expect(mockClient.Count["tokens_deletion_failed"]).To(Equal(1))
})
})
})

Describe("[Integration]", func() {
Expand Down

0 comments on commit d580c1d

Please sign in to comment.