Skip to content

Commit

Permalink
mocks.statsd is now thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
Henrique Rodrigues committed Feb 7, 2017
1 parent 419a859 commit d411a07
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 86 deletions.
81 changes: 42 additions & 39 deletions extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
)

var apnsResMutex sync.Mutex
var inflightMessagesMetadataLock sync.Mutex

// Notification is the notification base struct
type Notification struct {
Expand All @@ -56,26 +55,27 @@ type ResponseWithMetadata struct {

// APNSMessageHandler implements the messagehandler interface
type APNSMessageHandler struct {
certificate tls.Certificate
CertificatePath string
Config *viper.Viper
failuresReceived int64
feedbackReporters []interfaces.FeedbackReporter
InflightMessagesMetadata map[string]interface{}
InvalidTokenHandlers []interfaces.InvalidTokenHandler
IsProduction bool
Logger *log.Logger
LogStatsInterval time.Duration
pendingMessagesWG *sync.WaitGroup
PushQueue interfaces.APNSPushQueue
responsesReceived int64
run bool
sentMessages int64
StatsReporters []interfaces.StatsReporter
successesReceived int64
Topic string
requestsHeap *TimeoutHeap
CacheCleaningInterval int
certificate tls.Certificate
CertificatePath string
Config *viper.Viper
failuresReceived int64
feedbackReporters []interfaces.FeedbackReporter
InflightMessagesMetadata map[string]interface{}
InvalidTokenHandlers []interfaces.InvalidTokenHandler
IsProduction bool
Logger *log.Logger
LogStatsInterval time.Duration
pendingMessagesWG *sync.WaitGroup
inflightMessagesMetadataLock *sync.Mutex
PushQueue interfaces.APNSPushQueue
responsesReceived int64
run bool
sentMessages int64
StatsReporters []interfaces.StatsReporter
successesReceived int64
Topic string
requestsHeap *TimeoutHeap
CacheCleaningInterval int
}

// NewAPNSMessageHandler returns a new instance of a APNSMessageHandler
Expand All @@ -91,20 +91,21 @@ func NewAPNSMessageHandler(
queue interfaces.APNSPushQueue,
) (*APNSMessageHandler, error) {
a := &APNSMessageHandler{
CertificatePath: certificatePath,
Config: config,
failuresReceived: 0,
feedbackReporters: feedbackReporters,
InflightMessagesMetadata: map[string]interface{}{},
InvalidTokenHandlers: invalidTokenHandlers,
IsProduction: isProduction,
Logger: logger,
pendingMessagesWG: pendingMessagesWG,
responsesReceived: 0,
sentMessages: 0,
StatsReporters: statsReporters,
successesReceived: 0,
requestsHeap: NewTimeoutHeap(config),
CertificatePath: certificatePath,
Config: config,
failuresReceived: 0,
feedbackReporters: feedbackReporters,
InflightMessagesMetadata: map[string]interface{}{},
InvalidTokenHandlers: invalidTokenHandlers,
IsProduction: isProduction,
Logger: logger,
pendingMessagesWG: pendingMessagesWG,
inflightMessagesMetadataLock: &sync.Mutex{},
responsesReceived: 0,
sentMessages: 0,
StatsReporters: statsReporters,
successesReceived: 0,
requestsHeap: NewTimeoutHeap(config),
}
if err := a.configure(queue); err != nil {
return nil, err
Expand Down Expand Up @@ -189,12 +190,12 @@ func (a *APNSMessageHandler) sendMessage(message []byte) error {
}
statsReporterHandleNotificationSent(a.StatsReporters)
a.PushQueue.Push(n.DeviceToken, h, payload)
inflightMessagesMetadataLock.Lock()
a.inflightMessagesMetadataLock.Lock()

a.InflightMessagesMetadata[n.DeviceToken] = n.Metadata
a.requestsHeap.AddRequest(n.DeviceToken)

inflightMessagesMetadataLock.Unlock()
a.inflightMessagesMetadataLock.Unlock()
a.sentMessages++
return nil
}
Expand All @@ -214,10 +215,12 @@ func (a *APNSMessageHandler) CleanMetadataCache() {
var deviceToken string
var hasIndeed bool
for {
a.inflightMessagesMetadataLock.Lock()
for deviceToken, hasIndeed = a.requestsHeap.HasExpiredRequest(); hasIndeed; {
delete(a.InflightMessagesMetadata, deviceToken)
deviceToken, hasIndeed = a.requestsHeap.HasExpiredRequest()
}
a.inflightMessagesMetadataLock.Unlock()

duration := time.Duration(a.CacheCleaningInterval)
time.Sleep(duration * time.Millisecond)
Expand Down Expand Up @@ -255,12 +258,12 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
responseWithMetadata := &ResponseWithMetadata{
Response: res,
}
inflightMessagesMetadataLock.Lock()
a.inflightMessagesMetadataLock.Lock()
if val, ok := a.InflightMessagesMetadata[res.DeviceToken]; ok {
responseWithMetadata.Metadata = val.(map[string]interface{})
delete(a.InflightMessagesMetadata, res.DeviceToken)
}
inflightMessagesMetadataLock.Unlock()
a.inflightMessagesMetadataLock.Unlock()

if err != nil {
l.WithError(err).Error("error sending feedback to reporter")
Expand Down
88 changes: 46 additions & 42 deletions extensions/gcm_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,28 +52,29 @@ type CCSMessageWithMetadata struct {

// GCMMessageHandler implements the messagehandler interface
type GCMMessageHandler struct {
apiKey string
Config *viper.Viper
failuresReceived int64
feedbackReporters []interfaces.FeedbackReporter
GCMClient interfaces.GCMClient
InflightMessagesMetadata map[string]interface{}
InvalidTokenHandlers []interfaces.InvalidTokenHandler
IsProduction bool
Logger *log.Logger
LogStatsInterval time.Duration
pendingMessages chan bool
pendingMessagesWG *sync.WaitGroup
PingInterval int
PingTimeout int
responsesReceived int64
run bool
senderID string
sentMessages int64
StatsReporters []interfaces.StatsReporter
successesReceived int64
requestsHeap *TimeoutHeap
CacheCleaningInterval int
apiKey string
Config *viper.Viper
failuresReceived int64
feedbackReporters []interfaces.FeedbackReporter
GCMClient interfaces.GCMClient
InflightMessagesMetadata map[string]interface{}
InvalidTokenHandlers []interfaces.InvalidTokenHandler
IsProduction bool
Logger *log.Logger
LogStatsInterval time.Duration
pendingMessages chan bool
pendingMessagesWG *sync.WaitGroup
inflightMessagesMetadataLock *sync.Mutex
PingInterval int
PingTimeout int
responsesReceived int64
run bool
senderID string
sentMessages int64
StatsReporters []interfaces.StatsReporter
successesReceived int64
requestsHeap *TimeoutHeap
CacheCleaningInterval int
}

// NewGCMMessageHandler returns a new instance of a GCMMessageHandler
Expand All @@ -96,21 +97,22 @@ func NewGCMMessageHandler(
})

g := &GCMMessageHandler{
apiKey: apiKey,
Config: config,
failuresReceived: 0,
feedbackReporters: feedbackReporters,
InflightMessagesMetadata: map[string]interface{}{},
InvalidTokenHandlers: invalidTokenHandlers,
IsProduction: isProduction,
Logger: logger,
pendingMessagesWG: pendingMessagesWG,
responsesReceived: 0,
senderID: senderID,
sentMessages: 0,
StatsReporters: statsReporters,
successesReceived: 0,
requestsHeap: NewTimeoutHeap(config),
apiKey: apiKey,
Config: config,
failuresReceived: 0,
feedbackReporters: feedbackReporters,
InflightMessagesMetadata: map[string]interface{}{},
InvalidTokenHandlers: invalidTokenHandlers,
IsProduction: isProduction,
Logger: logger,
pendingMessagesWG: pendingMessagesWG,
inflightMessagesMetadataLock: &sync.Mutex{},
responsesReceived: 0,
senderID: senderID,
sentMessages: 0,
StatsReporters: statsReporters,
successesReceived: 0,
requestsHeap: NewTimeoutHeap(config),
}
err := g.configure(client)
if err != nil {
Expand Down Expand Up @@ -201,12 +203,12 @@ func (g *GCMMessageHandler) handleGCMResponse(cm gcm.CCSMessage) error {
ccsMessageWithMetadata := &CCSMessageWithMetadata{
CCSMessage: cm,
}
inflightMessagesMetadataLock.Lock()
g.inflightMessagesMetadataLock.Lock()
if val, ok := g.InflightMessagesMetadata[cm.MessageID]; ok {
ccsMessageWithMetadata.Metadata = val.(map[string]interface{})
delete(g.InflightMessagesMetadata, cm.MessageID)
}
inflightMessagesMetadataLock.Unlock()
g.inflightMessagesMetadataLock.Unlock()

if cm.Error != "" {
gcmResMutex.Lock()
Expand Down Expand Up @@ -293,12 +295,12 @@ func (g *GCMMessageHandler) sendMessage(message []byte) error {

if messageID != "" {
if km.Metadata != nil && len(km.Metadata) > 0 {
inflightMessagesMetadataLock.Lock()
g.inflightMessagesMetadataLock.Lock()

g.InflightMessagesMetadata[messageID] = km.Metadata
g.requestsHeap.AddRequest(messageID)

inflightMessagesMetadataLock.Unlock()
g.inflightMessagesMetadataLock.Unlock()
}
}

Expand All @@ -320,12 +322,14 @@ func (g *GCMMessageHandler) CleanMetadataCache() {
var deviceToken string
var hasIndeed bool
for {
g.inflightMessagesMetadataLock.Lock()
for deviceToken, hasIndeed = g.requestsHeap.HasExpiredRequest(); hasIndeed; {
delete(g.InflightMessagesMetadata, deviceToken)
deviceToken, hasIndeed = g.requestsHeap.HasExpiredRequest()
}
g.inflightMessagesMetadataLock.Unlock()

duration := time.Duration(g.Config.GetInt("feedback.cache.tick"))
duration := time.Duration(g.CacheCleaningInterval)
time.Sleep(duration * time.Millisecond)
}
}
Expand Down
5 changes: 0 additions & 5 deletions extensions/timeout_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,13 @@ import (
"time"
)

// A timeoutNode contains device token and the time when the request expires
type timeoutNode struct {
UnixTimeStamp int64
DeviceToken string
index int
}

// TODO: remove this constant and get it from config file
var timeoutCte int64

// Mutex for secure concurrency
var mutex sync.Mutex

// TimeoutHeap is a array of timeoutNode, which has request ID and expiration time
Expand Down Expand Up @@ -100,7 +96,6 @@ func (th *TimeoutHeap) empty() bool {
return th.Len() == 0
}

// Returns all information about the poped node
func (th *TimeoutHeap) completeHasExpiredRequest() (string, int64, bool) {
mutex.Lock()
defer mutex.Unlock()
Expand Down
10 changes: 10 additions & 0 deletions mocks/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

package mocks

import "sync"

//StatsDClientMock should be used for tests that need to send xmpp messages to StatsD
type StatsDClientMock struct {
Count map[string]int
Expand All @@ -40,19 +42,27 @@ func NewStatsDClientMock() *StatsDClientMock {
}
}

var mutexCount, mutexGauges, mutexTimings sync.Mutex

//Increment stores the new count in a map
func (m *StatsDClientMock) Increment(bucket string) {
mutexCount.Lock()
m.Count[bucket]++
mutexCount.Unlock()
}

//Gauge stores the count in a map
func (m *StatsDClientMock) Gauge(bucket string, value interface{}) {
mutexGauges.Lock()
m.Gauges[bucket] = value
mutexGauges.Unlock()
}

//Timing stores the count in a map
func (m *StatsDClientMock) Timing(bucket string, value interface{}) {
mutexTimings.Lock()
m.Timings[bucket] = value
mutexTimings.Unlock()
}

//Close records that it is closed
Expand Down

0 comments on commit d411a07

Please sign in to comment.