Skip to content

Commit

Permalink
Log stats from time to time. Closes #8
Browse files Browse the repository at this point in the history
  • Loading branch information
cscatolini committed Jan 23, 2017
1 parent db492a2 commit 02354a4
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 20 deletions.
2 changes: 2 additions & 0 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
gracefulShutdownTimeout: 10
apns:
concurrentWorkers: 100
logStatsInterval: 750
gcm:
pingInterval: 30
pingTimeout: 10
maxPendingMessages: 3
logStatsInterval: 750
queue:
topics:
- "com.games.test"
Expand Down
51 changes: 41 additions & 10 deletions extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"crypto/tls"
"encoding/json"
"sync"
"time"

cert "github.com/RobotsAndPencils/buford/certificate"
"github.com/RobotsAndPencils/buford/push"
Expand Down Expand Up @@ -58,17 +59,20 @@ type APNSMessageHandler struct {
certificate tls.Certificate
CertificatePath string
Config *viper.Viper
failuresReceived int64
feedbackReporters []interfaces.FeedbackReporter
InflightMessagesMetadata map[string]interface{}
InvalidTokenHandlers []interfaces.InvalidTokenHandler
IsProduction bool
Logger *log.Logger
LogStatsInterval time.Duration
pendingMessagesWG *sync.WaitGroup
PushQueue interfaces.APNSPushQueue
responsesReceived int64
run bool
sentMessages int64
StatsReporters []interfaces.StatsReporter
successesReceived int64
Topic string
}

Expand All @@ -85,17 +89,19 @@ func NewAPNSMessageHandler(
queue interfaces.APNSPushQueue,
) (*APNSMessageHandler, error) {
a := &APNSMessageHandler{
CertificatePath: certificatePath,
Config: config,
CertificatePath: certificatePath,
Config: config,
failuresReceived: 0,
feedbackReporters: feedbackReporters,
InflightMessagesMetadata: map[string]interface{}{},
InvalidTokenHandlers: invalidTokenHandlers,
IsProduction: isProduction,
Logger: logger,
pendingMessagesWG: pendingMessagesWG,
responsesReceived: 0,
sentMessages: 0,
pendingMessagesWG: pendingMessagesWG,
StatsReporters: statsReporters,
feedbackReporters: feedbackReporters,
successesReceived: 0,
}
if err := a.configure(queue); err != nil {
return nil, err
Expand All @@ -105,6 +111,8 @@ func NewAPNSMessageHandler(

func (a *APNSMessageHandler) configure(queue interfaces.APNSPushQueue) error {
a.loadConfigurationDefaults()
interval := a.Config.GetInt("apns.logStatsInterval")
a.LogStatsInterval = time.Duration(interval) * time.Millisecond
err := a.configureCertificate()
if err != nil {
return err
Expand All @@ -123,6 +131,7 @@ func (a *APNSMessageHandler) configure(queue interfaces.APNSPushQueue) error {

func (a *APNSMessageHandler) loadConfigurationDefaults() {
a.Config.SetDefault("apns.concurrentWorkers", 10)
a.Config.SetDefault("apns.logStatsInterval", 5000)
}

func (a *APNSMessageHandler) configureCertificate() error {
Expand Down Expand Up @@ -179,9 +188,6 @@ func (a *APNSMessageHandler) sendMessage(message []byte) error {
a.InflightMessagesMetadata[n.DeviceToken] = n.Metadata
inflightMessagesMetadataLock.Unlock()
a.sentMessages++
if a.sentMessages%1000 == 0 {
l.Infof("sent messages: %d", a.sentMessages)
}
return nil
}

Expand Down Expand Up @@ -221,9 +227,6 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
l.Debug("got response from apns")
apnsResMutex.Lock()
a.responsesReceived++
if a.responsesReceived%1000 == 0 {
l.Infof("received responses: %d", a.responsesReceived)
}
apnsResMutex.Unlock()
var err error
responseWithMetadata := &ResponseWithMetadata{
Expand All @@ -240,6 +243,9 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
l.WithError(err).Error("error sending feedback to reporter")
}
if res.Err != nil {
apnsResMutex.Lock()
a.failuresReceived++
apnsResMutex.Unlock()
pushError, ok := res.Err.(*push.Error)
if !ok {
l.WithFields(log.Fields{
Expand Down Expand Up @@ -293,10 +299,35 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
if sendFeedbackErr != nil {
l.WithError(sendFeedbackErr).Error("error sending feedback to reporter")
}
apnsResMutex.Lock()
a.successesReceived++
apnsResMutex.Unlock()
statsReporterHandleNotificationSuccess(a.StatsReporters)
return nil
}

// LogStats from time to time
func (a *APNSMessageHandler) LogStats() {
l := a.Logger.WithFields(log.Fields{
"method": "logStats",
"interval": a.LogStatsInterval,
})

ticker := time.NewTicker(a.LogStatsInterval)
for range ticker.C {
apnsResMutex.Lock()
l.WithField("count", a.sentMessages).Info("Sent messages")
l.WithField("count", a.responsesReceived).Info("Responses received")
l.WithField("count", a.successesReceived).Info("Successes received")
l.WithField("count", a.failuresReceived).Info("Failures received")
a.sentMessages = 0
a.responsesReceived = 0
a.successesReceived = 0
a.failuresReceived = 0
apnsResMutex.Unlock()
}
}

func (a *APNSMessageHandler) mapErrorReason(reason error) string {
switch reason {
case push.ErrPayloadEmpty:
Expand Down
32 changes: 32 additions & 0 deletions extensions/apns_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.successesReceived).To(Equal(int64(1)))
})

It("if reponse has error push.ErrMissingDeviceToken", func() {
Expand All @@ -166,6 +167,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
Expect(hook.Entries).To(ContainLogMessage("deleting token"))
//Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError"))
})
Expand All @@ -181,6 +183,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
Expect(hook.Entries).To(ContainLogMessage("deleting token"))
//Expect(hook.Entries[len(hook.Entries)-2].Data["category"]).To(Equal("TokenError"))
})
Expand All @@ -196,6 +199,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
//Expect(hook.LastEntry().Data["category"]).To(Equal("CertificateError"))
})

Expand All @@ -210,6 +214,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
//Expect(hook.LastEntry().Data["category"]).To(Equal("CertificateError"))
})

Expand All @@ -224,6 +229,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
//Expect(hook.LastEntry().Data["category"]).To(Equal("CertificateError"))
})

Expand All @@ -238,6 +244,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
//Expect(hook.LastEntry().Data["category"]).To(Equal("TopicError"))
})

Expand All @@ -252,6 +259,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
//Expect(hook.LastEntry().Data["category"]).To(Equal("TopicError"))
})

Expand All @@ -266,6 +274,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
//Expect(hook.LastEntry().Data["category"]).To(Equal("TopicError"))
})

Expand All @@ -280,6 +289,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
//Expect(hook.LastEntry().Data["category"]).To(Equal("AppleError"))
})

Expand All @@ -294,6 +304,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
//Expect(hook.LastEntry().Data["category"]).To(Equal("AppleError"))
})

Expand All @@ -308,6 +319,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
//Expect(hook.LastEntry().Data["category"]).To(Equal("AppleError"))
})

Expand All @@ -322,6 +334,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
//Expect(hook.LastEntry().Data["category"]).To(Equal("AppleError"))
})

Expand All @@ -336,6 +349,7 @@ var _ = Describe("APNS Message Handler", func() {
}
handler.handleAPNSResponse(res)
Expect(handler.responsesReceived).To(Equal(int64(1)))
Expect(handler.failuresReceived).To(Equal(int64(1)))
//Expect(hook.LastEntry().Data["category"]).To(Equal("DefaultError"))
})
})
Expand All @@ -357,6 +371,24 @@ var _ = Describe("APNS Message Handler", func() {
})
})

Describe("Log Stats", func() {
It("should log and zero stats", func() {
handler.sentMessages = 100
handler.responsesReceived = 90
handler.successesReceived = 60
handler.failuresReceived = 30
Expect(func() { go handler.LogStats() }).ShouldNot(Panic())
Eventually(func() int64 { return handler.sentMessages }).Should(Equal(int64(0)))
Eventually(func() []*logrus.Entry { return hook.Entries }).Should(ContainLogMessage("Sent messages"))
Eventually(func() int64 { return handler.responsesReceived }).Should(Equal(int64(0)))
Eventually(func() []*logrus.Entry { return hook.Entries }).Should(ContainLogMessage("Responses received"))
Eventually(func() int64 { return handler.successesReceived }).Should(Equal(int64(0)))
Eventually(func() []*logrus.Entry { return hook.Entries }).Should(ContainLogMessage("Successes received"))
Eventually(func() int64 { return handler.failuresReceived }).Should(Equal(int64(0)))
Eventually(func() []*logrus.Entry { return hook.Entries }).Should(ContainLogMessage("Failures received"))
})
})

Describe("Stats Reporter sent message", func() {
It("should call HandleNotificationSent upon message sent to queue", func() {
Expect(handler).NotTo(BeNil())
Expand Down

0 comments on commit 02354a4

Please sign in to comment.