Skip to content

Commit

Permalink
Stats Reporters - Now pusher allows several stats reporters to be added
Browse files Browse the repository at this point in the history
to it. Use the configuration stats.reporters. As of this commit only
statsd reporter is available.
  • Loading branch information
cscatolini authored and heynemann committed Jan 11, 2017
1 parent 5a75754 commit f344a0a
Show file tree
Hide file tree
Showing 32 changed files with 917 additions and 78 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -89,3 +89,4 @@ cat test.txt | kafka-console-producer --topic com.games.teste --broker-list loca
- [ ] README with dev and deployment instructions
- [ ] Apple JWT tokens instead of certificates https://developer.apple.com/library/content/documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/APNSOverview.html#//apple_ref/doc/uid/TP40008194-CH8-SW1
- [ ] Fix TODOs
- [ ] Verify string concats specially when building queries (SQL injection susceptible)
5 changes: 4 additions & 1 deletion cmd/apns.go
Expand Up @@ -56,7 +56,10 @@ var apnsCmd = &cobra.Command{
if len(app) == 0 {
l.Panic("app must be set")
}
apnsPusher := pusher.NewAPNSPusher(cfgFile, certificate, app, production, log)
apnsPusher, err := pusher.NewAPNSPusher(cfgFile, certificate, app, production, log)
if err != nil {
l.WithError(err).Panic("app could not start")
}
apnsPusher.Start()
},
}
Expand Down
4 changes: 4 additions & 0 deletions config/default.yaml
Expand Up @@ -21,3 +21,7 @@ push:
poolSize: 20
maxRetries: 3
database: push
statsreporter:
host: "localhost:8125"
prefix: "push"
flushintervalms: 5000
4 changes: 4 additions & 0 deletions config/test.yaml
Expand Up @@ -21,3 +21,7 @@ push:
poolSize: 20
maxRetries: 3
database: push
statsreporter:
host: "localhost:8125"
prefix: "push"
flushintervalms: 5000
5 changes: 5 additions & 0 deletions docker-compose.yml
Expand Up @@ -19,3 +19,8 @@ services:
image: postgres:9.5
ports:
- "8585:5432"
statsd:
image: hopsoft/graphite-statsd
ports:
- "8125:8125"
- "8126:8126"
44 changes: 44 additions & 0 deletions errors/push.go
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2016 TFG Co <backend@tfgco.com>
* Author: TFG Co <backend@tfgco.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package errors

import "fmt"

//PushError reports an error sending a Push Message
type PushError struct {
Key string
Description string
}

//NewPushError creates a new instance
func NewPushError(key, description string) *PushError {
return &PushError{
Key: key,
Description: description,
}
}

//Error returns a string
func (e *PushError) Error() string {
return fmt.Sprintf("Sending push notification failed with error %s (%s).", e.Key, e.Description)
}
115 changes: 113 additions & 2 deletions extensions/apns_message_handler.go
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/Sirupsen/logrus"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/certificate"
"github.com/topfreegames/pusher/errors"
"github.com/topfreegames/pusher/interfaces"
"github.com/topfreegames/pusher/util"
)

Expand All @@ -54,6 +56,7 @@ type APNSMessageHandler struct {
sentMessages int64
Topic string
pendingMessagesWG *sync.WaitGroup
StatsReporters []interfaces.StatsReporter
}

// Notification is the notification base struct
Expand All @@ -63,7 +66,13 @@ type Notification struct {
}

// NewAPNSMessageHandler returns a new instance of a APNSMessageHandler
func NewAPNSMessageHandler(configFile, certificatePath, appName string, isProduction bool, logger *logrus.Logger, pendingMessagesWG *sync.WaitGroup) *APNSMessageHandler {
func NewAPNSMessageHandler(
configFile, certificatePath, appName string,
isProduction bool,
logger *logrus.Logger,
pendingMessagesWG *sync.WaitGroup,
statsReporters []interfaces.StatsReporter,
) *APNSMessageHandler {
a := &APNSMessageHandler{
appName: appName,
CertificatePath: certificatePath,
Expand All @@ -73,6 +82,7 @@ func NewAPNSMessageHandler(configFile, certificatePath, appName string, isProduc
responsesReceived: 0,
sentMessages: 0,
pendingMessagesWG: pendingMessagesWG,
StatsReporters: statsReporters,
}
a.configure()
return a
Expand Down Expand Up @@ -105,8 +115,22 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
l.Infof("received responses: %d", a.responsesReceived)
}
apnsResMutex.Unlock()
var err error
if res.Err != nil {
switch res.Err.(*push.Error).Reason {
pushError, ok := res.Err.(*push.Error)
if !ok {
l.WithFields(logrus.Fields{
"category": "UnexpectedError",
logrus.ErrorKey: res.Err,
}).Error("received an error")
return res.Err
}
reason := pushError.Reason
pErr := errors.NewPushError(a.mapErrorReason(reason), pushError.Error())
a.statsReporterHandleNotificationFailure(pErr)

err = pErr
switch reason {
case push.ErrMissingDeviceToken, push.ErrBadDeviceToken:
l.WithFields(logrus.Fields{
"category": "TokenError",
Expand Down Expand Up @@ -134,7 +158,10 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
logrus.ErrorKey: res.Err,
}).Error("received an error")
}
return err
}

a.statsReporterHandleNotificationSuccess()
return nil
}

Expand Down Expand Up @@ -202,6 +229,7 @@ func (a *APNSMessageHandler) sendMessage(message []byte) {
if err != nil {
l.WithError(err).Error("error marshaling message payload")
}
a.statsReporterHandleNotificationSent()
a.PushQueue.Push(n.DeviceToken, h, payload)
a.sentMessages++
if a.sentMessages%1000 == 0 {
Expand Down Expand Up @@ -230,3 +258,86 @@ func (a *APNSMessageHandler) HandleMessages(msgChan *chan []byte) {
}
}
}

func (a *APNSMessageHandler) mapErrorReason(reason error) string {
switch reason {
case push.ErrPayloadEmpty:
return "payload-empty"
case push.ErrPayloadTooLarge:
return "payload-too-large"
case push.ErrMissingDeviceToken:
return "missing-device-token"
case push.ErrBadDeviceToken:
return "bad-device-token"
case push.ErrTooManyRequests:
return "too-many-requests"
case push.ErrBadMessageID:
return "bad-message-id"
case push.ErrBadExpirationDate:
return "bad-expiration-date"
case push.ErrBadPriority:
return "bad-priority"
case push.ErrBadTopic:
return "bad-topic"
case push.ErrBadCertificate:
return "bad-certificate"
case push.ErrBadCertificateEnvironment:
return "bad-certificate-environment"
case push.ErrForbidden:
return "forbidden"
case push.ErrMissingTopic:
return "missing-topic"
case push.ErrTopicDisallowed:
return "topic-disallowed"
case push.ErrUnregistered:
return "unregistered"
case push.ErrDeviceTokenNotForTopic:
return "device-token-not-for-topic"
case push.ErrDuplicateHeaders:
return "duplicate-headers"
case push.ErrBadPath:
return "bad-path"
case push.ErrMethodNotAllowed:
return "method-not-allowed"
case push.ErrIdleTimeout:
return "idle-timeout"
case push.ErrShutdown:
return "shutdown"
case push.ErrInternalServerError:
return "internal-server-error"
case push.ErrServiceUnavailable:
return "service-unavailable"
default:
return "unexpected"
}
}

//Cleanup closes connections to APNS
func (a *APNSMessageHandler) Cleanup() error {
err := a.PushDB.Close()
if err != nil {
return err
}

a.PushQueue.Close()

return nil
}

func (a *APNSMessageHandler) statsReporterHandleNotificationSent() {
for _, statsReporter := range a.StatsReporters {
statsReporter.HandleNotificationSent()
}
}

func (a *APNSMessageHandler) statsReporterHandleNotificationSuccess() {
for _, statsReporter := range a.StatsReporters {
statsReporter.HandleNotificationSuccess()
}
}

func (a *APNSMessageHandler) statsReporterHandleNotificationFailure(err *errors.PushError) {
for _, statsReporter := range a.StatsReporters {
statsReporter.HandleNotificationFailure(err)
}
}

0 comments on commit f344a0a

Please sign in to comment.