Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor cleanups in pkg/proxy/endpoints.go #76165

Merged
merged 3 commits into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 9 additions & 9 deletions pkg/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,18 @@ type UpdateEndpointMapResult struct {
}

// UpdateEndpointsMap updates endpointsMap base on the given changes.
func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
result.StaleEndpoints = make([]ServiceEndpoint, 0)
result.StaleServiceNames = make([]ServicePortName, 0)
result.LastChangeTriggerTimes = make([]time.Time, 0)

endpointsMap.apply(
em.apply(
changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)

// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to endpointsMap.
result.HCEndpointsLocalIPSize = make(map[types.NamespacedName]int)
localIPs := GetLocalEndpointIPs(endpointsMap)
localIPs := em.getLocalEndpointIPs()
for nsn, ips := range localIPs {
result.HCEndpointsLocalIPSize[nsn] = len(ips)
}
Expand Down Expand Up @@ -290,8 +290,8 @@ func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]S
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
em.Unmerge(change.previous)
em.Merge(change.current)
em.unmerge(change.previous)
em.merge(change.current)
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
}
changes.items = make(map[types.NamespacedName]*endpointsChange)
Expand All @@ -302,23 +302,23 @@ func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]S
}

// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
func (em EndpointsMap) Merge(other EndpointsMap) {
func (em EndpointsMap) merge(other EndpointsMap) {
for svcPortName := range other {
em[svcPortName] = other[svcPortName]
}
}

// Unmerge removes the <service, endpoints> pairs from the current EndpointsMap which are contained in the EndpointsMap passed in.
func (em EndpointsMap) Unmerge(other EndpointsMap) {
func (em EndpointsMap) unmerge(other EndpointsMap) {
for svcPortName := range other {
delete(em, svcPortName)
}
}

// GetLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy.
func GetLocalEndpointIPs(endpointsMap EndpointsMap) map[types.NamespacedName]sets.String {
func (em EndpointsMap) getLocalEndpointIPs() map[types.NamespacedName]sets.String {
localIPs := make(map[types.NamespacedName]sets.String)
for svcPortName, epList := range endpointsMap {
for svcPortName, epList := range em {
for _, ep := range epList {
if ep.GetIsLocal() {
nsn := svcPortName.NamespacedName
Expand Down
8 changes: 4 additions & 4 deletions pkg/proxy/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestGetLocalEndpointIPs(t *testing.T) {

for tci, tc := range testCases {
// outputs
localIPs := GetLocalEndpointIPs(tc.endpointsMap)
localIPs := tc.endpointsMap.getLocalEndpointIPs()

if !reflect.DeepEqual(localIPs, tc.expected) {
t.Errorf("[%d] expected %#v, got %#v", tci, tc.expected, localIPs)
Expand Down Expand Up @@ -1213,7 +1213,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
fp.addEndpoints(tc.previousEndpoints[i])
}
}
UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
fp.endpointsMap.Update(fp.endpointsChanges)
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)

// Now let's call appropriate handlers to get to state we want to be.
Expand All @@ -1233,7 +1233,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
fp.updateEndpoints(prev, curr)
}
}
result := UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
Expand Down Expand Up @@ -1373,7 +1373,7 @@ func TestLastChangeTriggerTime(t *testing.T) {

tc.scenario(fp)

result := UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
result := fp.endpointsMap.Update(fp.endpointsChanges)
got := result.LastChangeTriggerTimes
sortTimeSlice(got)
sortTimeSlice(tc.expected)
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (proxier *Proxier) syncProxyRules() {
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)

staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap
Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/iptables/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2185,7 +2185,7 @@ func Test_updateEndpointsMap(t *testing.T) {
fp.OnEndpointsAdd(tc.previousEndpoints[i])
}
}
proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
fp.endpointsMap.Update(fp.endpointsChanges)
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)

// Now let's call appropriate handlers to get to state we want to be.
Expand All @@ -2205,7 +2205,7 @@ func Test_updateEndpointsMap(t *testing.T) {
fp.OnEndpointsUpdate(prev, curr)
}
}
result := proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/ipvs/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func (proxier *Proxier) syncProxyRules() {
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)

staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap
Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/ipvs/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2486,7 +2486,7 @@ func Test_updateEndpointsMap(t *testing.T) {
fp.OnEndpointsAdd(tc.previousEndpoints[i])
}
}
proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
fp.endpointsMap.Update(fp.endpointsChanges)
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)

// Now let's call appropriate handlers to get to state we want to be.
Expand All @@ -2506,7 +2506,7 @@ func Test_updateEndpointsMap(t *testing.T) {
fp.OnEndpointsUpdate(prev, curr)
}
}
result := proxy.UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
Expand Down