Skip to content

Commit

Permalink
Improving metrics and reducing scope of some locks
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Hahn committed Jan 24, 2018
1 parent 6fb303b commit 21012f6
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 20 deletions.
2 changes: 1 addition & 1 deletion config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ feedback:
topics: "push-test_apns-feedbacks"
brokers: "localhost:9941"
cache:
requestTimeout: 86400000
requestTimeout: 1800000
cleaningInterval: 300000
stats:
reporters:
Expand Down
18 changes: 15 additions & 3 deletions extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func (a *APNSMessageHandler) sendMessage(message interfaces.KafkaMessage) error
payload, err := json.Marshal(n.Payload)
if err != nil {
l.WithError(err).Error("error marshaling message payload")
a.ignoredMessages++
if a.pendingMessagesWG != nil {
a.pendingMessagesWG.Done()
}
return err
}
if n.PushExpiry > 0 && n.PushExpiry < makeTimestamp() {
Expand All @@ -178,9 +182,7 @@ func (a *APNSMessageHandler) sendMessage(message interfaces.KafkaMessage) error
if n.Metadata == nil {
n.Metadata = map[string]interface{}{}
}
a.inflightMessagesMetadataLock.Lock()

n.Metadata["timestamp"] = time.Now().Unix()
n.Metadata["game"] = a.appName
n.Metadata["platform"] = "apns"
n.Metadata["deviceToken"] = n.DeviceToken
Expand All @@ -190,10 +192,13 @@ func (a *APNSMessageHandler) sendMessage(message interfaces.KafkaMessage) error
} else {
n.Metadata["hostname"] = hostname
}
n.Metadata["timestamp"] = time.Now().Unix()

a.inflightMessagesMetadataLock.Lock()
a.InflightMessagesMetadata[deviceIdentifier] = n.Metadata
a.requestsHeap.AddRequest(deviceIdentifier)

a.inflightMessagesMetadataLock.Unlock()

a.sentMessages++
return nil
}
Expand All @@ -212,6 +217,12 @@ func (a *APNSMessageHandler) CleanMetadataCache() {
for {
a.inflightMessagesMetadataLock.Lock()
for deviceToken, hasIndeed = a.requestsHeap.HasExpiredRequest(); hasIndeed; {
if _, ok := a.InflightMessagesMetadata[deviceToken]; ok {
a.ignoredMessages++
if a.pendingMessagesWG != nil {
a.pendingMessagesWG.Done()
}
}
delete(a.InflightMessagesMetadata, deviceToken)
deviceToken, hasIndeed = a.requestsHeap.HasExpiredRequest()
}
Expand All @@ -234,6 +245,7 @@ func (a *APNSMessageHandler) handleAPNSResponse(responseWithMetadata *structs.Re
}
}()

// TODO: Remove from timeout heap (will need a different heap implementation for this)
l := a.Logger.WithFields(log.Fields{
"method": "handleAPNSResponse",
"res": responseWithMetadata,
Expand Down
2 changes: 1 addition & 1 deletion extensions/apns_push_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (p *APNSPushQueue) pushWorker() {

for notification := range p.pushChannel {
client := <-p.clients
p.clients <- client
res, err := client.Push(notification)
p.clients <- client
if err != nil {
l.WithError(err).Error("push error")
}
Expand Down
11 changes: 7 additions & 4 deletions extensions/datadog_statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package extensions

import (
"fmt"
"os"

"github.com/DataDog/datadog-go/statsd"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -108,10 +109,12 @@ func (s *StatsD) ReportGoStats(
numGoRoutines int,
allocatedAndNotFreed, heapObjects, nextGCBytes, pauseGCNano uint64,
) {
s.Client.Gauge("num_goroutine", float64(numGoRoutines), nil, 1)
s.Client.Gauge("allocated_not_freed", float64(allocatedAndNotFreed), nil, 1)
s.Client.Gauge("heap_objects", float64(heapObjects), nil, 1)
s.Client.Gauge("next_gc_bytes", float64(nextGCBytes), nil, 1)
hostname, _ := os.Hostname()
tags := []string{fmt.Sprintf("hostname:%s", hostname)}
s.Client.Gauge("num_goroutine", float64(numGoRoutines), tags, 1)
s.Client.Gauge("allocated_not_freed", float64(allocatedAndNotFreed), tags, 1)
s.Client.Gauge("heap_objects", float64(heapObjects), tags, 1)
s.Client.Gauge("next_gc_bytes", float64(nextGCBytes), tags, 1)
}

//Cleanup closes statsd connection
Expand Down
7 changes: 2 additions & 5 deletions extensions/gcm_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,11 @@ func (g *GCMMessageHandler) handleGCMResponse(cm gcm.CCSMessage) error {
}

func (g *GCMMessageHandler) sendMessage(message interfaces.KafkaMessage) error {
g.pendingMessages <- true
l := g.Logger.WithField("method", "sendMessage")
//ttl := uint(0)
km := KafkaGCMMessage{}
err := json.Unmarshal(message.Value, &km)
if err != nil {
<-g.pendingMessages
l.WithError(err).Error("Error unmarshaling message.")
return err
}
Expand All @@ -311,6 +309,7 @@ func (g *GCMMessageHandler) sendMessage(message interfaces.KafkaMessage) error {
var messageID string
var bytes int

g.pendingMessages <- true
messageID, bytes, err = g.GCMClient.SendXMPP(km.XMPPMessage)

if err != nil {
Expand All @@ -324,8 +323,6 @@ func (g *GCMMessageHandler) sendMessage(message interfaces.KafkaMessage) error {
km.Metadata = map[string]interface{}{}
}

g.inflightMessagesMetadataLock.Lock()

km.Metadata["timestamp"] = time.Now().Unix()
hostname, err := os.Hostname()
if err != nil {
Expand All @@ -337,9 +334,9 @@ func (g *GCMMessageHandler) sendMessage(message interfaces.KafkaMessage) error {
km.Metadata["game"] = message.Game
km.Metadata["platform"] = "gcm"

g.inflightMessagesMetadataLock.Lock()
g.InflightMessagesMetadata[messageID] = km.Metadata
g.requestsHeap.AddRequest(messageID)

g.inflightMessagesMetadataLock.Unlock()
}

Expand Down
8 changes: 2 additions & 6 deletions extensions/timeout_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ var mutex sync.Mutex
// TimeoutHeap is a array of timeoutNode, which has request ID and expiration time
type TimeoutHeap []*timeoutNode

func (th *TimeoutHeap) newTimeoutNode(
deviceToken string,
) *timeoutNode {
func (th *TimeoutHeap) newTimeoutNode(deviceToken string) *timeoutNode {
now := getNowInUnixMilliseconds()
node := &timeoutNode{
UnixTimeStamp: now + timeoutCte,
Expand Down Expand Up @@ -117,9 +115,7 @@ func (th *TimeoutHeap) completeHasExpiredRequest() (string, int64, bool) {
// For thread safe guarantee, use only the methods below from this api

// NewTimeoutHeap creates and returns a new TimeoutHeap
func NewTimeoutHeap(
config *viper.Viper,
) *TimeoutHeap {
func NewTimeoutHeap(config *viper.Viper) *TimeoutHeap {
th := make(TimeoutHeap, 0)
heap.Init(&th)
timeoutCte = int64(config.GetInt("feedback.cache.requestTimeout"))
Expand Down

0 comments on commit 21012f6

Please sign in to comment.