Skip to content

Commit

Permalink
Feedback Listener Service (#22)
Browse files Browse the repository at this point in the history
* creating feedback-listener

* implement bsm consumer group

* template for Broker and Handler

* create and test broker

* invalidToken creation and test

* token delete logic and test

* invalid token handler comments and tests

* change configuration parameters name

* add librdk kafka consumer

* add integration tests with gcm feedback messages

* add apns integration tests

* add listener command

* code improvements

* update gopkg file

* restore python bench script

* update config file

* code improvements

* update invalid token unit tests

* create QueueMessage interface to hide kafka message internals

* remove unnecessary pkg/errors package and increase integration test eventually time

* no more channel pointers; stop ticker to avoid cpu leak

* turn the broker and the handler into synchronous components

* better handle handler ticker

* better defer function

* make kafka producers async in integration tests; send nil kafka message to create topics only once; use different consumer groups per test
  • Loading branch information
mbotarro authored and cscatolini committed Feb 19, 2019
1 parent f5f382c commit 495fb5f
Show file tree
Hide file tree
Showing 16 changed files with 2,683 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ test-integration integration func: deps test-db-drop test-db-create
@echo "= Running integration tests... ="
@echo "-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-="
@echo
@env MY_IP=${MY_IP} ginkgo -r --randomizeAllSpecs --randomizeSuites --skip="\[Integration\].*" .
@env MY_IP=${MY_IP} ginkgo -r -tags=integration --randomizeAllSpecs --randomizeSuites --skip="\[Integration\].*" .
@echo
@echo "-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-="
@echo "= Integration tests finished. ="
Expand Down
83 changes: 83 additions & 0 deletions cmd/feedback_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright (c) 2019 TFG Co <backend@tfgco.com>
* Author: TFG Co <backend@tfgco.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package cmd

import (
raven "github.com/getsentry/raven-go"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/feedback"
"github.com/topfreegames/pusher/util"
)

func newFeedbackListener(
debug, json bool,
config *viper.Viper,
) (*feedback.Listener, error) {
log := logrus.New()
if json {
log.Formatter = new(logrus.JSONFormatter)
}
if debug {
log.Level = logrus.DebugLevel
} else {
log.Level = logrus.InfoLevel
}

return feedback.NewListener(config, log)
}

// starFeedbackListenerCmd represents the start-feedback-listener command
var startFeedbackListenerCmd = &cobra.Command{
Use: "start-feedback-listener",
Short: "starts the feedback listener",
Long: `starts the feedback listener that will read from kafka topics,
process the messages and route them to a convenient handler`,
Run: func(cmd *cobra.Command, args []string) {
config, err := util.NewViperWithConfigFile(cfgFile)
if err != nil {
panic(err)
}

sentryURL := config.GetString("sentry.url")
if sentryURL != "" {
raven.SetDSN(sentryURL)
}

listener, err := newFeedbackListener(debug, json, config)
if err != nil {
raven.CaptureErrorAndWait(err, map[string]string{
"version": util.Version,
"cmd": "apns",
})
panic(err)
}

listener.Start()
},
}

func init() {
RootCmd.AddCommand(startFeedbackListenerCmd)
}
29 changes: 29 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,32 @@ invalidToken:
maxRetries: 3
database: push
connectionTimeout: 100
feedbackListeners:
queue:
topics:
- "^push-[^-_]+-(apns|gcm)-feedbacks"
brokers: "localhost:9941"
group: testGroup
sessionTimeout: 6000
fetch.min.bytes: 1
fetch.wait.max.ms: 100
offsetResetStrategy: latest
handleAllMessagesBeforeExiting: true
broker:
invalidTokenChan:
size: 999
invalidToken:
flush:
time:
ms: 2000
buffer:
size: 99
pg:
host: localhost
port: 8585
user: pusher_user
pass: ""
poolSize: 20
maxRetries: 3
database: push
connectionTimeout: 100
30 changes: 30 additions & 0 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,33 @@ invalidToken:
maxRetries: 3
database: push
connectionTimeout: 100
feedbackListeners:
queue:
topics:
- "^push-[^-_]+-(apns|gcm)-feedbacks"
brokers: "localhost:9941"
group: testGroup
sessionTimeout: 6000
fetch.min.bytes: 1
fetch.wait.max.ms: 100
offsetResetStrategy: latest
handleAllMessagesBeforeExiting: true
broker:
invalidTokenChan:
size: 999
invalidToken:
flush:
time:
ms: 2000
buffer:
size: 99
pg:
host: localhost
port: 8585
user: pusher_user
pass: ""
poolSize: 20
maxRetries: 3
database: push
connectionTimeout: 100

5 changes: 5 additions & 0 deletions extensions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func getGameAndPlatformFromTopic(topic string) ParsedTopic {
}
}

// GetGameAndPlatformFromTopic returns the game and plaform specified in the Kafka topic
func GetGameAndPlatformFromTopic(topic string) ParsedTopic {
return getGameAndPlatformFromTopic(topic)
}

func sendToFeedbackReporters(feedbackReporters []interfaces.FeedbackReporter, res interface{}, topic ParsedTopic) error {
jres, err := json.Marshal(res)
if err != nil {
Expand Down
180 changes: 180 additions & 0 deletions feedback/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright (c) 2019 TFG Co <backend@tfgco.com>
* Author: TFG Co <backend@tfgco.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package feedback

import (
"encoding/json"
"sync"

gcm "github.com/topfreegames/go-gcm"
"github.com/topfreegames/pusher/structs"

"github.com/sideshow/apns2"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

// Message is a struct that will decode an apns or gcm feedback message
type Message struct {
From string `json:"from"`
MessageID string `json:"message_id"`
MessageType string `json:"message_type"`
Error string `json:"error"`
ErrorDescription string `json:"error_description"`
DeviceToken string `json:"DeviceToken"`
ID string `json:"id"`
Err map[string]interface{} `json:"Err"`
Metadata map[string]interface{} `json:"metadata"`
Reason string `json:"reason"`
}

// Broker receives kafka messages in its InChan, unmarshal them according to the
// platform and routes them to the correct out channel after examining their content
type Broker struct {
Logger *log.Logger
Config *viper.Viper
InChan chan QueueMessage
pendingMessagesWG *sync.WaitGroup
InvalidTokenOutChan chan *InvalidToken

run bool
stopChannel chan struct{}
}

// NewBroker creates a new Broker instance
func NewBroker(
logger *log.Logger, cfg *viper.Viper,
inChan chan QueueMessage,
pendingMessagesWG *sync.WaitGroup,
) (*Broker, error) {
b := &Broker{
Logger: logger,
Config: cfg,
InChan: inChan,
pendingMessagesWG: pendingMessagesWG,
stopChannel: make(chan struct{}),
}

b.configure()

return b, nil
}

func (b *Broker) loadConfigurationDefaults() {
b.Config.SetDefault("feedbackListeners.broker.invalidTokenChan.size", 1000)
}

func (b *Broker) configure() {
b.loadConfigurationDefaults()

b.InvalidTokenOutChan = make(chan *InvalidToken, b.Config.GetInt("feedbackListeners.broker.invalidTokenChan.size"))
}

// Start starts a routine to process the Broker in channel
func (b *Broker) Start() {
l := b.Logger.WithField(
"method", "start",
)
l.Info("starting broker")

b.run = true
go b.processMessages()
}

// Stop stops all routines from processing the in channel and closes all output channels
func (b *Broker) Stop() {
b.run = false
close(b.stopChannel)
close(b.InvalidTokenOutChan)
}

func (b *Broker) processMessages() {
l := b.Logger.WithField(
"method`", "processMessages",
)

for b.run == true {
select {
case msg, ok := <-b.InChan:
if ok {
switch msg.GetPlatform() {
case APNSPlatform:
var res structs.ResponseWithMetadata
err := json.Unmarshal(msg.GetValue(), &res)
if err != nil {
l.WithError(err).Error(ErrAPNSUnmarshal.Error())
}
b.routeAPNSMessage(&res, msg.GetGame())

case GCMPlatform:
var res gcm.CCSMessage
err := json.Unmarshal(msg.GetValue(), &res)
if err != nil {
l.WithError(err).Error(ErrGCMUnmarshal.Error())
}
b.routeGCMMessage(&res, msg.GetGame())
}

b.confirmMessage()
}

case <-b.stopChannel:
break
}

}

l.Info("stop processing Broker's in channel")
}

func (b *Broker) routeAPNSMessage(msg *structs.ResponseWithMetadata, game string) {
switch msg.Reason {
case apns2.ReasonBadDeviceToken, apns2.ReasonUnregistered, apns2.ReasonTopicDisallowed, apns2.ReasonDeviceTokenNotForTopic:
tk := &InvalidToken{
Token: msg.DeviceToken,
Game: game,
Platform: APNSPlatform,
}

b.InvalidTokenOutChan <- tk
}
}

func (b *Broker) routeGCMMessage(msg *gcm.CCSMessage, game string) {
switch msg.Error {
case "DEVICE_UNREGISTERED", "BAD_REGISTRATION":
tk := &InvalidToken{
Token: msg.From,
Game: game,
Platform: GCMPlatform,
}

b.InvalidTokenOutChan <- tk
}
}

func (b *Broker) confirmMessage() {
if b.pendingMessagesWG != nil {
b.pendingMessagesWG.Done()
}
}

0 comments on commit 495fb5f

Please sign in to comment.