Skip to content

Commit

Permalink
add env var to disable routing to invalid token channel; send metric …
Browse files Browse the repository at this point in the history
…every time the listener is restarted (#25)
  • Loading branch information
mbotarro authored and cscatolini committed Feb 20, 2019
1 parent 3d02045 commit de5d642
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 13 deletions.
31 changes: 19 additions & 12 deletions feedback/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Broker struct {
StatsReporters []interfaces.StatsReporter
InChan chan QueueMessage
pendingMessagesWG *sync.WaitGroup
InvalidTokenEnabled bool
InvalidTokenOutChan chan *InvalidToken

run bool
Expand Down Expand Up @@ -85,11 +86,13 @@ func NewBroker(

func (b *Broker) loadConfigurationDefaults() {
b.Config.SetDefault("feedbackListeners.broker.invalidTokenChan.size", 1000)
b.Config.SetDefault("feedbackListeners.broker.invalidTokenEnabled", true)
}

func (b *Broker) configure() {
b.loadConfigurationDefaults()

b.InvalidTokenEnabled = b.Config.GetBool("feedbackListeners.broker.invalidTokenEnabled")
b.InvalidTokenOutChan = make(chan *InvalidToken, b.Config.GetInt("feedbackListeners.broker.invalidTokenChan.size"))
}

Expand Down Expand Up @@ -153,26 +156,30 @@ func (b *Broker) processMessages() {
func (b *Broker) routeAPNSMessage(msg *structs.ResponseWithMetadata, game string) {
switch msg.Reason {
case apns2.ReasonBadDeviceToken, apns2.ReasonUnregistered, apns2.ReasonTopicDisallowed, apns2.ReasonDeviceTokenNotForTopic:
tk := &InvalidToken{
Token: msg.DeviceToken,
Game: game,
Platform: APNSPlatform,
}
if b.InvalidTokenEnabled {
tk := &InvalidToken{
Token: msg.DeviceToken,
Game: game,
Platform: APNSPlatform,
}

b.InvalidTokenOutChan <- tk
b.InvalidTokenOutChan <- tk
}
}
}

func (b *Broker) routeGCMMessage(msg *gcm.CCSMessage, game string) {
switch msg.Error {
case "DEVICE_UNREGISTERED", "BAD_REGISTRATION":
tk := &InvalidToken{
Token: msg.From,
Game: game,
Platform: GCMPlatform,
}
if b.InvalidTokenEnabled {
tk := &InvalidToken{
Token: msg.From,
Game: game,
Platform: GCMPlatform,
}

b.InvalidTokenOutChan <- tk
b.InvalidTokenOutChan <- tk
}
}
}

Expand Down
42 changes: 41 additions & 1 deletion feedback/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var _ = Describe("Broker", func() {
inChan = make(chan QueueMessage, 100)
})

Describe("[Unit]", func() {
Describe("[Unit]Broker", func() {
It("Should start and stop correctly", func() {
broker, err := NewBroker(logger, config, nil, inChan, nil)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -114,6 +114,26 @@ var _ = Describe("Broker", func() {
Expect(len(broker.InChan)).To(Equal(0))
Expect(len(broker.InvalidTokenOutChan)).To(Equal(0))
})

It("Should not route if invalid token is disabled", func() {
config.Set("feedbackListeners.broker.invalidTokenEnabled", false)

broker, err := NewBroker(logger, config, nil, inChan, nil)
Expect(err).NotTo(HaveOccurred())

broker.Start()
inChan <- kafkaMsg

Eventually(func() int {
return len(broker.InChan)
}).Should(Equal(0))

Consistently(func() int {
return len(broker.InvalidTokenOutChan)
}).Should(Equal(0))

broker.Stop()
})
})
})

Expand Down Expand Up @@ -159,6 +179,26 @@ var _ = Describe("Broker", func() {
Expect(len(broker.InChan)).To(Equal(0))
Expect(len(broker.InvalidTokenOutChan)).To(Equal(0))
})

It("Should not route if invalid token is disabled", func() {
config.Set("feedbackListeners.broker.invalidTokenEnabled", false)

broker, err := NewBroker(logger, config, nil, inChan, nil)
Expect(err).NotTo(HaveOccurred())

broker.Start()
inChan <- kafkaMsg

Eventually(func() int {
return len(broker.InChan)
}).Should(Equal(0))

Consistently(func() int {
return len(broker.InvalidTokenOutChan)
}).Should(Equal(0))

broker.Stop()
})
})
})
})
Expand Down
3 changes: 3 additions & 0 deletions feedback/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (l *Listener) Start() {
l.Broker.Start()
l.InvalidTokenHandler.Start()

statsReporterReportMetricCount(l.StatsReporters,
"feedback_listener_restart", 1, "", "")

sigchan := make(chan os.Signal)
signal.Notify(sigchan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

Expand Down

0 comments on commit de5d642

Please sign in to comment.