From 77b08a845e451b695cfa25b79ebe277d85064345 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Wed, 6 Sep 2023 20:50:43 -0400 Subject: [PATCH] export manager.Server to provide an official HTTP server manager runnable Signed-off-by: Joe Lanford --- pkg/manager/internal.go | 10 ++-- pkg/manager/runnable_group.go | 5 +- pkg/manager/runnable_group_test.go | 3 +- pkg/manager/server.go | 80 +++++++++++++++++++++++++----- 4 files changed, 79 insertions(+), 19 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index a16f354a1b..ce44b85e52 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -284,7 +284,7 @@ func (cm *controllerManager) addHealthProbeServer() error { mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) } - return cm.add(&server{ + return cm.add(&Server{ Kind: "health probe", Log: cm.logger, Server: srv, @@ -302,7 +302,7 @@ func (cm *controllerManager) addPprofServer() error { mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - return cm.add(&server{ + return cm.add(&Server{ Kind: "pprof", Log: cm.logger, Server: srv, @@ -384,10 +384,10 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { } } - // First start any internal HTTP servers, which includes health probes, metrics and profiling if enabled. + // First start any HTTP servers, which includes health probes and profiling, if enabled. // - // WARNING: Internal HTTP servers MUST start before any cache is populated, otherwise it would block - // conversion webhooks to be ready for serving which make the cache never get ready. + // WARNING: HTTPServers includes the health probes, which MUST start before any cache is populated, otherwise + // it would block conversion webhooks to be ready for serving which make the cache never get ready. if err := cm.runnables.HTTPServers.Start(cm.internalCtx); err != nil { if err != nil { return fmt.Errorf("failed to start HTTP servers: %w", err) diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index 96566f5df1..1f350fbdc1 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -54,7 +54,10 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables { // The runnables added after Start are started directly. func (r *runnables) Add(fn Runnable) error { switch runnable := fn.(type) { - case *server: + case *Server: + if runnable.NeedLeaderElection() { + return r.LeaderElection.Add(fn, nil) + } return r.HTTPServers.Add(fn, nil) case hasCache: return r.Caches.Add(fn, func(ctx context.Context) bool { diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go index 251ce46fb3..f2a9e85b94 100644 --- a/pkg/manager/runnable_group_test.go +++ b/pkg/manager/runnable_group_test.go @@ -10,6 +10,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/webhook" ) @@ -22,7 +23,7 @@ var _ = Describe("runnables", func() { }) It("should add HTTP servers to the appropriate group", func() { - server := &server{} + server := &Server{} r := newRunnables(defaultBaseContext, errCh) Expect(r.Add(server)).To(Succeed()) Expect(r.HTTPServers.startQueue).To(HaveLen(1)) diff --git a/pkg/manager/server.go b/pkg/manager/server.go index b6509f48f2..c8f9d1784b 100644 --- a/pkg/manager/server.go +++ b/pkg/manager/server.go @@ -21,34 +21,75 @@ import ( "errors" "net" "net/http" + "time" "github.com/go-logr/logr" + + crlog "sigs.k8s.io/controller-runtime/pkg/log" +) + +var ( + _ Runnable = (*Server)(nil) + _ LeaderElectionRunnable = (*Server)(nil) ) -// server is a general purpose HTTP server Runnable for a manager -// to serve some internal handlers such as health probes, metrics and profiling. -type server struct { - Kind string - Log logr.Logger - Server *http.Server +// Server is a general purpose HTTP server Runnable for a manager. +// It is used to serve some internal handlers for health probes and profiling, +// but it can also be used to run custom servers. +type Server struct { + // Kind is an optional string that describes the purpose of the server. It is used in logs to distinguish + // among multiple servers. + Kind string + + // Log is the logger used by the server. If not set, a logger will be derived from the context passed to Start. + Log logr.Logger + + // Server is the HTTP server to run. It is required. + Server *http.Server + + // Listener is an optional listener to use. If not set, the server start a listener using the server.Addr. + // Using a listener is useful when the port reservation needs to happen in advance of this runnable starting. Listener net.Listener + + // OnlyServeWhenLeader is an optional bool that indicates that the server should only be started when the manager is the leader. + OnlyServeWhenLeader bool + + // ShutdownTimeout is an optional duration that indicates how long to wait for the server to shutdown gracefully. If not set, + // the server will wait indefinitely for all connections to close. + ShutdownTimeout *time.Duration } -func (s *server) Start(ctx context.Context) error { - log := s.Log.WithValues("kind", s.Kind, "addr", s.Listener.Addr()) +// Start starts the server. It will block until the server is stopped or an error occurs. +func (s *Server) Start(ctx context.Context) error { + log := s.Log + if log.GetSink() == nil { + log = crlog.FromContext(ctx) + } + if s.Kind != "" { + log = log.WithValues("kind", s.Kind) + } + log = log.WithValues("addr", s.addr()) serverShutdown := make(chan struct{}) go func() { <-ctx.Done() log.Info("shutting down server") - if err := s.Server.Shutdown(context.Background()); err != nil { + + shutdownCtx := context.Background() + if s.ShutdownTimeout != nil { + var shutdownCancel context.CancelFunc + shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), *s.ShutdownTimeout) + defer shutdownCancel() + } + + if err := s.Server.Shutdown(shutdownCtx); err != nil { log.Error(err, "error shutting down server") } close(serverShutdown) }() log.Info("starting server") - if err := s.Server.Serve(s.Listener); err != nil && !errors.Is(err, http.ErrServerClosed) { + if err := s.serve(); err != nil && !errors.Is(err, http.ErrServerClosed) { return err } @@ -56,6 +97,21 @@ func (s *server) Start(ctx context.Context) error { return nil } -func (s *server) NeedLeaderElection() bool { - return false +// NeedLeaderElection returns true if the server should only be started when the manager is the leader. +func (s *Server) NeedLeaderElection() bool { + return s.OnlyServeWhenLeader +} + +func (s *Server) addr() string { + if s.Listener != nil { + return s.Listener.Addr().String() + } + return s.Server.Addr +} + +func (s *Server) serve() error { + if s.Listener != nil { + return s.Server.Serve(s.Listener) + } + return s.Server.ListenAndServe() }