Skip to content

Commit

Permalink
WIP: Support EndpointSlice in sdn
Browse files Browse the repository at this point in the history
  • Loading branch information
smarterclayton committed Mar 5, 2021
1 parent 733faac commit c918927
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 76 deletions.
68 changes: 39 additions & 29 deletions pkg/network/proxy/proxy.go
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/klog/v2"

corev1 "k8s.io/api/core/v1"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ktypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -27,14 +28,21 @@ import (
"github.com/openshift/sdn/pkg/network/common"
)

// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
type EndpointsConfigHandler interface {
// OnEndpointsUpdate gets called when endpoints configuration is changed for a given
// service on any of the configuration sources. An example is when a new
// service comes up, or when containers come up or down for an existing service.
OnEndpointsUpdate(endpoints []*corev1.Endpoints)
// NoopEndpointSliceHandler is a noop handler for proxiers that have not yet
// implemented a full EndpointSliceHandler.
type NoopEndpointsHandler struct{}

func (*NoopEndpointsHandler) OnEndpointsAdd(*corev1.Endpoints) {}

func (*NoopEndpointsHandler) OnEndpointsUpdate(old, new *corev1.Endpoints) {
}

func (*NoopEndpointsHandler) OnEndpointsDelete(*corev1.Endpoints) {}

func (*NoopEndpointsHandler) OnEndpointsSynced() {}

var _ kubeproxyconfig.EndpointsHandler = &NoopEndpointsHandler{}

type firewallItem struct {
ruleType networkv1.EgressNetworkPolicyRuleType
net *net.IPNet
Expand All @@ -46,12 +54,12 @@ type proxyFirewallItem struct {
}

type proxyEndpoints struct {
endpoints *corev1.Endpoints
endpoints *discoveryv1beta1.EndpointSlice
blocked bool
}

type OsdnProxy struct {
kubeproxyconfig.NoopEndpointSliceHandler
NoopEndpointsHandler
sync.Mutex

kClient kubernetes.Interface
Expand Down Expand Up @@ -264,12 +272,12 @@ func (proxy *OsdnProxy) updateEgressNetworkPolicy(policy networkv1.EgressNetwork
}

wasBlocked := pep.blocked
pep.blocked = proxy.endpointsBlocked(pep.endpoints)
pep.blocked = proxy.endpointSliceBlocked(pep.endpoints)
switch {
case wasBlocked && !pep.blocked:
proxy.baseProxy.OnEndpointsAdd(pep.endpoints)
proxy.baseProxy.OnEndpointSliceAdd(pep.endpoints)
case !wasBlocked && pep.blocked:
proxy.baseProxy.OnEndpointsDelete(pep.endpoints)
proxy.baseProxy.OnEndpointSliceDelete(pep.endpoints)
}
}
}
Expand All @@ -290,13 +298,13 @@ func (proxy *OsdnProxy) firewallBlocksIP(namespace string, ip net.IP) bool {
return false
}

func (proxy *OsdnProxy) endpointsBlocked(ep *corev1.Endpoints) bool {
for _, ss := range ep.Subsets {
func (proxy *OsdnProxy) endpointSliceBlocked(ep *discoveryv1beta1.EndpointSlice) bool {
for _, ss := range ep.Endpoints {
for _, addr := range ss.Addresses {
IP := net.ParseIP(addr.IP)
if _, contains := common.ClusterNetworkListContains(proxy.networkInfo.ClusterNetworks, IP); !contains && !proxy.networkInfo.ServiceNetwork.Contains(IP) {
if proxy.firewallBlocksIP(ep.Namespace, IP) {
klog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.Name, ep.Namespace, addr.IP)
ip := net.ParseIP(addr)
if _, contains := common.ClusterNetworkListContains(proxy.networkInfo.ClusterNetworks, ip); !contains && !proxy.networkInfo.ServiceNetwork.Contains(ip) {
if proxy.firewallBlocksIP(ep.Namespace, ip) {
klog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.Name, ep.Namespace, addr)
return true
}
}
Expand All @@ -314,18 +322,18 @@ func (proxy *OsdnProxy) checkInitialized() {
}
}

func (proxy *OsdnProxy) OnEndpointsAdd(ep *corev1.Endpoints) {
func (proxy *OsdnProxy) OnEndpointSliceAdd(ep *discoveryv1beta1.EndpointSlice) {
proxy.Lock()
defer proxy.Unlock()

pep := &proxyEndpoints{ep, proxy.endpointsBlocked(ep)}
pep := &proxyEndpoints{ep, proxy.endpointSliceBlocked(ep)}
proxy.allEndpoints[ep.UID] = pep
if !pep.blocked {
proxy.baseProxy.OnEndpointsAdd(ep)
proxy.baseProxy.OnEndpointSliceAdd(ep)
}
}

func (proxy *OsdnProxy) OnEndpointsUpdate(old, ep *corev1.Endpoints) {
func (proxy *OsdnProxy) OnEndpointSliceUpdate(old, ep *discoveryv1beta1.EndpointSlice) {
proxy.Lock()
defer proxy.Unlock()

Expand All @@ -337,19 +345,19 @@ func (proxy *OsdnProxy) OnEndpointsUpdate(old, ep *corev1.Endpoints) {
}
wasBlocked := pep.blocked
pep.endpoints = ep
pep.blocked = proxy.endpointsBlocked(ep)
pep.blocked = proxy.endpointSliceBlocked(ep)

switch {
case wasBlocked && !pep.blocked:
proxy.baseProxy.OnEndpointsAdd(ep)
proxy.baseProxy.OnEndpointSliceAdd(ep)
case !wasBlocked && !pep.blocked:
proxy.baseProxy.OnEndpointsUpdate(old, ep)
proxy.baseProxy.OnEndpointSliceUpdate(old, ep)
case !wasBlocked && pep.blocked:
proxy.baseProxy.OnEndpointsDelete(ep)
proxy.baseProxy.OnEndpointSliceDelete(ep)
}
}

func (proxy *OsdnProxy) OnEndpointsDelete(ep *corev1.Endpoints) {
func (proxy *OsdnProxy) OnEndpointSliceDelete(ep *discoveryv1beta1.EndpointSlice) {
proxy.Lock()
defer proxy.Unlock()

Expand All @@ -360,13 +368,15 @@ func (proxy *OsdnProxy) OnEndpointsDelete(ep *corev1.Endpoints) {
}
delete(proxy.allEndpoints, ep.UID)
if !pep.blocked {
proxy.baseProxy.OnEndpointsDelete(ep)
proxy.baseProxy.OnEndpointSliceDelete(ep)
}
}

func (proxy *OsdnProxy) OnEndpointsSynced() {
proxy.baseProxy.OnEndpointsSynced()
func (proxy *OsdnProxy) OnEndpointSlicesSynced() {
klog.Infof("DEBUG: OsdnProxy OnEndpointSlicesSynced")
proxy.baseProxy.OnEndpointSlicesSynced()
proxy.endpointsSynced = true
klog.Infof("DEBUG: OsdnProxy OnEndpointSlicesSynced true")
proxy.checkInitialized()
}

Expand Down
74 changes: 38 additions & 36 deletions pkg/network/proxyimpl/hybrid/proxy.go
Expand Up @@ -8,6 +8,7 @@ import (
"k8s.io/klog/v2"

corev1 "k8s.io/api/core/v1"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
corev1listers "k8s.io/client-go/listers/core/v1"
Expand All @@ -27,11 +28,28 @@ type RunnableProxy interface {
SetSyncRunner(b *async.BoundedFrequencyRunner)
}

// NoopEndpointSliceHandler is a noop handler for proxiers that have not yet
// implemented a full EndpointSliceHandler.
type NoopEndpointsHandler struct{}

func (*NoopEndpointsHandler) OnEndpointsAdd(*corev1.Endpoints) {}

func (*NoopEndpointsHandler) OnEndpointsUpdate(old, new *corev1.Endpoints) {
}

func (*NoopEndpointsHandler) OnEndpointsDelete(*corev1.Endpoints) {}

func (*NoopEndpointsHandler) OnEndpointsSynced() {}

var _ proxyconfig.EndpointsHandler = &NoopEndpointsHandler{}

// HybridProxier runs an unidling proxy and a primary proxy at the same time,
// delegating idled services to the unidling proxy and other services to the
// primary proxy.
type HybridProxier struct {
proxyconfig.NoopEndpointSliceHandler
// TODO implement https://github.com/kubernetes/enhancements/pull/640
proxyconfig.NoopNodeHandler
NoopEndpointsHandler

mainProxy RunnableProxy
unidlingProxy RunnableProxy
Expand Down Expand Up @@ -89,22 +107,6 @@ func NewHybridProxier(
return p, nil
}

func (proxier *HybridProxier) OnNodeAdd(node *corev1.Node) {
// TODO implement https://github.com/kubernetes/enhancements/pull/640
}

func (proxier *HybridProxier) OnNodeUpdate(oldNode, node *corev1.Node) {
// TODO implement https://github.com/kubernetes/enhancements/pull/640
}

func (proxier *HybridProxier) OnNodeDelete(node *corev1.Node) {
// TODO implement https://github.com/kubernetes/enhancements/pull/640
}

func (proxier *HybridProxier) OnNodeSynced() {
// TODO implement https://github.com/kubernetes/enhancements/pull/640
}

func (p *HybridProxier) OnServiceAdd(service *corev1.Service) {
svcName := types.NamespacedName{
Namespace: service.Namespace,
Expand Down Expand Up @@ -190,11 +192,11 @@ func (p *HybridProxier) OnServiceSynced() {
klog.V(6).Infof("hybrid proxy: services synced")
}

// shouldEndpointsUseUserspace checks to see if the given endpoints have the correct
// shouldEndpointSliceUseUserspace checks to see if the given endpoints have the correct
// annotations and size to use the unidling proxy.
func (p *HybridProxier) shouldEndpointsUseUserspace(endpoints *corev1.Endpoints) bool {
func (p *HybridProxier) shouldEndpointSliceUseUserspace(endpoints *discoveryv1beta1.EndpointSlice) bool {
hasEndpoints := false
for _, subset := range endpoints.Subsets {
for _, subset := range endpoints.Endpoints {
if len(subset.Addresses) > 0 {
hasEndpoints = true
break
Expand Down Expand Up @@ -244,11 +246,11 @@ func (p *HybridProxier) switchService(name types.NamespacedName) {
p.switchedToUserspace[name] = p.usingUserspace[name]
}

func (p *HybridProxier) OnEndpointsAdd(endpoints *corev1.Endpoints) {
func (p *HybridProxier) OnEndpointSliceAdd(endpoints *discoveryv1beta1.EndpointSlice) {
// we track all endpoints in the unidling endpoints handler so that we can succesfully
// detect when a service become unidling
klog.V(6).Infof("hybrid proxy: (always) add ep %s/%s in unidling proxy", endpoints.Namespace, endpoints.Name)
p.unidlingProxy.OnEndpointsAdd(endpoints)
p.unidlingProxy.OnEndpointSliceAdd(endpoints)

p.usingUserspaceLock.Lock()
defer p.usingUserspaceLock.Unlock()
Expand All @@ -259,11 +261,11 @@ func (p *HybridProxier) OnEndpointsAdd(endpoints *corev1.Endpoints) {
}

wasUsingUserspace, knownEndpoints := p.usingUserspace[svcName]
p.usingUserspace[svcName] = p.shouldEndpointsUseUserspace(endpoints)
p.usingUserspace[svcName] = p.shouldEndpointSliceUseUserspace(endpoints)

if !p.usingUserspace[svcName] {
klog.V(6).Infof("hybrid proxy: add ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name)
p.mainProxy.OnEndpointsAdd(endpoints)
p.mainProxy.OnEndpointSliceAdd(endpoints)
}

// a service could appear before endpoints, so we have to treat this as a potential
Expand All @@ -273,11 +275,11 @@ func (p *HybridProxier) OnEndpointsAdd(endpoints *corev1.Endpoints) {
}
}

func (p *HybridProxier) OnEndpointsUpdate(oldEndpoints, endpoints *corev1.Endpoints) {
func (p *HybridProxier) OnEndpointSliceUpdate(oldEndpoints, endpoints *discoveryv1beta1.EndpointSlice) {
// we track all endpoints in the unidling endpoints handler so that we can succesfully
// detect when a service become unidling
klog.V(6).Infof("hybrid proxy: (always) update ep %s/%s in unidling proxy", endpoints.Namespace, endpoints.Name)
p.unidlingProxy.OnEndpointsUpdate(oldEndpoints, endpoints)
p.unidlingProxy.OnEndpointSliceUpdate(oldEndpoints, endpoints)

p.usingUserspaceLock.Lock()
defer p.usingUserspaceLock.Unlock()
Expand All @@ -288,7 +290,7 @@ func (p *HybridProxier) OnEndpointsUpdate(oldEndpoints, endpoints *corev1.Endpoi
}

wasUsingUserspace, knownEndpoints := p.usingUserspace[svcName]
p.usingUserspace[svcName] = p.shouldEndpointsUseUserspace(endpoints)
p.usingUserspace[svcName] = p.shouldEndpointSliceUseUserspace(endpoints)

if !knownEndpoints {
utilruntime.HandleError(fmt.Errorf("received update for unknown endpoints %s", svcName.String()))
Expand All @@ -299,26 +301,26 @@ func (p *HybridProxier) OnEndpointsUpdate(oldEndpoints, endpoints *corev1.Endpoi

if !isSwitch && !p.usingUserspace[svcName] {
klog.V(6).Infof("hybrid proxy: update ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name)
p.mainProxy.OnEndpointsUpdate(oldEndpoints, endpoints)
p.mainProxy.OnEndpointSliceUpdate(oldEndpoints, endpoints)
return
}

if p.usingUserspace[svcName] {
klog.V(6).Infof("hybrid proxy: del ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name)
p.mainProxy.OnEndpointsDelete(oldEndpoints)
p.mainProxy.OnEndpointSliceDelete(oldEndpoints)
} else {
klog.V(6).Infof("hybrid proxy: add ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name)
p.mainProxy.OnEndpointsAdd(endpoints)
p.mainProxy.OnEndpointSliceAdd(endpoints)
}

p.switchService(svcName)
}

func (p *HybridProxier) OnEndpointsDelete(endpoints *corev1.Endpoints) {
func (p *HybridProxier) OnEndpointSliceDelete(endpoints *discoveryv1beta1.EndpointSlice) {
// we track all endpoints in the unidling endpoints handler so that we can succesfully
// detect when a service become unidling
klog.V(6).Infof("hybrid proxy: (always) del ep %s/%s in unidling proxy", endpoints.Namespace, endpoints.Name)
p.unidlingProxy.OnEndpointsDelete(endpoints)
p.unidlingProxy.OnEndpointSliceDelete(endpoints)

// Careful - there is the potential for deadlocks here,
// except that we always get usingUserspaceLock first, then
Expand All @@ -340,15 +342,15 @@ func (p *HybridProxier) OnEndpointsDelete(endpoints *corev1.Endpoints) {

if !usingUserspace {
klog.V(6).Infof("hybrid proxy: del ep %s/%s in main proxy", endpoints.Namespace, endpoints.Name)
p.mainProxy.OnEndpointsDelete(endpoints)
p.mainProxy.OnEndpointSliceDelete(endpoints)
}

p.cleanupState(svcName)
}

func (p *HybridProxier) OnEndpointsSynced() {
p.unidlingProxy.OnEndpointsSynced()
p.mainProxy.OnEndpointsSynced()
func (p *HybridProxier) OnEndpointSlicesSynced() {
p.unidlingProxy.OnEndpointSlicesSynced()
p.mainProxy.OnEndpointSlicesSynced()
klog.V(6).Infof("hybrid proxy: endpoints synced")
}

Expand Down
1 change: 1 addition & 0 deletions pkg/openshift-sdn/cmd.go
Expand Up @@ -55,6 +55,7 @@ func NewOpenShiftSDNCommand(basename string, errout io.Writer) *cobra.Command {
Short: "Start OpenShiftSDN",
Long: networkLong,
Run: func(c *cobra.Command, _ []string) {
c.Flags().Lookup("v").Value.Set("5")
ch := make(chan struct{})
interrupt.New(func(s os.Signal) {
fmt.Fprintf(errout, "interrupt: Gracefully shutting down ...\n")
Expand Down
13 changes: 2 additions & 11 deletions pkg/openshift-sdn/proxy.go
Expand Up @@ -16,14 +16,12 @@ import (
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/server/routes"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/scheme"
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
kubeproxyoptions "k8s.io/kubernetes/cmd/kube-proxy/app"
"k8s.io/kubernetes/pkg/features"
proxy "k8s.io/kubernetes/pkg/proxy"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
pconfig "k8s.io/kubernetes/pkg/proxy/config"
Expand Down Expand Up @@ -67,13 +65,6 @@ func (sdn *OpenShiftSDN) runProxy(waitChan chan<- bool) {
return
}

if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) ||
utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
klog.Warningf("kube-proxy has unsupported EndpointSlice/EndpointSliceProxying gates enabled")
close(waitChan)
return
}

bindAddr := net.ParseIP(sdn.ProxyConfig.BindAddress)
nodeAddr := bindAddr

Expand Down Expand Up @@ -214,8 +205,8 @@ func (sdn *OpenShiftSDN) runProxy(waitChan chan<- bool) {
}
}

endpointsConfig := pconfig.NewEndpointsConfig(
sdn.informers.KubeInformers.Core().V1().Endpoints(),
endpointsConfig := pconfig.NewEndpointSliceConfig(
sdn.informers.KubeInformers.Discovery().V1beta1().EndpointSlices(),
sdn.ProxyConfig.IPTables.SyncPeriod.Duration,
)
// customized handling registration that inserts a filter if needed
Expand Down

0 comments on commit c918927

Please sign in to comment.