Skip to content

Commit

Permalink
Starting graceful shutdown on StopConsuming
Browse files Browse the repository at this point in the history
  • Loading branch information
Guilherme Souza committed Aug 24, 2017
1 parent ff3e718 commit 189aa5e
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 12 deletions.
8 changes: 7 additions & 1 deletion extensions/apns_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,13 @@ var _ = Describe("APNS Message Handler", func() {

Describe("Handle Messages", func() {
It("should start without panicking and set run to true", func() {
queue, err := NewKafkaConsumer(handler.Config, logger, mockKafkaConsumerClient)
stopChannel := make(chan struct{})
queue, err := NewKafkaConsumer(
handler.Config,
logger,
&stopChannel,
mockKafkaConsumerClient,
)
Expect(err).NotTo(HaveOccurred())
Expect(func() { go handler.HandleMessages(queue.MessagesChannel()) }).ShouldNot(Panic())
time.Sleep(50 * time.Millisecond)
Expand Down
6 changes: 5 additions & 1 deletion extensions/gcm_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,11 @@ var _ = Describe("GCM Message Handler", func() {

Describe("Handle Messages", func() {
It("should start without panicking and set run to true", func() {
queue, err := NewKafkaConsumer(handler.Config, logger, mockKafkaConsumerClient)
stopChannel := make(chan struct{})
queue, err := NewKafkaConsumer(
handler.Config, logger,
&stopChannel, mockKafkaConsumerClient,
)
Expect(err).NotTo(HaveOccurred())
Expect(func() { go handler.HandleMessages(queue.MessagesChannel()) }).ShouldNot(Panic())
time.Sleep(time.Millisecond)
Expand Down
6 changes: 5 additions & 1 deletion extensions/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"fmt"
"sync"

"github.com/sirupsen/logrus"
"github.com/confluentinc/confluent-kafka-go/kafka"
raven "github.com/getsentry/raven-go"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/interfaces"
"github.com/topfreegames/pusher/util"
Expand All @@ -50,19 +50,22 @@ type KafkaConsumer struct {
Topics []string
pendingMessagesWG *sync.WaitGroup
HandleAllMessagesBeforeExiting bool
stopChannel chan struct{}
}

// NewKafkaConsumer for creating a new KafkaConsumer instance
func NewKafkaConsumer(
config *viper.Viper,
logger *logrus.Logger,
stopChannel *chan struct{},
clientOrNil ...interfaces.KafkaConsumerClient,
) (*KafkaConsumer, error) {
q := &KafkaConsumer{
Config: config,
Logger: logger,
messagesReceived: 0,
pendingMessagesWG: nil,
stopChannel: *stopChannel,
}
var client interfaces.KafkaConsumerClient
if len(clientOrNil) == 1 {
Expand Down Expand Up @@ -204,6 +207,7 @@ func (q *KafkaConsumer) ConsumeLoop() error {
case kafka.Error:
q.handleError(ev)
q.StopConsuming()
close(q.stopChannel)
return e
default:
q.handleUnrecognized(e)
Expand Down
21 changes: 15 additions & 6 deletions extensions/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/confluentinc/confluent-kafka-go/kafka"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/spf13/viper"
"github.com/topfreegames/pusher/mocks"
. "github.com/topfreegames/pusher/testing"
Expand Down Expand Up @@ -75,7 +75,11 @@ var _ = Describe("Kafka Extension", func() {
config.Set("queue.handleAllMessagesBeforeExiting", true)

var err error
consumer, err = NewKafkaConsumer(config, logger, kafkaConsumerClientMock)
stopChannel := make(chan struct{})
consumer, err = NewKafkaConsumer(
config, logger,
&stopChannel, kafkaConsumerClientMock,
)
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -226,6 +230,8 @@ var _ = Describe("Kafka Extension", func() {
publishEvent(event)

Eventually(consumer.run, 5).Should(BeFalse())
_, ok := (<-consumer.stopChannel)
Expect(ok).Should(BeFalse())
Expect(hook.Entries).To(ContainLogMessage("Error in Kafka connection."))
})

Expand All @@ -243,7 +249,8 @@ var _ = Describe("Kafka Extension", func() {
Describe("Configuration Defaults", func() {
It("should configure defaults", func() {
cnf := viper.New()
cons, err := NewKafkaConsumer(cnf, logger, kafkaConsumerClientMock)
stopChannel := make(chan struct{})
cons, err := NewKafkaConsumer(cnf, logger, &stopChannel, kafkaConsumerClientMock)
Expect(err).NotTo(HaveOccurred())
cons.loadConfigurationDefaults()

Expand Down Expand Up @@ -297,7 +304,8 @@ var _ = Describe("Kafka Extension", func() {

Describe("Creating new client", func() {
It("should return connected client", func() {
client, err := NewKafkaConsumer(config, logger)
stopChannel := make(chan struct{})
client, err := NewKafkaConsumer(config, logger, &stopChannel)
Expect(err).NotTo(HaveOccurred())

Expect(client.Brokers).NotTo(HaveLen(0))
Expand All @@ -309,7 +317,8 @@ var _ = Describe("Kafka Extension", func() {

Describe("ConsumeLoop", func() {
It("should consume message and add it to msgChan", func() {
client, err := NewKafkaConsumer(config, logger)
stopChannel := make(chan struct{})
client, err := NewKafkaConsumer(config, logger, &stopChannel)
Expect(err).NotTo(HaveOccurred())
Expect(client).NotTo(BeNil())
defer client.StopConsuming()
Expand Down
2 changes: 1 addition & 1 deletion make/setup.mk
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ setup:
@/bin/bash -c '[ "`uname -s`" == "Darwin" ] && [ "`which brew`" != "" ] && [ ! -d "/usr/local/Cellar/librdkafka" ] && echo "librdkafka was not found. Installing with brew..." && brew install librdkafka; exit 0'
# Ensuring librdkafka is installed in Debian and Ubuntu
@/bin/bash -c '[ "`uname -s`" == "Linux" ] && [ "`which apt-get`" != "" ] && echo "Ensuring librdkafka is installed..." && ./debian-install-librdkafka.sh; exit 0'
@go get -u github.com/Masterminds/dep/cmd/dep
@go get -u github.com/golang/dep/cmd/dep
@go get -u github.com/onsi/ginkgo/ginkgo
@go get github.com/gordonklaus/ineffassign
@dep ensure
11 changes: 10 additions & 1 deletion pusher/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type APNSPusher struct {
Queue interfaces.Queue
run bool
StatsReporters []interfaces.StatsReporter
stopChannel chan struct{}
}

// NewAPNSPusher for getting a new APNSPusher instance
Expand All @@ -65,6 +66,7 @@ func NewAPNSPusher(
Config: config,
IsProduction: isProduction,
Logger: logger,
stopChannel: make(chan struct{}),
}
var queue interfaces.APNSPushQueue
if len(queueOrNil) > 0 {
Expand Down Expand Up @@ -94,7 +96,11 @@ func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB,
if err = a.configureInvalidTokenHandlers(db); err != nil {
return err
}
q, err := extensions.NewKafkaConsumer(a.Config, a.Logger)
q, err := extensions.NewKafkaConsumer(
a.Config,
a.Logger,
&a.stopChannel,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -169,6 +175,9 @@ func (a *APNSPusher) Start() {
case sig := <-sigchan:
l.Warnf("caught signal %v: terminating\n", sig)
a.run = false
case <-a.stopChannel:
l.Warn("Stop channel closed\n")
a.run = false
}
}
a.Queue.StopConsuming()
Expand Down
11 changes: 10 additions & 1 deletion pusher/gcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type GCMPusher struct {
run bool
senderID string
StatsReporters []interfaces.StatsReporter
stopChannel chan struct{}
}

// NewGCMPusher for getting a new GCMPusher instance
Expand All @@ -68,6 +69,7 @@ func NewGCMPusher(
IsProduction: isProduction,
Logger: logger,
senderID: senderID,
stopChannel: make(chan struct{}),
}
var client interfaces.GCMClient
if len(clientOrNil) > 0 {
Expand Down Expand Up @@ -98,7 +100,11 @@ func (g *GCMPusher) configure(client interfaces.GCMClient, db interfaces.DB, sta
if err = g.configureInvalidTokenHandlers(db); err != nil {
return err
}
q, err := extensions.NewKafkaConsumer(g.Config, g.Logger)
q, err := extensions.NewKafkaConsumer(
g.Config,
g.Logger,
&g.stopChannel,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -174,6 +180,9 @@ func (g *GCMPusher) Start() {
case sig := <-sigchan:
l.Warnf("caught signal %v: terminating\n", sig)
g.run = false
case <-g.stopChannel:
l.Warn("Stop channel closed\n")
g.run = false
}
}
g.Queue.StopConsuming()
Expand Down

0 comments on commit 189aa5e

Please sign in to comment.