Skip to content

Commit

Permalink
Replacing statsd with datadog statsd
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Hahn committed Jan 17, 2018
1 parent ae66201 commit 1234c88
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 69 deletions.
130 changes: 114 additions & 16 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Gopkg.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
[[constraint]]
name = "github.com/alexcesaro/statsd"
version = "2.0.0"

[[constraint]]
name = "github.com/confluentinc/confluent-kafka-go"
version = "0.11.0"
Expand Down Expand Up @@ -45,3 +41,7 @@
[[constraint]]
name = "gopkg.in/pg.v5"
version = "5.3.3"

[[constraint]]
name = "github.com/DataDog/datadog-go"
version = "1.1.0"
2 changes: 1 addition & 1 deletion config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ stats:
statsd:
host: "localhost:8125"
prefix: "push"
flushIntervalMs: 5000
buflen: 1
invalidToken:
handlers:
- pg
Expand Down
8 changes: 4 additions & 4 deletions extensions/apns_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ var _ = FDescribe("APNS Message Handler", func() {
handler.sendMessage(kafkaMessage)
handler.sendMessage(kafkaMessage)

Expect(mockStatsDClient.Count["apns.game.sent"]).To(Equal(2))
Expect(mockStatsDClient.Count["sent"]).To(Equal(2))
})

It("should call HandleNotificationSuccess upon message response received", func() {
Expand All @@ -398,7 +398,7 @@ var _ = FDescribe("APNS Message Handler", func() {

handler.handleAPNSResponse(res)
handler.handleAPNSResponse(res)
Expect(mockStatsDClient.Count["apns.game.ack"]).To(Equal(2))
Expect(mockStatsDClient.Count["ack"]).To(Equal(2))
})

It("should call HandleNotificationFailure upon message response received", func() {
Expand All @@ -413,8 +413,8 @@ var _ = FDescribe("APNS Message Handler", func() {
handler.handleAPNSResponse(res)
handler.handleAPNSResponse(res)

Expect(mockStatsDClient.Count["apns.game.failed"]).To(Equal(2))
Expect(mockStatsDClient.Count["apns.game.missing-device-token"]).To(Equal(2))
Expect(mockStatsDClient.Count["failed"]).To(Equal(2))
Expect(mockStatsDClient.Count["missing-device-token"]).To(Equal(2))
})
})

Expand Down
39 changes: 19 additions & 20 deletions extensions/statsd.go → extensions/datadog_statsd.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016 TFG Co <backend@tfgco.com>
* Copyright (c) 2018 TFG Co <backend@tfgco.com>
* Author: TFG Co <backend@tfgco.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
Expand All @@ -23,9 +23,9 @@
package extensions

import (
"time"
"fmt"

"github.com/alexcesaro/statsd"
"github.com/DataDog/datadog-go/statsd"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/errors"
Expand Down Expand Up @@ -56,26 +56,26 @@ func NewStatsD(config *viper.Viper, logger *logrus.Logger, clientOrNil ...interf
func (s *StatsD) loadConfigurationDefaults() {
s.Config.SetDefault("stats.statsd.host", "localhost:8125")
s.Config.SetDefault("stats.statsd.prefix", "test")
s.Config.SetDefault("stats.statsd.flushIntervalMs", 5000)
s.Config.SetDefault("stats.statsd.buflen", 1)
}

func (s *StatsD) configure(client interfaces.StatsDClient) error {
s.loadConfigurationDefaults()

host := s.Config.GetString("stats.statsd.host")
prefix := s.Config.GetString("stats.statsd.prefix")
flushIntervalMs := s.Config.GetInt("stats.statsd.flushIntervalMs")
flushPeriod := time.Duration(flushIntervalMs) * time.Millisecond
buflen := s.Config.GetInt("stats.statsd.buflen")

l := s.Logger.WithFields(logrus.Fields{
"host": host,
"prefix": prefix,
"flushIntervalMs": flushIntervalMs,
"host": host,
"prefix": prefix,
"buflen": buflen,
})

if client == nil {
var err error
client, err = statsd.New(statsd.Address(host), statsd.FlushPeriod(flushPeriod), statsd.Prefix(prefix))
ddClient, err := statsd.NewBuffered(host, buflen)
ddClient.Namespace = prefix
client = ddClient

if err != nil {
l.WithError(err).Error("Error configuring statsd client.")
Expand All @@ -90,30 +90,29 @@ func (s *StatsD) configure(client interfaces.StatsDClient) error {

//HandleNotificationSent stores notification count in StatsD
func (s *StatsD) HandleNotificationSent(game string, platform string) {
s.Client.Increment(platform + "." + game + "." + "sent")
s.Client.Incr("sent", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1)
}

//HandleNotificationSuccess stores notifications success in StatsD
func (s *StatsD) HandleNotificationSuccess(game string, platform string) {
s.Client.Increment(platform + "." + game + "." + "ack")
s.Client.Incr("ack", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1)
}

//HandleNotificationFailure stores each type of failure
func (s *StatsD) HandleNotificationFailure(game string, platform string, err *errors.PushError) {
s.Client.Increment(platform + "." + game + "." + "failed")
s.Client.Increment(platform + "." + game + "." + err.Key)
s.Client.Incr("failed", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1)
s.Client.Incr(err.Key, []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1)
}

//ReportGoStats reports go stats in statsd
func (s *StatsD) ReportGoStats(
numGoRoutines int,
allocatedAndNotFreed, heapObjects, nextGCBytes, pauseGCNano uint64,
) {
s.Client.Gauge("num_goroutine", numGoRoutines)
s.Client.Gauge("allocated_not_freed", allocatedAndNotFreed)
s.Client.Gauge("heap_objects", heapObjects)
s.Client.Gauge("next_gc_bytes", nextGCBytes)
s.Client.Timing("gc_pause_duration_ms", pauseGCNano/1000000)
s.Client.Gauge("num_goroutine", float64(numGoRoutines), nil, 1)
s.Client.Gauge("allocated_not_freed", float64(allocatedAndNotFreed), nil, 1)
s.Client.Gauge("heap_objects", float64(heapObjects), nil, 1)
s.Client.Gauge("next_gc_bytes", float64(nextGCBytes), nil, 1)
}

//Cleanup closes statsd connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var _ = Describe("StatsD Extension", func() {

statsd.HandleNotificationSent("game", "apns")
statsd.HandleNotificationSent("game", "apns")
Expect(mockClient.Count["apns.game.sent"]).To(Equal(2))
Expect(mockClient.Count["sent"]).To(Equal(2))
})
})

Expand All @@ -65,7 +65,7 @@ var _ = Describe("StatsD Extension", func() {

statsd.HandleNotificationSuccess("game", "apns")
statsd.HandleNotificationSuccess("game", "apns")
Expect(mockClient.Count["apns.game.ack"]).To(Equal(2))
Expect(mockClient.Count["ack"]).To(Equal(2))
})
})

Expand All @@ -82,7 +82,6 @@ var _ = Describe("StatsD Extension", func() {
Expect(mockClient.Gauges["allocated_not_freed"]).To(BeEquivalentTo(3))
Expect(mockClient.Gauges["heap_objects"]).To(BeEquivalentTo(4))
Expect(mockClient.Gauges["next_gc_bytes"]).To(BeEquivalentTo(5))
Expect(mockClient.Timings["gc_pause_duration_ms"]).To(BeEquivalentTo(6))
})
})

Expand All @@ -97,8 +96,8 @@ var _ = Describe("StatsD Extension", func() {
statsd.HandleNotificationFailure("game", "apns", pErr)
statsd.HandleNotificationFailure("game", "apns", pErr)

Expect(mockClient.Count["apns.game.failed"]).To(Equal(2))
Expect(mockClient.Count["apns.game.some-key"]).To(Equal(2))
Expect(mockClient.Count["failed"]).To(Equal(2))
Expect(mockClient.Count["some-key"]).To(Equal(2))
})
})
})
Expand Down
Loading

0 comments on commit 1234c88

Please sign in to comment.