Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the TriggerTimeTracker util in the Endpoints Controller. #73314

Merged
merged 1 commit into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/controller/endpoint/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
srcs = [
"doc.go",
"endpoints_controller.go",
"trigger_time_tracker.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/endpoint",
deps = [
Expand Down Expand Up @@ -39,7 +40,10 @@ go_library(

go_test(
name = "go_default_test",
srcs = ["endpoints_controller_test.go"],
srcs = [
"endpoints_controller_test.go",
"trigger_time_tracker_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/api/testapi:go_default_library",
Expand Down
18 changes: 18 additions & 0 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
e.endpointsLister = endpointsInformer.Lister()
e.endpointsSynced = endpointsInformer.Informer().HasSynced

e.triggerTimeTracker = NewTriggerTimeTracker()

return e
}

Expand Down Expand Up @@ -138,6 +140,10 @@ type EndpointController struct {

// workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.
workerLoopPeriod time.Duration

// triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime
// annotation.
triggerTimeTracker *TriggerTimeTracker
}

// Run will not return until stopCh is closed. workers determines how many
Expand Down Expand Up @@ -399,6 +405,7 @@ func (e *EndpointController) syncService(key string) error {
if err != nil && !errors.IsNotFound(err) {
return err
}
e.triggerTimeTracker.DeleteEndpoints(namespace, name)
return nil
}

Expand Down Expand Up @@ -427,6 +434,12 @@ func (e *EndpointController) syncService(key string) error {
}
}

// We call ComputeEndpointsLastChangeTriggerTime here to make sure that the state of the trigger
// time tracker gets updated even if the sync turns out to be no-op and we don't update the
// endpoints object.
endpointsLastChangeTriggerTime := e.triggerTimeTracker.
ComputeEndpointsLastChangeTriggerTime(namespace, name, service, pods)

subsets := []v1.EndpointSubset{}
var totalReadyEps int
var totalNotReadyEps int
Expand Down Expand Up @@ -506,6 +519,11 @@ func (e *EndpointController) syncService(key string) error {
newEndpoints.Annotations = make(map[string]string)
}

if !endpointsLastChangeTriggerTime.IsZero() {
newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =
endpointsLastChangeTriggerTime.Format(time.RFC3339Nano)
}

klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
if createEndpoints {
// No previous endpoints, create them
Expand Down
94 changes: 94 additions & 0 deletions pkg/controller/endpoint/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ import (
var alwaysReady = func() bool { return true }
var neverReady = func() bool { return false }
var emptyNodeName string
var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)

func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
for i := 0; i < nPods+nNotReady; i++ {
Expand Down Expand Up @@ -1175,3 +1178,94 @@ func TestDetermineNeededServiceUpdates(t *testing.T) {
}
}
}

func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
},
})
endpoints.syncService(ns + "/foo")

endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Annotations: map[string]string{
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}

func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Annotations: map[string]string{
v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
},
})
endpoints.syncService(ns + "/foo")

endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Annotations: map[string]string{
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
})
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}
163 changes: 163 additions & 0 deletions pkg/controller/endpoint/trigger_time_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
Copyright 2019 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package endpoint

import (
"sync"
"time"

"k8s.io/api/core/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)

// TriggerTimeTracker is a util used to compute the EndpointsLastChangeTriggerTime annotation which
// is exported in the endpoints controller's sync function.
// See the documentation of the EndpointsLastChangeTriggerTime annotation for more details.
//
// Please note that this util may compute a wrong EndpointsLastChangeTriggerTime if a same object
// changes multiple times between two consecutive syncs. We're aware of this limitation but we
// decided to accept it, as fixing it would require a major rewrite of the endpoints controller and
// Informer framework. Such situations, i.e. frequent updates of the same object in a single sync
// period, should be relatively rare and therefore this util should provide a good approximation of
// the EndpointsLastChangeTriggerTime.
wojtek-t marked this conversation as resolved.
Show resolved Hide resolved
// TODO(mm4tt): Implement a more robust mechanism that is not subject to the above limitations.
type TriggerTimeTracker struct {
// endpointsStates is a map, indexed by Endpoints object key, storing the last known Endpoints
// object state observed during the most recent call of the ComputeEndpointsLastChangeTriggerTime
// function.
endpointsStates map[endpointsKey]endpointsState

// mutex guarding the endpointsStates map.
mutex sync.Mutex
}

// NewTriggerTimeTracker creates a new instance of the TriggerTimeTracker.
func NewTriggerTimeTracker() *TriggerTimeTracker {
return &TriggerTimeTracker{
endpointsStates: make(map[endpointsKey]endpointsState),
}
}

// endpointsKey is a key uniquely identifying an Endpoints object.
type endpointsKey struct {
// namespace, name composing a namespaced name - an unique identifier of every Endpoints object.
namespace, name string
}

// endpointsState represents a state of an Endpoints object that is known to this util.
type endpointsState struct {
// lastServiceTriggerTime is a service trigger time observed most recently.
lastServiceTriggerTime time.Time
// lastPodTriggerTimes is a map (Pod name -> time) storing the pod trigger times that were
// observed during the most recent call of the ComputeEndpointsLastChangeTriggerTime function.
lastPodTriggerTimes map[string]time.Time
}

// ComputeEndpointsLastChangeTriggerTime updates the state of the Endpoints object being synced
// and returns the time that should be exported as the EndpointsLastChangeTriggerTime annotation.
//
// If the method returns a 'zero' time the EndpointsLastChangeTriggerTime annotation shouldn't be
// exported.
//
// Please note that this function may compute a wrong EndpointsLastChangeTriggerTime value if the
// same object (pod/service) changes multiple times between two consecutive syncs.
//
// Important: This method is go-routing safe but only when called for different keys. The method
// shouldn't be called concurrently for the same key! This contract is fulfilled in the current
// implementation of the endpoints controller.
func (t *TriggerTimeTracker) ComputeEndpointsLastChangeTriggerTime(
namespace, name string, service *v1.Service, pods []*v1.Pod) time.Time {

key := endpointsKey{namespace: namespace, name: name}
// As there won't be any concurrent calls for the same key, we need to guard access only to the
// endpointsStates map.
t.mutex.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using read lock here ?

state, wasKnown := t.endpointsStates[key]
t.mutex.Unlock()

// Update the state before returning.
defer func() {
t.mutex.Lock()
t.endpointsStates[key] = state
t.mutex.Unlock()
}()

// minChangedTriggerTime is the min trigger time of all trigger times that have changed since the
// last sync.
var minChangedTriggerTime time.Time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably add a piece of comment here to explain why using the minChangedTriggerTime to do the measurement may not yeild the intended result due to multiple update getting merged into one.

Basically, sync1 -> pod1, pod2, pod3 changed -> sync2
minChangedTriggerTime in this case is time delta between pod3 update and sync2, but the actual time difference may be delta between pod1 update and sync2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, assuming that by pod1, pod2, pod3 you're referring to multiple updates of the same pod.
In such case here we'd see and export the pod3 change time whereas we should've used the pod1 change time.
This was already documented in the TriggerTimeTracker struct documentation, but added it also in the doc of this function.

And just to make one thing clear, we're not operating on time deltas here. Here we only export the "trigger time" which is the time of the oldest change that triggered endpoints object change. Time delta will be computed in kube-proxy to export the actual latency.

// TODO(mm4tt): If memory allocation / GC performance impact of recreating map in every call
// turns out to be too expensive, we should consider rewriting this to reuse the existing map.
podTriggerTimes := make(map[string]time.Time)
wojtek-t marked this conversation as resolved.
Show resolved Hide resolved
for _, pod := range pods {
if podTriggerTime := getPodTriggerTime(pod); !podTriggerTime.IsZero() {
podTriggerTimes[pod.Name] = podTriggerTime
if podTriggerTime.After(state.lastPodTriggerTimes[pod.Name]) {
// Pod trigger time has changed since the last sync, update minChangedTriggerTime.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you please split this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, it doesn't require splitting. It's below the 100 character limit.

minChangedTriggerTime = min(minChangedTriggerTime, podTriggerTime)
}
}
}
serviceTriggerTime := getServiceTriggerTime(service)
if serviceTriggerTime.After(state.lastServiceTriggerTime) {
// Service trigger time has changed since the last sync, update minChangedTriggerTime.
minChangedTriggerTime = min(minChangedTriggerTime, serviceTriggerTime)
}

state.lastPodTriggerTimes = podTriggerTimes
state.lastServiceTriggerTime = serviceTriggerTime

if !wasKnown {
// New Endpoints object / new Service, use Service creationTimestamp.
return service.CreationTimestamp.Time
} else {
// Regular update of the Endpoints object, return min of changed trigger times.
return minChangedTriggerTime
}
}

// DeleteEndpoints deletes endpoints state stored in this util.
func (t *TriggerTimeTracker) DeleteEndpoints(namespace, name string) {
key := endpointsKey{namespace: namespace, name: name}
t.mutex.Lock()
defer t.mutex.Unlock()
delete(t.endpointsStates, key)
}

// getPodTriggerTime returns the time of the pod change (trigger) that resulted or will result in
// the endpoints object change.
func getPodTriggerTime(pod *v1.Pod) (triggerTime time.Time) {
if readyCondition := podutil.GetPodReadyCondition(pod.Status); readyCondition != nil {
triggerTime = readyCondition.LastTransitionTime.Time
}
// TODO(mm4tt): Implement missing cases: deletionTime set, pod label change
return triggerTime
}

// getServiceTriggerTime returns the time of the service change (trigger) that resulted or will
// result in the endpoints object change.
func getServiceTriggerTime(service *v1.Service) (triggerTime time.Time) {
// TODO(mm4tt): Ideally we should look at service.LastUpdateTime, but such thing doesn't exist.
Copy link
Contributor

@freehan freehan Feb 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring service update may result in discrepancy right?
For example:
Service ports got updated and no change on pods. This would result in update in endpoints object. The minChangedTriggerTime will still return one of the time delta after the latest pod update, right?

In this case, unless we have the something similar to service.LastUpdateTime, I think we can ignore it and skip recording. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what we do. The minChangedTriggerTime definition is: the min trigger time of all trigger times that have changed since the last sync.

Assuming that only service has changed, the minChangeTriggerTime would be a 'zero' time, as nothing has changed (since we don't have the service.LastUpdateTime and service.CreationTime stays the same). If the ComputeEndpointsLastChangeTriggerTime returns a 'zero' time the result should be ignored and the EndpointsLastChangeTriggerTime shouldn't be exported.

If in future we manage to get the service.LastUpdateTime somehow it will be enough to just change this place and everything will work correctly. That's why I left a TODO here.

I updated the docs and comments to make it easier to understand. Also added a test for that case.

return service.CreationTimestamp.Time
}

// min returns minimum of the currentMin and newValue or newValue if the currentMin is not set.
func min(currentMin, newValue time.Time) time.Time {
if currentMin.IsZero() || newValue.Before(currentMin) {
return newValue
}
return currentMin
}