Skip to content
Permalink
Browse files

kube-proxy: do not export network programming latency for deleted enp…

…oints.

At the moment the annotation used for calculations is not set when Endpoint is
deleted, causing skewed results.

Also improve logging: log name of the endpoint with its latency.
  • Loading branch information...
oxddr committed Aug 13, 2019
1 parent 4ce69dd commit b23f12cf5e0cd6d6dfd6e06182ff26513b18e681
@@ -143,7 +143,7 @@ func getPodTriggerTime(pod *v1.Pod) (triggerTime time.Time) {
if readyCondition := podutil.GetPodReadyCondition(pod.Status); readyCondition != nil {
triggerTime = readyCondition.LastTransitionTime.Time
}
// TODO(mm4tt): Implement missing cases: deletionTime set, pod label change
// TODO(#81360): Implement missing cases: deletionTime set, pod label change
return triggerTime
}

@@ -25,7 +25,7 @@ import (

"k8s.io/klog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
@@ -140,10 +140,11 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
change.previous = ect.endpointsToEndpointsMap(previous)
ect.items[namespacedName] = change
}
if t := getLastChangeTriggerTime(endpoints); !t.IsZero() {
if t := getLastChangeTriggerTime(current); !t.IsZero() {
ect.lastChangeTriggerTimes[namespacedName] =
append(ect.lastChangeTriggerTimes[namespacedName], t)
}

change.current = ect.endpointsToEndpointsMap(current)
// if change.previous equal to change.current, it means no change
if reflect.DeepEqual(change.previous, change.current) {
@@ -165,6 +166,10 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set
// or was set incorrectly.
func getLastChangeTriggerTime(endpoints *v1.Endpoints) time.Time {
// TODO(#81360): ignore case when Endpoint is deleted.
if endpoints == nil {
return time.Time{}
}
if _, ok := endpoints.Annotations[v1.EndpointsLastChangeTriggerTime]; !ok {
// It's possible that the Endpoints object won't have the EndpointsLastChangeTriggerTime
// annotation set. In that case return the 'zero value', which is ignored in the upstream code.
@@ -197,14 +202,14 @@ type UpdateEndpointMapResult struct {
StaleServiceNames []ServicePortName
// List of the trigger times for all endpoints objects that changed. It's used to export the
// network programming latency.
LastChangeTriggerTimes []time.Time
LastChangeTriggerTimes map[types.NamespacedName][]time.Time
}

// UpdateEndpointsMap updates endpointsMap base on the given changes.
func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
result.StaleEndpoints = make([]ServiceEndpoint, 0)
result.StaleServiceNames = make([]ServicePortName, 0)
result.LastChangeTriggerTimes = make([]time.Time, 0)
result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)

em.apply(
changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
@@ -287,7 +292,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
// that were changed and will result in syncing the proxy rules.
func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *[]time.Time) {
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
if changes == nil {
return
}
@@ -300,8 +305,8 @@ func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]S
}
changes.items = make(map[types.NamespacedName]*endpointsChange)
metrics.EndpointChangesPending.Set(0)
for _, lastChangeTriggerTime := range changes.lastChangeTriggerTimes {
*lastChangeTriggerTimes = append(*lastChangeTriggerTimes, lastChangeTriggerTime...)
for k, v := range changes.lastChangeTriggerTimes {
(*lastChangeTriggerTimes)[k] = v
}
changes.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
}
@@ -18,13 +18,12 @@ package proxy

import (
"reflect"
"sort"
"testing"
"time"

"github.com/davecgh/go-spew/spew"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
@@ -1288,29 +1287,29 @@ func TestLastChangeTriggerTime(t *testing.T) {
return e
}

createName := func(namespace, name string) types.NamespacedName {
return types.NamespacedName{Namespace: namespace, Name: name}
}

modifyEndpoints := func(endpoints *v1.Endpoints, triggerTime time.Time) *v1.Endpoints {
e := endpoints.DeepCopy()
e.Subsets[0].Ports[0].Port++
e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano)
return e
}

sortTimeSlice := func(data []time.Time) {
sort.Slice(data, func(i, j int) bool { return data[i].Before(data[j]) })
}

testCases := []struct {
name string
scenario func(fp *FakeProxier)
expected []time.Time
expected map[types.NamespacedName][]time.Time
}{
{
name: "Single addEndpoints",
scenario: func(fp *FakeProxier) {
e := createEndpoints("ns", "ep1", t0)
fp.addEndpoints(e)
},
expected: []time.Time{t0},
expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0}},
},
{
name: "addEndpoints then updatedEndpoints",
@@ -1321,7 +1320,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
e1 := modifyEndpoints(e, t1)
fp.updateEndpoints(e, e1)
},
expected: []time.Time{t0, t1},
expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0, t1}},
},
{
name: "Add two endpoints then modify one",
@@ -1335,7 +1334,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
e11 := modifyEndpoints(e1, t3)
fp.updateEndpoints(e1, e11)
},
expected: []time.Time{t1, t2, t3},
expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t1, t3}, createName("ns", "ep2"): {t2}},
},
{
name: "Endpoints without annotation set",
@@ -1344,7 +1343,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
delete(e.Annotations, v1.EndpointsLastChangeTriggerTime)
fp.addEndpoints(e)
},
expected: []time.Time{},
expected: map[types.NamespacedName][]time.Time{},
},
{
name: "addEndpoints then deleteEndpoints",
@@ -1353,7 +1352,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
fp.addEndpoints(e)
fp.deleteEndpoints(e)
},
expected: []time.Time{},
expected: map[types.NamespacedName][]time.Time{},
},
{
name: "add then delete then add again",
@@ -1364,7 +1363,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
e = modifyEndpoints(e, t2)
fp.addEndpoints(e)
},
expected: []time.Time{t2},
expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t2}},
},
}

@@ -1375,8 +1374,6 @@ func TestLastChangeTriggerTime(t *testing.T) {

result := fp.endpointsMap.Update(fp.endpointsChanges)
got := result.LastChangeTriggerTimes
sortTimeSlice(got)
sortTimeSlice(tc.expected)

if !reflect.DeepEqual(got, tc.expected) {
t.Errorf("%s: Invalid LastChangeTriggerTimes, expected: %v, got: %v",
@@ -1406,10 +1406,12 @@ func (proxier *Proxier) syncProxyRules() {
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
return
}
for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).Infof("Network programming took %f seconds", latency)
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).Infof("Network programming of %s took %f seconds", name, latency)
}
}

// Close old local ports and save new ones.
@@ -1314,10 +1314,12 @@ func (proxier *Proxier) syncProxyRules() {
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
return
}
for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).Infof("Network programming took %f seconds", latency)
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).Infof("Network programming of %s took %f seconds", name, latency)
}
}

// Close old local ports and save new ones.

0 comments on commit b23f12c

Please sign in to comment.
You can’t perform that action at this time.