Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding sync pod latency metric. #4818

Merged
merged 1 commit into from
Feb 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 44 additions & 8 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ const podOomScoreAdj = -100

// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
SyncPods([]api.BoundPod) error
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error
}

type SourceReadyFn func(source string) bool
Expand Down Expand Up @@ -942,7 +945,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke
func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
start := time.Now()
defer func() {
metrics.ImagePullLatency.Observe(float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()))
metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start))
}()

if err := kl.dockerPuller.Pull(img); err != nil {
Expand Down Expand Up @@ -1270,7 +1273,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker
}

// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
glog.V(4).Infof("Desired: %#v", pods)
var err error
desiredContainers := make(map[podContainer]empty)
Expand All @@ -1296,7 +1299,9 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
}

// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(*pod)
kl.podWorkers.UpdatePod(pod, func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
}

// Stop the workers for no-longer existing pods.
Expand Down Expand Up @@ -1416,19 +1421,21 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) {
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for {
unsyncedPod := false
podSyncTypes := make(map[types.UID]metrics.SyncPodType)
select {
case u := <-updates:
kl.updatePods(u)
kl.updatePods(u, podSyncTypes)
unsyncedPod = true
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
}
start := time.Now()
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
select {
case u := <-updates:
kl.updatePods(u)
kl.updatePods(u, podSyncTypes)
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
Expand All @@ -1440,25 +1447,54 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
glog.Errorf("Failed to get bound pods.")
return
}
if err := handler.SyncPods(pods); err != nil {
if err := handler.SyncPods(pods, podSyncTypes, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
}
}

func (kl *Kubelet) updatePods(u PodUpdate) {
// Updated the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")

// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make(map[types.UID]struct{})
for i := range kl.pods {
existingPods[kl.pods[i].UID] = struct{}{}
}
for i := range u.Pods {
if _, ok := existingPods[u.Pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodCreate
}
}

kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")

// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}

kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}

// Mark all remaining pods as sync.
for i := range kl.pods {
if _, ok := podSyncTypes[kl.pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
}
}
}

// Returns Docker version for this Kubelet.
Expand Down
29 changes: 16 additions & 13 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
Expand Down Expand Up @@ -382,6 +383,8 @@ func (cr *channelReader) GetList() [][]api.BoundPod {
return cr.list
}

var emptyPodUIDs map[types.UID]metrics.SyncPodType

func TestSyncPodsDoesNothing(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
container := api.Container{Name: "bar"}
Expand Down Expand Up @@ -413,7 +416,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -444,7 +447,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -491,7 +494,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -542,7 +545,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -590,7 +593,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -645,7 +648,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -690,7 +693,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -728,15 +731,15 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()

ready = true
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
Expand Down Expand Up @@ -787,15 +790,15 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()

ready = true
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
Expand Down Expand Up @@ -833,7 +836,7 @@ func TestSyncPodsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncPods([]api.BoundPod{})
err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -2091,7 +2094,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
},
},
},
})
}, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
38 changes: 37 additions & 1 deletion pkg/kubelet/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package metrics

import (
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
Expand All @@ -35,8 +36,16 @@ var (
Help: "Image pull latency in microseconds.",
},
)
// TODO(vmarmol): Break down by number of containers in pod?
SyncPodLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: kubeletSubsystem,
Name: "sync_pod_latency_microseconds",
Help: "Latency in microseconds to sync a single pod. Broken down by operation type: create, update, or sync",
},
[]string{"operation_type"},
)
// TODO(vmarmol): Containers per pod
// TODO(vmarmol): Latency of pod startup
// TODO(vmarmol): Latency of SyncPods
)

Expand All @@ -47,10 +56,37 @@ func Register(containerCache dockertools.DockerCache) {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(ImagePullLatency)
prometheus.MustRegister(SyncPodLatency)
prometheus.MustRegister(newPodAndContainerCollector(containerCache))
})
}

type SyncPodType int

const (
SyncPodCreate SyncPodType = iota
SyncPodUpdate
SyncPodSync
)

func (self SyncPodType) String() string {
switch self {
case SyncPodCreate:
return "create"
case SyncPodUpdate:
return "update"
case SyncPodSync:
return "sync"
default:
return "unknown"
}
}

// Gets the time since the specified start in microseconds.
func SinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}

func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAndContainerCollector {
return &podAndContainerCollector{
containerCache: containerCache,
Expand Down
34 changes: 23 additions & 11 deletions pkg/kubelet/pod_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type podWorkers struct {

// Tracks all running per-pod goroutines - per-pod goroutine will be
// processing updates received through its corresponding channel.
podUpdates map[types.UID]chan api.BoundPod
podUpdates map[types.UID]chan workUpdate
// DockerCache is used for listing running containers.
dockerCache dockertools.DockerCache

Expand All @@ -45,16 +45,24 @@ type podWorkers struct {
syncPodFun syncPodFunType
}

type workUpdate struct {
// The pod state to reflect.
pod *api.BoundPod

// Function to call when the update is complete.
updateCompleteFun func()
}

func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFun syncPodFunType) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan api.BoundPod{},
podUpdates: map[types.UID]chan workUpdate{},
dockerCache: dockerCache,
syncPodFun: syncPodFun,
}
}

func (p *podWorkers) managePodLoop(podUpdates <-chan api.BoundPod) {
for newPod := range podUpdates {
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
for newWork := range podUpdates {
// Since we use docker cache, getting current state shouldn't cause
// performance overhead on Docker. Moreover, as long as we run syncPod
// no matter if it changes anything, having an old version of "containers"
Expand All @@ -64,29 +72,33 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan api.BoundPod) {
glog.Errorf("Error listing containers while syncing pod: %v", err)
continue
}
err = p.syncPodFun(&newPod, containers)
err = p.syncPodFun(newWork.pod, containers)
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newPod.UID, err)
record.Eventf(&newPod, "failedSync", "Error syncing pod, skipping: %v", err)
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
record.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
continue
}
}
}

func (p *podWorkers) UpdatePod(pod api.BoundPod) {
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) {
uid := pod.UID
var podUpdates chan api.BoundPod
var podUpdates chan workUpdate
var exists bool

p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
// TODO(wojtek-t): Adjust the size of the buffer in this channel
podUpdates = make(chan api.BoundPod, 5)
podUpdates = make(chan workUpdate, 5)
p.podUpdates[uid] = podUpdates
go p.managePodLoop(podUpdates)
}
podUpdates <- pod
podUpdates <- workUpdate{
pod: pod,
updateCompleteFun: updateComplete,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh, Fun.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, This updates the Complete Fun (TM)

Can change to Func if you think it's better (I tend to agree). I was keeping the style of the file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have Fn, Fun, and Func. I wouldn't say no to Fun.
You can refactor in a separate PR if you want. Let's get this one in.

}
}

func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
Expand Down