Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1809665: Start graceful shutdown on SIGTERM #94

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 31 additions & 8 deletions images/router/haproxy/reload-haproxy
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,27 @@ set -o nounset

config_file=/var/lib/haproxy/conf/haproxy.config
pid_file=/var/lib/haproxy/run/haproxy.pid
readonly max_wait_time=30
readonly timeout_opts="-m 1 --connect-timeout 1"

readonly max_wait_time=30
readonly numeric_re='^[0-9]+$'
wait_time=${MAX_RELOAD_WAIT_TIME:-$max_wait_time}
if ! [[ $wait_time =~ $numeric_re ]]; then
echo " - Invalid max reload wait time, using default $max_wait_time ..."
wait_time=$max_wait_time
fi
shutdown_wait_time=${ROUTER_MAX_SHUTDOWN_TIMEOUT:-${wait_time}}
if ! [[ $shutdown_wait_time =~ $numeric_re ]]; then
echo " - Invalid max shutdown wait time, using $wait_time ..."
shutdown_wait_time=$wait_time
fi

function haproxyHealthCheck() {
local wait_time=${MAX_RELOAD_WAIT_TIME:-$max_wait_time}
local port=${ROUTER_SERVICE_HTTP_PORT:-"80"}
local url="http://localhost:${port}"
local retries=0
local start_ts=$(date +"%s")
local proxy_proto="${ROUTER_USE_PROXY_PROTOCOL-}"

if ! [[ $wait_time =~ $numeric_re ]]; then
echo " - Invalid max reload wait time, using default $max_wait_time ..."
wait_time=$max_wait_time
fi

local end_ts=$((start_ts + wait_time))

# test with proxy protocol on
Expand Down Expand Up @@ -67,6 +71,25 @@ function haproxyHealthCheck() {

old_pids=$(pidof haproxy)

# If signaled, stop accepting new connections and drain the current processes
if [ -n "${ROUTER_SHUTDOWN-}" ]; then
echo " - Shutting down"
if [ -z "$old_pids" ]; then
exit 0
fi
kill -USR1 $old_pids
for i in $( seq 1 $shutdown_wait_time ); do
old_pids=$(pidof haproxy)
if [ -z "$old_pids" ]; then
exit 0
fi
sleep 1
done
kill -TERM $old_pids
echo "error: Some processes did not exit within ${shutdown_wait_time}s"
exit 1
fi

reload_status=0
if [ -n "$old_pids" ]; then
/usr/sbin/haproxy -f $config_file -p $pid_file -x /var/lib/haproxy/run/haproxy.sock -sf $old_pids
Expand Down
26 changes: 21 additions & 5 deletions pkg/cmd/infra/router/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"time"

"github.com/MakeNowJust/heredoc"

"github.com/spf13/cobra"
"github.com/spf13/pflag"

Expand All @@ -40,6 +39,7 @@ import (
"github.com/openshift/router/pkg/router/controller"
"github.com/openshift/router/pkg/router/metrics"
"github.com/openshift/router/pkg/router/metrics/haproxy"
"github.com/openshift/router/pkg/router/shutdown"
templateplugin "github.com/openshift/router/pkg/router/template"
haproxyconfigmanager "github.com/openshift/router/pkg/router/template/configmanager/haproxy"
"github.com/openshift/router/pkg/router/writerlease"
Expand Down Expand Up @@ -209,7 +209,7 @@ func NewCommandTemplateRouter(name string) *cobra.Command {
if err := options.Validate(); err != nil {
return err
}
return options.Run()
return options.Run(shutdown.SetupSignalHandler())
},
}

Expand Down Expand Up @@ -293,7 +293,7 @@ func (o *TemplateRouterOptions) Validate() error {
}

// Run launches a template router using the provided options. It never exits.
func (o *TemplateRouterOptions) Run() error {
func (o *TemplateRouterOptions) Run(stopCh <-chan struct{}) error {
log.V(0).Info("starting router", "version", version.String())
var ptrTemplatePlugin *templateplugin.TemplatePlugin

Expand Down Expand Up @@ -419,7 +419,7 @@ func (o *TemplateRouterOptions) Run() error {
Name: o.RouterName,
},
LiveChecks: liveChecks,
ReadyChecks: []healthz.HealthChecker{checkBackend, checkSync},
ReadyChecks: []healthz.HealthChecker{checkBackend, checkSync, metrics.ProcessRunning(stopCh)},
}

if tlsConfig, err := makeTLSConfig(30 * time.Second); err != nil {
Expand Down Expand Up @@ -541,7 +541,23 @@ func (o *TemplateRouterOptions) Run() error {

proc.StartReaper()

select {}
select {
case <-stopCh:
// 45s is the default interval that almost all cloud load balancers require to take an unhealthy
// endpoint out of rotation.
delay := getIntervalFromEnv("ROUTER_GRACEFUL_SHUTDOWN_DELAY", 45)
log.Info(fmt.Sprintf("Shutdown requested, waiting %s for new connections to cease", delay))
time.Sleep(delay)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing that sends the signal that triggers the graceful shutdown is the kubelet, when the pod is marked for deletion, right? As soon as a pod is marked for deletion, the pod is removed from endpoints, so it should not be receiving new connections. So once the endpoints controller updates the endpoints in response to the pod's deletion and the service proxy updates in response to the endpoints update, we're really just waiting for already established connections to drain, right? Where does the 45-second delay come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you're waiting for distributed load balancers to take you out of rotation.

  1. Pod marked for deletion in etcd
  2. Delete notification propagates to all consumers
  3. Consumers stop directing new traffic to the endpoint

1 is fast. 2 may take up to 5-10s depending on load. 3 takes as long as any type of global load balancer in front of the service takes to detect a not ready service (which is (unhealthy checks + 1) * interval check or 32s for GCP). See https://docs.google.com/document/d/1BUmtdTth49V02UZ5EjRvJ92A5vjF8wMJMSdPb1Wz3wQ/edit# for an explanation (that will become part of openshift/enhancements)

log.Info("Instructing the template router to terminate")
if err := templatePlugin.Stop(); err != nil {
log.Error(err, "Router did not shut down cleanly")
} else {
log.Info("Shutdown complete, exiting")
}
// wait one second to let any remaining actions settle
time.Sleep(time.Second)
}
return nil
}

// blueprintRoutes returns all the routes in the blueprint namespace.
Expand Down
19 changes: 18 additions & 1 deletion pkg/router/metrics/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,24 @@ import (
templateplugin "github.com/openshift/router/pkg/router/template"
)

var errBackend = fmt.Errorf("backend reported failure")
var (
errBackend = fmt.Errorf("backend reported failure")
errShuttingDown = fmt.Errorf("process is terminating")
)

// ProcessRunning returns a healthz check that returns true as long as the provided
// stopCh is not closed.
func ProcessRunning(stopCh <-chan struct{}) healthz.HealthChecker {
return healthz.NamedCheck("process-running", func(r *http.Request) error {
select {
case <-stopCh:
return errShuttingDown
default:
return nil
}
return nil
})
}

// HTTPBackendAvailable returns a healthz check that verifies a backend responds to a GET to
// the provided URL with 2xx or 3xx response.
Expand Down
29 changes: 29 additions & 0 deletions pkg/router/shutdown/shutdown.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package shutdown

import (
"os"
"os/signal"
)

var onlyOneSignalHandler = make(chan struct{})
var shutdownHandler chan os.Signal

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler() <-chan struct{} {
close(onlyOneSignalHandler) // panics when called twice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What could call SetupSignalHandler twice? Is the purpose of this line to guard against future coding errors?

Copy link
Contributor

@frobware frobware Mar 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same code as genericapiserver. It's not appropriate for router to take a dependency on it, and this code is straightforward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the panic guards against coding errors.


shutdownHandler = make(chan os.Signal, 2)

stop := make(chan struct{})
signal.Notify(shutdownHandler, shutdownSignals...)
go func() {
<-shutdownHandler
close(stop)
<-shutdownHandler
os.Exit(1) // second signal. Exit directly.
}()

return stop
}
10 changes: 10 additions & 0 deletions pkg/router/shutdown/signal_posix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// +build !windows

package shutdown

import (
"os"
"syscall"
)

var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
7 changes: 7 additions & 0 deletions pkg/router/shutdown/signal_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package shutdown

import (
"os"
)

var shutdownSignals = []os.Signal{os.Interrupt}
27 changes: 26 additions & 1 deletion pkg/router/template/limiter/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type CoalescingSerializingRateLimiter struct {
// handlerRunning indicates whether the Handler is actively running.
handlerRunning bool

// stopped indicates no further commits should occur.
stopped bool

// lock protects the CoalescingSerializingRateLimiter structure from multiple threads manipulating it at once.
lock sync.Mutex

Expand All @@ -56,13 +59,30 @@ func NewCoalescingSerializingRateLimiter(interval time.Duration, handlerFunc Han
return limiter
}

// Stop signals shutdown and waits until no handler is running. After this method returns
// no handler will be invoked in the future.
func (csrl *CoalescingSerializingRateLimiter) Stop() {
csrl.lock.Lock()
csrl.stopped = true
csrl.lock.Unlock()

for csrl.isHandlerRunning() {
time.Sleep(50 * time.Millisecond)
}
}

func (csrl *CoalescingSerializingRateLimiter) isHandlerRunning() bool {
csrl.lock.Lock()
defer csrl.lock.Unlock()
return csrl.handlerRunning
}

// RegisterChange() indicates that the rate limited function should be called. It may not immediately run it, but it will cause it to run within
// the ReloadInterval. It will always immediately return, the function will be run in the background. Not every call to RegisterChange() will
// result in the function getting called. If it is called repeatedly while it is still within the ReloadInterval since the last run, it will
// only run once when the time allows it.
func (csrl *CoalescingSerializingRateLimiter) RegisterChange() {
log.V(8).Info("RegisterChange called")

csrl.changeWorker(true)
}

Expand All @@ -72,6 +92,11 @@ func (csrl *CoalescingSerializingRateLimiter) changeWorker(userChanged bool) {

log.V(8).Info("changeWorker called")

if csrl.stopped {
log.V(8).Info("limiter is stopped")
return
}

if userChanged && csrl.changeReqTime == nil {
// They just registered a change manually (and we aren't in the middle of a change)
now := time.Now()
Expand Down
8 changes: 8 additions & 0 deletions pkg/router/template/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ func NewTemplatePlugin(cfg TemplatePluginConfig, lookupSvc ServiceLookup) (*Temp
return newDefaultTemplatePlugin(router, cfg.IncludeUDP, lookupSvc), err
}

// Stop instructs the router plugin to stop invoking the reload method, and waits until no further
// reloads will occur. It then invokes the reload script one final time with the ROUTER_SHUTDOWN
// environment variable set with true.
func (p *TemplatePlugin) Stop() error {
p.Router.(*templateRouter).rateLimitedCommitFunction.Stop()
return p.Router.(*templateRouter).reloadRouter(true)
}

// HandleEndpoints processes watch events on the Endpoints resource.
func (p *TemplatePlugin) HandleEndpoints(eventType watch.EventType, endpoints *kapi.Endpoints) error {
key := endpointsKey(endpoints)
Expand Down
7 changes: 5 additions & 2 deletions pkg/router/template/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (r *templateRouter) commitAndReload() error {

log.V(4).Info("reloading the router")
reloadStart := time.Now()
err := r.reloadRouter()
err := r.reloadRouter(false)
r.metricReload.Observe(float64(time.Now().Sub(reloadStart)) / float64(time.Second))
if err != nil {
if r.dynamicConfigManager != nil {
Expand Down Expand Up @@ -539,8 +539,11 @@ func (r *templateRouter) writeCertificates(cfg *ServiceAliasConfig) error {
}

// reloadRouter executes the router's reload script.
func (r *templateRouter) reloadRouter() error {
func (r *templateRouter) reloadRouter(shutdown bool) error {
cmd := exec.Command(r.reloadScriptPath)
if shutdown {
cmd.Env = append(os.Environ(), "ROUTER_SHUTDOWN=true")
}
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("error reloading router: %v\n%s", err, string(out))
Expand Down