Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate healthz server from metrics server in kube-proxy #44968

Merged
merged 4 commits into from
May 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/kube-proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_library(
"//pkg/version/prometheus:go_default_library",
"//pkg/version/verflag:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library",
],
Expand Down
1 change: 1 addition & 0 deletions cmd/kube-proxy/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/kubelet/qos:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/iptables:go_default_library",
"//pkg/proxy/userspace:go_default_library",
"//pkg/proxy/winuserspace:go_default_library",
Expand Down
28 changes: 21 additions & 7 deletions cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/proxy"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/proxy/userspace"
"k8s.io/kubernetes/pkg/proxy/winuserspace"
Expand Down Expand Up @@ -249,7 +250,7 @@ func applyDefaults(in *componentconfig.KubeProxyConfiguration) (*componentconfig
func NewProxyCommand() *cobra.Command {
opts := Options{
config: new(componentconfig.KubeProxyConfiguration),
healthzPort: 10249,
healthzPort: 10256,
}

cmd := &cobra.Command{
Expand Down Expand Up @@ -296,7 +297,7 @@ type ProxyServer struct {
ProxyMode string
NodeRef *clientv1.ObjectReference
CleanupAndExit bool
HealthzBindAddress string
MetricsBindAddress string
OOMScoreAdj *int32
ResourceContainer string
ConfigSyncPeriod time.Duration
Expand All @@ -305,6 +306,7 @@ type ProxyServer struct {
// get rid of this one.
ServiceHandler proxyconfig.ServiceConfigHandler
EndpointsEventHandler proxyconfig.EndpointsHandler
HealthzServer *healthcheck.HealthzServer
}

// createClients creates a kube client and an event client from the given config and masterOverride.
Expand Down Expand Up @@ -388,6 +390,11 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname})

var healthzServer *healthcheck.HealthzServer
if len(config.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration)
}

var proxier proxy.ProxyProvider
var serviceEventHandler proxyconfig.ServiceHandler
// TODO: Migrate all handlers to ServiceHandler types and
Expand Down Expand Up @@ -416,6 +423,7 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
hostname,
getNodeIP(client, hostname),
recorder,
healthzServer,
)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
Expand Down Expand Up @@ -504,13 +512,14 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
Conntracker: &realConntracker{},
ProxyMode: proxyMode,
NodeRef: nodeRef,
HealthzBindAddress: config.HealthzBindAddress,
MetricsBindAddress: config.MetricsBindAddress,
OOMScoreAdj: config.OOMScoreAdj,
ResourceContainer: config.ResourceContainer,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
ServiceEventHandler: serviceEventHandler,
ServiceHandler: serviceHandler,
EndpointsEventHandler: endpointsEventHandler,
HealthzServer: healthzServer,
}, nil
}

Expand Down Expand Up @@ -546,17 +555,22 @@ func (s *ProxyServer) Run() error {

s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})

// Start up a webserver if requested
if len(s.HealthzBindAddress) > 0 {
// Start up a healthz server if requested
if s.HealthzServer != nil {
s.HealthzServer.Run()
}

// Start up a metrics server if requested
if len(s.MetricsBindAddress) > 0 {
http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%s", s.ProxyMode)
})
http.Handle("/metrics", prometheus.Handler())
configz.InstallHandler(http.DefaultServeMux)
go wait.Until(func() {
err := http.ListenAndServe(s.HealthzBindAddress, nil)
err := http.ListenAndServe(s.MetricsBindAddress, nil)
if err != nil {
utilruntime.HandleError(fmt.Errorf("starting health server failed: %v", err))
utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
}
}, 5*time.Second, wait.NeverStop)
}
Expand Down
3 changes: 0 additions & 3 deletions cmd/kube-proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/pflag"

"k8s.io/apiserver/pkg/server/healthz"
utilflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/kubernetes/cmd/kube-proxy/app"
Expand All @@ -32,8 +31,6 @@ import (
)

func main() {
healthz.DefaultHealthz()

command := app.NewProxyCommand()

// TODO: once we switch everything over to Cobra commands, we can go back to calling
Expand Down
1 change: 1 addition & 0 deletions hack/.linted_packages
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ pkg/kubelet/volumemanager/cache
pkg/kubelet/volumemanager/populator
pkg/kubelet/volumemanager/reconciler
pkg/labels
pkg/master/ports
pkg/printers
pkg/proxy/config
pkg/proxy/healthcheck
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/componentconfig/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ type KubeProxyConfiguration struct {
// for all interfaces)
BindAddress string
// healthzBindAddress is the IP address and port for the health check server to serve on,
// defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces)
// defaulting to 0.0.0.0:10256
HealthzBindAddress string
// metricsBindAddress is the IP address and port for the metrics server to serve on,
// defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces)
MetricsBindAddress string
// clusterCIDR is the CIDR range of the pods in the cluster. It is used to
// bridge traffic coming from outside of the cluster. If not provided,
// no off-cluster bridging will be performed.
Expand Down
12 changes: 9 additions & 3 deletions pkg/apis/componentconfig/v1alpha1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
"fmt"
"path/filepath"
"runtime"
"strings"
Expand Down Expand Up @@ -63,10 +64,15 @@ func SetDefaults_KubeProxyConfiguration(obj *KubeProxyConfiguration) {
if len(obj.BindAddress) == 0 {
obj.BindAddress = "0.0.0.0"
}
if len(obj.HealthzBindAddress) == 0 {
obj.HealthzBindAddress = "127.0.0.1:10249"
if obj.HealthzBindAddress == "" {
obj.HealthzBindAddress = fmt.Sprintf("0.0.0.0:%v", ports.ProxyHealthzPort)
} else if !strings.Contains(obj.HealthzBindAddress, ":") {
obj.HealthzBindAddress = ":10249"
obj.HealthzBindAddress += fmt.Sprintf(":%v", ports.ProxyHealthzPort)
}
if obj.MetricsBindAddress == "" {
obj.MetricsBindAddress = fmt.Sprintf("127.0.0.1:%v", ports.ProxyStatusPort)
} else if !strings.Contains(obj.MetricsBindAddress, ":") {
obj.MetricsBindAddress += fmt.Sprintf(":%v", ports.ProxyStatusPort)
}
if obj.OOMScoreAdj == nil {
temp := int32(qos.KubeProxyOOMScoreAdj)
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/componentconfig/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ type KubeProxyConfiguration struct {
// for all interfaces)
BindAddress string `json:"bindAddress"`
// healthzBindAddress is the IP address and port for the health check server to serve on,
// defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces)
// defaulting to 0.0.0.0:10256
HealthzBindAddress string `json:"healthzBindAddress"`
// metricsBindAddress is the IP address and port for the metrics server to serve on,
// defaulting to 127.0.0.1:10249 (set to 0.0.0.0 for all interfaces)
MetricsBindAddress string `json:"metricsBindAddress"`
// clusterCIDR is the CIDR range of the pods in the cluster. It is used to
// bridge traffic coming from outside of the cluster. If not provided,
// no off-cluster bridging will be performed.
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/componentconfig/v1alpha1/zz_generated.conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_componentconfig_KubeProxyCon
out.FeatureGates = in.FeatureGates
out.BindAddress = in.BindAddress
out.HealthzBindAddress = in.HealthzBindAddress
out.MetricsBindAddress = in.MetricsBindAddress
out.ClusterCIDR = in.ClusterCIDR
out.HostnameOverride = in.HostnameOverride
if err := Convert_v1alpha1_ClientConnectionConfiguration_To_componentconfig_ClientConnectionConfiguration(&in.ClientConnection, &out.ClientConnection, s); err != nil {
Expand Down Expand Up @@ -128,6 +129,7 @@ func autoConvert_componentconfig_KubeProxyConfiguration_To_v1alpha1_KubeProxyCon
out.FeatureGates = in.FeatureGates
out.BindAddress = in.BindAddress
out.HealthzBindAddress = in.HealthzBindAddress
out.MetricsBindAddress = in.MetricsBindAddress
out.ClusterCIDR = in.ClusterCIDR
out.HostnameOverride = in.HostnameOverride
if err := Convert_componentconfig_ClientConnectionConfiguration_To_v1alpha1_ClientConnectionConfiguration(&in.ClientConnection, &out.ClientConnection, s); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/master/ports/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package ports

const (
// ProxyPort is the default port for the proxy healthz server.
// ProxyStatusPort is the default port for the proxy metrics server.
// May be overridden by a flag at startup.
ProxyStatusPort = 10249
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we use these constants to generate the strings in the defaults above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// KubeletPort is the default port for the kubelet server on each host machine.
Expand All @@ -38,4 +38,7 @@ const (
// until heapster can transition to using the SSL endpoint.
// TODO(roberthbailey): Remove this once we have a better solution for heapster.
KubeletReadOnlyPort = 10255
// ProxyHealthzPort is the default port for the proxy healthz server.
// May be overridden by a flag at startup.
ProxyHealthzPort = 10256
)
2 changes: 2 additions & 0 deletions pkg/proxy/healthcheck/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/clock:go_default_library",
],
)

Expand All @@ -34,6 +35,7 @@ go_test(
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/util/clock:go_default_library",
],
)

Expand Down
92 changes: 92 additions & 0 deletions pkg/proxy/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/golang/glog"
"github.com/renstrom/dedent"

"k8s.io/apimachinery/pkg/types"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/clock"
"k8s.io/kubernetes/pkg/api"
)

Expand Down Expand Up @@ -233,3 +236,92 @@ func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) erro
}
return nil
}

// HealthzUpdater allows callers to update healthz timestamp only.
type HealthzUpdater interface {
UpdateTimestamp()
}

// HealthzServer returns 200 "OK" by default. Once timestamp has been
// updated, it verifies we don't exceed max no respond duration since
// last update.
type HealthzServer struct {
listener Listener
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need locking?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is to use sync/atomic.Value for concurrent read/write. I think it should be sufficient?

httpFactory HTTPServerFactory
clock clock.Clock

addr string
port int32
healthTimeout time.Duration

lastUpdated atomic.Value
}

// NewDefaultHealthzServer returns a default healthz http server.
func NewDefaultHealthzServer(addr string, healthTimeout time.Duration) *HealthzServer {
return newHealthzServer(nil, nil, nil, addr, healthTimeout)
}

func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *HealthzServer {
if listener == nil {
listener = stdNetListener{}
}
if httpServerFactory == nil {
httpServerFactory = stdHTTPServerFactory{}
}
if c == nil {
c = clock.RealClock{}
}
return &HealthzServer{
listener: listener,
httpFactory: httpServerFactory,
clock: c,
addr: addr,
healthTimeout: healthTimeout,
}
}

// UpdateTimestamp updates the lastUpdated timestamp.
func (hs *HealthzServer) UpdateTimestamp() {
hs.lastUpdated.Store(hs.clock.Now())
}

// Run starts the healthz http server and returns.
func (hs *HealthzServer) Run() {
serveMux := http.NewServeMux()
serveMux.Handle("/healthz", healthzHandler{hs: hs})
server := hs.httpFactory.New(hs.addr, serveMux)
listener, err := hs.listener.Listen(hs.addr)
if err != nil {
glog.Errorf("Failed to start healthz on %s: %v", hs.addr, err)
return
}
go func() {
glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr)
if err := server.Serve(listener); err != nil {
glog.Errorf("Healhz closed: %v", err)
return
}
glog.Errorf("Unexpected healhz closed.")
}()
}

type healthzHandler struct {
hs *HealthzServer
}

func (h healthzHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
lastUpdated := time.Time{}
if val := h.hs.lastUpdated.Load(); val != nil {
lastUpdated = val.(time.Time)
}
currentTime := h.hs.clock.Now()

resp.Header().Set("Content-Type", "application/json")
if !lastUpdated.IsZero() && currentTime.After(lastUpdated.Add(h.hs.healthTimeout)) {
resp.WriteHeader(http.StatusServiceUnavailable)
} else {
resp.WriteHeader(http.StatusOK)
}
fmt.Fprintf(resp, fmt.Sprintf(`{"lastUpdated": %q,"currentTime": %q}`, lastUpdated, currentTime))
}