diff --git a/Gopkg.lock b/Gopkg.lock index 904ccd6..82a5ec3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,10 +2,10 @@ [[projects]] - name = "github.com/alexcesaro/statsd" - packages = ["."] - revision = "7fea3f0d2fab1ad973e641e51dba45443a311a90" - version = "v2.0.0" + name = "github.com/DataDog/datadog-go" + packages = ["statsd"] + revision = "0ddda6bee21174ef6c4873647cb0d6ec9cba996f" + version = "1.1.0" [[projects]] name = "github.com/certifi/gocertifi" @@ -46,7 +46,17 @@ [[projects]] branch = "master" name = "github.com/hashicorp/hcl" - packages = [".","hcl/ast","hcl/parser","hcl/scanner","hcl/strconv","hcl/token","json/parser","json/scanner","json/token"] + packages = [ + ".", + "hcl/ast", + "hcl/parser", + "hcl/scanner", + "hcl/strconv", + "hcl/token", + "json/parser", + "json/scanner", + "json/token" + ] revision = "392dba7d905ed5d04a5794ba89f558b27e2ba1ca" [[projects]] @@ -87,13 +97,45 @@ [[projects]] name = "github.com/onsi/ginkgo" - packages = [".","config","internal/codelocation","internal/containernode","internal/failer","internal/leafnodes","internal/remote","internal/spec","internal/spec_iterator","internal/specrunner","internal/suite","internal/testingtproxy","internal/writer","reporters","reporters/stenographer","reporters/stenographer/support/go-colorable","reporters/stenographer/support/go-isatty","types"] + packages = [ + ".", + "config", + "internal/codelocation", + "internal/containernode", + "internal/failer", + "internal/leafnodes", + "internal/remote", + "internal/spec", + "internal/spec_iterator", + "internal/specrunner", + "internal/suite", + "internal/testingtproxy", + "internal/writer", + "reporters", + "reporters/stenographer", + "reporters/stenographer/support/go-colorable", + "reporters/stenographer/support/go-isatty", + "types" + ] revision = "9eda700730cba42af70d53180f9dcce9266bc2bc" version = "v1.4.0" [[projects]] name = "github.com/onsi/gomega" - packages = [".","format","internal/assertion","internal/asyncassertion","internal/oraclematcher","internal/testingtsupport","matchers","matchers/support/goraph/bipartitegraph","matchers/support/goraph/edge","matchers/support/goraph/node","matchers/support/goraph/util","types"] + packages = [ + ".", + "format", + "internal/assertion", + "internal/asyncassertion", + "internal/oraclematcher", + "internal/testingtsupport", + "matchers", + "matchers/support/goraph/bipartitegraph", + "matchers/support/goraph/edge", + "matchers/support/goraph/node", + "matchers/support/goraph/util", + "types" + ] revision = "c893efa28eb45626cdaa76c9f653b62488858837" version = "v1.2.0" @@ -129,20 +171,29 @@ [[projects]] name = "github.com/sideshow/apns2" - packages = [".","token"] + packages = [ + ".", + "token" + ] revision = "a3ce9c6f95f63dab4ead29da86534dd7af95271a" version = "v0.13" [[projects]] name = "github.com/sirupsen/logrus" - packages = [".","hooks/test"] + packages = [ + ".", + "hooks/test" + ] revision = "a3f95b5c423586578a4e099b11a46c2479628cac" version = "1.0.2" [[projects]] branch = "master" name = "github.com/spf13/afero" - packages = [".","mem"] + packages = [ + ".", + "mem" + ] revision = "9be650865eab0c12963d8753212f4f9c66cdcf12" [[projects]] @@ -183,7 +234,10 @@ [[projects]] name = "github.com/stretchr/testify" - packages = ["assert","mock"] + packages = [ + "assert", + "mock" + ] revision = "69483b4bd14f5845b5a1e55bca19e954e827f1d0" version = "v1.1.4" @@ -196,13 +250,26 @@ [[projects]] branch = "master" name = "golang.org/x/crypto" - packages = ["pkcs12","pkcs12/internal/rc2"] + packages = [ + "pkcs12", + "pkcs12/internal/rc2" + ] revision = "6914964337150723782436d56b3f21610a74ce7b" [[projects]] branch = "master" name = "golang.org/x/net" - packages = ["context","context/ctxhttp","html","html/atom","html/charset","http2","http2/hpack","idna","lex/httplex"] + packages = [ + "context", + "context/ctxhttp", + "html", + "html/atom", + "html/charset", + "http2", + "http2/hpack", + "idna", + "lex/httplex" + ] revision = "ab5485076ff3407ad2d02db054635913f017b0ed" [[projects]] @@ -214,12 +281,43 @@ [[projects]] branch = "master" name = "golang.org/x/text" - packages = ["encoding","encoding/charmap","encoding/htmlindex","encoding/internal","encoding/internal/identifier","encoding/japanese","encoding/korean","encoding/simplifiedchinese","encoding/traditionalchinese","encoding/unicode","internal/gen","internal/tag","internal/triegen","internal/ucd","internal/utf8internal","language","runes","secure/bidirule","transform","unicode/bidi","unicode/cldr","unicode/norm","unicode/rangetable"] + packages = [ + "encoding", + "encoding/charmap", + "encoding/htmlindex", + "encoding/internal", + "encoding/internal/identifier", + "encoding/japanese", + "encoding/korean", + "encoding/simplifiedchinese", + "encoding/traditionalchinese", + "encoding/unicode", + "internal/gen", + "internal/tag", + "internal/triegen", + "internal/ucd", + "internal/utf8internal", + "language", + "runes", + "secure/bidirule", + "transform", + "unicode/bidi", + "unicode/cldr", + "unicode/norm", + "unicode/rangetable" + ] revision = "836efe42bb4aa16aaa17b9c155d8813d336ed720" [[projects]] name = "gopkg.in/pg.v5" - packages = [".","internal","internal/parser","internal/pool","orm","types"] + packages = [ + ".", + "internal", + "internal/parser", + "internal/pool", + "orm", + "types" + ] revision = "2246060a2a43f8282ad53295d56d780dbc930b7f" version = "v5.3.3" @@ -232,6 +330,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "558da59624e3539016af2336e57a20d4af7ebe93c3bbb6247e76585c8de5fcd2" + inputs-digest = "c0fcac949df8a913c9a6a13b6f9b5d87ec5f480f86b1d5c8311150499694df9e" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 7529a6e..c59055c 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -1,7 +1,3 @@ -[[constraint]] - name = "github.com/alexcesaro/statsd" - version = "2.0.0" - [[constraint]] name = "github.com/confluentinc/confluent-kafka-go" version = "0.11.0" @@ -45,3 +41,7 @@ [[constraint]] name = "gopkg.in/pg.v5" version = "5.3.3" + +[[constraint]] + name = "github.com/DataDog/datadog-go" + version = "1.1.0" diff --git a/config/default.yaml b/config/default.yaml index 9c37dc9..9689881 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -45,7 +45,7 @@ stats: statsd: host: "localhost:8125" prefix: "push" - flushIntervalMs: 5000 + buflen: 1 invalidToken: handlers: - pg diff --git a/extensions/apns_message_handler_test.go b/extensions/apns_message_handler_test.go index e06ee90..c4fb94a 100644 --- a/extensions/apns_message_handler_test.go +++ b/extensions/apns_message_handler_test.go @@ -384,7 +384,7 @@ var _ = FDescribe("APNS Message Handler", func() { handler.sendMessage(kafkaMessage) handler.sendMessage(kafkaMessage) - Expect(mockStatsDClient.Count["apns.game.sent"]).To(Equal(2)) + Expect(mockStatsDClient.Count["sent"]).To(Equal(2)) }) It("should call HandleNotificationSuccess upon message response received", func() { @@ -398,7 +398,7 @@ var _ = FDescribe("APNS Message Handler", func() { handler.handleAPNSResponse(res) handler.handleAPNSResponse(res) - Expect(mockStatsDClient.Count["apns.game.ack"]).To(Equal(2)) + Expect(mockStatsDClient.Count["ack"]).To(Equal(2)) }) It("should call HandleNotificationFailure upon message response received", func() { @@ -413,8 +413,8 @@ var _ = FDescribe("APNS Message Handler", func() { handler.handleAPNSResponse(res) handler.handleAPNSResponse(res) - Expect(mockStatsDClient.Count["apns.game.failed"]).To(Equal(2)) - Expect(mockStatsDClient.Count["apns.game.missing-device-token"]).To(Equal(2)) + Expect(mockStatsDClient.Count["failed"]).To(Equal(2)) + Expect(mockStatsDClient.Count["missing-device-token"]).To(Equal(2)) }) }) diff --git a/extensions/statsd.go b/extensions/datadog_statsd.go similarity index 74% rename from extensions/statsd.go rename to extensions/datadog_statsd.go index 2843afe..29f8f49 100644 --- a/extensions/statsd.go +++ b/extensions/datadog_statsd.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 TFG Co + * Copyright (c) 2018 TFG Co * Author: TFG Co * * Permission is hereby granted, free of charge, to any person obtaining a copy of @@ -23,9 +23,9 @@ package extensions import ( - "time" + "fmt" - "github.com/alexcesaro/statsd" + "github.com/DataDog/datadog-go/statsd" "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/topfreegames/pusher/errors" @@ -56,7 +56,7 @@ func NewStatsD(config *viper.Viper, logger *logrus.Logger, clientOrNil ...interf func (s *StatsD) loadConfigurationDefaults() { s.Config.SetDefault("stats.statsd.host", "localhost:8125") s.Config.SetDefault("stats.statsd.prefix", "test") - s.Config.SetDefault("stats.statsd.flushIntervalMs", 5000) + s.Config.SetDefault("stats.statsd.buflen", 1) } func (s *StatsD) configure(client interfaces.StatsDClient) error { @@ -64,18 +64,18 @@ func (s *StatsD) configure(client interfaces.StatsDClient) error { host := s.Config.GetString("stats.statsd.host") prefix := s.Config.GetString("stats.statsd.prefix") - flushIntervalMs := s.Config.GetInt("stats.statsd.flushIntervalMs") - flushPeriod := time.Duration(flushIntervalMs) * time.Millisecond + buflen := s.Config.GetInt("stats.statsd.buflen") l := s.Logger.WithFields(logrus.Fields{ - "host": host, - "prefix": prefix, - "flushIntervalMs": flushIntervalMs, + "host": host, + "prefix": prefix, + "buflen": buflen, }) if client == nil { - var err error - client, err = statsd.New(statsd.Address(host), statsd.FlushPeriod(flushPeriod), statsd.Prefix(prefix)) + ddClient, err := statsd.NewBuffered(host, buflen) + ddClient.Namespace = prefix + client = ddClient if err != nil { l.WithError(err).Error("Error configuring statsd client.") @@ -90,18 +90,18 @@ func (s *StatsD) configure(client interfaces.StatsDClient) error { //HandleNotificationSent stores notification count in StatsD func (s *StatsD) HandleNotificationSent(game string, platform string) { - s.Client.Increment(platform + "." + game + "." + "sent") + s.Client.Incr("sent", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1) } //HandleNotificationSuccess stores notifications success in StatsD func (s *StatsD) HandleNotificationSuccess(game string, platform string) { - s.Client.Increment(platform + "." + game + "." + "ack") + s.Client.Incr("ack", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1) } //HandleNotificationFailure stores each type of failure func (s *StatsD) HandleNotificationFailure(game string, platform string, err *errors.PushError) { - s.Client.Increment(platform + "." + game + "." + "failed") - s.Client.Increment(platform + "." + game + "." + err.Key) + s.Client.Incr("failed", []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1) + s.Client.Incr(err.Key, []string{fmt.Sprintf("platform:%s", platform), fmt.Sprintf("game:%s", game)}, 1) } //ReportGoStats reports go stats in statsd @@ -109,11 +109,10 @@ func (s *StatsD) ReportGoStats( numGoRoutines int, allocatedAndNotFreed, heapObjects, nextGCBytes, pauseGCNano uint64, ) { - s.Client.Gauge("num_goroutine", numGoRoutines) - s.Client.Gauge("allocated_not_freed", allocatedAndNotFreed) - s.Client.Gauge("heap_objects", heapObjects) - s.Client.Gauge("next_gc_bytes", nextGCBytes) - s.Client.Timing("gc_pause_duration_ms", pauseGCNano/1000000) + s.Client.Gauge("num_goroutine", float64(numGoRoutines), nil, 1) + s.Client.Gauge("allocated_not_freed", float64(allocatedAndNotFreed), nil, 1) + s.Client.Gauge("heap_objects", float64(heapObjects), nil, 1) + s.Client.Gauge("next_gc_bytes", float64(nextGCBytes), nil, 1) } //Cleanup closes statsd connection diff --git a/extensions/statsd_test.go b/extensions/datadog_statsd_test.go similarity index 92% rename from extensions/statsd_test.go rename to extensions/datadog_statsd_test.go index 084e1fb..559ef8f 100644 --- a/extensions/statsd_test.go +++ b/extensions/datadog_statsd_test.go @@ -53,7 +53,7 @@ var _ = Describe("StatsD Extension", func() { statsd.HandleNotificationSent("game", "apns") statsd.HandleNotificationSent("game", "apns") - Expect(mockClient.Count["apns.game.sent"]).To(Equal(2)) + Expect(mockClient.Count["sent"]).To(Equal(2)) }) }) @@ -65,7 +65,7 @@ var _ = Describe("StatsD Extension", func() { statsd.HandleNotificationSuccess("game", "apns") statsd.HandleNotificationSuccess("game", "apns") - Expect(mockClient.Count["apns.game.ack"]).To(Equal(2)) + Expect(mockClient.Count["ack"]).To(Equal(2)) }) }) @@ -82,7 +82,6 @@ var _ = Describe("StatsD Extension", func() { Expect(mockClient.Gauges["allocated_not_freed"]).To(BeEquivalentTo(3)) Expect(mockClient.Gauges["heap_objects"]).To(BeEquivalentTo(4)) Expect(mockClient.Gauges["next_gc_bytes"]).To(BeEquivalentTo(5)) - Expect(mockClient.Timings["gc_pause_duration_ms"]).To(BeEquivalentTo(6)) }) }) @@ -97,8 +96,8 @@ var _ = Describe("StatsD Extension", func() { statsd.HandleNotificationFailure("game", "apns", pErr) statsd.HandleNotificationFailure("game", "apns", pErr) - Expect(mockClient.Count["apns.game.failed"]).To(Equal(2)) - Expect(mockClient.Count["apns.game.some-key"]).To(Equal(2)) + Expect(mockClient.Count["failed"]).To(Equal(2)) + Expect(mockClient.Count["some-key"]).To(Equal(2)) }) }) }) diff --git a/extensions/gcm_message_handler_test.go b/extensions/gcm_message_handler_test.go index 79c22a7..75a3e9b 100644 --- a/extensions/gcm_message_handler_test.go +++ b/extensions/gcm_message_handler_test.go @@ -478,14 +478,14 @@ var _ = Describe("GCM Message Handler", func() { err = handler.sendMessage(kafkaMessage) Expect(err).NotTo(HaveOccurred()) - Expect(mockStatsDClient.Count["gcm.game.sent"]).To(Equal(2)) + Expect(mockStatsDClient.Count["sent"]).To(Equal(2)) }) It("should call HandleNotificationSuccess upon message response received", func() { res := gcm.CCSMessage{} handler.handleGCMResponse(res) handler.handleGCMResponse(res) - Expect(mockStatsDClient.Count["gcm..ack"]).To(Equal(2)) + Expect(mockStatsDClient.Count["ack"]).To(Equal(2)) }) It("should call HandleNotificationFailure upon message response received", func() { @@ -495,8 +495,8 @@ var _ = Describe("GCM Message Handler", func() { handler.handleGCMResponse(res) handler.handleGCMResponse(res) - Expect(mockStatsDClient.Count["gcm..failed"]).To(Equal(2)) - Expect(mockStatsDClient.Count["gcm..device_unregistered"]).To(Equal(2)) + Expect(mockStatsDClient.Count["failed"]).To(Equal(2)) + Expect(mockStatsDClient.Count["device_unregistered"]).To(Equal(2)) }) }) diff --git a/extensions/pg.go b/extensions/pg.go index 5be1115..1bfb889 100644 --- a/extensions/pg.go +++ b/extensions/pg.go @@ -123,7 +123,7 @@ func (c *PGClient) WaitForConnection(timeout int) error { } if ellapsed() > t { - return fmt.Errorf("Timed out waiting for PostgreSQL to connect.") + return fmt.Errorf("Timed out waiting for PostgreSQL to connect") } return nil diff --git a/extensions/token_pg_test.go b/extensions/token_pg_test.go index 56223e7..d1137b6 100644 --- a/extensions/token_pg_test.go +++ b/extensions/token_pg_test.go @@ -69,7 +69,7 @@ var _ = Describe("TokenPG Extension", func() { t, err := NewTokenPG(config, logger, mockClient) Expect(t).NotTo(BeNil()) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("Timed out waiting for PostgreSQL to connect.")) + Expect(err.Error()).To(Equal("Timed out waiting for PostgreSQL to connect")) }) }) diff --git a/interfaces/statsd.go b/interfaces/statsd.go index 29aa669..a98541d 100644 --- a/interfaces/statsd.go +++ b/interfaces/statsd.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 TFG Co + * Copyright (c) 2018 TFG Co * Author: TFG Co * * Permission is hereby granted, free of charge, to any person obtaining a copy of @@ -22,10 +22,14 @@ package interfaces +import ( + "time" +) + // StatsDClient interface type StatsDClient interface { - Increment(string) - Gauge(string, interface{}) - Timing(string, interface{}) - Close() + Incr(string, []string, float64) error + Gauge(string, float64, []string, float64) error + Timing(string, time.Duration, []string, float64) error + Close() error } diff --git a/mocks/statsd.go b/mocks/statsd.go index a8f880f..4b002a4 100644 --- a/mocks/statsd.go +++ b/mocks/statsd.go @@ -22,7 +22,10 @@ package mocks -import "sync" +import ( + "sync" + "time" +) //StatsDClientMock should be used for tests that need to send xmpp messages to StatsD type StatsDClientMock struct { @@ -44,28 +47,32 @@ func NewStatsDClientMock() *StatsDClientMock { var mutexCount, mutexGauges, mutexTimings sync.Mutex -//Increment stores the new count in a map -func (m *StatsDClientMock) Increment(bucket string) { +//Incr stores the new count in a map +func (m *StatsDClientMock) Incr(bucket string, tags []string, rate float64) error { mutexCount.Lock() m.Count[bucket]++ mutexCount.Unlock() + return nil } //Gauge stores the count in a map -func (m *StatsDClientMock) Gauge(bucket string, value interface{}) { +func (m *StatsDClientMock) Gauge(bucket string, value float64, tags []string, rate float64) error { mutexGauges.Lock() m.Gauges[bucket] = value mutexGauges.Unlock() + return nil } //Timing stores the count in a map -func (m *StatsDClientMock) Timing(bucket string, value interface{}) { +func (m *StatsDClientMock) Timing(bucket string, value time.Duration, tags []string, rate float64) error { mutexTimings.Lock() m.Timings[bucket] = value mutexTimings.Unlock() + return nil } //Close records that it is closed -func (m *StatsDClientMock) Close() { +func (m *StatsDClientMock) Close() error { m.Closed = true + return nil } diff --git a/pusher/handlers_test.go b/pusher/handlers_test.go index 76a7f2b..f9696de 100644 --- a/pusher/handlers_test.go +++ b/pusher/handlers_test.go @@ -65,7 +65,7 @@ var _ = Describe("Handlers", func() { mockClient.RowsReturned = 0 handlers, err := configureInvalidTokenHandlers(config, logger, mockClient) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("Failed to initialize pg. Timed out waiting for PostgreSQL to connect.")) + Expect(err.Error()).To(Equal("Failed to initialize pg. Timed out waiting for PostgreSQL to connect")) Expect(handlers).To(BeNil()) }) }) diff --git a/util/version.go b/util/version.go index 16ba7d6..5ce7405 100644 --- a/util/version.go +++ b/util/version.go @@ -23,4 +23,4 @@ package util //Version is the current version of pusher -var Version = "3.1.0" +var Version = "3.2.0"