diff --git a/.travis.yml b/.travis.yml index 1e86f9c..6b1beda 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go go: -- 1.8 +- 1.9 addons: postgresql: '9.5' services: diff --git a/Dockerfile b/Dockerfile index 769d4af..9847704 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,37 +18,35 @@ # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # -FROM golang:1.8-alpine +FROM golang:1.9-alpine MAINTAINER TFG Co -RUN apk update -RUN apk add make git g++ bash python wget - ENV LIBRDKAFKA_VERSION 0.11.0 -RUN wget -O /root/librdkafka-${LIBRDKAFKA_VERSION}.tar.gz https://github.com/edenhill/librdkafka/archive/v${LIBRDKAFKA_VERSION}.tar.gz && \ +ENV CPLUS_INCLUDE_PATH /usr/local/include +ENV LIBRARY_PATH /usr/local/lib +ENV LD_LIBRARY_PATH /usr/local/lib + +WORKDIR /go/src/github.com/topfreegames/pusher + +RUN apk add --no-cache make git g++ bash python wget && \ + wget -O /root/librdkafka-${LIBRDKAFKA_VERSION}.tar.gz https://github.com/edenhill/librdkafka/archive/v${LIBRDKAFKA_VERSION}.tar.gz && \ tar -xzf /root/librdkafka-${LIBRDKAFKA_VERSION}.tar.gz -C /root && \ cd /root/librdkafka-${LIBRDKAFKA_VERSION} && \ - ./configure && make && make install && make clean && ./configure --clean + ./configure && make && make install && make clean && ./configure --clean && \ + go get -u github.com/golang/dep/cmd/dep && \ + mkdir -p /go/src/github.com/topfreegames/pusher -RUN go get -u github.com/golang/dep/cmd/dep - -RUN mkdir -p /go/src/github.com/topfreegames/pusher -WORKDIR /go/src/github.com/topfreegames/pusher ADD . /go/src/github.com/topfreegames/pusher -RUN dep ensure - -ENV CPLUS_INCLUDE_PATH /usr/local/include -ENV LIBRARY_PATH /usr/local/lib -ENV LD_LIBRARY_PATH /usr/local/lib -RUN export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH && make build -RUN mkdir /app -RUN mv /go/src/github.com/topfreegames/pusher/bin/pusher /app/pusher -RUN mv /go/src/github.com/topfreegames/pusher/config /app/config -RUN mv /go/src/github.com/topfreegames/pusher/tls /app/tls -RUN rm -r /go/src/github.com/topfreegames/pusher +RUN dep ensure && \ + export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH && make build && \ + mkdir /app && \ + mv /go/src/github.com/topfreegames/pusher/bin/pusher /app/pusher && \ + mv /go/src/github.com/topfreegames/pusher/config /app/config && \ + mv /go/src/github.com/topfreegames/pusher/tls /app/tls && \ + rm -r /go/src/github.com/topfreegames/pusher WORKDIR /app diff --git a/README.md b/README.md index 33ecd65..74738af 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Pusher ====== ### Dependencies -* Go 1.7 +* Go 1.9 * Kafka >= 0.9.0 * [librdkafka](https://github.com/edenhill/librdkafka) diff --git a/cmd/apns.go b/cmd/apns.go index 1a856d2..cc70458 100644 --- a/cmd/apns.go +++ b/cmd/apns.go @@ -23,10 +23,8 @@ package cmd import ( - "fmt" - - "github.com/sirupsen/logrus" raven "github.com/getsentry/raven-go" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/topfreegames/pusher/interfaces" @@ -35,11 +33,9 @@ import ( ) var app string -var certificate string func startApns( debug, json, production bool, - certificate string, config *viper.Viper, statsdClientOrNil interfaces.StatsDClient, dbOrNil interfaces.DB, @@ -54,16 +50,7 @@ func startApns( } else { log.Level = logrus.InfoLevel } - l := log.WithFields(logrus.Fields{ - "method": "apnsCmd", - "debug": debug, - }) - if len(certificate) == 0 { - err := fmt.Errorf("pem certificate must be set") - l.Error(err) - return nil, err - } - return pusher.NewAPNSPusher(certificate, production, config, log, statsdClientOrNil, dbOrNil, queueOrNil) + return pusher.NewAPNSPusher(production, config, log, statsdClientOrNil, dbOrNil, queueOrNil) } // apnsCmd represents the apns command @@ -82,7 +69,7 @@ var apnsCmd = &cobra.Command{ raven.SetDSN(sentryURL) } - apnsPusher, err := startApns(debug, json, production, certificate, config, nil, nil, nil) + apnsPusher, err := startApns(debug, json, production, config, nil, nil, nil) if err != nil { raven.CaptureErrorAndWait(err, map[string]string{ "version": util.Version, @@ -95,6 +82,6 @@ var apnsCmd = &cobra.Command{ } func init() { - apnsCmd.Flags().StringVar(&certificate, "certificate", "", "pem certificate path") + apnsCmd.Flags() RootCmd.AddCommand(apnsCmd) } diff --git a/cmd/apns_test.go b/cmd/apns_test.go index 9a88a6c..a0a0505 100644 --- a/cmd/apns_test.go +++ b/cmd/apns_test.go @@ -25,9 +25,9 @@ package cmd import ( "fmt" - "github.com/sirupsen/logrus" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/topfreegames/pusher/mocks" "github.com/topfreegames/pusher/util" @@ -35,7 +35,6 @@ import ( var _ = Describe("APNS", func() { cfg := "../config/test.yaml" - cert := "../tls/self_signed_cert.pem" var config *viper.Viper var mockPushQueue *mocks.APNSPushQueueMock @@ -53,10 +52,9 @@ var _ = Describe("APNS", func() { Describe("[Unit]", func() { It("Should return apnsPusher without errors", func() { - apnsPusher, err := startApns(false, false, false, cert, config, mockStatsDClient, mockDb, mockPushQueue) + apnsPusher, err := startApns(false, false, false, config, mockStatsDClient, mockDb, mockPushQueue) Expect(err).NotTo(HaveOccurred()) Expect(apnsPusher).NotTo(BeNil()) - Expect(apnsPusher.CertificatePath).To(Equal(cert)) Expect(apnsPusher.Config).NotTo(BeNil()) Expect(apnsPusher.IsProduction).To(BeFalse()) Expect(apnsPusher.Logger.Level).To(Equal(logrus.InfoLevel)) @@ -64,31 +62,24 @@ var _ = Describe("APNS", func() { }) It("Should set log to json format", func() { - apnsPusher, err := startApns(false, true, false, cert, config, mockStatsDClient, mockDb, mockPushQueue) + apnsPusher, err := startApns(false, true, false, config, mockStatsDClient, mockDb, mockPushQueue) Expect(err).NotTo(HaveOccurred()) Expect(apnsPusher).NotTo(BeNil()) Expect(fmt.Sprintf("%T", apnsPusher.Logger.Formatter)).To(Equal(fmt.Sprintf("%T", &logrus.JSONFormatter{}))) }) It("Should set log to debug", func() { - apnsPusher, err := startApns(true, false, false, cert, config, mockStatsDClient, mockDb, mockPushQueue) + apnsPusher, err := startApns(true, false, false, config, mockStatsDClient, mockDb, mockPushQueue) Expect(err).NotTo(HaveOccurred()) Expect(apnsPusher).NotTo(BeNil()) Expect(apnsPusher.Logger.Level).To(Equal(logrus.DebugLevel)) }) It("Should set log to production", func() { - apnsPusher, err := startApns(false, false, true, cert, config, mockStatsDClient, mockDb, mockPushQueue) + apnsPusher, err := startApns(false, false, true, config, mockStatsDClient, mockDb, mockPushQueue) Expect(err).NotTo(HaveOccurred()) Expect(apnsPusher).NotTo(BeNil()) Expect(apnsPusher.IsProduction).To(BeTrue()) }) - - It("Should return error if certificate is not provided", func() { - apnsPusher, err := startApns(false, true, false, "", config, mockStatsDClient, mockDb, mockPushQueue) - Expect(apnsPusher).To(BeNil()) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(Equal("pem certificate must be set")) - }) }) }) diff --git a/config/default.yaml b/config/default.yaml index 32d927b..e077242 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -3,14 +3,20 @@ gracefulShutdownTimeout: 30 apns: concurrentWorkers: 100 logStatsInterval: 10000 + games: + game: /certs/cert.pem gcm: pingInterval: 30 pingTimeout: 10 maxPendingMessages: 100 logStatsInterval: 10000 + games: + game: + apiKey: game-api-key + senderID: "1233456789" queue: topics: - - "com.games.test" + - "^push-[^-_]+_(apns|gcm)[_-](single|massive)" brokers: "localhost:9941" group: testGroup sessionTimeout: 6000 diff --git a/config/test.yaml b/config/test.yaml index 0449f47..8f9dd9b 100644 --- a/config/test.yaml +++ b/config/test.yaml @@ -3,14 +3,20 @@ gracefulShutdownTimeout: 10 apns: concurrentWorkers: 100 logStatsInterval: 750 + games: + game: ../tls/self_signed_cert.pem gcm: pingInterval: 30 pingTimeout: 10 maxPendingMessages: 3 logStatsInterval: 750 + games: + game: + apiKey: game-api-key + senderID: "1233456789" queue: topics: - - "com.games.test" + - "^push-[^-_]+_(apns|gcm)[_-](single|massive)" brokers: "localhost:9941" group: testGroup sessionTimeout: 6000 @@ -36,7 +42,6 @@ invalidToken: handlers: - pg pg: - table: "test_apns" host: localhost port: 8585 user: pusher_user diff --git a/extensions/apns_message_handler.go b/extensions/apns_message_handler.go index d1bcbd8..ab3acb9 100644 --- a/extensions/apns_message_handler.go +++ b/extensions/apns_message_handler.go @@ -180,14 +180,16 @@ func (a *APNSMessageHandler) configureAPNSPushQueue() error { return nil } -func (a *APNSMessageHandler) sendMessage(message []byte) error { +func (a *APNSMessageHandler) sendMessage(message interfaces.KafkaMessage) error { + deviceIdentifier := uuid.NewV4().String() l := a.Logger.WithField("method", "sendMessage") l.WithField("message", message).Debug("sending message to apns") h := &push.Headers{ Topic: a.Topic, + ID: deviceIdentifier, } n := &Notification{} - json.Unmarshal(message, n) + json.Unmarshal(message.Value, n) payload, err := json.Marshal(n.Payload) if err != nil { l.WithError(err).Error("error marshaling message payload") @@ -201,7 +203,7 @@ func (a *APNSMessageHandler) sendMessage(message []byte) error { } return nil } - statsReporterHandleNotificationSent(a.StatsReporters) + statsReporterHandleNotificationSent(a.StatsReporters, message.Game, "apns") a.PushQueue.Push(n.DeviceToken, h, payload) if n.Metadata == nil { n.Metadata = map[string]interface{}{} @@ -209,6 +211,8 @@ func (a *APNSMessageHandler) sendMessage(message []byte) error { a.inflightMessagesMetadataLock.Lock() n.Metadata["timestamp"] = time.Now().Unix() + n.Metadata["game"] = message.Game + n.Metadata["platform"] = "apns" hostname, err := os.Hostname() if err != nil { l.WithError(err).Error("error retrieving hostname") @@ -216,8 +220,8 @@ func (a *APNSMessageHandler) sendMessage(message []byte) error { n.Metadata["hostname"] = hostname } n.Metadata["msgid"] = uuid.NewV4().String() - a.InflightMessagesMetadata[n.DeviceToken] = n.Metadata - a.requestsHeap.AddRequest(n.DeviceToken) + a.InflightMessagesMetadata[deviceIdentifier] = n.Metadata + a.requestsHeap.AddRequest(deviceIdentifier) a.inflightMessagesMetadataLock.Unlock() a.sentMessages++ @@ -252,15 +256,8 @@ func (a *APNSMessageHandler) CleanMetadataCache() { } // HandleMessages get messages from msgChan and send to APNS -func (a *APNSMessageHandler) HandleMessages(msgChan *chan []byte) { - a.run = true - - for a.run == true { - select { - case message := <-*msgChan: - a.sendMessage(message) - } - } +func (a *APNSMessageHandler) HandleMessages(message interfaces.KafkaMessage) { + a.sendMessage(message) } func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error { @@ -278,22 +275,26 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error { apnsResMutex.Lock() a.responsesReceived++ apnsResMutex.Unlock() + parsedTopic := ParsedTopic{} var err error responseWithMetadata := &ResponseWithMetadata{ Response: res, } a.inflightMessagesMetadataLock.Lock() - if val, ok := a.InflightMessagesMetadata[res.DeviceToken]; ok { + if val, ok := a.InflightMessagesMetadata[res.ID]; ok { responseWithMetadata.Metadata = val.(map[string]interface{}) responseWithMetadata.Timestamp = responseWithMetadata.Metadata["timestamp"].(int64) + parsedTopic.Game = responseWithMetadata.Metadata["game"].(string) + parsedTopic.Platform = responseWithMetadata.Metadata["platform"].(string) delete(responseWithMetadata.Metadata, "timestamp") - delete(a.InflightMessagesMetadata, res.DeviceToken) + delete(a.InflightMessagesMetadata, res.ID) } a.inflightMessagesMetadataLock.Unlock() if err != nil { l.WithError(err).Error("error sending feedback to reporter") } + if res.Err != nil { apnsResMutex.Lock() a.failuresReceived++ @@ -308,7 +309,7 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error { } reason := pushError.Reason pErr := errors.NewPushError(a.mapErrorReason(reason), pushError.Error()) - statsReporterHandleNotificationFailure(a.StatsReporters, pErr) + statsReporterHandleNotificationFailure(a.StatsReporters, parsedTopic.Game, "apns", pErr) err = pErr switch reason { @@ -320,7 +321,10 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error { if responseWithMetadata.Metadata != nil { responseWithMetadata.Metadata["deleteToken"] = true } - handleInvalidToken(a.InvalidTokenHandlers, res.DeviceToken) + handleInvalidToken( + a.InvalidTokenHandlers, res.DeviceToken, + parsedTopic.Game, parsedTopic.Platform, + ) case push.ErrBadCertificate, push.ErrBadCertificateEnvironment, push.ErrForbidden: l.WithFields(log.Fields{ "category": "CertificateError", @@ -343,13 +347,13 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error { }).Debug("received an error") } responseWithMetadata.Err = pErr - sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata) + sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic) if sendFeedbackErr != nil { l.WithError(sendFeedbackErr).Error("error sending feedback to reporter") } return err } - sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata) + sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic) if sendFeedbackErr != nil { l.WithError(sendFeedbackErr).Error("error sending feedback to reporter") @@ -357,7 +361,7 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error { apnsResMutex.Lock() a.successesReceived++ apnsResMutex.Unlock() - statsReporterHandleNotificationSuccess(a.StatsReporters) + statsReporterHandleNotificationSuccess(a.StatsReporters, parsedTopic.Game, "apns") return nil } diff --git a/extensions/apns_message_handler_test.go b/extensions/apns_message_handler_test.go index 85b0843..3ff7f77 100644 --- a/extensions/apns_message_handler_test.go +++ b/extensions/apns_message_handler_test.go @@ -357,30 +357,36 @@ var _ = Describe("APNS Message Handler", func() { Describe("Send message", func() { It("should add message to push queue and increment sentMessages", func() { - handler.sendMessage([]byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`)) + handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_apns", + Value: []byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`), + }) Expect(handler.sentMessages).To(Equal(int64(1))) }) }) - Describe("Handle Messages", func() { - It("should start without panicking and set run to true", func() { - stopChannel := make(chan struct{}) - queue, err := NewKafkaConsumer( - handler.Config, - logger, - &stopChannel, - mockKafkaConsumerClient, - ) - Expect(err).NotTo(HaveOccurred()) - Expect(func() { go handler.HandleMessages(queue.MessagesChannel()) }).ShouldNot(Panic()) - time.Sleep(50 * time.Millisecond) - Expect(handler.run).To(BeTrue()) - }) - }) + // Describe("Handle Messages", func() { + // It("should start without panicking and set run to true", func() { + // stopChannel := make(chan struct{}) + // queue, err := NewKafkaConsumer( + // handler.Config, + // logger, + // &stopChannel, + // mockKafkaConsumerClient, + // ) + // Expect(err).NotTo(HaveOccurred()) + // Expect(func() { go handler.HandleMessages(queue.MessagesChannel()) }).ShouldNot(Panic()) + // time.Sleep(50 * time.Millisecond) + // Expect(handler.run).To(BeTrue()) + // }) + // }) Describe("Clean Cache", func() { It("should remove from push queue after timeout", func() { - handler.sendMessage([]byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`)) + handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_apns", + Value: []byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`), + }) Expect(func() { go handler.CleanMetadataCache() }).ShouldNot(Panic()) time.Sleep(500 * time.Millisecond) Expect(*handler.requestsHeap).To(BeEmpty()) @@ -388,7 +394,10 @@ var _ = Describe("APNS Message Handler", func() { }) It("should not panic if a request got a response", func() { - handler.sendMessage([]byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`)) + handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_apns", + Value: []byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`), + }) Expect(func() { go handler.CleanMetadataCache() }).ShouldNot(Panic()) res := push.Response{ DeviceToken: uuid.NewV4().String(), @@ -405,7 +414,10 @@ var _ = Describe("APNS Message Handler", func() { var n int = 10 sendRequests := func() { for i := 0; i < n; i++ { - handler.sendMessage([]byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`)) + handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_apns", + Value: []byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`), + }) } } @@ -450,11 +462,15 @@ var _ = Describe("APNS Message Handler", func() { It("should call HandleNotificationSent upon message sent to queue", func() { Expect(handler).NotTo(BeNil()) Expect(handler.StatsReporters).To(Equal(statsClients)) + kafkaMessage := interfaces.KafkaMessage{ + Game: "game", + Topic: "push-game_apns", + Value: []byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`), + } + handler.sendMessage(kafkaMessage) + handler.sendMessage(kafkaMessage) - handler.sendMessage([]byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`)) - handler.sendMessage([]byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`)) - - Expect(mockStatsDClient.Count["sent"]).To(Equal(2)) + Expect(mockStatsDClient.Count["apns.game.sent"]).To(Equal(2)) }) It("should call HandleNotificationSuccess upon message response received", func() { @@ -468,7 +484,7 @@ var _ = Describe("APNS Message Handler", func() { handler.handleAPNSResponse(res) handler.handleAPNSResponse(res) - Expect(mockStatsDClient.Count["ack"]).To(Equal(2)) + Expect(mockStatsDClient.Count["apns..ack"]).To(Equal(2)) }) It("should call HandleNotificationFailure upon message response received", func() { @@ -486,8 +502,8 @@ var _ = Describe("APNS Message Handler", func() { handler.handleAPNSResponse(res) handler.handleAPNSResponse(res) - Expect(mockStatsDClient.Count["failed"]).To(Equal(2)) - Expect(mockStatsDClient.Count["missing-device-token"]).To(Equal(2)) + Expect(mockStatsDClient.Count["apns..failed"]).To(Equal(2)) + Expect(mockStatsDClient.Count["apns..missing-device-token"]).To(Equal(2)) }) }) @@ -528,12 +544,15 @@ var _ = Describe("APNS Message Handler", func() { "timestamp": timestampNow, "hostname": hostname, "msgid": msgID, + "game": "game", + "platform": "apns", } - handler.InflightMessagesMetadata["testToken1"] = metadata + handler.InflightMessagesMetadata["idTest1"] = metadata res := push.Response{ DeviceToken: "testToken1", ID: "idTest1", } + go handler.handleAPNSResponse(res) fromKafka := &ResponseWithMetadata{} @@ -548,8 +567,10 @@ var _ = Describe("APNS Message Handler", func() { metadata := map[string]interface{}{ "some": "metadata", "timestamp": time.Now().Unix(), + "game": "game", + "platform": "apns", } - handler.InflightMessagesMetadata["testToken1"] = metadata + handler.InflightMessagesMetadata["idTest1"] = metadata res := push.Response{ DeviceToken: "testToken1", ID: "idTest1", @@ -583,8 +604,10 @@ var _ = Describe("APNS Message Handler", func() { metadata := map[string]interface{}{ "some": "metadata", "timestamp": time.Now().Unix(), + "game": "game", + "platform": "apns", } - handler.InflightMessagesMetadata["testToken1"] = metadata + handler.InflightMessagesMetadata["idTest1"] = metadata res := push.Response{ DeviceToken: "testToken1", ID: "idTest1", @@ -608,8 +631,10 @@ var _ = Describe("APNS Message Handler", func() { metadata := map[string]interface{}{ "some": "metadata", "timestamp": time.Now().Unix(), + "game": "game", + "platform": "apns", } - handler.InflightMessagesMetadata["testToken1"] = metadata + handler.InflightMessagesMetadata["idTest1"] = metadata res := push.Response{ DeviceToken: "testToken1", ID: "idTest1", @@ -677,7 +702,10 @@ var _ = Describe("APNS Message Handler", func() { Describe("Send message", func() { It("should add message to push queue and increment sentMessages", func() { - handler.sendMessage([]byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`)) + handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_apns", + Value: []byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`), + }) Eventually(handler.PushQueue.(*push.Queue).Responses, 5*time.Second).Should(Receive()) Expect(handler.sentMessages).To(Equal(int64(1))) }) @@ -685,13 +713,19 @@ var _ = Describe("APNS Message Handler", func() { Describe("PushExpiry", func() { It("should not send message if PushExpiry is in the past", func() { - handler.sendMessage([]byte(fmt.Sprintf(`{ "aps" : { "alert" : "Hello HTTP/2" }, "push_expiry": %d }`, time.Now().Unix()-int64(100)))) + handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_apns", + Value: []byte(fmt.Sprintf(`{ "aps" : { "alert" : "Hello HTTP/2" }, "push_expiry": %d }`, time.Now().Unix()-int64(100))), + }) Eventually(handler.PushQueue.(*push.Queue).Responses, 5*time.Second).Should(Receive()) Expect(handler.sentMessages).To(Equal(int64(0))) Expect(handler.ignoredMessages).To(Equal(int64(1))) }) It("should send message if PushExpiry is in the future", func() { - handler.sendMessage([]byte(fmt.Sprintf(`{ "aps" : { "alert" : "Hello HTTP/2" }, "push_expiry": %d}`, time.Now().Unix()+int64(100)))) + handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_apns", + Value: []byte(fmt.Sprintf(`{ "aps" : { "alert" : "Hello HTTP/2" }, "push_expiry": %d}`, time.Now().Unix()+int64(100))), + }) Eventually(handler.PushQueue.(*push.Queue).Responses, 5*time.Second).Should(Receive()) Expect(handler.sentMessages).To(Equal(int64(1))) }) diff --git a/extensions/common.go b/extensions/common.go index 15b4fc7..dd657f8 100644 --- a/extensions/common.go +++ b/extensions/common.go @@ -24,44 +24,61 @@ package extensions import ( "encoding/json" + "regexp" "github.com/topfreegames/pusher/errors" "github.com/topfreegames/pusher/interfaces" ) -func handleInvalidToken(invalidTokenHandlers []interfaces.InvalidTokenHandler, token string) { +var topicRegex = regexp.MustCompile("push-([^-_]+)[-_]([^-_]+)") + +// ParsedTopic contains game and platform extracted from topic name +type ParsedTopic struct { + Platform string + Game string +} + +func handleInvalidToken(invalidTokenHandlers []interfaces.InvalidTokenHandler, token string, game string, platform string) { for _, invalidTokenHandler := range invalidTokenHandlers { - invalidTokenHandler.HandleToken(token) + invalidTokenHandler.HandleToken(token, game, platform) + } +} + +func getGameAndPlatformFromTopic(topic string) ParsedTopic { + res := topicRegex.FindStringSubmatch(topic) + return ParsedTopic{ + Platform: res[2], + Game: res[1], } } -func sendToFeedbackReporters(feedbackReporters []interfaces.FeedbackReporter, res interface{}) error { +func sendToFeedbackReporters(feedbackReporters []interfaces.FeedbackReporter, res interface{}, topic ParsedTopic) error { jres, err := json.Marshal(res) if err != nil { return err } if feedbackReporters != nil { for _, feedbackReporter := range feedbackReporters { - feedbackReporter.SendFeedback(jres) + feedbackReporter.SendFeedback(topic.Game, topic.Platform, jres) } } return nil } -func statsReporterHandleNotificationSent(statsReporters []interfaces.StatsReporter) { +func statsReporterHandleNotificationSent(statsReporters []interfaces.StatsReporter, game string, platform string) { for _, statsReporter := range statsReporters { - statsReporter.HandleNotificationSent() + statsReporter.HandleNotificationSent(game, platform) } } -func statsReporterHandleNotificationSuccess(statsReporters []interfaces.StatsReporter) { +func statsReporterHandleNotificationSuccess(statsReporters []interfaces.StatsReporter, game string, platform string) { for _, statsReporter := range statsReporters { - statsReporter.HandleNotificationSuccess() + statsReporter.HandleNotificationSuccess(game, platform) } } -func statsReporterHandleNotificationFailure(statsReporters []interfaces.StatsReporter, err *errors.PushError) { +func statsReporterHandleNotificationFailure(statsReporters []interfaces.StatsReporter, game string, platform string, err *errors.PushError) { for _, statsReporter := range statsReporters { - statsReporter.HandleNotificationFailure(err) + statsReporter.HandleNotificationFailure(game, platform, err) } } diff --git a/extensions/common_test.go b/extensions/common_test.go index a640340..f57a9df 100644 --- a/extensions/common_test.go +++ b/extensions/common_test.go @@ -69,7 +69,7 @@ var _ = Describe("Common", func() { Describe("Handle token error", func() { It("should be successful", func() { token := uuid.NewV4().String() - handleInvalidToken(invalidTokenHandlers, token) + handleInvalidToken(invalidTokenHandlers, token, "test", "apns") query := "DELETE FROM test_apns WHERE token = ?0;" Expect(db.Execs).To(HaveLen(2)) Expect(db.Execs[1][0]).To(BeEquivalentTo(query)) @@ -79,7 +79,7 @@ var _ = Describe("Common", func() { It("should fail silently", func() { token := uuid.NewV4().String() db.Error = fmt.Errorf("pg: error") - handleInvalidToken(invalidTokenHandlers, token) + handleInvalidToken(invalidTokenHandlers, token, "test", "apns") Expect(db.Execs).To(HaveLen(2)) query := "DELETE FROM test_apns WHERE token = ?0;" Expect(db.Execs[1][0]).To(BeEquivalentTo(query)) @@ -90,7 +90,7 @@ var _ = Describe("Common", func() { Describe("Send feedback to reporters", func() { It("should return an error if res cannot be marshaled", func() { badContent := make(chan int) - err := sendToFeedbackReporters(feedbackClients, badContent) + err := sendToFeedbackReporters(feedbackClients, badContent, ParsedTopic{}) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("json: unsupported type: chan int")) }) diff --git a/extensions/gcm_message_handler.go b/extensions/gcm_message_handler.go index 8a08ba4..e396362 100644 --- a/extensions/gcm_message_handler.go +++ b/extensions/gcm_message_handler.go @@ -209,10 +209,13 @@ func (g *GCMMessageHandler) handleGCMResponse(cm gcm.CCSMessage) error { ccsMessageWithMetadata := &CCSMessageWithMetadata{ CCSMessage: cm, } + parsedTopic := ParsedTopic{} g.inflightMessagesMetadataLock.Lock() if val, ok := g.InflightMessagesMetadata[cm.MessageID]; ok { ccsMessageWithMetadata.Metadata = val.(map[string]interface{}) ccsMessageWithMetadata.Timestamp = ccsMessageWithMetadata.Metadata["timestamp"].(int64) + parsedTopic.Game = ccsMessageWithMetadata.Metadata["game"].(string) + parsedTopic.Platform = ccsMessageWithMetadata.Metadata["platform"].(string) delete(ccsMessageWithMetadata.Metadata, "timestamp") delete(g.InflightMessagesMetadata, cm.MessageID) } @@ -223,7 +226,7 @@ func (g *GCMMessageHandler) handleGCMResponse(cm gcm.CCSMessage) error { g.failuresReceived++ gcmResMutex.Unlock() pErr := errors.NewPushError(strings.ToLower(cm.Error), cm.ErrorDescription) - statsReporterHandleNotificationFailure(g.StatsReporters, pErr) + statsReporterHandleNotificationFailure(g.StatsReporters, parsedTopic.Game, "gcm", pErr) err = pErr switch cm.Error { @@ -236,7 +239,10 @@ func (g *GCMMessageHandler) handleGCMResponse(cm gcm.CCSMessage) error { if ccsMessageWithMetadata.Metadata != nil { ccsMessageWithMetadata.Metadata["deleteToken"] = true } - handleInvalidToken(g.InvalidTokenHandlers, cm.From) + handleInvalidToken( + g.InvalidTokenHandlers, cm.From, + parsedTopic.Game, parsedTopic.Platform, + ) case "INVALID_JSON": l.WithFields(log.Fields{ "category": "JsonError", @@ -258,14 +264,14 @@ func (g *GCMMessageHandler) handleGCMResponse(cm gcm.CCSMessage) error { log.ErrorKey: cm.Error, }).Debug("received an error") } - sendFeedbackErr := sendToFeedbackReporters(g.feedbackReporters, ccsMessageWithMetadata) + sendFeedbackErr := sendToFeedbackReporters(g.feedbackReporters, ccsMessageWithMetadata, parsedTopic) if sendFeedbackErr != nil { l.WithError(sendFeedbackErr).Error("error sending feedback to reporter") } return err } - sendFeedbackErr := sendToFeedbackReporters(g.feedbackReporters, ccsMessageWithMetadata) + sendFeedbackErr := sendToFeedbackReporters(g.feedbackReporters, ccsMessageWithMetadata, parsedTopic) if sendFeedbackErr != nil { l.WithError(sendFeedbackErr).Error("error sending feedback to reporter") } @@ -273,17 +279,17 @@ func (g *GCMMessageHandler) handleGCMResponse(cm gcm.CCSMessage) error { gcmResMutex.Lock() g.successesReceived++ gcmResMutex.Unlock() - statsReporterHandleNotificationSuccess(g.StatsReporters) + statsReporterHandleNotificationSuccess(g.StatsReporters, parsedTopic.Game, "gcm") return nil } -func (g *GCMMessageHandler) sendMessage(message []byte) error { +func (g *GCMMessageHandler) sendMessage(message interfaces.KafkaMessage) error { g.pendingMessages <- true l := g.Logger.WithField("method", "sendMessage") //ttl := uint(0) km := KafkaGCMMessage{} - err := json.Unmarshal(message, &km) + err := json.Unmarshal(message.Value, &km) if err != nil { <-g.pendingMessages l.WithError(err).Error("Error unmarshaling message.") @@ -313,6 +319,7 @@ func (g *GCMMessageHandler) sendMessage(message []byte) error { if km.Metadata == nil { km.Metadata = map[string]interface{}{} } + g.inflightMessagesMetadataLock.Lock() km.Metadata["timestamp"] = time.Now().Unix() @@ -322,14 +329,18 @@ func (g *GCMMessageHandler) sendMessage(message []byte) error { } else { km.Metadata["hostname"] = hostname } + km.Metadata["msgid"] = uuid.NewV4().String() + km.Metadata["game"] = message.Game + km.Metadata["platform"] = "gcm" + g.InflightMessagesMetadata[messageID] = km.Metadata g.requestsHeap.AddRequest(messageID) g.inflightMessagesMetadataLock.Unlock() } - statsReporterHandleNotificationSent(g.StatsReporters) + statsReporterHandleNotificationSent(g.StatsReporters, message.Game, "gcm") g.sentMessages++ l.WithFields(log.Fields{ "messageID": messageID, @@ -360,15 +371,8 @@ func (g *GCMMessageHandler) CleanMetadataCache() { } // HandleMessages get messages from msgChan and send to GCM -func (g *GCMMessageHandler) HandleMessages(msgChan *chan []byte) { - g.run = true - - for g.run == true { - select { - case message := <-*msgChan: - g.sendMessage(message) - } - } +func (g *GCMMessageHandler) HandleMessages(msg interfaces.KafkaMessage) { + g.sendMessage(msg) } // LogStats from time to time diff --git a/extensions/gcm_message_handler_test.go b/extensions/gcm_message_handler_test.go index ee82775..d9b38ac 100644 --- a/extensions/gcm_message_handler_test.go +++ b/extensions/gcm_message_handler_test.go @@ -209,6 +209,8 @@ var _ = Describe("GCM Message Handler", func() { metadata := map[string]interface{}{ "some": "metadata", "timestamp": time.Now().Unix(), + "game": "game", + "platform": "gcm", } msg := &KafkaGCMMessage{ gcm.XMPPMessage{ @@ -224,7 +226,10 @@ var _ = Describe("GCM Message Handler", func() { msgBytes, err := json.Marshal(msg) Expect(err).NotTo(HaveOccurred()) - err = handler.sendMessage(msgBytes) + err = handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_gcm", + Value: msgBytes, + }) Expect(err).NotTo(HaveOccurred()) Expect(handler.sentMessages).To(Equal(int64(1))) Expect(handler.ignoredMessages).To(Equal(int64(0))) @@ -235,6 +240,8 @@ var _ = Describe("GCM Message Handler", func() { metadata := map[string]interface{}{ "some": "metadata", "timestamp": time.Now().Unix(), + "game": "game", + "platform": "gcm", } msg := &KafkaGCMMessage{ gcm.XMPPMessage{ @@ -250,7 +257,10 @@ var _ = Describe("GCM Message Handler", func() { msgBytes, err := json.Marshal(msg) Expect(err).NotTo(HaveOccurred()) - err = handler.sendMessage(msgBytes) + err = handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_gcm", + Value: msgBytes, + }) Expect(err).NotTo(HaveOccurred()) Expect(handler.sentMessages).To(Equal(int64(0))) Expect(handler.ignoredMessages).To(Equal(int64(1))) @@ -261,7 +271,10 @@ var _ = Describe("GCM Message Handler", func() { Describe("Send message", func() { It("should send xmpp message and not increment sentMessages if an error occurs", func() { - err := handler.sendMessage([]byte("gogogo")) + err := handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_gcm", + Value: []byte("gogogo"), + }) Expect(err).To(HaveOccurred()) Expect(handler.sentMessages).To(Equal(int64(0))) Expect(hook.Entries).To(ContainLogMessage("Error unmarshaling message.")) @@ -281,7 +294,10 @@ var _ = Describe("GCM Message Handler", func() { msgBytes, err := json.Marshal(msg) Expect(err).NotTo(HaveOccurred()) - err = handler.sendMessage(msgBytes) + err = handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_gcm", + Value: msgBytes, + }) Expect(err).NotTo(HaveOccurred()) Expect(handler.sentMessages).To(Equal(int64(1))) Expect(hook.LastEntry().Message).To(Equal("sent message")) @@ -294,6 +310,8 @@ var _ = Describe("GCM Message Handler", func() { metadata := map[string]interface{}{ "some": "metadata", "timestamp": time.Now().Unix(), + "game": "game", + "platform": "gcm", } msg := &KafkaGCMMessage{ gcm.XMPPMessage{ @@ -309,7 +327,10 @@ var _ = Describe("GCM Message Handler", func() { msgBytes, err := json.Marshal(msg) Expect(err).NotTo(HaveOccurred()) - err = handler.sendMessage(msgBytes) + err = handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_gcm", + Value: msgBytes, + }) Expect(err).NotTo(HaveOccurred()) Expect(handler.sentMessages).To(Equal(int64(1))) Expect(hook.LastEntry().Message).To(Equal("sent message")) @@ -330,13 +351,19 @@ var _ = Describe("GCM Message Handler", func() { Expect(err).NotTo(HaveOccurred()) for i := 1; i <= 3; i++ { - err = handler.sendMessage(msgBytes) + err = handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_gcm", + Value: msgBytes, + }) Expect(err).NotTo(HaveOccurred()) Expect(handler.sentMessages).To(Equal(int64(i))) Expect(len(handler.pendingMessages)).To(Equal(i)) } - go handler.sendMessage(msgBytes) + go handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_gcm", + Value: msgBytes, + }) Consistently(handler.sentMessages).Should(Equal(int64(3))) Consistently(len(handler.pendingMessages)).Should(Equal(3)) @@ -346,23 +373,26 @@ var _ = Describe("GCM Message Handler", func() { }) }) - Describe("Handle Messages", func() { - It("should start without panicking and set run to true", func() { - stopChannel := make(chan struct{}) - queue, err := NewKafkaConsumer( - handler.Config, logger, - &stopChannel, mockKafkaConsumerClient, - ) - Expect(err).NotTo(HaveOccurred()) - Expect(func() { go handler.HandleMessages(queue.MessagesChannel()) }).ShouldNot(Panic()) - time.Sleep(time.Millisecond) - Expect(handler.run).To(BeTrue()) - }) - }) + // Describe("Handle Messages", func() { + // It("should start without panicking and set run to true", func() { + // stopChannel := make(chan struct{}) + // queue, err := NewKafkaConsumer( + // handler.Config, logger, + // &stopChannel, mockKafkaConsumerClient, + // ) + // Expect(err).NotTo(HaveOccurred()) + // Expect(func() { go handler.HandleMessages(queue.MessagesChannel()) }).ShouldNot(Panic()) + // time.Sleep(time.Millisecond) + // Expect(handler.run).To(BeTrue()) + // }) + // }) Describe("Clean Cache", func() { It("should remove from push queue after timeout", func() { - handler.sendMessage([]byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`)) + handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_gcm", + Value: []byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`), + }) Expect(func() { go handler.CleanMetadataCache() }).ShouldNot(Panic()) time.Sleep(500 * time.Millisecond) Expect(*handler.requestsHeap).To(BeEmpty()) @@ -370,7 +400,10 @@ var _ = Describe("GCM Message Handler", func() { }) It("should not panic if a request got a response", func() { - handler.sendMessage([]byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`)) + handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_gcm", + Value: []byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`), + }) Expect(func() { go handler.CleanMetadataCache() }).ShouldNot(Panic()) res := gcm.CCSMessage{ From: "testToken1", @@ -389,7 +422,10 @@ var _ = Describe("GCM Message Handler", func() { var n int = 10 sendRequests := func() { for i := 0; i < n; i++ { - handler.sendMessage([]byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`)) + handler.sendMessage(interfaces.KafkaMessage{ + Topic: "push-game_gcm", + Value: []byte(`{ "aps" : { "alert" : "Hello HTTP/2" } }`), + }) } } @@ -446,22 +482,24 @@ var _ = Describe("GCM Message Handler", func() { } msgBytes, err := json.Marshal(msg) Expect(err).NotTo(HaveOccurred()) - - err = handler.sendMessage(msgBytes) + kafkaMessage := interfaces.KafkaMessage{ + Game: "game", + Topic: "push-game_gcm", + Value: msgBytes, + } + err = handler.sendMessage(kafkaMessage) Expect(err).NotTo(HaveOccurred()) - err = handler.sendMessage(msgBytes) + err = handler.sendMessage(kafkaMessage) Expect(err).NotTo(HaveOccurred()) - - Expect(mockStatsDClient.Count["sent"]).To(Equal(2)) + Expect(mockStatsDClient.Count["gcm.game.sent"]).To(Equal(2)) }) It("should call HandleNotificationSuccess upon message response received", func() { res := gcm.CCSMessage{} handler.handleGCMResponse(res) handler.handleGCMResponse(res) - - Expect(mockStatsDClient.Count["ack"]).To(Equal(2)) + Expect(mockStatsDClient.Count["gcm..ack"]).To(Equal(2)) }) It("should call HandleNotificationFailure upon message response received", func() { @@ -471,8 +509,8 @@ var _ = Describe("GCM Message Handler", func() { handler.handleGCMResponse(res) handler.handleGCMResponse(res) - Expect(mockStatsDClient.Count["failed"]).To(Equal(2)) - Expect(mockStatsDClient.Count["device_unregistered"]).To(Equal(2)) + Expect(mockStatsDClient.Count["gcm..failed"]).To(Equal(2)) + Expect(mockStatsDClient.Count["gcm..device_unregistered"]).To(Equal(2)) }) }) @@ -516,6 +554,8 @@ var _ = Describe("GCM Message Handler", func() { "timestamp": timestampNow, "hostname": hostname, "msgid": msgID, + "game": "game", + "platform": "gcm", } handler.InflightMessagesMetadata["idTest1"] = metadata res := gcm.CCSMessage{ @@ -538,6 +578,8 @@ var _ = Describe("GCM Message Handler", func() { metadata := map[string]interface{}{ "some": "metadata", "timestamp": time.Now().Unix(), + "game": "game", + "platform": "gcm", } handler.InflightMessagesMetadata["idTest1"] = metadata res := gcm.CCSMessage{ @@ -581,6 +623,8 @@ var _ = Describe("GCM Message Handler", func() { metadata := map[string]interface{}{ "some": "metadata", "timestamp": time.Now().Unix(), + "game": "game", + "platform": "gcm", } handler.InflightMessagesMetadata["idTest1"] = metadata res := gcm.CCSMessage{ @@ -608,6 +652,8 @@ var _ = Describe("GCM Message Handler", func() { metadata := map[string]interface{}{ "some": "metadata", "timestamp": time.Now().Unix(), + "game": "game", + "platform": "gcm", } handler.InflightMessagesMetadata["idTest1"] = metadata res := gcm.CCSMessage{ diff --git a/extensions/kafka_consumer.go b/extensions/kafka_consumer.go index 5c3301c..b74affa 100644 --- a/extensions/kafka_consumer.go +++ b/extensions/kafka_consumer.go @@ -43,7 +43,7 @@ type KafkaConsumer struct { ChannelSize int Logger *logrus.Logger messagesReceived int64 - msgChan chan []byte + msgChan chan interfaces.KafkaMessage OffsetResetStrategy string run bool SessionTimeout int @@ -97,7 +97,7 @@ func (q *KafkaConsumer) configure(client interfaces.KafkaConsumerClient) error { q.ChannelSize = q.Config.GetInt("queue.channelSize") q.HandleAllMessagesBeforeExiting = q.Config.GetBool("queue.handleAllMessagesBeforeExiting") - q.msgChan = make(chan []byte, q.ChannelSize) + q.msgChan = make(chan interfaces.KafkaMessage, q.ChannelSize) if q.HandleAllMessagesBeforeExiting { var wg sync.WaitGroup @@ -164,7 +164,7 @@ func (q *KafkaConsumer) StopConsuming() { } // MessagesChannel returns the channel that will receive all messages got from kafka -func (q *KafkaConsumer) MessagesChannel() *chan []byte { +func (q *KafkaConsumer) MessagesChannel() *chan interfaces.KafkaMessage { return &q.msgChan } @@ -264,7 +264,14 @@ func (q *KafkaConsumer) receiveMessage(topicPartition kafka.TopicPartition, valu if q.pendingMessagesWG != nil { q.pendingMessagesWG.Add(1) } - q.msgChan <- value + + message := interfaces.KafkaMessage{ + Game: getGameAndPlatformFromTopic(*topicPartition.Topic).Game, + Topic: *topicPartition.Topic, + Value: value, + } + + q.msgChan <- message l.Debug("Received message processed.") } diff --git a/extensions/kafka_consumer_test.go b/extensions/kafka_consumer_test.go index 5b174da..3fd0480 100644 --- a/extensions/kafka_consumer_test.go +++ b/extensions/kafka_consumer_test.go @@ -32,6 +32,7 @@ import ( "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" + "github.com/topfreegames/pusher/interfaces" "github.com/topfreegames/pusher/mocks" . "github.com/topfreegames/pusher/testing" "github.com/topfreegames/pusher/util" @@ -186,7 +187,7 @@ var _ = Describe("Kafka Extension", func() { }) It("should receive message", func() { - topic := consumer.Config.GetStringSlice("queue.topics")[0] + topic := "push-games_apns-single" startConsuming() defer consumer.StopConsuming() part := kafka.TopicPartition{ @@ -198,7 +199,10 @@ var _ = Describe("Kafka Extension", func() { consumer.messagesReceived = 999 publishEvent(event) - Eventually(consumer.msgChan, 5).Should(Receive(&val)) + Eventually(consumer.msgChan, 5).Should(Receive(&interfaces.KafkaMessage{ + Topic: topic, + Value: val, + })) Expect(consumer.messagesReceived).To(BeEquivalentTo(1000)) }) diff --git a/extensions/kafka_producer.go b/extensions/kafka_producer.go index 0e0673d..ff57e32 100644 --- a/extensions/kafka_producer.go +++ b/extensions/kafka_producer.go @@ -23,9 +23,9 @@ package extensions import ( - log "github.com/sirupsen/logrus" "github.com/confluentinc/confluent-kafka-go/kafka" raven "github.com/getsentry/raven-go" + log "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/topfreegames/pusher/interfaces" "github.com/topfreegames/pusher/util" @@ -116,10 +116,11 @@ func (q *KafkaProducer) listenForKafkaResponses() { } // SendFeedback sends the feedback to the kafka Queue -func (q *KafkaProducer) SendFeedback(feedback []byte) { +func (q *KafkaProducer) SendFeedback(game string, platform string, feedback []byte) { + topic := "push-" + game + "-" + platform + "-feedbacks" m := &kafka.Message{ TopicPartition: kafka.TopicPartition{ - Topic: &q.Topic, + Topic: &topic, Partition: kafka.PartitionAny, }, Value: feedback, diff --git a/extensions/kafka_producer_test.go b/extensions/kafka_producer_test.go index 31a98e6..88219f1 100644 --- a/extensions/kafka_producer_test.go +++ b/extensions/kafka_producer_test.go @@ -23,11 +23,11 @@ package extensions import ( - "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" "github.com/confluentinc/confluent-kafka-go/kafka" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" "github.com/topfreegames/pusher/mocks" "github.com/topfreegames/pusher/util" @@ -53,7 +53,7 @@ var _ = Describe("KafkaProducer Extension", func() { It("should send message", func() { KafkaProducer, err := NewKafkaProducer(config, logger, mockProducer) Expect(err).NotTo(HaveOccurred()) - KafkaProducer.SendFeedback([]byte("test message")) + KafkaProducer.SendFeedback("testgame", "apns", []byte("test message")) Eventually(func() int { return KafkaProducer.Producer.(*mocks.KafkaProducerClientMock).SentMessages }).Should(Equal(1)) diff --git a/extensions/pg_test.go b/extensions/pg_test.go index db842e9..9f2f807 100644 --- a/extensions/pg_test.go +++ b/extensions/pg_test.go @@ -144,7 +144,7 @@ var _ = Describe("PG Extension", func() { }) It("should return error if error when closing connection", func() { - pErr := fmt.Errorf("Failed to close connection.") + pErr := fmt.Errorf("failed to close connection") mockDb = mocks.NewPGMock(0, 1) client, err := NewPGClient("push.db", config, mockDb) Expect(err).NotTo(HaveOccurred()) @@ -153,7 +153,7 @@ var _ = Describe("PG Extension", func() { err = client.Cleanup() Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("Failed to close connection.")) + Expect(err.Error()).To(ContainSubstring("failed to close connection")) }) }) }) diff --git a/extensions/statsd.go b/extensions/statsd.go index 8906b19..2843afe 100644 --- a/extensions/statsd.go +++ b/extensions/statsd.go @@ -25,8 +25,8 @@ package extensions import ( "time" - "github.com/sirupsen/logrus" "github.com/alexcesaro/statsd" + "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/topfreegames/pusher/errors" "github.com/topfreegames/pusher/interfaces" @@ -89,19 +89,19 @@ func (s *StatsD) configure(client interfaces.StatsDClient) error { } //HandleNotificationSent stores notification count in StatsD -func (s *StatsD) HandleNotificationSent() { - s.Client.Increment("sent") +func (s *StatsD) HandleNotificationSent(game string, platform string) { + s.Client.Increment(platform + "." + game + "." + "sent") } //HandleNotificationSuccess stores notifications success in StatsD -func (s *StatsD) HandleNotificationSuccess() { - s.Client.Increment("ack") +func (s *StatsD) HandleNotificationSuccess(game string, platform string) { + s.Client.Increment(platform + "." + game + "." + "ack") } //HandleNotificationFailure stores each type of failure -func (s *StatsD) HandleNotificationFailure(err *errors.PushError) { - s.Client.Increment("failed") - s.Client.Increment(err.Key) +func (s *StatsD) HandleNotificationFailure(game string, platform string, err *errors.PushError) { + s.Client.Increment(platform + "." + game + "." + "failed") + s.Client.Increment(platform + "." + game + "." + err.Key) } //ReportGoStats reports go stats in statsd diff --git a/extensions/statsd_test.go b/extensions/statsd_test.go index 3c1901f..084e1fb 100644 --- a/extensions/statsd_test.go +++ b/extensions/statsd_test.go @@ -23,9 +23,9 @@ package extensions import ( - "github.com/sirupsen/logrus/hooks/test" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" "github.com/topfreegames/pusher/errors" "github.com/topfreegames/pusher/mocks" @@ -51,10 +51,9 @@ var _ = Describe("StatsD Extension", func() { Expect(err).NotTo(HaveOccurred()) defer statsd.Cleanup() - statsd.HandleNotificationSent() - statsd.HandleNotificationSent() - - Expect(mockClient.Count["sent"]).To(Equal(2)) + statsd.HandleNotificationSent("game", "apns") + statsd.HandleNotificationSent("game", "apns") + Expect(mockClient.Count["apns.game.sent"]).To(Equal(2)) }) }) @@ -64,10 +63,9 @@ var _ = Describe("StatsD Extension", func() { Expect(err).NotTo(HaveOccurred()) defer statsd.Cleanup() - statsd.HandleNotificationSuccess() - statsd.HandleNotificationSuccess() - - Expect(mockClient.Count["ack"]).To(Equal(2)) + statsd.HandleNotificationSuccess("game", "apns") + statsd.HandleNotificationSuccess("game", "apns") + Expect(mockClient.Count["apns.game.ack"]).To(Equal(2)) }) }) @@ -96,11 +94,11 @@ var _ = Describe("StatsD Extension", func() { pErr := errors.NewPushError("some-key", "some description") - statsd.HandleNotificationFailure(pErr) - statsd.HandleNotificationFailure(pErr) + statsd.HandleNotificationFailure("game", "apns", pErr) + statsd.HandleNotificationFailure("game", "apns", pErr) - Expect(mockClient.Count["failed"]).To(Equal(2)) - Expect(mockClient.Count["some-key"]).To(Equal(2)) + Expect(mockClient.Count["apns.game.failed"]).To(Equal(2)) + Expect(mockClient.Count["apns.game.some-key"]).To(Equal(2)) }) }) }) diff --git a/extensions/token_pg.go b/extensions/token_pg.go index 5a9083a..0618095 100644 --- a/extensions/token_pg.go +++ b/extensions/token_pg.go @@ -25,8 +25,8 @@ package extensions import ( "fmt" - "github.com/sirupsen/logrus" raven "github.com/getsentry/raven-go" + "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/topfreegames/pusher/interfaces" "github.com/topfreegames/pusher/util" @@ -34,10 +34,9 @@ import ( // TokenPG for sending metrics type TokenPG struct { - Client *PGClient - Config *viper.Viper - Logger *logrus.Logger - tableName string + Client *PGClient + Config *viper.Viper + Logger *logrus.Logger } // NewTokenPG for creating a new TokenPG instance @@ -59,7 +58,6 @@ func (t *TokenPG) loadConfigurationDefaults() {} func (t *TokenPG) configure(db interfaces.DB) error { l := t.Logger.WithField("method", "configure") t.loadConfigurationDefaults() - t.tableName = t.Config.GetString("invalidToken.pg.table") var err error t.Client, err = NewPGClient("invalidToken.pg", t.Config, db) @@ -73,13 +71,13 @@ func (t *TokenPG) configure(db interfaces.DB) error { } // HandleToken handles an invalid token -func (t *TokenPG) HandleToken(token string) error { +func (t *TokenPG) HandleToken(token string, game string, platform string) error { l := t.Logger.WithFields(logrus.Fields{ "method": "HandleToken", "token": token, }) l.Debug("deleting token") - query := fmt.Sprintf("DELETE FROM %s WHERE token = ?0;", t.tableName) + query := fmt.Sprintf("DELETE FROM %s WHERE token = ?0;", game+"_"+platform) _, err := t.Client.DB.Exec(query, token) if err != nil && err.Error() != "pg: no rows in result set" { raven.CaptureError(err, map[string]string{ diff --git a/extensions/token_pg_test.go b/extensions/token_pg_test.go index 3f376b8..56223e7 100644 --- a/extensions/token_pg_test.go +++ b/extensions/token_pg_test.go @@ -25,10 +25,10 @@ package extensions import ( "fmt" - "github.com/sirupsen/logrus/hooks/test" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" uuid "github.com/satori/go.uuid" + "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" "github.com/topfreegames/pusher/mocks" "github.com/topfreegames/pusher/util" @@ -62,7 +62,6 @@ var _ = Describe("TokenPG Extension", func() { Expect(t.Config).NotTo(BeNil()) Expect(t.Logger).NotTo(BeNil()) - Expect(t.tableName).To(Equal("test_apns")) }) It("should return an error if failed to connect to postgres", func() { @@ -76,7 +75,7 @@ var _ = Describe("TokenPG Extension", func() { Describe("Handle invalid token", func() { It("should delete apns token", func() { - err := tokenPG.HandleToken(token) + err := tokenPG.HandleToken(token, "test", "apns") Expect(err).NotTo(HaveOccurred()) query := "DELETE FROM test_apns WHERE token = ?0;" @@ -87,7 +86,7 @@ var _ = Describe("TokenPG Extension", func() { It("should not break if token does not exist in db", func() { mockClient.Error = fmt.Errorf("pg: no rows in result set") - err := tokenPG.HandleToken(token) + err := tokenPG.HandleToken(token, "test", "apns") Expect(err).NotTo(HaveOccurred()) query := "DELETE FROM test_apns WHERE token = ?0;" @@ -98,7 +97,7 @@ var _ = Describe("TokenPG Extension", func() { It("should return an error if pg error occurred", func() { mockClient.Error = fmt.Errorf("pg: error") - err := tokenPG.HandleToken(token) + err := tokenPG.HandleToken(token, "test", "apns") Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("pg: error")) }) diff --git a/interfaces/feedback_reporter.go b/interfaces/feedback_reporter.go index 35ad95e..3ec9c2c 100644 --- a/interfaces/feedback_reporter.go +++ b/interfaces/feedback_reporter.go @@ -24,5 +24,5 @@ package interfaces // FeedbackReporter interface for making new feedback reporters pluggable easily type FeedbackReporter interface { - SendFeedback(feedback []byte) + SendFeedback(game string, platform string, feedback []byte) } diff --git a/interfaces/invalid_token_handler.go b/interfaces/invalid_token_handler.go index 372d39e..43e75c7 100644 --- a/interfaces/invalid_token_handler.go +++ b/interfaces/invalid_token_handler.go @@ -24,5 +24,5 @@ package interfaces // InvalidTokenHandler interface for defining functions that handle invalid tokens easily pluggable type InvalidTokenHandler interface { - HandleToken(token string) error + HandleToken(token string, game string, platform string) error } diff --git a/interfaces/message_handler.go b/interfaces/message_handler.go index d2459fd..e823a12 100644 --- a/interfaces/message_handler.go +++ b/interfaces/message_handler.go @@ -24,7 +24,7 @@ package interfaces // MessageHandler interface for making message handlers pluggable easily type MessageHandler interface { - HandleMessages(msgChan *chan []byte) + HandleMessages(msg KafkaMessage) HandleResponses() LogStats() } diff --git a/interfaces/queue.go b/interfaces/queue.go index a812c9e..6086c21 100644 --- a/interfaces/queue.go +++ b/interfaces/queue.go @@ -24,9 +24,16 @@ package interfaces import "sync" +// KafkaMessage sent through the Channel +type KafkaMessage struct { + Game string + Topic string + Value []byte +} + // Queue interface for making new queues pluggable easily type Queue interface { - MessagesChannel() *chan []byte + MessagesChannel() *chan KafkaMessage ConsumeLoop() error StopConsuming() PendingMessagesWaitGroup() *sync.WaitGroup diff --git a/interfaces/stats_reporter.go b/interfaces/stats_reporter.go index e934d5e..210f76a 100644 --- a/interfaces/stats_reporter.go +++ b/interfaces/stats_reporter.go @@ -26,8 +26,8 @@ import "github.com/topfreegames/pusher/errors" // StatsReporter interface for making stats reporters pluggable easily type StatsReporter interface { - HandleNotificationSent() - HandleNotificationSuccess() - HandleNotificationFailure(*errors.PushError) + HandleNotificationSent(game string, platform string) + HandleNotificationSuccess(game string, platform string) + HandleNotificationFailure(game string, platform string, err *errors.PushError) ReportGoStats(numGoRoutines int, allocatedAndNotFreed, heapObjects, nextGCBytes, pauseGCNano uint64) } diff --git a/pusher/apns.go b/pusher/apns.go index 62be6be..9270666 100644 --- a/pusher/apns.go +++ b/pusher/apns.go @@ -44,7 +44,7 @@ type APNSPusher struct { InvalidTokenHandlers []interfaces.InvalidTokenHandler IsProduction bool Logger *logrus.Logger - MessageHandler interfaces.MessageHandler + MessageHandler map[string]interfaces.MessageHandler Queue interfaces.Queue run bool StatsReporters []interfaces.StatsReporter @@ -53,7 +53,6 @@ type APNSPusher struct { // NewAPNSPusher for getting a new APNSPusher instance func NewAPNSPusher( - certificatePath string, isProduction bool, config *viper.Viper, logger *logrus.Logger, @@ -62,11 +61,10 @@ func NewAPNSPusher( queueOrNil ...interfaces.APNSPushQueue, ) (*APNSPusher, error) { a := &APNSPusher{ - CertificatePath: certificatePath, - Config: config, - IsProduction: isProduction, - Logger: logger, - stopChannel: make(chan struct{}), + Config: config, + IsProduction: isProduction, + Logger: logger, + stopChannel: make(chan struct{}), } var queue interfaces.APNSPushQueue if len(queueOrNil) > 0 { @@ -85,6 +83,9 @@ func (a *APNSPusher) loadConfigurationDefaults() { func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB, statsdClientOrNil interfaces.StatsDClient) error { var err error + l := a.Logger.WithFields(logrus.Fields{ + "method": "configure", + }) a.loadConfigurationDefaults() a.GracefulShutdownTimeout = a.Config.GetInt("gracefulShutdownTimeout") if err = a.configureStatsReporters(statsdClientOrNil); err != nil { @@ -104,22 +105,30 @@ func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB, if err != nil { return err } + a.MessageHandler = make(map[string]interfaces.MessageHandler) a.Queue = q - handler, err := extensions.NewAPNSMessageHandler( - a.CertificatePath, - a.IsProduction, - a.Config, - a.Logger, - a.Queue.PendingMessagesWaitGroup(), - a.StatsReporters, - a.feedbackReporters, - a.InvalidTokenHandlers, - queue, - ) - if err != nil { - return err + + for k, v := range a.Config.GetStringMap("apns.games") { + l.Infof( + "Configuring messageHandler for game %s with certificate: %s", + k, v, + ) + handler, err := extensions.NewAPNSMessageHandler( + v.(string), + a.IsProduction, + a.Config, + a.Logger, + a.Queue.PendingMessagesWaitGroup(), + a.StatsReporters, + a.feedbackReporters, + a.InvalidTokenHandlers, + queue, + ) + if err != nil { + return err + } + a.MessageHandler[k] = handler } - a.MessageHandler = handler return nil } @@ -150,6 +159,24 @@ func (a *APNSPusher) configureInvalidTokenHandlers(dbOrNil interfaces.DB) error return nil } +func (a *APNSPusher) routeMessages(msgChan *chan interfaces.KafkaMessage) { + a.run = true + + for a.run == true { + select { + case message := <-*msgChan: + if handler, ok := a.MessageHandler[message.Game]; ok { + handler.HandleMessages(message) + } else { + a.Logger.WithFields(logrus.Fields{ + "method": "routeMessages", + "game": message.Game, + }).Error("Game not found") + } + } + } +} + // Start starts pusher in apns mode func (a *APNSPusher) Start() { a.run = true @@ -158,14 +185,15 @@ func (a *APNSPusher) Start() { "certificatePath": a.CertificatePath, }) l.Info("starting pusher in apns mode...") - go a.MessageHandler.HandleMessages(a.Queue.MessagesChannel()) - go a.MessageHandler.HandleResponses() + go a.routeMessages(a.Queue.MessagesChannel()) + for _, v := range a.MessageHandler { + go v.HandleResponses() + go v.LogStats() + msgHandler, _ := v.(*extensions.APNSMessageHandler) + go msgHandler.CleanMetadataCache() + } go a.Queue.ConsumeLoop() go a.reportGoStats() - go a.MessageHandler.LogStats() - - msgHandler, _ := a.MessageHandler.(*extensions.APNSMessageHandler) - go msgHandler.CleanMetadataCache() sigchan := make(chan os.Signal) signal.Notify(sigchan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) diff --git a/pusher/apns_test.go b/pusher/apns_test.go index 577ab0d..2a21eea 100644 --- a/pusher/apns_test.go +++ b/pusher/apns_test.go @@ -25,18 +25,16 @@ package pusher import ( "time" - "github.com/sirupsen/logrus/hooks/test" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" - "github.com/topfreegames/pusher/extensions" "github.com/topfreegames/pusher/mocks" "github.com/topfreegames/pusher/util" ) var _ = Describe("APNS Pusher", func() { var config *viper.Viper - certificatePath := "../tls/self_signed_cert.pem" configFile := "../config/test.yaml" isProduction := false logger, hook := test.NewNullLogger() @@ -63,7 +61,6 @@ var _ = Describe("APNS Pusher", func() { Describe("Creating new apns pusher", func() { It("should return configured pusher", func() { pusher, err := NewAPNSPusher( - certificatePath, isProduction, config, logger, @@ -73,7 +70,6 @@ var _ = Describe("APNS Pusher", func() { ) Expect(err).NotTo(HaveOccurred()) Expect(pusher).NotTo(BeNil()) - Expect(pusher.CertificatePath).To(Equal(certificatePath)) Expect(pusher.IsProduction).To(Equal(isProduction)) Expect(pusher.run).To(BeFalse()) Expect(pusher.Queue).NotTo(BeNil()) @@ -81,14 +77,13 @@ var _ = Describe("APNS Pusher", func() { Expect(pusher.MessageHandler).NotTo(BeNil()) Expect(pusher.StatsReporters).To(HaveLen(1)) - Expect(pusher.MessageHandler.(*extensions.APNSMessageHandler).StatsReporters).To(HaveLen(1)) + Expect(pusher.MessageHandler).To(HaveLen(1)) }) }) Describe("Start apns pusher", func() { It("should launch go routines and run forever", func() { pusher, err := NewAPNSPusher( - certificatePath, isProduction, config, logger, diff --git a/pusher/gcm.go b/pusher/gcm.go index 344d14d..1a19915 100644 --- a/pusher/gcm.go +++ b/pusher/gcm.go @@ -44,7 +44,7 @@ type GCMPusher struct { InvalidTokenHandlers []interfaces.InvalidTokenHandler IsProduction bool Logger *logrus.Logger - MessageHandler interfaces.MessageHandler + MessageHandler map[string]interfaces.MessageHandler Queue interfaces.Queue run bool senderID string @@ -89,6 +89,9 @@ func (g *GCMPusher) loadConfigurationDefaults() { func (g *GCMPusher) configure(client interfaces.GCMClient, db interfaces.DB, statsdClientOrNil interfaces.StatsDClient) error { var err error + l := g.Logger.WithFields(logrus.Fields{ + "method": "configure", + }) g.loadConfigurationDefaults() g.GracefulShutdownTimeout = g.Config.GetInt("gracefulShutdownTimeout") if err = g.configureStatsReporters(statsdClientOrNil); err != nil { @@ -109,22 +112,34 @@ func (g *GCMPusher) configure(client interfaces.GCMClient, db interfaces.DB, sta return err } g.Queue = q - handler, err := extensions.NewGCMMessageHandler( - g.senderID, - g.apiKey, - g.IsProduction, - g.Config, - g.Logger, - g.Queue.PendingMessagesWaitGroup(), - g.StatsReporters, - g.feedbackReporters, - g.InvalidTokenHandlers, - client, - ) + g.MessageHandler = make(map[string]interfaces.MessageHandler) + for k := range g.Config.GetStringMap("gcm.games") { + senderID := g.Config.GetString("gcm.certs." + k + ".senderID") + apiKey := g.Config.GetString("gcm.certs." + k + ".apiKey") + l.Infof( + "Configuring messageHandler for game %s with senderID %s and apiKey %s", + k, senderID, apiKey, + ) + handler, herr := extensions.NewGCMMessageHandler( + senderID, + apiKey, + g.IsProduction, + g.Config, + g.Logger, + g.Queue.PendingMessagesWaitGroup(), + g.StatsReporters, + g.feedbackReporters, + g.InvalidTokenHandlers, + client, + ) + if herr != nil { + return herr + } + g.MessageHandler[k] = handler + } if err != nil { return err } - g.MessageHandler = handler return nil } @@ -155,6 +170,24 @@ func (g *GCMPusher) configureInvalidTokenHandlers(dbOrNil interfaces.DB) error { return nil } +func (g *GCMPusher) routeMessages(msgChan *chan interfaces.KafkaMessage) { + g.run = true + + for g.run == true { + select { + case message := <-*msgChan: + if handler, ok := g.MessageHandler[message.Game]; ok { + handler.HandleMessages(message) + } else { + g.Logger.WithFields(logrus.Fields{ + "method": "routeMessages", + "game": message.Game, + }).Error("Game not found") + } + } + } +} + // Start starts pusher in apns mode func (g *GCMPusher) Start() { g.run = true @@ -163,14 +196,15 @@ func (g *GCMPusher) Start() { "senderID": g.senderID, }) l.Info("starting pusher in gcm mode...") - go g.MessageHandler.HandleMessages(g.Queue.MessagesChannel()) - go g.MessageHandler.HandleResponses() + for _, v := range g.MessageHandler { + go v.HandleResponses() + go v.LogStats() + msgHandler, _ := v.(*extensions.GCMMessageHandler) + go msgHandler.CleanMetadataCache() + } + go g.Queue.ConsumeLoop() go g.reportGoStats() - go g.MessageHandler.LogStats() - - msgHandler, _ := g.MessageHandler.(*extensions.GCMMessageHandler) - go msgHandler.CleanMetadataCache() sigchan := make(chan os.Signal) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) diff --git a/pusher/gcm_test.go b/pusher/gcm_test.go index b0b758c..81dccd2 100644 --- a/pusher/gcm_test.go +++ b/pusher/gcm_test.go @@ -25,11 +25,10 @@ package pusher import ( "time" - "github.com/sirupsen/logrus/hooks/test" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" - "github.com/topfreegames/pusher/extensions" "github.com/topfreegames/pusher/mocks" "github.com/topfreegames/pusher/util" ) @@ -82,7 +81,7 @@ var _ = Describe("GCM Pusher", func() { Expect(pusher.run).To(BeFalse()) Expect(pusher.senderID).To(Equal(senderID)) Expect(pusher.StatsReporters).To(HaveLen(1)) - Expect(pusher.MessageHandler.(*extensions.GCMMessageHandler).StatsReporters).To(HaveLen(1)) + Expect(pusher.MessageHandler).To(HaveLen(1)) }) }) diff --git a/pusher/handlers_test.go b/pusher/handlers_test.go index c9aa4cb..76a7f2b 100644 --- a/pusher/handlers_test.go +++ b/pusher/handlers_test.go @@ -23,9 +23,9 @@ package pusher import ( - "github.com/sirupsen/logrus/hooks/test" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus/hooks/test" "github.com/spf13/viper" "github.com/topfreegames/pusher/mocks" "github.com/topfreegames/pusher/util" diff --git a/util/version.go b/util/version.go index 2299603..f723b54 100644 --- a/util/version.go +++ b/util/version.go @@ -23,4 +23,4 @@ package util //Version is the current version of pusher -var Version = "2.2.0" +var Version = "3.0.0-rc1"