Skip to content

Commit

Permalink
clean(er) shutdown in Service: first blocking stop of Router, followe…
Browse files Browse the repository at this point in the history
…d by the rest of the modules in reverse-registration-order (with timeout)

minor refactoring in GCM and router
increasing default timeout for stopping modules= 5 sec
  • Loading branch information
Cosmin Rentea committed Jun 3, 2016
1 parent f8d438d commit e6e5a95
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 54 deletions.
2 changes: 1 addition & 1 deletion gcm/gcm_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (conn *GCMConnector) Start() error {
// (even if startup-time is longer, the routes are guaranteed to be there right after Start() returns)
conn.loadSubscriptions()

conn.wg.Add(conn.nWorkers)
for id := 1; id <= conn.nWorkers; id++ {
go conn.loopSendOrBroadcastMessage(id)
}
Expand Down Expand Up @@ -109,7 +110,6 @@ func (conn *GCMConnector) Check() error {
// until the stop-channel is closed
func (conn *GCMConnector) loopSendOrBroadcastMessage(id int) {
defer conn.wg.Done()
conn.wg.Add(1)
protocol.Debug("gcm: starting worker %v", id)
for {
select {
Expand Down
30 changes: 11 additions & 19 deletions server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,14 @@ type subRequest struct {
}

type router struct {
// mapping the path to the route slice
routes map[protocol.Path][]*Route
routes map[protocol.Path][]*Route // mapping the path to the route slice
handleC chan *protocol.Message
subscribeC chan subRequest
unsubscribeC chan subRequest
stopC chan bool // Channel that signals stop of the router
stopping bool // Flag: the router is in stopping process and no incoming messages are accepted
wg sync.WaitGroup // Add any operation that we need to wait upon here

// Channel that signals stop of the router
stopC chan bool
// marks that the router is in stopping process
// no incoming messages are accepted
stopping bool

// Add any operation that we need to wait upon here
wg sync.WaitGroup

// external 'services'
accessManager auth.AccessManager
messageStore store.MessageStore
kvStore store.KVStore
Expand All @@ -52,12 +44,12 @@ type router struct {
// NewRouter returns a pointer to Router
func NewRouter(accessManager auth.AccessManager, messageStore store.MessageStore, kvStore store.KVStore) Router {
return &router{
routes: make(map[protocol.Path][]*Route),
routes: make(map[protocol.Path][]*Route),

handleC: make(chan *protocol.Message, 500),
subscribeC: make(chan subRequest, 10),
unsubscribeC: make(chan subRequest, 10),

stopC: make(chan bool, 1),
stopC: make(chan bool, 1),

accessManager: accessManager,
messageStore: messageStore,
Expand All @@ -84,7 +76,7 @@ func (router *router) Start() error {

select {
case message := <-router.handleC:
router.handleMessage(message)
router.routeMessage(message)
runtime.Gosched()
case subscriber := <-router.subscribeC:
router.subscribe(subscriber.route)
Expand Down Expand Up @@ -225,8 +217,8 @@ func (router *router) storeMessage(msg *protocol.Message) error {
return nil
}

func (router *router) handleMessage(message *protocol.Message) {
protocol.Debug("router: handleMessage: %v", message.Metadata())
func (router *router) routeMessage(message *protocol.Message) {
protocol.Debug("router: routeMessage: %v", message.Metadata())

for path, list := range router.routes {
if matchesTopic(message.Path, path) {
Expand All @@ -245,8 +237,8 @@ func (router *router) deliverMessage(route *Route, message *protocol.Message) {
// fine, we could send the message
default:
protocol.Warn("router: queue was full, closing delivery for route=%v to applicationID=%v", route.Path, route.ApplicationID)
route.Close()
router.unsubscribe(route)
route.Close()
}
}

Expand Down
80 changes: 46 additions & 34 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

const (
healthEndpointPrefix = "/health"
defaultStopGracePeriod = time.Second * 2
defaultStopGracePeriod = time.Second * 5
defaultHealthCheckFrequency = time.Second * 60
defaultHealthCheckThreshold = 1
)
Expand Down Expand Up @@ -107,37 +107,23 @@ func (s *Service) Stop() error {
stopables = append(stopables, stopable)
}
}
// stopOrder allows the customized stopping of the modules
// (not necessarily in the reverse order of their Registrations)
// stopOrder allows the customized stopping of the modules,
// and not necessarily in the exact reverse order of their Registrations.
// Now, router is first to stop, the rest of the modules are stopped in reverse-registration-order.
stopOrder := make([]int, len(stopables))
for i := 0; i < len(stopables); i++ {
stopOrder[i] = len(stopables) - i - 1
for i := 1; i < len(stopables); i++ {
stopOrder[i] = len(stopables) - i
}
protocol.Debug("service: stopping %d modules, in order: %v", len(stopOrder), stopOrder)

protocol.Debug("service: stopping %d modules with a %v timeout, in this order relative to registration: %v",
len(stopOrder), s.StopGracePeriod, stopOrder)
errors := make(map[string]error)
for _, i := range stopOrder {
name := reflect.TypeOf(stopables[i]).String()
stoppedC := make(chan bool)
errorC := make(chan error)
protocol.Info("service: stopping [%d] %v", i, name)
go func() {
err := stopables[i].Stop()
if err != nil {
errorC <- err
return
}
stoppedC <- true
}()
select {
case err := <-errorC:
protocol.Err("service: error while stopping %v: %v", name, err.Error)
for _, order := range stopOrder {
name := reflect.TypeOf(stopables[order]).String()
protocol.Info("service: stopping [%d] %v", order, name)
err := s.stopModule(stopables[order], name)
if err != nil {
errors[name] = err
case <-stoppedC:
protocol.Info("service: stopped %v", name)
case <-time.After(s.StopGracePeriod):
errors[name] = fmt.Errorf("service: error while stopping %v: did not stop after timeout %v", name, s.StopGracePeriod)
protocol.Err(errors[name].Error())
}
}
if len(errors) > 0 {
Expand All @@ -154,15 +140,41 @@ func (s *Service) WebServer() *webserver.WebServer {
return s.webserver
}

// stop module with a timeout
func stopAsyncTimeout(m Stopable, timeout int) chan error {
func (s *Service) stopModule(stopable Stopable, name string) error {
if _, ok := stopable.(Router); ok {
protocol.Debug("service: %v is a Router and requires a blocking stop", name)
return stopable.Stop()
}
return stopWithTimeout(stopable, name, s.StopGracePeriod)
}

// stopWithTimeout waits for channel to respond with an error, or until time expires - and returns an error.
// If Stopable stopped correctly, it returns nil.
func stopWithTimeout(stopable Stopable, name string, timeout time.Duration) error {
select {
case err, opened := <-stopChannel(stopable):
if opened {
protocol.Err("service: error while stopping %v: %v", name, err.Error)
return err
}
case <-time.After(timeout):
errTimeout := fmt.Errorf("service: error while stopping %v: did not stop after timeout %v", name, timeout)
protocol.Err(errTimeout.Error())
return errTimeout
}
protocol.Info("service: stopped %v", name)
return nil
}

func stopChannel(stopable Stopable) chan error {
errorC := make(chan error)
go func() {
err := stopable.Stop()
if err != nil {
errorC <- err
return
}
close(errorC)
}()
return errorC
}

// wait for channel to respond or until time expired
func wait() error {
return nil
}

0 comments on commit e6e5a95

Please sign in to comment.