Skip to content

Commit

Permalink
pkg/cvo/metrics: Graceful server shutdown
Browse files Browse the repository at this point in the history
Somewhat like the example in [1].  This pushes the server management
down into a new RunMetrics method, which we then run in its own
goroutine.  This is initial groundwork; I expect we will port more of
our child goroutines to this framework in follow-up work.

[1]: https://golang.org/pkg/net/http/#Server.Shutdown
  • Loading branch information
wking committed Jun 16, 2020
1 parent 07e5809 commit b30aa0e
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 51 deletions.
89 changes: 89 additions & 0 deletions pkg/cvo/metrics.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package cvo

import (
"context"
"crypto/tls"
"net"
"net/http"
"time"

"github.com/cockroachdb/cmux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

configv1 "github.com/openshift/api/config/v1"
"github.com/openshift/cluster-version-operator/lib/resourcemerge"
Expand Down Expand Up @@ -86,6 +93,88 @@ version for 'cluster', or empty for 'initial'.
}
}

// RunMetrics launches an server bound to listenAddress serving
// Prometheus metrics at /metrics over HTTP, and, if tlsConfig is
// non-nil, also over HTTPS. Continues serving until runContext.Done()
// and then attempts a clean shutdown limited by shutdownContext.Done().
// Assumes runContext.Done() occurs before or simultaneously with
// shutdownContext.Done().
func RunMetrics(runContext context.Context, shutdownContext context.Context, listenAddress string, tlsConfig *tls.Config) error {
handler := http.NewServeMux()
handler.Handle("/metrics", promhttp.Handler())
server := &http.Server{
Handler: handler,
}

tcpListener, err := net.Listen("tcp", listenAddress)
if err != nil {
return err
}

// if a TLS connection was requested, set up a connection mux that will send TLS requests to
// the TLS server but send HTTP requests to the HTTP server. Preserves the ability for legacy
// HTTP, needed during upgrade, while still allowing TLS certs and end to end metrics protection.
mux := cmux.New(tcpListener)

errorChannel := make(chan error, 1)
errorChannelCount := 1

go func() {
// match HTTP first
httpListener := mux.Match(cmux.HTTP1())
klog.Infof("Metrics port listening for HTTP on %v", listenAddress)
errorChannel <- server.Serve(httpListener)
}()

if tlsConfig != nil {
errorChannelCount++
go func() {
tlsListener := tls.NewListener(mux.Match(cmux.Any()), tlsConfig)
klog.Infof("Metrics port listening for HTTPS on %v", listenAddress)
errorChannel <- server.Serve(tlsListener)
}()
}

errorChannelCount++
go func() {
errorChannel <- mux.Serve()
}()

shutdown := false
var loopError error
for errorChannelCount > 0 {
if shutdown {
err := <-errorChannel
errorChannelCount--
if err != nil && err != http.ErrServerClosed && err != cmux.ErrListenerClosed {
if loopError == nil {
loopError = err
} else if err != nil { // log the error we are discarding
klog.Errorf("Failed to gracefully shut down metrics server: %s", err)
}
}
} else {
select {
case <-runContext.Done(): // clean shutdown
case err := <-errorChannel: // crashed before a shutdown was requested
errorChannelCount--
if err != nil && err != http.ErrServerClosed && err != cmux.ErrListenerClosed {
loopError = err
}
}
shutdown = true
shutdownError := server.Shutdown(shutdownContext)
if loopError == nil {
loopError = shutdownError
} else if shutdownError != nil { // log the error we are discarding
klog.Errorf("Failed to gracefully shut down metrics server: %s", shutdownError)
}
}
}

return loopError
}

type conditionKey struct {
Name string
Type string
Expand Down
92 changes: 41 additions & 51 deletions pkg/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,13 @@ import (
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/cockroachdb/cmux"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/promhttp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -188,56 +183,23 @@ func (o *Options) makeTLSConfig() (*tls.Config, error) {
}

func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourcelock.ConfigMapLock) {
// listen on metrics
runContext, runCancel := context.WithCancel(ctx)
shutdownContext, shutdownCancel := context.WithCancel(ctx)
errorChannel := make(chan error, 1)
errorChannelCount := 0
if o.ListenAddr != "" {
handler := http.NewServeMux()
handler.Handle("/metrics", promhttp.Handler())
tcpl, err := net.Listen("tcp", o.ListenAddr)
if err != nil {
klog.Fatalf("Listen error: %v", err)
}

// if a TLS connection was requested, set up a connection mux that will send TLS requests to
// the TLS server but send HTTP requests to the HTTP server. Preserves the ability for legacy
// HTTP, needed during upgrade, while still allowing TLS certs and end to end metrics protection.
m := cmux.New(tcpl)

// match HTTP first
httpl := m.Match(cmux.HTTP1())
go func() {
s := &http.Server{
Handler: handler,
}
if err := s.Serve(httpl); err != cmux.ErrListenerClosed {
klog.Fatalf("HTTP serve error: %v", err)
}
}()

var tlsConfig *tls.Config
if o.ServingCertFile != "" || o.ServingKeyFile != "" {
tlsConfig, err := o.makeTLSConfig()
var err error
tlsConfig, err = o.makeTLSConfig()
if err != nil {
klog.Fatalf("Failed to create TLS config: %v", err)
}

tlsListener := tls.NewListener(m.Match(cmux.Any()), tlsConfig)
klog.Infof("Metrics port listening for HTTP and HTTPS on %v", o.ListenAddr)
go func() {
s := &http.Server{
Handler: handler,
}
if err := s.Serve(tlsListener); err != cmux.ErrListenerClosed {
klog.Fatalf("HTTPS serve error: %v", err)
}
}()

go func() {
if err := m.Serve(); err != nil {
klog.Errorf("CMUX serve error: %v", err)
}
}()
} else {
klog.Infof("Metrics port listening for HTTP on %v", o.ListenAddr)
}
errorChannelCount++
go func() {
errorChannel <- cvo.RunMetrics(runContext, shutdownContext, o.ListenAddr, tlsConfig)
}()
}

exit := make(chan struct{})
Expand All @@ -253,9 +215,9 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(localCtx context.Context) {
controllerCtx.Start(ctx)
controllerCtx.Start(runContext)
select {
case <-ctx.Done():
case <-runContext.Done():
// WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel
// and client-go ContextCancelable, which allows us to block new API requests before
// we step down. However, the CVO isn't that sensitive to races and can tolerate
Expand Down Expand Up @@ -284,6 +246,34 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
},
})

for errorChannelCount > 0 {
var shutdownTimer *time.Timer
if shutdownTimer == nil { // running
select {
case <-runContext.Done():
shutdownTimer = time.NewTimer(2 * time.Minute)
case err := <-errorChannel:
errorChannelCount--
if err != nil {
klog.Error(err)
runCancel() // this will cause shutdownTimer initialization in the next loop
}
}
} else { // shutting down
select {
case <-shutdownTimer.C: // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing.
shutdownCancel()
shutdownTimer.Stop()
case err := <-errorChannel:
errorChannelCount--
if err != nil {
klog.Error(err)
runCancel()
}
}
}
}

<-exit
}

Expand Down

0 comments on commit b30aa0e

Please sign in to comment.