Skip to content

Commit

Permalink
Merge pull request #17 from topfreegames/listenin-multiple-queues
Browse files Browse the repository at this point in the history
Listening to multiple queues
  • Loading branch information
guilhermef committed Sep 26, 2017
2 parents 189aa5e + 36bcdcf commit 7f7d7b6
Show file tree
Hide file tree
Showing 33 changed files with 417 additions and 289 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: go
go:
- 1.8
- 1.9
addons:
postgresql: '9.5'
services:
Expand Down
40 changes: 19 additions & 21 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,35 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#

FROM golang:1.8-alpine
FROM golang:1.9-alpine

MAINTAINER TFG Co <backend@tfgco.com>

RUN apk update
RUN apk add make git g++ bash python wget

ENV LIBRDKAFKA_VERSION 0.11.0
RUN wget -O /root/librdkafka-${LIBRDKAFKA_VERSION}.tar.gz https://github.com/edenhill/librdkafka/archive/v${LIBRDKAFKA_VERSION}.tar.gz && \
ENV CPLUS_INCLUDE_PATH /usr/local/include
ENV LIBRARY_PATH /usr/local/lib
ENV LD_LIBRARY_PATH /usr/local/lib

WORKDIR /go/src/github.com/topfreegames/pusher

RUN apk add --no-cache make git g++ bash python wget && \
wget -O /root/librdkafka-${LIBRDKAFKA_VERSION}.tar.gz https://github.com/edenhill/librdkafka/archive/v${LIBRDKAFKA_VERSION}.tar.gz && \
tar -xzf /root/librdkafka-${LIBRDKAFKA_VERSION}.tar.gz -C /root && \
cd /root/librdkafka-${LIBRDKAFKA_VERSION} && \
./configure && make && make install && make clean && ./configure --clean
./configure && make && make install && make clean && ./configure --clean && \
go get -u github.com/golang/dep/cmd/dep && \
mkdir -p /go/src/github.com/topfreegames/pusher

RUN go get -u github.com/golang/dep/cmd/dep

RUN mkdir -p /go/src/github.com/topfreegames/pusher
WORKDIR /go/src/github.com/topfreegames/pusher

ADD . /go/src/github.com/topfreegames/pusher
RUN dep ensure

ENV CPLUS_INCLUDE_PATH /usr/local/include
ENV LIBRARY_PATH /usr/local/lib
ENV LD_LIBRARY_PATH /usr/local/lib
RUN export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH && make build

RUN mkdir /app
RUN mv /go/src/github.com/topfreegames/pusher/bin/pusher /app/pusher
RUN mv /go/src/github.com/topfreegames/pusher/config /app/config
RUN mv /go/src/github.com/topfreegames/pusher/tls /app/tls
RUN rm -r /go/src/github.com/topfreegames/pusher
RUN dep ensure && \
export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH && make build && \
mkdir /app && \
mv /go/src/github.com/topfreegames/pusher/bin/pusher /app/pusher && \
mv /go/src/github.com/topfreegames/pusher/config /app/config && \
mv /go/src/github.com/topfreegames/pusher/tls /app/tls && \
rm -r /go/src/github.com/topfreegames/pusher

WORKDIR /app

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Pusher
======

### Dependencies
* Go 1.7
* Go 1.9
* Kafka >= 0.9.0
* [librdkafka](https://github.com/edenhill/librdkafka)

Expand Down
21 changes: 4 additions & 17 deletions cmd/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
package cmd

import (
"fmt"

"github.com/sirupsen/logrus"
raven "github.com/getsentry/raven-go"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/interfaces"
Expand All @@ -35,11 +33,9 @@ import (
)

var app string
var certificate string

func startApns(
debug, json, production bool,
certificate string,
config *viper.Viper,
statsdClientOrNil interfaces.StatsDClient,
dbOrNil interfaces.DB,
Expand All @@ -54,16 +50,7 @@ func startApns(
} else {
log.Level = logrus.InfoLevel
}
l := log.WithFields(logrus.Fields{
"method": "apnsCmd",
"debug": debug,
})
if len(certificate) == 0 {
err := fmt.Errorf("pem certificate must be set")
l.Error(err)
return nil, err
}
return pusher.NewAPNSPusher(certificate, production, config, log, statsdClientOrNil, dbOrNil, queueOrNil)
return pusher.NewAPNSPusher(production, config, log, statsdClientOrNil, dbOrNil, queueOrNil)
}

// apnsCmd represents the apns command
Expand All @@ -82,7 +69,7 @@ var apnsCmd = &cobra.Command{
raven.SetDSN(sentryURL)
}

apnsPusher, err := startApns(debug, json, production, certificate, config, nil, nil, nil)
apnsPusher, err := startApns(debug, json, production, config, nil, nil, nil)
if err != nil {
raven.CaptureErrorAndWait(err, map[string]string{
"version": util.Version,
Expand All @@ -95,6 +82,6 @@ var apnsCmd = &cobra.Command{
}

func init() {
apnsCmd.Flags().StringVar(&certificate, "certificate", "", "pem certificate path")
apnsCmd.Flags()
RootCmd.AddCommand(apnsCmd)
}
19 changes: 5 additions & 14 deletions cmd/apns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@ package cmd
import (
"fmt"

"github.com/sirupsen/logrus"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/mocks"
"github.com/topfreegames/pusher/util"
)

var _ = Describe("APNS", func() {
cfg := "../config/test.yaml"
cert := "../tls/self_signed_cert.pem"

var config *viper.Viper
var mockPushQueue *mocks.APNSPushQueueMock
Expand All @@ -53,42 +52,34 @@ var _ = Describe("APNS", func() {

Describe("[Unit]", func() {
It("Should return apnsPusher without errors", func() {
apnsPusher, err := startApns(false, false, false, cert, config, mockStatsDClient, mockDb, mockPushQueue)
apnsPusher, err := startApns(false, false, false, config, mockStatsDClient, mockDb, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(apnsPusher.CertificatePath).To(Equal(cert))
Expect(apnsPusher.Config).NotTo(BeNil())
Expect(apnsPusher.IsProduction).To(BeFalse())
Expect(apnsPusher.Logger.Level).To(Equal(logrus.InfoLevel))
Expect(fmt.Sprintf("%T", apnsPusher.Logger.Formatter)).To(Equal(fmt.Sprintf("%T", &logrus.TextFormatter{})))
})

It("Should set log to json format", func() {
apnsPusher, err := startApns(false, true, false, cert, config, mockStatsDClient, mockDb, mockPushQueue)
apnsPusher, err := startApns(false, true, false, config, mockStatsDClient, mockDb, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(fmt.Sprintf("%T", apnsPusher.Logger.Formatter)).To(Equal(fmt.Sprintf("%T", &logrus.JSONFormatter{})))
})

It("Should set log to debug", func() {
apnsPusher, err := startApns(true, false, false, cert, config, mockStatsDClient, mockDb, mockPushQueue)
apnsPusher, err := startApns(true, false, false, config, mockStatsDClient, mockDb, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(apnsPusher.Logger.Level).To(Equal(logrus.DebugLevel))
})

It("Should set log to production", func() {
apnsPusher, err := startApns(false, false, true, cert, config, mockStatsDClient, mockDb, mockPushQueue)
apnsPusher, err := startApns(false, false, true, config, mockStatsDClient, mockDb, mockPushQueue)
Expect(err).NotTo(HaveOccurred())
Expect(apnsPusher).NotTo(BeNil())
Expect(apnsPusher.IsProduction).To(BeTrue())
})

It("Should return error if certificate is not provided", func() {
apnsPusher, err := startApns(false, true, false, "", config, mockStatsDClient, mockDb, mockPushQueue)
Expect(apnsPusher).To(BeNil())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("pem certificate must be set"))
})
})
})
8 changes: 7 additions & 1 deletion config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ gracefulShutdownTimeout: 30
apns:
concurrentWorkers: 100
logStatsInterval: 10000
games:
game: /certs/cert.pem
gcm:
pingInterval: 30
pingTimeout: 10
maxPendingMessages: 100
logStatsInterval: 10000
games:
game:
apiKey: game-api-key
senderID: "1233456789"
queue:
topics:
- "com.games.test"
- "^push-[^-_]+_(apns|gcm)[_-](single|massive)"
brokers: "localhost:9941"
group: testGroup
sessionTimeout: 6000
Expand Down
9 changes: 7 additions & 2 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ gracefulShutdownTimeout: 10
apns:
concurrentWorkers: 100
logStatsInterval: 750
games:
game: ../tls/self_signed_cert.pem
gcm:
pingInterval: 30
pingTimeout: 10
maxPendingMessages: 3
logStatsInterval: 750
games:
game:
apiKey: game-api-key
senderID: "1233456789"
queue:
topics:
- "com.games.test"
- "^push-[^-_]+_(apns|gcm)[_-](single|massive)"
brokers: "localhost:9941"
group: testGroup
sessionTimeout: 6000
Expand All @@ -36,7 +42,6 @@ invalidToken:
handlers:
- pg
pg:
table: "test_apns"
host: localhost
port: 8585
user: pusher_user
Expand Down
46 changes: 25 additions & 21 deletions extensions/apns_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,16 @@ func (a *APNSMessageHandler) configureAPNSPushQueue() error {
return nil
}

func (a *APNSMessageHandler) sendMessage(message []byte) error {
func (a *APNSMessageHandler) sendMessage(message interfaces.KafkaMessage) error {
deviceIdentifier := uuid.NewV4().String()
l := a.Logger.WithField("method", "sendMessage")
l.WithField("message", message).Debug("sending message to apns")
h := &push.Headers{
Topic: a.Topic,
ID: deviceIdentifier,
}
n := &Notification{}
json.Unmarshal(message, n)
json.Unmarshal(message.Value, n)
payload, err := json.Marshal(n.Payload)
if err != nil {
l.WithError(err).Error("error marshaling message payload")
Expand All @@ -201,23 +203,25 @@ func (a *APNSMessageHandler) sendMessage(message []byte) error {
}
return nil
}
statsReporterHandleNotificationSent(a.StatsReporters)
statsReporterHandleNotificationSent(a.StatsReporters, message.Game, "apns")
a.PushQueue.Push(n.DeviceToken, h, payload)
if n.Metadata == nil {
n.Metadata = map[string]interface{}{}
}
a.inflightMessagesMetadataLock.Lock()

n.Metadata["timestamp"] = time.Now().Unix()
n.Metadata["game"] = message.Game
n.Metadata["platform"] = "apns"
hostname, err := os.Hostname()
if err != nil {
l.WithError(err).Error("error retrieving hostname")
} else {
n.Metadata["hostname"] = hostname
}
n.Metadata["msgid"] = uuid.NewV4().String()
a.InflightMessagesMetadata[n.DeviceToken] = n.Metadata
a.requestsHeap.AddRequest(n.DeviceToken)
a.InflightMessagesMetadata[deviceIdentifier] = n.Metadata
a.requestsHeap.AddRequest(deviceIdentifier)

a.inflightMessagesMetadataLock.Unlock()
a.sentMessages++
Expand Down Expand Up @@ -252,15 +256,8 @@ func (a *APNSMessageHandler) CleanMetadataCache() {
}

// HandleMessages get messages from msgChan and send to APNS
func (a *APNSMessageHandler) HandleMessages(msgChan *chan []byte) {
a.run = true

for a.run == true {
select {
case message := <-*msgChan:
a.sendMessage(message)
}
}
func (a *APNSMessageHandler) HandleMessages(message interfaces.KafkaMessage) {
a.sendMessage(message)
}

func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
Expand All @@ -278,22 +275,26 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
apnsResMutex.Lock()
a.responsesReceived++
apnsResMutex.Unlock()
parsedTopic := ParsedTopic{}
var err error
responseWithMetadata := &ResponseWithMetadata{
Response: res,
}
a.inflightMessagesMetadataLock.Lock()
if val, ok := a.InflightMessagesMetadata[res.DeviceToken]; ok {
if val, ok := a.InflightMessagesMetadata[res.ID]; ok {
responseWithMetadata.Metadata = val.(map[string]interface{})
responseWithMetadata.Timestamp = responseWithMetadata.Metadata["timestamp"].(int64)
parsedTopic.Game = responseWithMetadata.Metadata["game"].(string)
parsedTopic.Platform = responseWithMetadata.Metadata["platform"].(string)
delete(responseWithMetadata.Metadata, "timestamp")
delete(a.InflightMessagesMetadata, res.DeviceToken)
delete(a.InflightMessagesMetadata, res.ID)
}
a.inflightMessagesMetadataLock.Unlock()

if err != nil {
l.WithError(err).Error("error sending feedback to reporter")
}

if res.Err != nil {
apnsResMutex.Lock()
a.failuresReceived++
Expand All @@ -308,7 +309,7 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
}
reason := pushError.Reason
pErr := errors.NewPushError(a.mapErrorReason(reason), pushError.Error())
statsReporterHandleNotificationFailure(a.StatsReporters, pErr)
statsReporterHandleNotificationFailure(a.StatsReporters, parsedTopic.Game, "apns", pErr)

err = pErr
switch reason {
Expand All @@ -320,7 +321,10 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
if responseWithMetadata.Metadata != nil {
responseWithMetadata.Metadata["deleteToken"] = true
}
handleInvalidToken(a.InvalidTokenHandlers, res.DeviceToken)
handleInvalidToken(
a.InvalidTokenHandlers, res.DeviceToken,
parsedTopic.Game, parsedTopic.Platform,
)
case push.ErrBadCertificate, push.ErrBadCertificateEnvironment, push.ErrForbidden:
l.WithFields(log.Fields{
"category": "CertificateError",
Expand All @@ -343,21 +347,21 @@ func (a *APNSMessageHandler) handleAPNSResponse(res push.Response) error {
}).Debug("received an error")
}
responseWithMetadata.Err = pErr
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata)
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic)
if sendFeedbackErr != nil {
l.WithError(sendFeedbackErr).Error("error sending feedback to reporter")
}
return err
}
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata)
sendFeedbackErr := sendToFeedbackReporters(a.feedbackReporters, responseWithMetadata, parsedTopic)

if sendFeedbackErr != nil {
l.WithError(sendFeedbackErr).Error("error sending feedback to reporter")
}
apnsResMutex.Lock()
a.successesReceived++
apnsResMutex.Unlock()
statsReporterHandleNotificationSuccess(a.StatsReporters)
statsReporterHandleNotificationSuccess(a.StatsReporters, parsedTopic.Game, "apns")
return nil
}

Expand Down

0 comments on commit 7f7d7b6

Please sign in to comment.