Skip to content

Commit

Permalink
Start graceful shutdown on SIGTERM
Browse files Browse the repository at this point in the history
When the main process recieves SIGTERM or SIGINT, wait
ROUTER_GRACEFUL_SHUTDOWN_DELAY (default: 45) seconds and then signal
the reload script with ROUTER_SHUTDOWN=true that a graceful termination
is requested. The reload-haproxy script then invokes USR1 on the child
processes and waits ROUTER_MAX_SHUTDOWN_TIMEOUT or MAX_RELOAD_WAIT_TIME
(default 30) seconds before invoking TERM on the child processes.
If TERM is invoked the script exits with 1, indicating that not all
processes completed their work.

Clients with long running requests should set ROUTER_MAX_SHUTDOWN_TIMEOUT
as appropriate to ensure all connections exit cleanly.
  • Loading branch information
smarterclayton committed Mar 1, 2020
1 parent 43661b2 commit 7fc4d56
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 7 deletions.
20 changes: 20 additions & 0 deletions images/router/haproxy/reload-haproxy
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ function haproxyHealthCheck() {

old_pids=$(pidof haproxy)

# If signaled, stop accepting new connections and drain the current processes
if [ -n "${ROUTER_SHUTDOWN-}" ]; then
echo " - Shutting down"
if [ -z "$old_pids" ]; then
exit 0
fi
local wait_time=${ROUTER_MAX_SHUTDOWN_TIMEOUT:-${MAX_RELOAD_WAIT_TIME:-$max_wait_time}}
kill -USR1 $old_pids
for i in $( seq 1 $wait_time ); do
old_pids=$(pidof haproxy)
if [ -z "$old_pids" ]; then
exit 0
fi
sleep 1
done
kill -TERM $old_pids
echo "error: Some processes did not exit within ${wait_time}s"
exit 1
fi

reload_status=0
if [ -n "$old_pids" ]; then
/usr/sbin/haproxy -f $config_file -p $pid_file -x /var/lib/haproxy/run/haproxy.sock -sf $old_pids
Expand Down
20 changes: 16 additions & 4 deletions pkg/cmd/infra/router/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"time"

"github.com/MakeNowJust/heredoc"

"github.com/spf13/cobra"
"github.com/spf13/pflag"

Expand All @@ -40,6 +39,7 @@ import (
"github.com/openshift/router/pkg/router/controller"
"github.com/openshift/router/pkg/router/metrics"
"github.com/openshift/router/pkg/router/metrics/haproxy"
"github.com/openshift/router/pkg/router/shutdown"
templateplugin "github.com/openshift/router/pkg/router/template"
haproxyconfigmanager "github.com/openshift/router/pkg/router/template/configmanager/haproxy"
"github.com/openshift/router/pkg/router/writerlease"
Expand Down Expand Up @@ -209,7 +209,7 @@ func NewCommandTemplateRouter(name string) *cobra.Command {
if err := options.Validate(); err != nil {
return err
}
return options.Run()
return options.Run(shutdown.SetupSignalHandler())
},
}

Expand Down Expand Up @@ -293,7 +293,7 @@ func (o *TemplateRouterOptions) Validate() error {
}

// Run launches a template router using the provided options. It never exits.
func (o *TemplateRouterOptions) Run() error {
func (o *TemplateRouterOptions) Run(stopCh <-chan struct{}) error {
log.V(0).Info("starting router", "version", version.String())
var ptrTemplatePlugin *templateplugin.TemplatePlugin

Expand Down Expand Up @@ -541,7 +541,19 @@ func (o *TemplateRouterOptions) Run() error {

proc.StartReaper()

select {}
// TODO: start returning not ready when this signal is closed.
select {
case <-stopCh:
// 45s is the default interval that almost all cloud load balancers require to take an unhealthy
// endpoint out of rotation.
delay := getIntervalFromEnv("ROUTER_GRACEFUL_SHUTDOWN_DELAY", 45)
log.Info("Shutdown requested, waiting %s for new connections to cease", delay.String())
time.Sleep(delay)
log.Info("Instructing the template router to terminate")
templatePlugin.Stop()
}
log.Info("Shutdown complete")
return nil
}

// blueprintRoutes returns all the routes in the blueprint namespace.
Expand Down
43 changes: 43 additions & 0 deletions pkg/router/shutdown/shutdown.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package shutdown

import (
"os"
"os/signal"
)

var onlyOneSignalHandler = make(chan struct{})
var shutdownHandler chan os.Signal

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler() <-chan struct{} {
close(onlyOneSignalHandler) // panics when called twice

shutdownHandler = make(chan os.Signal, 2)

stop := make(chan struct{})
signal.Notify(shutdownHandler, shutdownSignals...)
go func() {
<-shutdownHandler
close(stop)
<-shutdownHandler
os.Exit(1) // second signal. Exit directly.
}()

return stop
}

// RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT)
// This returns whether a handler was notified
func RequestShutdown() bool {
if shutdownHandler != nil {
select {
case shutdownHandler <- shutdownSignals[0]:
return true
default:
}
}

return false
}
10 changes: 10 additions & 0 deletions pkg/router/shutdown/signal_posix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// +build !windows

package shutdown

import (
"os"
"syscall"
)

var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
7 changes: 7 additions & 0 deletions pkg/router/shutdown/signal_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package shutdown

import (
"os"
)

var shutdownSignals = []os.Signal{os.Interrupt}
27 changes: 26 additions & 1 deletion pkg/router/template/limiter/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type CoalescingSerializingRateLimiter struct {
// handlerRunning indicates whether the Handler is actively running.
handlerRunning bool

// stopped indicates no further commits should occur.
stopped bool

// lock protects the CoalescingSerializingRateLimiter structure from multiple threads manipulating it at once.
lock sync.Mutex

Expand All @@ -56,13 +59,30 @@ func NewCoalescingSerializingRateLimiter(interval time.Duration, handlerFunc Han
return limiter
}

// Stop signals shutdown and waits until no handler is running. After this method returns
// no handler will be invoked in the future.
func (csrl *CoalescingSerializingRateLimiter) Stop() {
csrl.lock.Lock()
csrl.stopped = true
csrl.lock.Unlock()

for csrl.isHandlerRunning() {
time.Sleep(50 * time.Millisecond)
}
}

func (csrl *CoalescingSerializingRateLimiter) isHandlerRunning() bool {
csrl.lock.Lock()
defer csrl.lock.Unlock()
return csrl.handlerRunning
}

// RegisterChange() indicates that the rate limited function should be called. It may not immediately run it, but it will cause it to run within
// the ReloadInterval. It will always immediately return, the function will be run in the background. Not every call to RegisterChange() will
// result in the function getting called. If it is called repeatedly while it is still within the ReloadInterval since the last run, it will
// only run once when the time allows it.
func (csrl *CoalescingSerializingRateLimiter) RegisterChange() {
log.V(8).Info("RegisterChange called")

csrl.changeWorker(true)
}

Expand All @@ -72,6 +92,11 @@ func (csrl *CoalescingSerializingRateLimiter) changeWorker(userChanged bool) {

log.V(8).Info("changeWorker called")

if csrl.stopped {
log.V(8).Info("limiter is stopped")
return
}

if userChanged && csrl.changeReqTime == nil {
// They just registered a change manually (and we aren't in the middle of a change)
now := time.Now()
Expand Down
8 changes: 8 additions & 0 deletions pkg/router/template/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ func NewTemplatePlugin(cfg TemplatePluginConfig, lookupSvc ServiceLookup) (*Temp
return newDefaultTemplatePlugin(router, cfg.IncludeUDP, lookupSvc), err
}

// Stop instructs the router plugin to stop invoking the reload method, and waits until no further
// reloads will occur. It then invokes the reload script one final time with the ROUTER_SHUTDOWN
// environment variable set with true.
func (p *TemplatePlugin) Stop() {
p.Router.(*templateRouter).rateLimitedCommitFunction.Stop()
p.Router.(*templateRouter).reloadRouter(true)
}

// HandleEndpoints processes watch events on the Endpoints resource.
func (p *TemplatePlugin) HandleEndpoints(eventType watch.EventType, endpoints *kapi.Endpoints) error {
key := endpointsKey(endpoints)
Expand Down
8 changes: 6 additions & 2 deletions pkg/router/template/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (r *templateRouter) commitAndReload() error {

log.V(4).Info("reloading the router")
reloadStart := time.Now()
err := r.reloadRouter()
err := r.reloadRouter(false)
r.metricReload.Observe(float64(time.Now().Sub(reloadStart)) / float64(time.Second))
if err != nil {
if r.dynamicConfigManager != nil {
Expand Down Expand Up @@ -539,8 +539,12 @@ func (r *templateRouter) writeCertificates(cfg *ServiceAliasConfig) error {
}

// reloadRouter executes the router's reload script.
func (r *templateRouter) reloadRouter() error {
func (r *templateRouter) reloadRouter(shutdown bool) error {
cmd := exec.Command(r.reloadScriptPath)
if shutdown {
cmd.Env = append([]string{}, os.Environ()...)
cmd.Env = append(cmd.Env, "ROUTER_SHUTDOWN=true")
}
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("error reloading router: %v\n%s", err, string(out))
Expand Down

0 comments on commit 7fc4d56

Please sign in to comment.