Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge duplicated linux/windows kube-proxy setup code #118017

Merged
merged 2 commits into from May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
117 changes: 59 additions & 58 deletions cmd/kube-proxy/app/server.go
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz"
Expand All @@ -57,11 +58,13 @@ import (
"k8s.io/component-base/configz"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/metrics"
metricsfeatures "k8s.io/component-base/metrics/features"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/prometheus/slis"
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/klog/v2"
"k8s.io/kube-proxy/config/v1alpha1"
api "k8s.io/kubernetes/pkg/apis/core"
Expand Down Expand Up @@ -319,7 +322,7 @@ func (o *Options) Run() error {
return cleanupAndExit()
}

proxyServer, err := NewProxyServer(o)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope nobody is importing this

proxyServer, err := newProxyServer(o.config, o.master)
if err != nil {
return err
}
Expand Down Expand Up @@ -514,13 +517,63 @@ type ProxyServer struct {
Client clientset.Interface
Broadcaster events.EventBroadcaster
Recorder events.EventRecorder
Conntracker Conntracker // if nil, ignored
NodeRef *v1.ObjectReference
HealthzServer healthcheck.ProxierHealthUpdater
Hostname string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be internal variables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean hostname rather than Hostname? yeah, the whole struct is wrong that way

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep yep

NodeIP net.IP

Proxier proxy.Provider
}

// newProxyServer creates a ProxyServer based on the given config
func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master string) (*ProxyServer, error) {
s := &ProxyServer{Config: config}

cz, err := configz.New(kubeproxyconfig.GroupName)
if err != nil {
return nil, fmt.Errorf("unable to register configz: %s", err)
}
cz.Set(config)

if len(config.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}

s.Hostname, err = nodeutil.GetHostname(config.HostnameOverride)
if err != nil {
return nil, err
}

s.Client, err = createClient(config.ClientConnection, master)
if err != nil {
return nil, err
}

s.NodeIP = detectNodeIP(s.Client, s.Hostname, config.BindAddress)
klog.InfoS("Detected node IP", "address", s.NodeIP.String())

s.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: s.Client.EventsV1()})
s.Recorder = s.Broadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")

s.NodeRef = &v1.ObjectReference{
Kind: "Node",
Name: s.Hostname,
UID: types.UID(s.Hostname),
Namespace: "",
}

if len(config.HealthzBindAddress) > 0 {
s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, s.Recorder, s.NodeRef)
}

s.Proxier, err = s.createProxier(config)
if err != nil {
return nil, err
}

return s, nil
}

// createClient creates a kube client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed.
func createClient(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
Expand Down Expand Up @@ -652,45 +705,10 @@ func (s *ProxyServer) Run() error {
// Start up a metrics server if requested
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, errCh)

// Tune conntrack, if requested
// Conntracker is always nil for windows
if s.Conntracker != nil {
max, err := getConntrackMax(s.Config.Conntrack)
if err != nil {
return err
}
if max > 0 {
err := s.Conntracker.SetMax(max)
if err != nil {
if err != errReadOnlySysFS {
return err
}
// errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
// the only remediation we know is to restart the docker daemon.
// Here we'll send an node event with specific reason and message, the
// administrator should decide whether and how to handle this issue,
// whether to drain the node and restart docker. Occurs in other container runtimes
// as well.
// TODO(random-liu): Remove this when the docker bug is fixed.
const message = "CRI error: /sys is read-only: " +
"cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, err.Error(), "StartKubeProxy", message)
}
}

if s.Config.Conntrack.TCPEstablishedTimeout != nil && s.Config.Conntrack.TCPEstablishedTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.TCPEstablishedTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
return err
}
}

if s.Config.Conntrack.TCPCloseWaitTimeout != nil && s.Config.Conntrack.TCPCloseWaitTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.TCPCloseWaitTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
return err
}
}
// Do platform-specific setup
err := s.platformSetup()
if err != nil {
return err
}

noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
Expand Down Expand Up @@ -758,23 +776,6 @@ func (s *ProxyServer) birthCry() {
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeNormal, "Starting", "StartKubeProxy", "")
}

func getConntrackMax(config kubeproxyconfig.KubeProxyConntrackConfiguration) (int, error) {
if config.MaxPerCore != nil && *config.MaxPerCore > 0 {
floor := 0
if config.Min != nil {
floor = int(*config.Min)
}
scaled := int(*config.MaxPerCore) * detectNumCPU()
if scaled > floor {
klog.V(3).InfoS("GetConntrackMax: using scaled conntrack-max-per-core")
return scaled, nil
}
klog.V(3).InfoS("GetConntrackMax: using conntrack-min")
return floor, nil
}
return 0, nil
}

// detectNodeIP returns the nodeIP used by the proxier
// The order of precedence is:
// 1. config.bindAddress if bindAddress is not 0.0.0.0 or ::
Expand Down