Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/smancke/guble into testin…
Browse files Browse the repository at this point in the history
…g/guble-#79-improve-code-coverage-for-store-package
  • Loading branch information
Marian Craciunescu committed May 31, 2016
2 parents e4f47b4 + 60d1c2d commit 2058912
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 102 deletions.
91 changes: 45 additions & 46 deletions gcm/gcm_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,59 @@ import (
"net/http"
"strings"
"sync"
//"runtime"
"runtime"
)

// GCM_REGISTRATIONS_SCHEMA is the default sqlite schema for gcm
const GCM_REGISTRATIONS_SCHEMA = "gcm_registration"
// REGISTRATIONS_SCHEMA is the default sqlite schema for GCM
const REGISTRATIONS_SCHEMA = "gcm_registration"
const MESSAGE_RETRIES = 5
const BROADCAST_RETRIES = 3

// GCMConnector is the structure for handling the communication with Google Cloud Messaging
type GCMConnector struct {
router server.Router
kvStore store.KVStore
prefix string
channelFromRouter chan server.MsgAndRoute
routerC chan server.MsgAndRoute
closeRouteByRouter chan server.Route
stopChan chan bool
stopC chan bool
sender *gcm.Sender
workersNumber int
nWorkers int
waitGroup sync.WaitGroup
}

// NewGCMConnector creates a new gcmConnector without starting it
func NewGCMConnector(router server.Router, prefix string, gcmAPIKey string) (*GCMConnector, error) {
// NewGCMConnector creates a new GCMConnector without starting it
func NewGCMConnector(router server.Router, prefix string, gcmAPIKey string, nWorkers int) (*GCMConnector, error) {

kvStore, err := router.KVStore()
if err != nil {
return nil, err
}

//TODO Cosmin: check with dev-team the number of GCM workers, below
//TODO Cosmin: check with dev-team the default number of GCM workers, below
gcm := &GCMConnector{
router: router,
kvStore: kvStore,
prefix: prefix,
channelFromRouter: make(chan server.MsgAndRoute, 1000),
stopChan: make(chan bool, 1),
sender: &gcm.Sender{ApiKey: gcmAPIKey},
workersNumber: runtime.GOMAXPROCS(0),
router: router,
kvStore: kvStore,
prefix: prefix,
routerC: make(chan server.MsgAndRoute, 1000),
stopC: make(chan bool, 1),
sender: &gcm.Sender{ApiKey: gcmAPIKey},
nWorkers: nWorkers,
}

return gcm, nil
}

// Start opens the connector, start more goroutines / workers to handle messages coming from the router
// Start opens the connector, creates more goroutines / workers to handle messages coming from the router
func (conn *GCMConnector) Start() error {
broadcastRoute := server.NewRoute(removeTrailingSlash(conn.prefix)+"/broadcast", conn.channelFromRouter, "gcm_connector", "gcm_connector")
broadcastRoute := server.NewRoute(removeTrailingSlash(conn.prefix)+"/broadcast", conn.routerC, "gcm_connector", "gcm_connector")
conn.router.Subscribe(broadcastRoute)
go func() {
//TODO Cosmin: should loadSubscriptions() be taken out of this goroutine, and executed before ?
// even if startup-time is longer, the routes are guaranteed to be there right after Start() returns
// (even if startup-time is longer, the routes are guaranteed to be there right after Start() returns)
conn.loadSubscriptions()

protocol.Debug("number of GCM workers: %v", conn.workersNumber)
for i := 1; i <= conn.workersNumber; i++ {
go conn.loopSendOrBroadcastMessage(i)
for id := 1; id <= conn.nWorkers; id++ {
go conn.loopSendOrBroadcastMessage(id)
}
}()
return nil
Expand All @@ -74,7 +73,7 @@ func (conn *GCMConnector) Start() error {
// Stop signals the closing of GCMConnector
func (conn *GCMConnector) Stop() error {
protocol.Debug("GCM Stop()")
close(conn.stopChan)
close(conn.stopC)
conn.waitGroup.Wait()
return nil
}
Expand All @@ -86,22 +85,22 @@ func (conn *GCMConnector) Check() error {

// loopSendOrBroadcastMessage awaits in a loop for messages from router to be forwarded to GCM,
// until the stop-channel is closed
func (conn *GCMConnector) loopSendOrBroadcastMessage(i int) {
func (conn *GCMConnector) loopSendOrBroadcastMessage(id int) {
defer conn.waitGroup.Done()
conn.waitGroup.Add(1)
protocol.Debug("starting GCM worker %v", i)
protocol.Debug("starting GCM worker %v", id)
for {
select {
case msg, opened := <-conn.channelFromRouter:
case msg, opened := <-conn.routerC:
if opened {
if string(msg.Message.Path) == removeTrailingSlash(conn.prefix) + "/broadcast" {
if string(msg.Message.Path) == removeTrailingSlash(conn.prefix)+"/broadcast" {
go conn.broadcastMessage(msg)
} else {
go conn.sendMessage(msg)
}
}
case <-conn.stopChan:
protocol.Debug("stopping GCM worker %v", i)
case <-conn.stopC:
protocol.Debug("stopping GCM worker %v", id)
return
}
}
Expand All @@ -114,7 +113,7 @@ func (conn *GCMConnector) sendMessage(msg server.MsgAndRoute) {

var messageToGcm = gcm.NewMessage(payload, gcmID)
protocol.Info("sending message to %v ...", gcmID)
result, err := conn.sender.Send(messageToGcm, 5)
result, err := conn.sender.Send(messageToGcm, MESSAGE_RETRIES)
if err != nil {
protocol.Err("error sending message to GCM gcmID=%v: %v", gcmID, err.Error())
return
Expand All @@ -141,33 +140,33 @@ func (conn *GCMConnector) parseMessageToMap(msg *protocol.Message) map[string]in
} else {
payload["message"] = msg.BodyAsString()
}
protocol.Debug("parsed message is: %v", payload)
protocol.Debug("gcm: parsed message is: %v", payload)
return payload
}

func (conn *GCMConnector) broadcastMessage(msg server.MsgAndRoute) {
topic := msg.Message.Path
payload := conn.parseMessageToMap(msg.Message)
protocol.Info("broadcasting message with topic %v ...", string(topic))
protocol.Info("gcm: broadcasting message with topic %v ...", string(topic))

subscriptions := conn.kvStore.Iterate(GCM_REGISTRATIONS_SCHEMA, "")
subscriptions := conn.kvStore.Iterate(REGISTRATIONS_SCHEMA, "")
count := 0
for {
select {
case entry, ok := <-subscriptions:
if !ok {
protocol.Info("send message to %v receivers", count)
protocol.Info("gcm: sent message to %v receivers", count)
return
}
gmcID := entry[0]
gcmID := entry[0]
//TODO collect 1000 gcmIds and send them in one request!
broadcastMessage := gcm.NewMessage(payload, gmcID)
broadcastMessage := gcm.NewMessage(payload, gcmID)
go func() {
//TODO error handling of response!
_, err := conn.sender.Send(broadcastMessage, 3)
protocol.Debug("sent broadcast message to gcmID=%v", gmcID)
_, err := conn.sender.Send(broadcastMessage, BROADCAST_RETRIES)
protocol.Debug("gcm: sent broadcast message to gcmID=%v", gcmID)
if err != nil {
protocol.Err("error sending broadcast message to gcmID=%v: %v", gmcID, err.Error())
protocol.Err("gcm: error sending broadcast message to gcmID=%v: %v", gcmID, err.Error())
}
}()
count++
Expand All @@ -180,7 +179,7 @@ func (conn *GCMConnector) replaceSubscriptionWithCanonicalID(route *server.Route
topic := string(route.Path)
userID := route.UserID

protocol.Info("replacing old gcmID %v with canonicalId %v", oldGcmID, newGcmID)
protocol.Info("gcm: replacing old gcmID %v with canonicalId %v", oldGcmID, newGcmID)

conn.removeSubscription(route, oldGcmID)
conn.subscribe(topic, userID, newGcmID)
Expand Down Expand Up @@ -250,23 +249,23 @@ func (conn *GCMConnector) parseParams(path string) (userID, gcmID, topic string,
func (conn *GCMConnector) subscribe(topic string, userID string, gcmID string) {
protocol.Info("GCM connector registration to userID=%q, gcmID=%q: %q", userID, gcmID, topic)

route := server.NewRoute(topic, conn.channelFromRouter, gcmID, userID)
route := server.NewRoute(topic, conn.routerC, gcmID, userID)

conn.router.Subscribe(route)
conn.saveSubscription(userID, topic, gcmID)
}

func (conn *GCMConnector) removeSubscription(route *server.Route, gcmID string) {
conn.router.Unsubscribe(route)
conn.kvStore.Delete(GCM_REGISTRATIONS_SCHEMA, gcmID)
conn.kvStore.Delete(REGISTRATIONS_SCHEMA, gcmID)
}

func (conn *GCMConnector) saveSubscription(userID, topic, gcmID string) {
conn.kvStore.Put(GCM_REGISTRATIONS_SCHEMA, gcmID, []byte(userID+":"+topic))
conn.kvStore.Put(REGISTRATIONS_SCHEMA, gcmID, []byte(userID+":"+topic))
}

func (conn *GCMConnector) loadSubscriptions() {
subscriptions := conn.kvStore.Iterate(GCM_REGISTRATIONS_SCHEMA, "")
subscriptions := conn.kvStore.Iterate(REGISTRATIONS_SCHEMA, "")
count := 0
for {
select {
Expand All @@ -281,7 +280,7 @@ func (conn *GCMConnector) loadSubscriptions() {
topic := splitValue[1]

protocol.Debug("renewing GCM subscription: userID=%v, topic=%v, gcmID=%v", userID, topic, gcmID)
route := server.NewRoute(topic, conn.channelFromRouter, gcmID, userID)
route := server.NewRoute(topic, conn.routerC, gcmID, userID)
conn.router.Subscribe(route)
count++
}
Expand Down
Loading

0 comments on commit 2058912

Please sign in to comment.