Skip to content

Commit

Permalink
Add metric support to Feedback Listener (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbotarro authored and cscatolini committed Feb 19, 2019
1 parent 495fb5f commit a54d346
Show file tree
Hide file tree
Showing 18 changed files with 437 additions and 81 deletions.
2 changes: 1 addition & 1 deletion cmd/feedback_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func newFeedbackListener(
log.Level = logrus.InfoLevel
}

return feedback.NewListener(config, log)
return feedback.NewListener(config, log, nil)
}

// starFeedbackListenerCmd represents the start-feedback-listener command
Expand Down
2 changes: 2 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ feedback:
stats:
reporters:
- statsd
flush:
s: 5
statsd:
host: "localhost:8125"
prefix: "push"
Expand Down
2 changes: 2 additions & 0 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ feedback:
stats:
reporters:
- statsd
flush:
s: 5
statsd:
host: "localhost:40001"
prefix: "push"
Expand Down
6 changes: 3 additions & 3 deletions extensions/apns_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ var _ = FDescribe("APNS Message Handler", func() {
handler.sendMessage(kafkaMessage)
handler.sendMessage(kafkaMessage)

Expect(mockStatsDClient.Count["sent"]).To(Equal(2))
Expect(mockStatsDClient.Counts["sent"]).To(Equal(int64(2)))
})

It("should call HandleNotificationSuccess upon message response received", func() {
Expand All @@ -396,7 +396,7 @@ var _ = FDescribe("APNS Message Handler", func() {

handler.handleAPNSResponse(res)
handler.handleAPNSResponse(res)
Expect(mockStatsDClient.Count["ack"]).To(Equal(2))
Expect(mockStatsDClient.Counts["ack"]).To(Equal(int64(2)))
})

It("should call HandleNotificationFailure upon message response received", func() {
Expand All @@ -411,7 +411,7 @@ var _ = FDescribe("APNS Message Handler", func() {
handler.handleAPNSResponse(res)
handler.handleAPNSResponse(res)

Expect(mockStatsDClient.Count["failed"]).To(Equal(2))
Expect(mockStatsDClient.Counts["failed"]).To(Equal(int64(2)))
})
})

Expand Down
44 changes: 44 additions & 0 deletions extensions/datadog_statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,47 @@ func (s *StatsD) Cleanup() error {
s.Client.Close()
return nil
}

// ReportMetricGauge reports a metric as a Gauge with hostname, game and platform
// as tags
func (s *StatsD) ReportMetricGauge(
metric string, value float64,
game, platform string,
) {
hostname, _ := os.Hostname()
tags := []string{
fmt.Sprintf("hostname:%s", hostname),
}

if game != "" {
tags = append(tags, game)
}

if platform != "" {
tags = append(tags, platform)
}

s.Client.Gauge(metric, value, tags, 1)
}

// ReportMetricCount reports a metric as a Count with hostname, game and platform
// as tags
func (s *StatsD) ReportMetricCount(
metric string, value int64,
game, platform string,
) {
hostname, _ := os.Hostname()
tags := []string{
fmt.Sprintf("hostname:%s", hostname),
}

if game != "" {
tags = append(tags, game)
}

if platform != "" {
tags = append(tags, platform)
}

s.Client.Count(metric, value, tags, 1)
}
40 changes: 37 additions & 3 deletions extensions/datadog_statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var _ = Describe("StatsD Extension", func() {

statsd.HandleNotificationSent("game", "apns")
statsd.HandleNotificationSent("game", "apns")
Expect(mockClient.Count["sent"]).To(Equal(2))
Expect(mockClient.Counts["sent"]).To(Equal(int64(2)))
})
})

Expand All @@ -65,7 +65,7 @@ var _ = Describe("StatsD Extension", func() {

statsd.HandleNotificationSuccess("game", "apns")
statsd.HandleNotificationSuccess("game", "apns")
Expect(mockClient.Count["ack"]).To(Equal(2))
Expect(mockClient.Counts["ack"]).To(Equal(int64(2)))
})
})

Expand Down Expand Up @@ -96,7 +96,41 @@ var _ = Describe("StatsD Extension", func() {
statsd.HandleNotificationFailure("game", "apns", pErr)
statsd.HandleNotificationFailure("game", "apns", pErr)

Expect(mockClient.Count["failed"]).To(Equal(2))
Expect(mockClient.Counts["failed"]).To(Equal(int64(2)))
})
})

Describe("Reporting metric count", func() {
It("should report metric increment in statsd", func() {
statsd, err := NewStatsD(config, logger, mockClient)
Expect(err).NotTo(HaveOccurred())
defer statsd.Cleanup()

statsd.ReportMetricCount("tokens_delete_success", 2, "game", "apns")
statsd.ReportMetricCount("tokens_delete_error", 3, "game", "apns")

statsd.ReportMetricCount("tokens_delete_success", 3, "game", "apns")
statsd.ReportMetricCount("tokens_delete_error", 0, "game", "apns")

Expect(mockClient.Counts["tokens_delete_success"]).To(Equal(int64(5)))
Expect(mockClient.Counts["tokens_delete_error"]).To(Equal(int64(3)))
})
})

Describe("Reporting metric gauge", func() {
It("should report metric gauge in statsd", func() {
statsd, err := NewStatsD(config, logger, mockClient)
Expect(err).NotTo(HaveOccurred())
defer statsd.Cleanup()

statsd.ReportMetricGauge("in_chan_size", 10, "game", "apns")
Expect(mockClient.Gauges["in_chan_size"]).To(Equal(float64(10)))

statsd.ReportMetricGauge("in_chan_size", 0, "game", "apns")
Expect(mockClient.Gauges["in_chan_size"]).To(Equal(float64(0)))

statsd.ReportMetricGauge("in_chan_size", 3, "game", "apns")
Expect(mockClient.Gauges["in_chan_size"]).To(Equal(float64(3)))
})
})
})
Expand Down
44 changes: 22 additions & 22 deletions extensions/gcm_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
uuid "github.com/satori/go.uuid"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/topfreegames/go-gcm"
gcm "github.com/topfreegames/go-gcm"
"github.com/topfreegames/pusher/interfaces"
"github.com/topfreegames/pusher/mocks"
. "github.com/topfreegames/pusher/testing"
Expand Down Expand Up @@ -214,9 +214,9 @@ var _ = Describe("GCM Message Handler", func() {
gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
},
metadata,
makeTimestamp() + int64(1000000),
Expand Down Expand Up @@ -245,9 +245,9 @@ var _ = Describe("GCM Message Handler", func() {
gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
},
metadata,
makeTimestamp() - int64(100),
Expand Down Expand Up @@ -285,9 +285,9 @@ var _ = Describe("GCM Message Handler", func() {
msg := &gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
}
msgBytes, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -315,9 +315,9 @@ var _ = Describe("GCM Message Handler", func() {
gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
},
metadata,
makeTimestamp() + int64(1000000),
Expand All @@ -341,9 +341,9 @@ var _ = Describe("GCM Message Handler", func() {
msg := &gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
}
msgBytes, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -460,9 +460,9 @@ var _ = Describe("GCM Message Handler", func() {
msg := &gcm.XMPPMessage{
TimeToLive: &ttl,
DeliveryReceiptRequested: false,
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
DryRun: true,
To: uuid.NewV4().String(),
Data: map[string]interface{}{},
}
msgBytes, err := json.Marshal(msg)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -476,14 +476,14 @@ var _ = Describe("GCM Message Handler", func() {

err = handler.sendMessage(kafkaMessage)
Expect(err).NotTo(HaveOccurred())
Expect(mockStatsDClient.Count["sent"]).To(Equal(2))
Expect(mockStatsDClient.Counts["sent"]).To(Equal(int64(2)))
})

It("should call HandleNotificationSuccess upon message response received", func() {
res := gcm.CCSMessage{}
handler.handleGCMResponse(res)
handler.handleGCMResponse(res)
Expect(mockStatsDClient.Count["ack"]).To(Equal(2))
Expect(mockStatsDClient.Counts["ack"]).To(Equal(int64(2)))
})

It("should call HandleNotificationFailure upon message response received", func() {
Expand All @@ -493,7 +493,7 @@ var _ = Describe("GCM Message Handler", func() {
handler.handleGCMResponse(res)
handler.handleGCMResponse(res)

Expect(mockStatsDClient.Count["failed"]).To(Equal(2))
Expect(mockStatsDClient.Counts["failed"]).To(Equal(int64(2)))
})
})

Expand Down
5 changes: 4 additions & 1 deletion feedback/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync"

gcm "github.com/topfreegames/go-gcm"
"github.com/topfreegames/pusher/interfaces"
"github.com/topfreegames/pusher/structs"

"github.com/sideshow/apns2"
Expand All @@ -53,6 +54,7 @@ type Message struct {
type Broker struct {
Logger *log.Logger
Config *viper.Viper
StatsReporters []interfaces.StatsReporter
InChan chan QueueMessage
pendingMessagesWG *sync.WaitGroup
InvalidTokenOutChan chan *InvalidToken
Expand All @@ -63,13 +65,14 @@ type Broker struct {

// NewBroker creates a new Broker instance
func NewBroker(
logger *log.Logger, cfg *viper.Viper,
logger *log.Logger, cfg *viper.Viper, statsReporters []interfaces.StatsReporter,
inChan chan QueueMessage,
pendingMessagesWG *sync.WaitGroup,
) (*Broker, error) {
b := &Broker{
Logger: logger,
Config: cfg,
StatsReporters: statsReporters,
InChan: inChan,
pendingMessagesWG: pendingMessagesWG,
stopChannel: make(chan struct{}),
Expand Down
6 changes: 3 additions & 3 deletions feedback/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var _ = Describe("Broker", func() {

Describe("[Unit]", func() {
It("Should start and stop correctly", func() {
broker, err := NewBroker(logger, config, inChan, nil)
broker, err := NewBroker(logger, config, nil, inChan, nil)
Expect(err).NotTo(HaveOccurred())

broker.Start()
Expand Down Expand Up @@ -95,7 +95,7 @@ var _ = Describe("Broker", func() {
})

It("Should route an invalid token feedback", func() {
broker, err := NewBroker(logger, config, inChan, nil)
broker, err := NewBroker(logger, config, nil, inChan, nil)
Expect(err).NotTo(HaveOccurred())

broker.Start()
Expand Down Expand Up @@ -140,7 +140,7 @@ var _ = Describe("Broker", func() {
})

It("Should route an invalid token feedback from GCM", func() {
broker, err := NewBroker(logger, config, inChan, nil)
broker, err := NewBroker(logger, config, nil, inChan, nil)
Expect(err).NotTo(HaveOccurred())

broker.Start()
Expand Down

0 comments on commit a54d346

Please sign in to comment.