Skip to content

Commit

Permalink
Merge pull request #122050 from danwinship/proxy-change-tracker-clean…
Browse files Browse the repository at this point in the history
…up-1

proxy service/endpoints change tracker cleanups
  • Loading branch information
k8s-ci-robot committed Dec 22, 2023
2 parents 5104e65 + 1c089af commit 784c7ce
Show file tree
Hide file tree
Showing 10 changed files with 736 additions and 727 deletions.
138 changes: 138 additions & 0 deletions pkg/proxy/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package proxy

import (
"net"
"strconv"

"k8s.io/apimachinery/pkg/util/sets"
)

// Endpoint in an interface which abstracts information about an endpoint.
type Endpoint interface {
// String returns endpoint string. An example format can be: `IP:Port`.
// We take the returned value as ServiceEndpoint.Endpoint.
String() string
// IP returns IP part of the endpoint.
IP() string
// Port returns the Port part of the endpoint.
Port() int

// IsLocal returns true if the endpoint is running on the same host as kube-proxy.
IsLocal() bool
// IsReady returns true if an endpoint is ready and not terminating, or
// if PublishNotReadyAddresses is set on the service.
IsReady() bool
// IsServing returns true if an endpoint is ready. It does not account
// for terminating state.
IsServing() bool
// IsTerminating returns true if an endpoint is terminating. For pods,
// that is any pod with a deletion timestamp.
IsTerminating() bool

// ZoneHints returns the zone hint for the endpoint. This is based on
// endpoint.hints.forZones[0].name in the EndpointSlice API.
ZoneHints() sets.Set[string]
}

// BaseEndpointInfo contains base information that defines an endpoint.
// This could be used directly by proxier while processing endpoints,
// or can be used for constructing a more specific EndpointInfo struct
// defined by the proxier if needed.
type BaseEndpointInfo struct {
// Cache this values to improve performance
ip string
port int
// endpoint is the same as net.JoinHostPort(ip,port)
endpoint string

// isLocal indicates whether the endpoint is running on same host as kube-proxy.
isLocal bool

// ready indicates whether this endpoint is ready and NOT terminating, unless
// PublishNotReadyAddresses is set on the service, in which case it will just
// always be true.
ready bool
// serving indicates whether this endpoint is ready regardless of its terminating state.
// For pods this is true if it has a ready status regardless of its deletion timestamp.
serving bool
// terminating indicates whether this endpoint is terminating.
// For pods this is true if it has a non-nil deletion timestamp.
terminating bool

// zoneHints represent the zone hints for the endpoint. This is based on
// endpoint.hints.forZones[*].name in the EndpointSlice API.
zoneHints sets.Set[string]
}

var _ Endpoint = &BaseEndpointInfo{}

// String is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) String() string {
return info.endpoint
}

// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) IP() string {
return info.ip
}

// Port returns just the Port part of the endpoint.
func (info *BaseEndpointInfo) Port() int {
return info.port
}

// IsLocal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) IsLocal() bool {
return info.isLocal
}

// IsReady returns true if an endpoint is ready and not terminating.
func (info *BaseEndpointInfo) IsReady() bool {
return info.ready
}

// IsServing returns true if an endpoint is ready, regardless of if the
// endpoint is terminating.
func (info *BaseEndpointInfo) IsServing() bool {
return info.serving
}

// IsTerminating retruns true if an endpoint is terminating. For pods,
// that is any pod with a deletion timestamp.
func (info *BaseEndpointInfo) IsTerminating() bool {
return info.terminating
}

// ZoneHints returns the zone hint for the endpoint.
func (info *BaseEndpointInfo) ZoneHints() sets.Set[string] {
return info.zoneHints
}

func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo {
return &BaseEndpointInfo{
ip: ip,
port: port,
endpoint: net.JoinHostPort(ip, strconv.Itoa(port)),
isLocal: isLocal,
ready: ready,
serving: serving,
terminating: terminating,
zoneHints: zoneHints,
}
}
135 changes: 24 additions & 111 deletions pkg/proxy/endpoints.go → pkg/proxy/endpointschangetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,138 +17,50 @@ limitations under the License.
package proxy

import (
"net"
"strconv"
"sync"
"time"

"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy/metrics"
)

var supportedEndpointSliceAddressTypes = sets.New[string](
string(discovery.AddressTypeIPv4),
string(discovery.AddressTypeIPv6),
var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType](
discovery.AddressTypeIPv4,
discovery.AddressTypeIPv6,
)

// BaseEndpointInfo contains base information that defines an endpoint.
// This could be used directly by proxier while processing endpoints,
// or can be used for constructing a more specific EndpointInfo struct
// defined by the proxier if needed.
type BaseEndpointInfo struct {
// Cache this values to improve performance
ip string
port int
// endpoint is the same as net.JoinHostPort(ip,port)
endpoint string

// isLocal indicates whether the endpoint is running on same host as kube-proxy.
isLocal bool

// ready indicates whether this endpoint is ready and NOT terminating, unless
// PublishNotReadyAddresses is set on the service, in which case it will just
// always be true.
ready bool
// serving indicates whether this endpoint is ready regardless of its terminating state.
// For pods this is true if it has a ready status regardless of its deletion timestamp.
serving bool
// terminating indicates whether this endpoint is terminating.
// For pods this is true if it has a non-nil deletion timestamp.
terminating bool

// zoneHints represent the zone hints for the endpoint. This is based on
// endpoint.hints.forZones[*].name in the EndpointSlice API.
zoneHints sets.Set[string]
}

var _ Endpoint = &BaseEndpointInfo{}

// String is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) String() string {
return info.endpoint
}

// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) IP() string {
return info.ip
}

// Port returns just the Port part of the endpoint.
func (info *BaseEndpointInfo) Port() int {
return info.port
}

// IsLocal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) IsLocal() bool {
return info.isLocal
}

// IsReady returns true if an endpoint is ready and not terminating.
func (info *BaseEndpointInfo) IsReady() bool {
return info.ready
}

// IsServing returns true if an endpoint is ready, regardless of if the
// endpoint is terminating.
func (info *BaseEndpointInfo) IsServing() bool {
return info.serving
}

// IsTerminating retruns true if an endpoint is terminating. For pods,
// that is any pod with a deletion timestamp.
func (info *BaseEndpointInfo) IsTerminating() bool {
return info.terminating
}

// ZoneHints returns the zone hint for the endpoint.
func (info *BaseEndpointInfo) ZoneHints() sets.Set[string] {
return info.zoneHints
}

func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo {
return &BaseEndpointInfo{
ip: ip,
port: port,
endpoint: net.JoinHostPort(ip, strconv.Itoa(port)),
isLocal: isLocal,
ready: ready,
serving: serving,
terminating: terminating,
zoneHints: zoneHints,
}
}

type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint

// This handler is invoked by the apply function on every change. This function should not modify the
// EndpointsMap's but just use the changes for any Proxier specific cleanup.
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)

// EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of
// Endpoints, keyed by their namespace and name.
type EndpointsChangeTracker struct {
// lock protects lastChangeTriggerTimes
lock sync.Mutex

// processEndpointsMapChange is invoked by the apply function on every change.
// This function should not modify the EndpointsMaps, but just use the changes for
// any Proxier-specific cleanup.
processEndpointsMapChange processEndpointsMapChangeFunc

// endpointSliceCache holds a simplified version of endpoint slices.
endpointSliceCache *EndpointSliceCache
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
// object to change. Used to calculate the network-programming-latency.

// lastChangeTriggerTimes maps from the Service's NamespacedName to the times of
// the triggers that caused its EndpointSlice objects to change. Used to calculate
// the network-programming-latency metric.
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
// record the time when the endpointsChangeTracker was created so we can ignore the endpoints
// that were generated before, because we can't estimate the network-programming-latency on those.
// This is specially problematic on restarts, because we process all the endpoints that may have been
// created hours or days before.
// trackerStartTime is the time when the EndpointsChangeTracker was created, so
// we can avoid generating network-programming-latency metrics for changes that
// occurred before that.
trackerStartTime time.Time
}

type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)

// NewEndpointsChangeTracker initializes an EndpointsChangeTracker
func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
return &EndpointsChangeTracker{
Expand All @@ -159,11 +71,12 @@ func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFun
}
}

// EndpointSliceUpdate updates given service's endpoints change map based on the <previous, current> endpoints pair.
// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeTracker.
// If removeSlice is true, slice will be removed, otherwise it will be added or updated.
// EndpointSliceUpdate updates the EndpointsChangeTracker by adding/updating or removing
// endpointSlice (depending on removeSlice). It returns true if this update contained a
// change that needs to be synced; note that this is different from the return value of
// ServiceChangeTracker.Update().
func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) {
if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) {
klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1745,6 +1745,6 @@ func initializeCache(endpointSliceCache *EndpointSliceCache, endpointSlices []*d

for _, tracker := range endpointSliceCache.trackerByServiceMap {
tracker.applied = tracker.pending
tracker.pending = endpointSliceInfoByName{}
tracker.pending = endpointSliceDataByName{}
}
}
Loading

0 comments on commit 784c7ce

Please sign in to comment.