Skip to content

Commit

Permalink
Fix/graceful shutdown (#988)
Browse files Browse the repository at this point in the history
* add graceful shutdown

Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>

* exposing WaitForHealthcheckInterval to fade out from LB pool

Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>

* fix staticcheck findings

Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
  • Loading branch information
szuecs committed Mar 13, 2019
1 parent 7b3f470 commit e44dca6
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 1 deletion.
6 changes: 6 additions & 0 deletions cmd/skipper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
defaultApplicationLogLevel = "INFO"

// connections, timeouts:
defaultWaitForHealthcheckInterval = (10 + 5) * 3 * time.Second // kube-ingress-aws-controller default
defaultReadTimeoutServer = 5 * time.Minute
defaultReadHeaderTimeoutServer = 60 * time.Second
defaultWriteTimeoutServer = 60 * time.Second
Expand Down Expand Up @@ -173,6 +174,7 @@ const (
apiUsageMonitoringRealmsTrackingPatternUsage = "regular expression used for matching monitored realms (defaults is 'services')"

// connections, timeouts:
waitForHealthcheckIntervalUsage = "period waiting to become unhealthy in the loadbalancer pool in front of this instance, before shutdown triggered by SIGINT or SIGTERM"
idleConnsPerHostUsage = "maximum idle connections per backend host"
closeIdleConnsPeriodUsage = "period of closing all idle connections in seconds or as a duration string. Not closing when less than 0"
backendFlushIntervalUsage = "flush interval for upgraded proxy connections"
Expand Down Expand Up @@ -315,6 +317,7 @@ var (
apiUsageMonitoringRealmsTrackingPattern string

// connections, timeouts:
waitForHealthcheckInterval time.Duration
idleConnsPerHost int
closeIdleConnsPeriod string
backendFlushInterval time.Duration
Expand Down Expand Up @@ -455,6 +458,8 @@ func init() {
flag.StringVar(&apiUsageMonitoringRealmsTrackingPattern, "api-usage-monitoring-realms-tracking-pattern", defaultApiUsageMonitoringRealmsTrackingPattern, apiUsageMonitoringRealmsTrackingPatternUsage)

// connections, timeouts:
flag.DurationVar(&waitForHealthcheckInterval, "wait-for-healthcheck-interval", defaultWaitForHealthcheckInterval, waitForHealthcheckIntervalUsage)

flag.IntVar(&idleConnsPerHost, "idle-conns-num", proxy.DefaultIdleConnsPerHost, idleConnsPerHostUsage)
flag.StringVar(&closeIdleConnsPeriod, "close-idle-conns-period", strconv.Itoa(int(proxy.DefaultCloseIdleConnsPeriod/time.Second)), closeIdleConnsPeriodUsage)
flag.DurationVar(&backendFlushInterval, "backend-flush-interval", defaultBackendFlushInterval, backendFlushIntervalUsage)
Expand Down Expand Up @@ -670,6 +675,7 @@ func main() {
OIDCSecretsFile: oidcSecretsFile,

// connections, timeouts:
WaitForHealthcheckInterval: waitForHealthcheckInterval,
IdleConnectionsPerHost: idleConnsPerHost,
CloseIdleConnsPeriod: time.Duration(clsic) * time.Second,
BackendFlushInterval: backendFlushInterval,
Expand Down
35 changes: 34 additions & 1 deletion skipper.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package skipper

import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"os"
"os/signal"
"path"
"strings"
"syscall"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -48,6 +51,11 @@ const DefaultPluginDir = "./plugins"

// Options to start skipper.
type Options struct {
// WaitForHealthcheckInterval sets the time that skipper waits
// for the loadbalancer in front to become unhealthy. Defaults
// to 0.
WaitForHealthcheckInterval time.Duration

// WhitelistedHealthcheckCIDR appends the whitelisted IP Range to the inernalIPS range for healthcheck purposes
WhitelistedHealthCheckCIDR []string

Expand Down Expand Up @@ -702,7 +710,32 @@ func listenAndServe(proxy http.Handler, o *Options) error {
return srv.ListenAndServeTLS(o.CertPathTLS, o.KeyPathTLS)
}
log.Infof("TLS settings not found, defaulting to HTTP")
return srv.ListenAndServe()

idleConnsCH := make(chan struct{})
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)

<-sigs

log.Infof("Got shutdown signal, wait %v for health check", o.WaitForHealthcheckInterval)
time.Sleep(o.WaitForHealthcheckInterval)

log.Info("Start shutdown")
if err := srv.Shutdown(context.Background()); err != nil {
log.Errorf("Failed to graceful shutdown: %v", err)
}
close(idleConnsCH)
}()

if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Errorf("Failed to start to ListenAndServe: %v", err)
return err
}

<-idleConnsCH
log.Infof("done.")
return nil
}

// Run skipper.
Expand Down
93 changes: 93 additions & 0 deletions skipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import (
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"testing"
"time"

"github.com/zalando/skipper/dataclients/routestring"
"github.com/zalando/skipper/filters/builtin"
"github.com/zalando/skipper/proxy"
"github.com/zalando/skipper/routing"
Expand Down Expand Up @@ -208,3 +212,92 @@ func TestHTTPServer(t *testing.T) {
t.Fatalf("Failed to stream response body: %v", err)
}
}

func TestHTTPServerShutdown(t *testing.T) {
d := 1 * time.Second

o := Options{
Address: ":19999",
WaitForHealthcheckInterval: d,
}

// simulate a backend that got a request and should be handled correctly
dc, err := routestring.New(`r0: * -> latency("3s") -> inlineContent("OK") -> status(200) -> <shunt>`)
if err != nil {
t.Errorf("Failed to create dataclient: %v", err)
}

rt := routing.New(routing.Options{
FilterRegistry: builtin.MakeRegistry(),
DataClients: []routing.DataClient{
dc,
},
})
defer rt.Close()

proxy := proxy.New(rt, proxy.OptionsNone)
defer proxy.Close()
go func() {
if errLas := listenAndServe(proxy, &o); errLas != nil {
t.Logf("Failed to liste and serve: %v", errLas)
}
}()

pid := syscall.Getpid()
p, err := os.FindProcess(pid)
if err != nil {
t.Errorf("Failed to find current process: %v", err)
}

var wg sync.WaitGroup
installSigHandler := make(chan struct{}, 1)
wg.Add(1)
go func() {
defer wg.Done()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)

installSigHandler <- struct{}{}

<-sigs

// ongoing requests passing in before shutdown
time.Sleep(d / 2)
r, err2 := waitConnGet("http://" + o.Address)
if r != nil {
defer r.Body.Close()
}
if err2 != nil {
t.Errorf("Cannot connect to the local server for testing: %v ", err2)
}
if r.StatusCode != 200 {
t.Errorf("Status code should be 200, instead got: %d\n", r.StatusCode)
}
body, err2 := ioutil.ReadAll(r.Body)
if err2 != nil {
t.Errorf("Failed to stream response body: %v", err2)
}
if s := string(body); s != "OK" {
t.Errorf("Failed to get the right content: %s", s)
}

// requests on closed listener should fail
time.Sleep(d / 2)
r2, err2 := waitConnGet("http://" + o.Address)
if r2 != nil {
defer r2.Body.Close()
}
if err2 == nil {
t.Error("Can connect to a closed server for testing")
}
}()

<-installSigHandler
time.Sleep(d / 2)

if err = p.Signal(syscall.SIGTERM); err != nil {
t.Errorf("Failed to signal process: %v", err)
}
wg.Wait()
time.Sleep(d)
}

0 comments on commit e44dca6

Please sign in to comment.