diff --git a/pkg/network/proxy/proxy.go b/pkg/network/proxy/proxy.go index 6f35b3c98e..65c68681b0 100644 --- a/pkg/network/proxy/proxy.go +++ b/pkg/network/proxy/proxy.go @@ -11,6 +11,7 @@ import ( "k8s.io/klog/v2" corev1 "k8s.io/api/core/v1" + discoveryv1beta1 "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ktypes "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -27,14 +28,21 @@ import ( "github.com/openshift/sdn/pkg/network/common" ) -// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints. -type EndpointsConfigHandler interface { - // OnEndpointsUpdate gets called when endpoints configuration is changed for a given - // service on any of the configuration sources. An example is when a new - // service comes up, or when containers come up or down for an existing service. - OnEndpointsUpdate(endpoints []*corev1.Endpoints) +// NoopEndpointSliceHandler is a noop handler for proxiers that have not yet +// implemented a full EndpointSliceHandler. +type NoopEndpointsHandler struct{} + +func (*NoopEndpointsHandler) OnEndpointsAdd(*corev1.Endpoints) {} + +func (*NoopEndpointsHandler) OnEndpointsUpdate(old, new *corev1.Endpoints) { } +func (*NoopEndpointsHandler) OnEndpointsDelete(*corev1.Endpoints) {} + +func (*NoopEndpointsHandler) OnEndpointsSynced() {} + +var _ kubeproxyconfig.EndpointsHandler = &NoopEndpointsHandler{} + type firewallItem struct { ruleType networkv1.EgressNetworkPolicyRuleType net *net.IPNet @@ -46,12 +54,12 @@ type proxyFirewallItem struct { } type proxyEndpoints struct { - endpoints *corev1.Endpoints + endpoints *discoveryv1beta1.EndpointSlice blocked bool } type OsdnProxy struct { - kubeproxyconfig.NoopEndpointSliceHandler + NoopEndpointsHandler sync.Mutex kClient kubernetes.Interface @@ -264,12 +272,12 @@ func (proxy *OsdnProxy) updateEgressNetworkPolicy(policy networkv1.EgressNetwork } wasBlocked := pep.blocked - pep.blocked = proxy.endpointsBlocked(pep.endpoints) + pep.blocked = proxy.endpointSliceBlocked(pep.endpoints) switch { case wasBlocked && !pep.blocked: - proxy.baseProxy.OnEndpointsAdd(pep.endpoints) + proxy.baseProxy.OnEndpointSliceAdd(pep.endpoints) case !wasBlocked && pep.blocked: - proxy.baseProxy.OnEndpointsDelete(pep.endpoints) + proxy.baseProxy.OnEndpointSliceDelete(pep.endpoints) } } } @@ -290,13 +298,13 @@ func (proxy *OsdnProxy) firewallBlocksIP(namespace string, ip net.IP) bool { return false } -func (proxy *OsdnProxy) endpointsBlocked(ep *corev1.Endpoints) bool { - for _, ss := range ep.Subsets { +func (proxy *OsdnProxy) endpointSliceBlocked(ep *discoveryv1beta1.EndpointSlice) bool { + for _, ss := range ep.Endpoints { for _, addr := range ss.Addresses { - IP := net.ParseIP(addr.IP) - if _, contains := common.ClusterNetworkListContains(proxy.networkInfo.ClusterNetworks, IP); !contains && !proxy.networkInfo.ServiceNetwork.Contains(IP) { - if proxy.firewallBlocksIP(ep.Namespace, IP) { - klog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.Name, ep.Namespace, addr.IP) + ip := net.ParseIP(addr) + if _, contains := common.ClusterNetworkListContains(proxy.networkInfo.ClusterNetworks, ip); !contains && !proxy.networkInfo.ServiceNetwork.Contains(ip) { + if proxy.firewallBlocksIP(ep.Namespace, ip) { + klog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.Name, ep.Namespace, addr) return true } } @@ -314,18 +322,18 @@ func (proxy *OsdnProxy) checkInitialized() { } } -func (proxy *OsdnProxy) OnEndpointsAdd(ep *corev1.Endpoints) { +func (proxy *OsdnProxy) OnEndpointSliceAdd(ep *discoveryv1beta1.EndpointSlice) { proxy.Lock() defer proxy.Unlock() - pep := &proxyEndpoints{ep, proxy.endpointsBlocked(ep)} + pep := &proxyEndpoints{ep, proxy.endpointSliceBlocked(ep)} proxy.allEndpoints[ep.UID] = pep if !pep.blocked { - proxy.baseProxy.OnEndpointsAdd(ep) + proxy.baseProxy.OnEndpointSliceAdd(ep) } } -func (proxy *OsdnProxy) OnEndpointsUpdate(old, ep *corev1.Endpoints) { +func (proxy *OsdnProxy) OnEndpointSliceUpdate(old, ep *discoveryv1beta1.EndpointSlice) { proxy.Lock() defer proxy.Unlock() @@ -337,19 +345,19 @@ func (proxy *OsdnProxy) OnEndpointsUpdate(old, ep *corev1.Endpoints) { } wasBlocked := pep.blocked pep.endpoints = ep - pep.blocked = proxy.endpointsBlocked(ep) + pep.blocked = proxy.endpointSliceBlocked(ep) switch { case wasBlocked && !pep.blocked: - proxy.baseProxy.OnEndpointsAdd(ep) + proxy.baseProxy.OnEndpointSliceAdd(ep) case !wasBlocked && !pep.blocked: - proxy.baseProxy.OnEndpointsUpdate(old, ep) + proxy.baseProxy.OnEndpointSliceUpdate(old, ep) case !wasBlocked && pep.blocked: - proxy.baseProxy.OnEndpointsDelete(ep) + proxy.baseProxy.OnEndpointSliceDelete(ep) } } -func (proxy *OsdnProxy) OnEndpointsDelete(ep *corev1.Endpoints) { +func (proxy *OsdnProxy) OnEndpointSliceDelete(ep *discoveryv1beta1.EndpointSlice) { proxy.Lock() defer proxy.Unlock() @@ -360,13 +368,15 @@ func (proxy *OsdnProxy) OnEndpointsDelete(ep *corev1.Endpoints) { } delete(proxy.allEndpoints, ep.UID) if !pep.blocked { - proxy.baseProxy.OnEndpointsDelete(ep) + proxy.baseProxy.OnEndpointSliceDelete(ep) } } -func (proxy *OsdnProxy) OnEndpointsSynced() { - proxy.baseProxy.OnEndpointsSynced() +func (proxy *OsdnProxy) OnEndpointSlicesSynced() { + klog.Infof("DEBUG: OsdnProxy OnEndpointSlicesSynced") + proxy.baseProxy.OnEndpointSlicesSynced() proxy.endpointsSynced = true + klog.Infof("DEBUG: OsdnProxy OnEndpointSlicesSynced true") proxy.checkInitialized() } diff --git a/pkg/network/proxyimpl/hybrid/proxy.go b/pkg/network/proxyimpl/hybrid/proxy.go index d4cc72af93..376b7bc02f 100644 --- a/pkg/network/proxyimpl/hybrid/proxy.go +++ b/pkg/network/proxyimpl/hybrid/proxy.go @@ -8,6 +8,7 @@ import ( "k8s.io/klog/v2" corev1 "k8s.io/api/core/v1" + discoveryv1beta1 "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" corev1listers "k8s.io/client-go/listers/core/v1" @@ -27,11 +28,28 @@ type RunnableProxy interface { SetSyncRunner(b *async.BoundedFrequencyRunner) } +// NoopEndpointSliceHandler is a noop handler for proxiers that have not yet +// implemented a full EndpointSliceHandler. +type NoopEndpointsHandler struct{} + +func (*NoopEndpointsHandler) OnEndpointsAdd(*corev1.Endpoints) {} + +func (*NoopEndpointsHandler) OnEndpointsUpdate(old, new *corev1.Endpoints) { +} + +func (*NoopEndpointsHandler) OnEndpointsDelete(*corev1.Endpoints) {} + +func (*NoopEndpointsHandler) OnEndpointsSynced() {} + +var _ proxyconfig.EndpointsHandler = &NoopEndpointsHandler{} + // HybridProxier runs an unidling proxy and a primary proxy at the same time, // delegating idled services to the unidling proxy and other services to the // primary proxy. type HybridProxier struct { - proxyconfig.NoopEndpointSliceHandler + // TODO implement https://github.com/kubernetes/enhancements/pull/640 + proxyconfig.NoopNodeHandler + NoopEndpointsHandler mainProxy RunnableProxy unidlingProxy RunnableProxy @@ -89,22 +107,6 @@ func NewHybridProxier( return p, nil } -func (proxier *HybridProxier) OnNodeAdd(node *corev1.Node) { - // TODO implement https://github.com/kubernetes/enhancements/pull/640 -} - -func (proxier *HybridProxier) OnNodeUpdate(oldNode, node *corev1.Node) { - // TODO implement https://github.com/kubernetes/enhancements/pull/640 -} - -func (proxier *HybridProxier) OnNodeDelete(node *corev1.Node) { - // TODO implement https://github.com/kubernetes/enhancements/pull/640 -} - -func (proxier *HybridProxier) OnNodeSynced() { - // TODO implement https://github.com/kubernetes/enhancements/pull/640 -} - func (p *HybridProxier) OnServiceAdd(service *corev1.Service) { svcName := types.NamespacedName{ Namespace: service.Namespace, @@ -190,11 +192,11 @@ func (p *HybridProxier) OnServiceSynced() { klog.V(6).Infof("hybrid proxy: services synced") } -// shouldEndpointsUseUserspace checks to see if the given endpoints have the correct +// shouldEndpointSliceUseUserspace checks to see if the given endpoints have the correct // annotations and size to use the unidling proxy. -func (p *HybridProxier) shouldEndpointsUseUserspace(endpoints *corev1.Endpoints) bool { +func (p *HybridProxier) shouldEndpointSliceUseUserspace(endpoints *discoveryv1beta1.EndpointSlice) bool { hasEndpoints := false - for _, subset := range endpoints.Subsets { + for _, subset := range endpoints.Endpoints { if len(subset.Addresses) > 0 { hasEndpoints = true break @@ -244,11 +246,11 @@ func (p *HybridProxier) switchService(name types.NamespacedName) { p.switchedToUserspace[name] = p.usingUserspace[name] } -func (p *HybridProxier) OnEndpointsAdd(endpoints *corev1.Endpoints) { +func (p *HybridProxier) OnEndpointSliceAdd(endpoints *discoveryv1beta1.EndpointSlice) { // we track all endpoints in the unidling endpoints handler so that we can succesfully // detect when a service become unidling klog.V(6).Infof("hybrid proxy: (always) add ep %s/%s in unidling proxy", endpoints.Namespace, endpoints.Name) - p.unidlingProxy.OnEndpointsAdd(endpoints) + p.unidlingProxy.OnEndpointSliceAdd(endpoints) p.usingUserspaceLock.Lock() defer p.usingUserspaceLock.Unlock() @@ -259,11 +261,11 @@ func (p *HybridProxier) OnEndpointsAdd(endpoints *corev1.Endpoints) { } wasUsingUserspace, knownEndpoints := p.usingUserspace[svcName] - p.usingUserspace[svcName] = p.shouldEndpointsUseUserspace(endpoints) + p.usingUserspace[svcName] = p.shouldEndpointSliceUseUserspace(endpoints) if !p.usingUserspace[svcName] { klog.V(6).Infof("hybrid proxy: add ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name) - p.mainProxy.OnEndpointsAdd(endpoints) + p.mainProxy.OnEndpointSliceAdd(endpoints) } // a service could appear before endpoints, so we have to treat this as a potential @@ -273,11 +275,11 @@ func (p *HybridProxier) OnEndpointsAdd(endpoints *corev1.Endpoints) { } } -func (p *HybridProxier) OnEndpointsUpdate(oldEndpoints, endpoints *corev1.Endpoints) { +func (p *HybridProxier) OnEndpointSliceUpdate(oldEndpoints, endpoints *discoveryv1beta1.EndpointSlice) { // we track all endpoints in the unidling endpoints handler so that we can succesfully // detect when a service become unidling klog.V(6).Infof("hybrid proxy: (always) update ep %s/%s in unidling proxy", endpoints.Namespace, endpoints.Name) - p.unidlingProxy.OnEndpointsUpdate(oldEndpoints, endpoints) + p.unidlingProxy.OnEndpointSliceUpdate(oldEndpoints, endpoints) p.usingUserspaceLock.Lock() defer p.usingUserspaceLock.Unlock() @@ -288,7 +290,7 @@ func (p *HybridProxier) OnEndpointsUpdate(oldEndpoints, endpoints *corev1.Endpoi } wasUsingUserspace, knownEndpoints := p.usingUserspace[svcName] - p.usingUserspace[svcName] = p.shouldEndpointsUseUserspace(endpoints) + p.usingUserspace[svcName] = p.shouldEndpointSliceUseUserspace(endpoints) if !knownEndpoints { utilruntime.HandleError(fmt.Errorf("received update for unknown endpoints %s", svcName.String())) @@ -299,26 +301,26 @@ func (p *HybridProxier) OnEndpointsUpdate(oldEndpoints, endpoints *corev1.Endpoi if !isSwitch && !p.usingUserspace[svcName] { klog.V(6).Infof("hybrid proxy: update ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name) - p.mainProxy.OnEndpointsUpdate(oldEndpoints, endpoints) + p.mainProxy.OnEndpointSliceUpdate(oldEndpoints, endpoints) return } if p.usingUserspace[svcName] { klog.V(6).Infof("hybrid proxy: del ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name) - p.mainProxy.OnEndpointsDelete(oldEndpoints) + p.mainProxy.OnEndpointSliceDelete(oldEndpoints) } else { klog.V(6).Infof("hybrid proxy: add ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name) - p.mainProxy.OnEndpointsAdd(endpoints) + p.mainProxy.OnEndpointSliceAdd(endpoints) } p.switchService(svcName) } -func (p *HybridProxier) OnEndpointsDelete(endpoints *corev1.Endpoints) { +func (p *HybridProxier) OnEndpointSliceDelete(endpoints *discoveryv1beta1.EndpointSlice) { // we track all endpoints in the unidling endpoints handler so that we can succesfully // detect when a service become unidling klog.V(6).Infof("hybrid proxy: (always) del ep %s/%s in unidling proxy", endpoints.Namespace, endpoints.Name) - p.unidlingProxy.OnEndpointsDelete(endpoints) + p.unidlingProxy.OnEndpointSliceDelete(endpoints) // Careful - there is the potential for deadlocks here, // except that we always get usingUserspaceLock first, then @@ -340,15 +342,15 @@ func (p *HybridProxier) OnEndpointsDelete(endpoints *corev1.Endpoints) { if !usingUserspace { klog.V(6).Infof("hybrid proxy: del ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name) - p.mainProxy.OnEndpointsDelete(endpoints) + p.mainProxy.OnEndpointSliceDelete(endpoints) } p.cleanupState(svcName) } -func (p *HybridProxier) OnEndpointsSynced() { - p.unidlingProxy.OnEndpointsSynced() - p.mainProxy.OnEndpointsSynced() +func (p *HybridProxier) OnEndpointSlicesSynced() { + p.unidlingProxy.OnEndpointSlicesSynced() + p.mainProxy.OnEndpointSlicesSynced() klog.V(6).Infof("hybrid proxy: endpoints synced") } diff --git a/pkg/openshift-sdn/cmd.go b/pkg/openshift-sdn/cmd.go index 74002588b9..0ebbc55d0b 100644 --- a/pkg/openshift-sdn/cmd.go +++ b/pkg/openshift-sdn/cmd.go @@ -55,6 +55,7 @@ func NewOpenShiftSDNCommand(basename string, errout io.Writer) *cobra.Command { Short: "Start OpenShiftSDN", Long: networkLong, Run: func(c *cobra.Command, _ []string) { + c.Flags().Lookup("v").Value.Set("5") ch := make(chan struct{}) interrupt.New(func(s os.Signal) { fmt.Fprintf(errout, "interrupt: Gracefully shutting down ...\n") diff --git a/pkg/openshift-sdn/proxy.go b/pkg/openshift-sdn/proxy.go index 16c12c6c02..aa9bb4fbcd 100644 --- a/pkg/openshift-sdn/proxy.go +++ b/pkg/openshift-sdn/proxy.go @@ -16,14 +16,12 @@ import ( utilwait "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/mux" "k8s.io/apiserver/pkg/server/routes" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes/scheme" kv1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" kubeproxyoptions "k8s.io/kubernetes/cmd/kube-proxy/app" - "k8s.io/kubernetes/pkg/features" proxy "k8s.io/kubernetes/pkg/proxy" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" pconfig "k8s.io/kubernetes/pkg/proxy/config" @@ -67,13 +65,6 @@ func (sdn *OpenShiftSDN) runProxy(waitChan chan<- bool) { return } - if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) || - utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) { - klog.Warningf("kube-proxy has unsupported EndpointSlice/EndpointSliceProxying gates enabled") - close(waitChan) - return - } - bindAddr := net.ParseIP(sdn.ProxyConfig.BindAddress) nodeAddr := bindAddr @@ -214,8 +205,8 @@ func (sdn *OpenShiftSDN) runProxy(waitChan chan<- bool) { } } - endpointsConfig := pconfig.NewEndpointsConfig( - sdn.informers.KubeInformers.Core().V1().Endpoints(), + endpointsConfig := pconfig.NewEndpointSliceConfig( + sdn.informers.KubeInformers.Discovery().V1beta1().EndpointSlices(), sdn.ProxyConfig.IPTables.SyncPeriod.Duration, ) // customized handling registration that inserts a filter if needed