Skip to content

Commit

Permalink
Remove duplicated code in pusher (#28)
Browse files Browse the repository at this point in the history
* Remove duplicated code in pusher

* Improve log menssage

* Change function description
  • Loading branch information
capella authored and cscatolini committed May 3, 2019
1 parent 6f26447 commit df79801
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 221 deletions.
1 change: 1 addition & 0 deletions interfaces/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ type MessageHandler interface {
HandleMessages(msg KafkaMessage)
HandleResponses()
LogStats()
CleanMetadataCache()
}
117 changes: 7 additions & 110 deletions pusher/apns.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@
package pusher

import (
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand All @@ -38,17 +33,7 @@ import (

// APNSPusher struct for apns pusher
type APNSPusher struct {
CertificatePath string
Config *viper.Viper
feedbackReporters []interfaces.FeedbackReporter
GracefulShutdownTimeout int
IsProduction bool
Logger *logrus.Logger
MessageHandler map[string]interfaces.MessageHandler
Queue interfaces.Queue
run bool
StatsReporters []interfaces.StatsReporter
stopChannel chan struct{}
Pusher
}

// NewAPNSPusher for getting a new APNSPusher instance
Expand All @@ -61,10 +46,12 @@ func NewAPNSPusher(
queueOrNil ...interfaces.APNSPushQueue,
) (*APNSPusher, error) {
a := &APNSPusher{
Config: config,
IsProduction: isProduction,
Logger: logger,
stopChannel: make(chan struct{}),
Pusher: Pusher{
Config: config,
IsProduction: isProduction,
Logger: logger,
stopChannel: make(chan struct{}),
},
}
var queue interfaces.APNSPushQueue
if len(queueOrNil) > 0 {
Expand All @@ -77,10 +64,6 @@ func NewAPNSPusher(
return a, nil
}

func (a *APNSPusher) loadConfigurationDefaults() {
a.Config.SetDefault("gracefulShutdownTimeout", 10)
}

func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB, statsdClientOrNil interfaces.StatsDClient) error {
var err error
l := a.Logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -136,89 +119,3 @@ func (a *APNSPusher) configure(queue interfaces.APNSPushQueue, db interfaces.DB,
}
return nil
}

func (a *APNSPusher) configureFeedbackReporters() error {
reporters, err := configureFeedbackReporters(a.Config, a.Logger)
if err != nil {
return err
}
a.feedbackReporters = reporters
return nil
}

func (a *APNSPusher) configureStatsReporters(clientOrNil interfaces.StatsDClient) error {
reporters, err := configureStatsReporters(a.Config, a.Logger, clientOrNil)
if err != nil {
return err
}
a.StatsReporters = reporters
return nil
}

func (a *APNSPusher) routeMessages(msgChan *chan interfaces.KafkaMessage) {
for a.run == true {
select {
case message := <-*msgChan:
if handler, ok := a.MessageHandler[message.Game]; ok {
handler.HandleMessages(message)
} else {
a.Logger.WithFields(logrus.Fields{
"method": "routeMessages",
"game": message.Game,
}).Error("Game not found")
}
}
}
}

// Start starts pusher in apns mode
func (a *APNSPusher) Start() {
a.run = true
l := a.Logger.WithFields(logrus.Fields{
"method": "start",
"certificatePath": a.CertificatePath,
})
l.Info("starting pusher in apns mode...")
go a.routeMessages(a.Queue.MessagesChannel())
for _, v := range a.MessageHandler {
go v.HandleResponses()
go v.LogStats()
msgHandler, _ := v.(*extensions.APNSMessageHandler)
go msgHandler.CleanMetadataCache()
}
go a.Queue.ConsumeLoop()
go a.reportGoStats()

sigchan := make(chan os.Signal)
signal.Notify(sigchan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

for a.run == true {
select {
case sig := <-sigchan:
l.Warnf("caught signal %v: terminating\n", sig)
a.run = false
case <-a.stopChannel:
l.Warn("Stop channel closed\n")
a.run = false
}
}
a.Queue.StopConsuming()
GracefulShutdown(a.Queue.PendingMessagesWaitGroup(), time.Duration(a.GracefulShutdownTimeout)*time.Second)
}

func (a *APNSPusher) reportGoStats() {
for {
num := runtime.NumGoroutine()
m := &runtime.MemStats{}
runtime.ReadMemStats(m)
gcTime := m.PauseNs[(m.NumGC+255)%256]
for _, statsReporter := range a.StatsReporters {
statsReporter.ReportGoStats(
num,
m.Alloc, m.HeapObjects, m.NextGC,
gcTime,
)
}
time.Sleep(30 * time.Second)
}
}
118 changes: 7 additions & 111 deletions pusher/gcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@
package pusher

import (
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand All @@ -38,16 +33,7 @@ import (

// GCMPusher struct for GCM pusher
type GCMPusher struct {
Config *viper.Viper
feedbackReporters []interfaces.FeedbackReporter
GracefulShutdownTimeout int
IsProduction bool
Logger *logrus.Logger
MessageHandler map[string]interfaces.MessageHandler
Queue interfaces.Queue
run bool
StatsReporters []interfaces.StatsReporter
stopChannel chan struct{}
Pusher
}

// NewGCMPusher for getting a new GCMPusher instance
Expand All @@ -60,10 +46,12 @@ func NewGCMPusher(
clientOrNil ...interfaces.GCMClient,
) (*GCMPusher, error) {
g := &GCMPusher{
Config: config,
IsProduction: isProduction,
Logger: logger,
stopChannel: make(chan struct{}),
Pusher: Pusher{
Config: config,
IsProduction: isProduction,
Logger: logger,
stopChannel: make(chan struct{}),
},
}
var client interfaces.GCMClient
if len(clientOrNil) > 0 {
Expand All @@ -76,11 +64,6 @@ func NewGCMPusher(
return g, nil
}

func (g *GCMPusher) loadConfigurationDefaults() {
g.Config.SetDefault("gracefulShutdownTimeout", 10)
g.Config.SetDefault("stats.reporters", []string{})
}

func (g *GCMPusher) configure(client interfaces.GCMClient, db interfaces.DB, statsdClientOrNil interfaces.StatsDClient) error {
var err error
l := g.Logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -129,90 +112,3 @@ func (g *GCMPusher) configure(client interfaces.GCMClient, db interfaces.DB, sta
}
return nil
}

func (g *GCMPusher) configureStatsReporters(clientOrNil interfaces.StatsDClient) error {
reporters, err := configureStatsReporters(g.Config, g.Logger, clientOrNil)
if err != nil {
return err
}
g.StatsReporters = reporters
return nil
}

func (g *GCMPusher) configureFeedbackReporters() error {
reporters, err := configureFeedbackReporters(g.Config, g.Logger)
if err != nil {
return err
}
g.feedbackReporters = reporters
return nil
}

func (g *GCMPusher) routeMessages(msgChan *chan interfaces.KafkaMessage) {
for g.run == true {
select {
case message := <-*msgChan:
if handler, ok := g.MessageHandler[message.Game]; ok {
handler.HandleMessages(message)
} else {
g.Logger.WithFields(logrus.Fields{
"method": "routeMessages",
"game": message.Game,
}).Error("Game not found")
}
}
}
}

// Start starts pusher in apns mode
func (g *GCMPusher) Start() {
g.run = true
l := g.Logger.WithFields(logrus.Fields{
"method": "start",
})
l.Info("starting pusher in gcm mode...")
go g.routeMessages(g.Queue.MessagesChannel())
for _, v := range g.MessageHandler {
go v.HandleResponses()
go v.LogStats()
msgHandler, _ := v.(*extensions.GCMMessageHandler)
go msgHandler.CleanMetadataCache()
}

go g.Queue.ConsumeLoop()
go g.reportGoStats()

sigchan := make(chan os.Signal)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

for g.run == true {
select {
case sig := <-sigchan:
l.Warnf("caught signal %v: terminating\n", sig)
g.run = false
case <-g.stopChannel:
l.Warn("Stop channel closed\n")
g.run = false
}
}
g.Queue.StopConsuming()
GracefulShutdown(g.Queue.PendingMessagesWaitGroup(), time.Duration(g.GracefulShutdownTimeout)*time.Second)
}

func (g *GCMPusher) reportGoStats() {
for {
num := runtime.NumGoroutine()
m := &runtime.MemStats{}
runtime.ReadMemStats(m)
gcTime := m.PauseNs[(m.NumGC+255)%256]
for _, statsReporter := range g.StatsReporters {
statsReporter.ReportGoStats(
num,
m.Alloc, m.HeapObjects, m.NextGC,
gcTime,
)
}

time.Sleep(30 * time.Second)
}
}

0 comments on commit df79801

Please sign in to comment.