Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge duplicated linux/windows kube-proxy setup code #118017

Merged
merged 2 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 55 additions & 1 deletion cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz"
Expand All @@ -57,11 +58,13 @@ import (
"k8s.io/component-base/configz"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/metrics"
metricsfeatures "k8s.io/component-base/metrics/features"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/prometheus/slis"
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/klog/v2"
"k8s.io/kube-proxy/config/v1alpha1"
api "k8s.io/kubernetes/pkg/apis/core"
Expand Down Expand Up @@ -319,7 +322,7 @@ func (o *Options) Run() error {
return cleanupAndExit()
}

proxyServer, err := NewProxyServer(o)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope nobody is importing this

proxyServer, err := newProxyServer(o.config, o.master)
if err != nil {
return err
}
Expand Down Expand Up @@ -516,10 +519,61 @@ type ProxyServer struct {
Recorder events.EventRecorder
NodeRef *v1.ObjectReference
HealthzServer healthcheck.ProxierHealthUpdater
Hostname string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be internal variables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean hostname rather than Hostname? yeah, the whole struct is wrong that way

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep yep

NodeIP net.IP

Proxier proxy.Provider
}

// newProxyServer creates a ProxyServer based on the given config
func newProxyServer(config *kubeproxyconfig.KubeProxyConfiguration, master string) (*ProxyServer, error) {
s := &ProxyServer{Config: config}

cz, err := configz.New(kubeproxyconfig.GroupName)
if err != nil {
return nil, fmt.Errorf("unable to register configz: %s", err)
}
cz.Set(config)

if len(config.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}

s.Hostname, err = nodeutil.GetHostname(config.HostnameOverride)
if err != nil {
return nil, err
}

s.Client, err = createClient(config.ClientConnection, master)
if err != nil {
return nil, err
}

s.NodeIP = detectNodeIP(s.Client, s.Hostname, config.BindAddress)
klog.InfoS("Detected node IP", "address", s.NodeIP.String())

s.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: s.Client.EventsV1()})
s.Recorder = s.Broadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")

s.NodeRef = &v1.ObjectReference{
Kind: "Node",
Name: s.Hostname,
UID: types.UID(s.Hostname),
Namespace: "",
}

if len(config.HealthzBindAddress) > 0 {
s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, s.Recorder, s.NodeRef)
}

s.Proxier, err = s.createProxier(config)
if err != nil {
return nil, err
}

return s, nil
}

// createClient creates a kube client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed.
func createClient(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
Expand Down
109 changes: 23 additions & 86 deletions cmd/kube-proxy/app/server_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,16 @@ import (

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"

"k8s.io/apimachinery/pkg/fields"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
toolswatch "k8s.io/client-go/tools/watch"
"k8s.io/component-base/configz"
"k8s.io/component-base/metrics"
nodeutil "k8s.io/component-helpers/node/util"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
"k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/proxy/ipvs"
proxymetrics "k8s.io/kubernetes/pkg/proxy/metrics"
Expand Down Expand Up @@ -83,63 +76,15 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur
klog.V(2).InfoS("DetectLocalMode", "localMode", string(config.DetectLocalMode))
}

// NewProxyServer returns a new ProxyServer.
func NewProxyServer(o *Options) (*ProxyServer, error) {
return newProxyServer(o.config, o.master)
}

func newProxyServer(
config *proxyconfigapi.KubeProxyConfiguration,
master string) (*ProxyServer, error) {

if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
c.Set(config)
} else {
return nil, fmt.Errorf("unable to register configz: %s", err)
}

var ipvsInterface utilipvs.Interface
var ipsetInterface utilipset.Interface

if len(config.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}

hostname, err := nodeutil.GetHostname(config.HostnameOverride)
if err != nil {
return nil, err
}

client, err := createClient(config.ClientConnection, master)
if err != nil {
return nil, err
}

nodeIP := detectNodeIP(client, hostname, config.BindAddress)
klog.InfoS("Detected node IP", "address", nodeIP.String())

// Create event recorder
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "kube-proxy")

nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: hostname,
UID: types.UID(hostname),
Namespace: "",
}

var healthzServer healthcheck.ProxierHealthUpdater
if len(config.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
}

// createProxier creates the proxy.Provider
func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration) (proxy.Provider, error) {
var proxier proxy.Provider
var err error

var nodeInfo *v1.Node
if config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", hostname)
nodeInfo, err = waitForPodCIDR(client, hostname)
klog.InfoS("Watching for node, awaiting podCIDR allocation", "hostname", s.Hostname)
nodeInfo, err = waitForPodCIDR(s.Client, s.Hostname)
if err != nil {
return nil, err
}
Expand All @@ -148,7 +93,7 @@ func newProxyServer(

primaryFamily := v1.IPv4Protocol
primaryProtocol := utiliptables.ProtocolIPv4
if netutils.IsIPv6(nodeIP) {
if netutils.IsIPv6(s.NodeIP) {
primaryFamily = v1.IPv6Protocol
primaryProtocol = utiliptables.ProtocolIPv6
}
Expand Down Expand Up @@ -210,10 +155,10 @@ func newProxyServer(
*config.IPTables.LocalhostNodePorts,
int(*config.IPTables.MasqueradeBit),
localDetectors,
hostname,
s.Hostname,
nodeIPTuple(config.BindAddress),
recorder,
healthzServer,
s.Recorder,
s.HealthzServer,
nodePortAddresses,
)
} else {
Expand All @@ -236,10 +181,10 @@ func newProxyServer(
*config.IPTables.LocalhostNodePorts,
int(*config.IPTables.MasqueradeBit),
localDetector,
hostname,
nodeIP,
recorder,
healthzServer,
s.Hostname,
s.NodeIP,
s.Recorder,
s.HealthzServer,
nodePortAddresses,
)
}
Expand All @@ -249,8 +194,8 @@ func newProxyServer(
}
} else if config.Mode == proxyconfigapi.ProxyModeIPVS {
kernelHandler := ipvs.NewLinuxKernelHandler()
ipsetInterface = utilipset.New(execer)
ipvsInterface = utilipvs.New()
ipsetInterface := utilipset.New(execer)
ipvsInterface := utilipvs.New()
if err := ipvs.CanUseIPVSProxier(ipvsInterface, ipsetInterface, config.IPVS.Scheduler); err != nil {
return nil, fmt.Errorf("can't use the IPVS proxier: %v", err)
}
Expand Down Expand Up @@ -284,10 +229,10 @@ func newProxyServer(
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
localDetectors,
hostname,
s.Hostname,
nodeIPs,
recorder,
healthzServer,
s.Recorder,
s.HealthzServer,
config.IPVS.Scheduler,
nodePortAddresses,
kernelHandler,
Expand Down Expand Up @@ -316,10 +261,10 @@ func newProxyServer(
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
localDetector,
hostname,
nodeIP,
recorder,
healthzServer,
s.Hostname,
s.NodeIP,
s.Recorder,
s.HealthzServer,
config.IPVS.Scheduler,
nodePortAddresses,
kernelHandler,
Expand All @@ -330,15 +275,7 @@ func newProxyServer(
}
}

return &ProxyServer{
Config: config,
Client: client,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
NodeRef: nodeRef,
HealthzServer: healthzServer,
}, nil
return proxier, nil
}

func (s *ProxyServer) platformSetup() error {
Expand Down
75 changes: 10 additions & 65 deletions cmd/kube-proxy/app/server_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,9 @@ import (
// Enable pprof HTTP handlers.
_ "net/http/pprof"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/events"
"k8s.io/component-base/configz"
"k8s.io/component-base/metrics"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/winkernel"
)

Expand All @@ -50,49 +42,10 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur
}
}

// NewProxyServer returns a new ProxyServer.
func NewProxyServer(o *Options) (*ProxyServer, error) {
return newProxyServer(o.config, o.master)
}

func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string) (*ProxyServer, error) {
if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
c.Set(config)
} else {
return nil, fmt.Errorf("unable to register configz: %s", err)
}

if len(config.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}

client, err := createClient(config.ClientConnection, master)
if err != nil {
return nil, err
}

// Create event recorder
hostname, err := nodeutil.GetHostname(config.HostnameOverride)
if err != nil {
return nil, err
}
nodeIP := detectNodeIP(client, hostname, config.BindAddress)
klog.InfoS("Detected node IP", "IP", nodeIP.String())

eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
recorder := eventBroadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")

nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: hostname,
UID: types.UID(hostname),
Namespace: "",
}

var healthzServer healthcheck.ProxierHealthUpdater
// createProxier creates the proxy.Provider
func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration) (proxy.Provider, error) {
var healthzPort int
if len(config.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
_, port, _ := net.SplitHostPort(config.HealthzBindAddress)
healthzPort, _ = strconv.Atoi(port)
}
Expand All @@ -113,10 +66,10 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration,
config.ClusterCIDR,
hostname,
s.Hostname,
nodeIPTuple(config.BindAddress),
recorder,
healthzServer,
s.Recorder,
s.HealthzServer,
config.Winkernel,
healthzPort,
)
Expand All @@ -125,10 +78,10 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration,
config.ClusterCIDR,
hostname,
nodeIP,
recorder,
healthzServer,
s.Hostname,
s.NodeIP,
s.Recorder,
s.HealthzServer,
config.Winkernel,
healthzPort,
)
Expand All @@ -137,15 +90,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, master string
return nil, fmt.Errorf("unable to create proxier: %v", err)
}

return &ProxyServer{
Config: config,
Client: client,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
NodeRef: nodeRef,
HealthzServer: healthzServer,
}, nil
return proxier, nil
}

func (s *ProxyServer) platformSetup() error {
Expand Down