Skip to content

Commit

Permalink
Kubeproxy uses V1 EndpointSlice
Browse files Browse the repository at this point in the history
  • Loading branch information
swetharepakula committed Jul 1, 2021
1 parent 92726bf commit 03b7a69
Show file tree
Hide file tree
Showing 16 changed files with 184 additions and 160 deletions.
2 changes: 1 addition & 1 deletion cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (s *ProxyServer) Run() error {
go serviceConfig.Run(wait.NeverStop)

if s.UseEndpointSlices {
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1beta1().EndpointSlices(), s.ConfigSyncPeriod)
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"time"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
discoveryinformers "k8s.io/client-go/informers/discovery/v1beta1"
discoveryinformers "k8s.io/client-go/informers/discovery/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
Expand Down
40 changes: 28 additions & 12 deletions pkg/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"k8s.io/klog/v2"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
Expand All @@ -47,8 +47,7 @@ var supportedEndpointSliceAddressTypes = sets.NewString(
type BaseEndpointInfo struct {
Endpoint string // TODO: should be an endpointString type
// IsLocal indicates whether the endpoint is running in same host as kube-proxy.
IsLocal bool
Topology map[string]string
IsLocal bool

// ZoneHints represent the zone hints for the endpoint. This is based on
// endpoint.hints.forZones[*].name in the EndpointSlice API.
Expand All @@ -69,6 +68,11 @@ type BaseEndpointInfo struct {
// This is only set when watching EndpointSlices. If using Endpoints, this is always
// false since terminating endpoints are always excluded from Endpoints.
Terminating bool

// NodeName is the name of the node this endpoint belongs to
NodeName string
// Zone is the name of the zone this endpoint belongs to
Zone string
}

var _ Endpoint = &BaseEndpointInfo{}
Expand Down Expand Up @@ -100,11 +104,6 @@ func (info *BaseEndpointInfo) IsTerminating() bool {
return info.Terminating
}

// GetTopology returns the topology information of the endpoint.
func (info *BaseEndpointInfo) GetTopology() map[string]string {
return info.Topology
}

// GetZoneHints returns the zone hint for the endpoint.
func (info *BaseEndpointInfo) GetZoneHints() sets.String {
return info.ZoneHints
Expand All @@ -125,16 +124,27 @@ func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
}

func newBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]string,
// GetNodeName returns the NodeName for this endpoint.
func (info *BaseEndpointInfo) GetNodeName() string {
return info.NodeName
}

// GetZone returns the Zone for this endpoint.
func (info *BaseEndpointInfo) GetZone() string {
return info.Zone
}

func newBaseEndpointInfo(IP, nodeName, zone string, port int, isLocal bool,
ready, serving, terminating bool, zoneHints sets.String) *BaseEndpointInfo {
return &BaseEndpointInfo{
Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)),
IsLocal: isLocal,
Topology: topology,
Ready: ready,
Serving: serving,
Terminating: terminating,
ZoneHints: zoneHints,
NodeName: nodeName,
Zone: zone,
}
}

Expand Down Expand Up @@ -441,11 +451,17 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
isReady := true
isServing := true
isTerminating := false
isLocal := addr.NodeName != nil && *addr.NodeName == ect.hostname
isLocal := false
nodeName := ""
if addr.NodeName != nil {
isLocal = *addr.NodeName == ect.hostname
nodeName = *addr.NodeName
}
// Only supported with EndpointSlice API
zoneHints := sets.String{}

baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal, nil, isReady, isServing, isTerminating, zoneHints)
// Zone information is only supported with EndpointSlice API
baseEndpointInfo := newBaseEndpointInfo(addr.IP, nodeName, "", int(port.Port), isLocal, isReady, isServing, isTerminating, zoneHints)
if ect.makeEndpointInfo != nil {
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo))
} else {
Expand Down
6 changes: 2 additions & 4 deletions pkg/proxy/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/davecgh/go-spew/spew"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -1952,9 +1952,7 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser
func newTestEp(ep, host string, ready, serving, terminating bool) *BaseEndpointInfo {
endpointInfo := &BaseEndpointInfo{Endpoint: ep, Ready: ready, Serving: serving, Terminating: terminating}
if host != "" {
endpointInfo.Topology = map[string]string{
"kubernetes.io/hostname": host,
}
endpointInfo.NodeName = host
}
return endpointInfo
}
Expand Down
22 changes: 13 additions & 9 deletions pkg/proxy/endpointslicecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"sync"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -76,11 +76,11 @@ type endpointSliceInfo struct {

// endpointInfo contains just the attributes kube-proxy cares about.
// Used for caching. Intentionally small to limit memory util.
// Addresses and Topology are copied from EndpointSlice Endpoints.
// Addresses, NodeName, and Zone are copied from EndpointSlice Endpoints.
type endpointInfo struct {
Addresses []string
NodeName *string
Topology map[string]string
Zone *string
ZoneHints sets.String

Ready bool
Expand Down Expand Up @@ -130,16 +130,15 @@ func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *
for _, endpoint := range endpointSlice.Endpoints {
epInfo := &endpointInfo{
Addresses: endpoint.Addresses,
Topology: endpoint.Topology,
Zone: endpoint.Zone,
NodeName: endpoint.NodeName,

// conditions
Ready: endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready,
Serving: endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving,
Terminating: endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating,
}

epInfo.NodeName = endpoint.NodeName

if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
epInfo.ZoneHints = sets.String{}
Expand Down Expand Up @@ -281,13 +280,18 @@ func (cache *EndpointSliceCache) addEndpoints(serviceNN types.NamespacedName, po
}

isLocal := false
nodeName := ""
if endpoint.NodeName != nil {
isLocal = cache.isLocal(*endpoint.NodeName)
} else {
isLocal = cache.isLocal(endpoint.Topology[v1.LabelHostname])
nodeName = *endpoint.NodeName
}

zone := ""
if endpoint.Zone != nil {
zone = *endpoint.Zone
}

endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology,
endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], nodeName, zone, portNum, isLocal,
endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints)

// This logic ensures we're deduplicating potential overlapping endpoints
Expand Down
23 changes: 11 additions & 12 deletions pkg/proxy/endpointslicecache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"reflect"
"testing"

"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -235,23 +235,23 @@ func TestEndpointInfoByServicePort(t *testing.T) {
"10.0.1.1:80": &BaseEndpointInfo{
Endpoint: "10.0.1.1:80",
IsLocal: false,
Topology: map[string]string{"kubernetes.io/hostname": "host2"},
NodeName: "host2",
Ready: true,
Serving: true,
Terminating: false,
},
"10.0.1.2:80": &BaseEndpointInfo{
Endpoint: "10.0.1.2:80",
IsLocal: true,
Topology: map[string]string{"kubernetes.io/hostname": "host1"},
NodeName: "host1",
Ready: true,
Serving: true,
Terminating: false,
},
"10.0.1.3:80": &BaseEndpointInfo{
Endpoint: "10.0.1.3:80",
IsLocal: false,
Topology: map[string]string{"kubernetes.io/hostname": "host2"},
NodeName: "host2",
Ready: true,
Serving: true,
Terminating: false,
Expand All @@ -271,31 +271,31 @@ func TestEndpointInfoByServicePort(t *testing.T) {
"10.0.1.1:80": &BaseEndpointInfo{
Endpoint: "10.0.1.1:80",
IsLocal: false,
Topology: map[string]string{"kubernetes.io/hostname": "host2"},
NodeName: "host2",
Ready: true,
Serving: true,
Terminating: false,
},
"10.0.1.2:80": &BaseEndpointInfo{
Endpoint: "10.0.1.2:80",
IsLocal: true,
Topology: map[string]string{"kubernetes.io/hostname": "host1"},
NodeName: "host1",
Ready: true,
Serving: true,
Terminating: false,
},
"10.0.1.1:8080": &BaseEndpointInfo{
Endpoint: "10.0.1.1:8080",
IsLocal: false,
Topology: map[string]string{"kubernetes.io/hostname": "host2"},
NodeName: "host2",
Ready: true,
Serving: true,
Terminating: false,
},
"10.0.1.2:8080": &BaseEndpointInfo{
Endpoint: "10.0.1.2:8080",
IsLocal: true,
Topology: map[string]string{"kubernetes.io/hostname": "host1"},
NodeName: "host1",
Ready: true,
Serving: true,
Terminating: false,
Expand Down Expand Up @@ -500,9 +500,8 @@ func generateEndpointSliceWithOffset(serviceName, namespace string, sliceNum, of
}

if len(hosts) > 0 {
endpoint.Topology = map[string]string{
"kubernetes.io/hostname": hosts[i%len(hosts)],
}
hostname := hosts[i%len(hosts)]
endpoint.NodeName = &hostname
}

endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpoint)
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down
Loading

0 comments on commit 03b7a69

Please sign in to comment.