Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix IPVS proxier to update stale real server after restart #111635

Merged
merged 2 commits into from
Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 28 additions & 0 deletions pkg/proxy/ipvs/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ type Proxier struct {
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
nodeLabels map[string]string
// initialSync is a bool indicating if the proxier is syncing for the first time.
// It is set to true when a new proxier is initialized and then set to false on all
// future syncs.
// This lets us run specific logic that's required only during proxy startup.
// For eg: it enables us to update weights of existing destinations only on startup
// saving us the cost of querying and updating real servers during every sync.
initialSync bool
// endpointSlicesSynced, and servicesSynced are set to true when
// corresponding objects are synced after startup. This is used to avoid updating
// ipvs rules with some partial data after kube-proxy restart.
Expand Down Expand Up @@ -468,6 +475,7 @@ func NewProxier(ipt utiliptables.Interface,
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil),
initialSync: true,
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
excludeCIDRs: parsedExcludeCIDRs,
Expand Down Expand Up @@ -1010,6 +1018,12 @@ func (proxier *Proxier) syncProxyRules() {
return
}

// its safe to set initialSync to false as it acts as a flag for startup actions
// and the mutex is held.
defer func() {
proxier.initialSync = false
}()

// Keep track of how long syncs take.
start := time.Now()
defer func() {
Expand Down Expand Up @@ -2007,6 +2021,19 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
}

if curEndpoints.Has(ep) {
// if we are syncing for the first time, loop through all current destinations and
// reset their weight.
if proxier.initialSync {
for _, dest := range curDests {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add proxier.updateWeights = false here. The mutex is held so it's safe

Copy link
Contributor

@uablrek uablrek Aug 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(btw, you may make a comment that the mutex is held and that it's a one-time event)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't add this here, as this is inside a loop. adding this outside the loop instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside the "for _, ep := range newEndpoints.List() {" loop you mean? Seens reasonable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on second thought, is it completely safe to have proxier.updateWeights = false inside syncProxyRules() instead of OnServicesSynced()/OnEndpointSynced()?
i'm asking because in OnServicesSynced() we do something like:

proxier.mu.Lock()
...
proxier.updateWeights = true
proxier.mu.Unlock()

proxier.syncProxyRules()

so even though proxier.syncProxyRules() would capture the mutex first thing, theoretically another goroutine which called syncProxyRules() could execute proxier.updateWeights = false at the exact moment between OnServiceSynced let go of the lock and proxier.syncProxyRules() acquired the lock.

Copy link
Contributor

@uablrek uablrek Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silly mistake, now it works https://go.dev/play/p/CuVZg3B2itE

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i noticed first needs to be initialized to true instead of false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems go has support for "Once"; https://golangcode.com/run-code-once-with-sync/

You may see if you can use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure if it's better since we already have the proxier.mu mutex.

Copy link
Member Author

@aryan9600 aryan9600 Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to avoid sync.Once because if in the future we want to do more things only on startup, the underlying function will get large. Having a flag gives us flexibility about where we want to run the one-time logic during sync.

if dest.Weight != newDest.Weight {
err = proxier.ipvs.UpdateRealServer(appliedVirtualServer, newDest)
if err != nil {
klog.ErrorS(err, "Failed to update destination", "newDest", newDest)
continue
}
}
}
}
// check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately
uniqueRS := GetUniqueRSName(vs, newDest)
if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
Expand All @@ -2025,6 +2052,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
continue
}
}

// Delete old endpoints
for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
// if curEndpoint is in gracefulDelete, skip
Expand Down
91 changes: 91 additions & 0 deletions pkg/proxy/ipvs/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
proxier.servicesSynced = true
}

func makeEndpointSliceMap(proxier *Proxier, allEpSlices ...*discovery.EndpointSlice) {
for i := range allEpSlices {
proxier.OnEndpointSliceAdd(allEpSlices[i])
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.endpointSlicesSynced = true
}

func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1379,6 +1388,88 @@ func TestNodePortIPv6(t *testing.T) {
}
}

func Test_syncEndpoint_updateWeightsOnRestart(t *testing.T) {
tcpProtocol := v1.ProtocolTCP

ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)

svc1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.ClusterIP = "10.20.30.41"
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: int32(80),
Protocol: v1.ProtocolTCP,
}}
})
epSlice1 := makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"10.180.0.1"},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.StringPtr("p80"),
Port: pointer.Int32(80),
Protocol: &tcpProtocol,
}}
})

// sync proxy rules to get to the desired initial state
makeServiceMap(fp, svc1)
makeEndpointSliceMap(fp, epSlice1)
fp.syncProxyRules()

serv := &utilipvs.VirtualServer{
Address: netutils.ParseIPSloppy("10.20.30.41"),
Port: uint16(80),
Protocol: string(tcpProtocol),
Scheduler: fp.ipvsScheduler,
}

vs, err := fp.ipvs.GetVirtualServer(serv)
if err != nil {
t.Errorf("failed to get virtual server, err: %v", err)
}

rss, err := fp.ipvs.GetRealServers(vs)
if err != nil {
t.Errorf("failed to get real servers, err: %v", err)
}
for _, rs := range rss {
rs.Weight = 0
if err = fp.ipvs.UpdateRealServer(vs, rs); err != nil {
t.Errorf("failed to update real server: %v, err: %v", rs, err)
}
}

// simulate a restart by enabling initial sync logic.
fp.initialSync = true
err = fp.syncEndpoint(proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Name: "svc1",
Namespace: "ns1",
},
Port: "80",
Protocol: tcpProtocol,
}, true, vs)
if err != nil {
t.Errorf("failed to sync endpoint, err: %v", err)
}

rss, err = fp.ipvs.GetRealServers(vs)
if err != nil {
t.Errorf("failed to get real server, err: %v", err)
}
for _, rs := range rss {
if rs.Weight != 1 {
t.Logf("unexpected realserver weight: %d, expected weight: 1", rs.Weight)
t.Errorf("unexpected realserver state")
}
}
}

func TestIPv4Proxier(t *testing.T) {
tcpProtocol := v1.ProtocolTCP
tests := []struct {
Expand Down