Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minimize iptables-restore input #110268

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/features/kube_features.go
Expand Up @@ -567,6 +567,13 @@ const (
// Enable MinDomains in Pod Topology Spread.
MinDomainsInPodTopologySpread featuregate.Feature = "MinDomainsInPodTopologySpread"

// owner: @danwinship
// kep: http://kep.k8s.io/3453
// alpha: v1.26
//
// Enables new performance-improving code in kube-proxy iptables mode
MinimizeIPTablesRestore featuregate.Feature = "MinimizeIPTablesRestore"

// owner: @janosi @bridgetkromhout
// kep: http://kep.k8s.io/1435
// alpha: v1.20
Expand Down Expand Up @@ -1030,6 +1037,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

MinDomainsInPodTopologySpread: {Default: false, PreRelease: featuregate.Beta},

MinimizeIPTablesRestore: {Default: false, PreRelease: featuregate.Alpha},

MixedProtocolLBService: {Default: true, PreRelease: featuregate.Beta},

MultiCIDRRangeAllocator: {Default: false, PreRelease: featuregate.Alpha},
Expand Down
18 changes: 18 additions & 0 deletions pkg/proxy/endpoints.go
Expand Up @@ -302,6 +302,24 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
return changeNeeded
}

// PendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time ect was used to update an EndpointsMap. (You must call
// this _before_ calling em.Update(ect).)
func (ect *EndpointChangeTracker) PendingChanges() sets.String {
danwinship marked this conversation as resolved.
Show resolved Hide resolved
if ect.endpointSliceCache != nil {
danwinship marked this conversation as resolved.
Show resolved Hide resolved
return ect.endpointSliceCache.pendingChanges()
}

ect.lock.Lock()
defer ect.lock.Unlock()

changes := sets.NewString()
for name := range ect.items {
changes.Insert(name.String())
}
Comment on lines +316 to +319
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side comment, I don't know if is for testing but I think this tracker doesn't work if ect.endpointSliceCache == nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not?

Copy link
Member

@aojea aojea Aug 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EndpointSliceUpdate() doesn't check is nil and uses it
The contructor initializes it, maybe is for testing 🙃

return changes
}

// checkoutChanges returns a list of pending endpointsChanges and marks them as
// applied.
func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {
Expand Down
95 changes: 68 additions & 27 deletions pkg/proxy/endpoints_test.go
Expand Up @@ -825,13 +825,15 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints []ServiceEndpoint
expectedStaleServiceNames map[ServicePortName]bool
expectedHealthchecks map[types.NamespacedName]int
expectedChangedEndpoints sets.String
}{{
name: "empty",
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, unnamed port",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -853,6 +855,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, named port, local",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -876,6 +879,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple subsets",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -903,6 +907,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple subsets, multiple ports, local",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -938,6 +943,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "no change, multiple endpoints, subsets, IPs, and ports",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1006,6 +1012,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1,
},
expectedChangedEndpoints: sets.NewString(),
}, {
name: "add an Endpoints",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -1027,6 +1034,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove an Endpoints",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -1047,6 +1055,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "add an IP and port",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1077,6 +1086,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove an IP and port",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1112,6 +1122,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "add a subset",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1140,6 +1151,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "remove a subset",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1167,6 +1179,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "rename a port",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -1192,7 +1205,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleServiceNames: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "renumber a port",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -1217,6 +1231,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedStaleServiceNames: map[ServicePortName]bool{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, {
name: "complex add and remove",
previousEndpoints: []*v1.Endpoints{
Expand Down Expand Up @@ -1292,6 +1307,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedHealthchecks: map[types.NamespacedName]int{
makeNSN("ns4", "ep4"): 1,
},
expectedChangedEndpoints: sets.NewString("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"),
}, {
name: "change from 0 endpoint address to 1 unnamed port",
previousEndpoints: []*v1.Endpoints{
Expand All @@ -1310,7 +1326,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedStaleServiceNames: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedHealthchecks: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
},
}

Expand Down Expand Up @@ -1346,6 +1363,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
fp.updateEndpoints(prev, curr)
}
}

pendingChanges := fp.endpointsChanges.PendingChanges()
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.List(), pendingChanges.List())
}

result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap
compareEndpointsMapsStr(t, newMap, tc.expectedResult)
Expand Down Expand Up @@ -1520,13 +1543,14 @@ func TestEndpointSliceUpdate(t *testing.T) {
fqdnSlice.AddressType = discovery.AddressTypeFQDN

testCases := map[string]struct {
startingSlices []*discovery.EndpointSlice
endpointChangeTracker *EndpointChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
expectedReturnVal bool
expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo
startingSlices []*discovery.EndpointSlice
endpointChangeTracker *EndpointChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
expectedReturnVal bool
expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo
expectedChangedEndpoints sets.String
}{
// test starting from an empty state
"add a simple slice that doesn't already exist": {
Expand All @@ -1548,30 +1572,33 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: false, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test no modification to state - current change should be nil as nothing changes
"add the same slice that already exists": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// ensure that only valide address types are processed
"add an FQDN slice (invalid address type)": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// test additions to existing state
"add a slice that overlaps with existing state": {
Expand Down Expand Up @@ -1604,6 +1631,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test additions to existing state with partially overlapping slices and ports
"add a slice that overlaps with existing state and partial ports": {
Expand Down Expand Up @@ -1634,6 +1662,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// test deletions from existing state with partially overlapping slices and ports
"remove a slice that overlaps with existing state": {
Expand All @@ -1656,19 +1685,21 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// ensure a removal that has no effect turns into a no-op
"remove a slice that doesn't even exist in current state": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: true,
expectedReturnVal: false,
expectedCurrentChange: nil,
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{pointer.Int32(80), pointer.Int32(443)}),
paramRemoveSlice: true,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.NewString(),
},
// start with all endpoints ready, transition to no endpoints ready
"transition all endpoints to unready state": {
Expand All @@ -1692,6 +1723,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: true, Ready: false, Serving: false, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with no endpoints ready, transition to all endpoints ready
"transition all endpoints to ready state": {
Expand All @@ -1713,6 +1745,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with some endpoints ready, transition to more endpoints ready
"transition some endpoints to ready state": {
Expand Down Expand Up @@ -1741,6 +1774,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: false},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
// start with some endpoints ready, transition to some terminating
"transition some endpoints to terminating state": {
Expand Down Expand Up @@ -1769,6 +1803,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true, Ready: false, Serving: false, Terminating: true},
},
},
expectedChangedEndpoints: sets.NewString("ns1/svc1"),
},
}

Expand All @@ -1783,6 +1818,12 @@ func TestEndpointSliceUpdate(t *testing.T) {
if tc.endpointChangeTracker.items == nil {
t.Errorf("Expected ect.items to not be nil")
}

pendingChanges := tc.endpointChangeTracker.PendingChanges()
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.List(), pendingChanges.List())
}

changes := tc.endpointChangeTracker.checkoutChanges()
if tc.expectedCurrentChange == nil {
if len(changes) != 0 {
Expand Down
15 changes: 15 additions & 0 deletions pkg/proxy/endpointslicecache.go
Expand Up @@ -188,6 +188,21 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint
return changed
}

// pendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time checkoutChanges was called
func (cache *EndpointSliceCache) pendingChanges() sets.String {
danwinship marked this conversation as resolved.
Show resolved Hide resolved
cache.lock.Lock()
defer cache.lock.Unlock()

changes := sets.NewString()
for serviceNN, esTracker := range cache.trackerByServiceMap {
if len(esTracker.pending) > 0 {
changes.Insert(serviceNN.String())
}
}
return changes
}

// checkoutChanges returns a list of all endpointsChanges that are
// pending and then marks them as applied.
func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange {
Expand Down