Skip to content

Commit

Permalink
Handling multiple games
Browse files Browse the repository at this point in the history
  • Loading branch information
Guilherme Souza committed Sep 25, 2017
1 parent e88f50d commit d0157a9
Show file tree
Hide file tree
Showing 24 changed files with 228 additions and 179 deletions.
21 changes: 4 additions & 17 deletions cmd/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
package cmd

import (
"fmt"

"github.com/sirupsen/logrus"
raven "github.com/getsentry/raven-go"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/interfaces"
Expand All @@ -35,11 +33,9 @@ import (
)

var app string
var certificate string

func startApns(
debug, json, production bool,
certificate string,
config *viper.Viper,
statsdClientOrNil interfaces.StatsDClient,
dbOrNil interfaces.DB,
Expand All @@ -54,16 +50,7 @@ func startApns(
} else {
log.Level = logrus.InfoLevel
}
l := log.WithFields(logrus.Fields{
"method": "apnsCmd",
"debug": debug,
})
if len(certificate) == 0 {
err := fmt.Errorf("pem certificate must be set")
l.Error(err)
return nil, err
}
return pusher.NewAPNSPusher(certificate, production, config, log, statsdClientOrNil, dbOrNil, queueOrNil)
return pusher.NewAPNSPusher(production, config, log, statsdClientOrNil, dbOrNil, queueOrNil)
}

// apnsCmd represents the apns command
Expand All @@ -82,7 +69,7 @@ var apnsCmd = &cobra.Command{
raven.SetDSN(sentryURL)
}

apnsPusher, err := startApns(debug, json, production, certificate, config, nil, nil, nil)
apnsPusher, err := startApns(debug, json, production, config, nil, nil, nil)
if err != nil {
raven.CaptureErrorAndWait(err, map[string]string{
"version": util.Version,
Expand All @@ -95,6 +82,6 @@ var apnsCmd = &cobra.Command{
}

func init() {
apnsCmd.Flags().StringVar(&certificate, "certificate", "", "pem certificate path")
apnsCmd.Flags()
RootCmd.AddCommand(apnsCmd)
}
17 changes: 4 additions & 13 deletions cmd/apns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (

var _ = Describe("APNS", func() {
cfg := "../config/test.yaml"
cert := "../tls/self_signed_cert.pem"

var config *viper.Viper
var mockPushQueue *mocks.APNSPushQueueMock
Expand All @@ -53,42 +52,34 @@ var _ = Describe("APNS", func() {

Describe("[Unit]", func() {
It("Should return apnsPusher without errors", func() {
apnsPusher, err := startApns(false, false, false, cert, config, mockStatsDClient, mockDb, mockPushQueue)
apnsPusher, err := startApns(false, false, false, config, mockStatsDClient, mockDb, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(apnsPusher.CertificatePath).To(Equal(cert))
Expect(apnsPusher.Config).NotTo(BeNil())
Expect(apnsPusher.IsProduction).To(BeFalse())
Expect(apnsPusher.Logger.Level).To(Equal(logrus.InfoLevel))
Expect(fmt.Sprintf("%T", apnsPusher.Logger.Formatter)).To(Equal(fmt.Sprintf("%T", &logrus.TextFormatter{})))
})

It("Should set log to json format", func() {
apnsPusher, err := startApns(false, true, false, cert, config, mockStatsDClient, mockDb, mockPushQueue)
apnsPusher, err := startApns(false, true, false, config, mockStatsDClient, mockDb, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(fmt.Sprintf("%T", apnsPusher.Logger.Formatter)).To(Equal(fmt.Sprintf("%T", &logrus.JSONFormatter{})))
})

It("Should set log to debug", func() {
apnsPusher, err := startApns(true, false, false, cert, config, mockStatsDClient, mockDb, mockPushQueue)
apnsPusher, err := startApns(true, false, false, config, mockStatsDClient, mockDb, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(apnsPusher.Logger.Level).To(Equal(logrus.DebugLevel))
})

It("Should set log to production", func() {
apnsPusher, err := startApns(false, false, true, cert, config, mockStatsDClient, mockDb, mockPushQueue)
apnsPusher, err := startApns(false, false, true, config, mockStatsDClient, mockDb, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(apnsPusher.IsProduction).To(BeTrue())
})

It("Should return error if certificate is not provided", func() {
apnsPusher, err := startApns(false, true, false, "", config, mockStatsDClient, mockDb, mockPushQueue)
Expect(apnsPusher).To(BeNil())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("pem certificate must be set"))
})
})
})
8 changes: 7 additions & 1 deletion config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ gracefulShutdownTimeout: 30
apns:
concurrentWorkers: 100
logStatsInterval: 10000
games:
game: /certs/cert.pem
gcm:
pingInterval: 30
pingTimeout: 10
maxPendingMessages: 100
logStatsInterval: 10000
games:
game:
apiKey: game-api-key
senderID: "1233456789"
queue:
topics:
- "com.games.test"
- "^push-[^-_]+_(apns|gcm)[_-](single|massive)"
brokers: "localhost:9941"
group: testGroup
sessionTimeout: 6000
Expand Down
9 changes: 7 additions & 2 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ gracefulShutdownTimeout: 10
apns:
concurrentWorkers: 100
logStatsInterval: 750
games:
game: ../tls/self_signed_cert.pem
gcm:
pingInterval: 30
pingTimeout: 10
maxPendingMessages: 3
logStatsInterval: 750
games:
game:
apiKey: game-api-key
senderID: "1233456789"
queue:
topics:
- "com.games.test"
- "^push-[^-_]+_(apns|gcm)[_-](single|massive)"
brokers: "localhost:9941"
group: testGroup
sessionTimeout: 6000
Expand All @@ -36,7 +42,6 @@ invalidToken:
handlers:
- pg
pg:
table: "test_apns"
host: localhost
port: 8585
user: pusher_user
Expand Down
31 changes: 15 additions & 16 deletions extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,14 @@ func (a *APNSMessageHandler) sendMessage(message interfaces.KafkaMessage) error
}
statsReporterHandleNotificationSent(a.StatsReporters)
a.PushQueue.Push(n.DeviceToken, h, payload)
parsedTopic := getGameAndPlatformFromTopic(message.Topic)
if n.Metadata == nil {
n.Metadata = map[string]interface{}{}
}
a.inflightMessagesMetadataLock.Lock()

n.Metadata["timestamp"] = time.Now().Unix()
n.Metadata["game"] = parsedTopic.Game
n.Metadata["platform"] = parsedTopic.Platform
n.Metadata["game"] = message.Game
n.Metadata["platform"] = "apns"
hostname, err := os.Hostname()
if err != nil {
l.WithError(err).Error("error retrieving hostname")
Expand Down Expand Up @@ -257,15 +256,8 @@ func (a *APNSMessageHandler) CleanMetadataCache() {
}

// HandleMessages get messages from msgChan and send to APNS
func (a *APNSMessageHandler) HandleMessages(msgChan *chan interfaces.KafkaMessage) {
a.run = true

for a.run == true {
select {
case message := <-*msgChan:
a.sendMessage(message)
}
}
func (a *APNSMessageHandler) HandleMessages(message interfaces.KafkaMessage) {
a.sendMessage(message)
}

func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
Expand All @@ -283,6 +275,7 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
apnsResMutex.Lock()
a.responsesReceived++
apnsResMutex.Unlock()
parsedTopic := ParsedTopic{}
var err error
responseWithMetadata := &ResponseWithMetadata{
Response: res,
Expand All @@ -291,14 +284,17 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
if val, ok := a.InflightMessagesMetadata[res.ID]; ok {
responseWithMetadata.Metadata = val.(map[string]interface{})
responseWithMetadata.Timestamp = responseWithMetadata.Metadata["timestamp"].(int64)
parsedTopic.Game = responseWithMetadata.Metadata["game"].(string)
parsedTopic.Platform = responseWithMetadata.Metadata["platform"].(string)
delete(responseWithMetadata.Metadata, "timestamp")
delete(a.InflightMessagesMetadata, res.DeviceToken)
delete(a.InflightMessagesMetadata, res.ID)
}
a.inflightMessagesMetadataLock.Unlock()

if err != nil {
l.WithError(err).Error("error sending feedback to reporter")
}

if res.Err != nil {
apnsResMutex.Lock()
a.failuresReceived++
Expand All @@ -325,7 +321,10 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
if responseWithMetadata.Metadata != nil {
responseWithMetadata.Metadata["deleteToken"] = true
}
handleInvalidToken(a.InvalidTokenHandlers, res.DeviceToken)
handleInvalidToken(
a.InvalidTokenHandlers, res.DeviceToken,
parsedTopic.Game, parsedTopic.Platform,
)
case push.ErrBadCertificate, push.ErrBadCertificateEnvironment, push.ErrForbidden:
l.WithFields(log.Fields{
"category": "CertificateError",
Expand All @@ -348,13 +347,13 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
}).Debug("received an error")
}
responseWithMetadata.Err = pErr
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata)
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic)
if sendFeedbackErr != nil {
l.WithError(sendFeedbackErr).Error("error sending feedback to reporter")
}
return err
}
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata)
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic)

if sendFeedbackErr != nil {
l.WithError(sendFeedbackErr).Error("error sending feedback to reporter")
Expand Down
39 changes: 24 additions & 15 deletions extensions/apns_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,21 +365,21 @@ var _ = Describe("APNS Message Handler", func() {
})
})

Describe("Handle Messages", func() {
It("should start without panicking and set run to true", func() {
stopChannel := make(chan struct{})
queue, err := NewKafkaConsumer(
handler.Config,
logger,
&stopChannel,
mockKafkaConsumerClient,
)
Expect(err).NotTo(HaveOccurred())
Expect(func() { go handler.HandleMessages(queue.MessagesChannel()) }).ShouldNot(Panic())
time.Sleep(50 * time.Millisecond)
Expect(handler.run).To(BeTrue())
})
})
// Describe("Handle Messages", func() {
// It("should start without panicking and set run to true", func() {
// stopChannel := make(chan struct{})
// queue, err := NewKafkaConsumer(
// handler.Config,
// logger,
// &stopChannel,
// mockKafkaConsumerClient,
// )
// Expect(err).NotTo(HaveOccurred())
// Expect(func() { go handler.HandleMessages(queue.MessagesChannel()) }).ShouldNot(Panic())
// time.Sleep(50 * time.Millisecond)
// Expect(handler.run).To(BeTrue())
// })
// })

Describe("Clean Cache", func() {
It("should remove from push queue after timeout", func() {
Expand Down Expand Up @@ -546,12 +546,15 @@ var _ = Describe("APNS Message Handler", func() {
"timestamp": timestampNow,
"hostname": hostname,
"msgid": msgID,
"game": "game",
"platform": "apns",
}
handler.InflightMessagesMetadata["idTest1"] = metadata
res := push.Response{
DeviceToken: "testToken1",
ID: "idTest1",
}

go handler.handleAPNSResponse(res)

fromKafka := &ResponseWithMetadata{}
Expand All @@ -566,6 +569,8 @@ var _ = Describe("APNS Message Handler", func() {
metadata := map[string]interface{}{
"some": "metadata",
"timestamp": time.Now().Unix(),
"game": "game",
"platform": "apns",
}
handler.InflightMessagesMetadata["idTest1"] = metadata
res := push.Response{
Expand Down Expand Up @@ -601,6 +606,8 @@ var _ = Describe("APNS Message Handler", func() {
metadata := map[string]interface{}{
"some": "metadata",
"timestamp": time.Now().Unix(),
"game": "game",
"platform": "apns",
}
handler.InflightMessagesMetadata["idTest1"] = metadata
res := push.Response{
Expand All @@ -626,6 +633,8 @@ var _ = Describe("APNS Message Handler", func() {
metadata := map[string]interface{}{
"some": "metadata",
"timestamp": time.Now().Unix(),
"game": "game",
"platform": "apns",
}
handler.InflightMessagesMetadata["idTest1"] = metadata
res := push.Response{
Expand Down
8 changes: 4 additions & 4 deletions extensions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type ParsedTopic struct {
Game string
}

func handleInvalidToken(invalidTokenHandlers []interfaces.InvalidTokenHandler, token string) {
func handleInvalidToken(invalidTokenHandlers []interfaces.InvalidTokenHandler, token string, game string, platform string) {
for _, invalidTokenHandler := range invalidTokenHandlers {
invalidTokenHandler.HandleToken(token)
invalidTokenHandler.HandleToken(token, game, platform)
}
}

Expand All @@ -51,14 +51,14 @@ func getGameAndPlatformFromTopic(topic string) ParsedTopic {
}
}

func sendToFeedbackReporters(feedbackReporters []interfaces.FeedbackReporter, res interface{}) error {
func sendToFeedbackReporters(feedbackReporters []interfaces.FeedbackReporter, res interface{}, topic ParsedTopic) error {
jres, err := json.Marshal(res)
if err != nil {
return err
}
if feedbackReporters != nil {
for _, feedbackReporter := range feedbackReporters {
feedbackReporter.SendFeedback(jres)
feedbackReporter.SendFeedback(topic.Game, topic.Platform, jres)
}
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions extensions/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var _ = Describe("Common", func() {
Describe("Handle token error", func() {
It("should be successful", func() {
token := uuid.NewV4().String()
handleInvalidToken(invalidTokenHandlers, token)
handleInvalidToken(invalidTokenHandlers, token, "test", "apns")
query := "DELETE FROM test_apns WHERE token = ?0;"
Expect(db.Execs).To(HaveLen(2))
Expect(db.Execs[1][0]).To(BeEquivalentTo(query))
Expand All @@ -79,7 +79,7 @@ var _ = Describe("Common", func() {
It("should fail silently", func() {
token := uuid.NewV4().String()
db.Error = fmt.Errorf("pg: error")
handleInvalidToken(invalidTokenHandlers, token)
handleInvalidToken(invalidTokenHandlers, token, "test", "apns")
Expect(db.Execs).To(HaveLen(2))
query := "DELETE FROM test_apns WHERE token = ?0;"
Expect(db.Execs[1][0]).To(BeEquivalentTo(query))
Expand All @@ -90,7 +90,7 @@ var _ = Describe("Common", func() {
Describe("Send feedback to reporters", func() {
It("should return an error if res cannot be marshaled", func() {
badContent := make(chan int)
err := sendToFeedbackReporters(feedbackClients, badContent)
err := sendToFeedbackReporters(feedbackClients, badContent, ParsedTopic{})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("json: unsupported type: chan int"))
})
Expand Down
Loading

0 comments on commit d0157a9

Please sign in to comment.