Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change PodWorkers to have desired cache. #5239

Merged
merged 1 commit into from
Mar 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/kubelet/dockertools/docker_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

type DockerCache interface {
RunningContainers() (DockerContainers, error)
ForceUpdateIfOlder(time.Time) error
}

func NewDockerCache(client DockerInterface) (DockerCache, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I'd recommend is this in the cache and the cache_test:

var _ DockerCache = new(dockerCache)

this ensures that dockerCache abides by the DockerCache interface. If not, you'll get a compile error. Not needed for this PR though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Expand All @@ -49,6 +50,9 @@ type dockerCache struct {
updatingThreadStopTime time.Time
}

// Ensure that dockerCache abides by the DockerCache interface.
var _ DockerCache = new(dockerCache)

func (d *dockerCache) RunningContainers() (DockerContainers, error) {
d.lock.Lock()
defer d.lock.Unlock()
Expand All @@ -69,6 +73,20 @@ func (d *dockerCache) RunningContainers() (DockerContainers, error) {
return d.containers, nil
}

func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.cacheTime.Before(minExpectedCacheTime) {
containers, err := GetKubeletDockerContainers(d.client, false)
if err != nil {
return err
}
d.containers = containers
d.cacheTime = time.Now()
}
return nil
}

func (d *dockerCache) startUpdatingCache() {
run := true
for run {
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/dockertools/fake_docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
Expand Down Expand Up @@ -246,3 +247,7 @@ func NewFakeDockerCache(client DockerInterface) DockerCache {
func (f *FakeDockerCache) RunningContainers() (DockerContainers, error) {
return GetKubeletDockerContainers(f.client, false)
}

func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error {
return nil
}
69 changes: 42 additions & 27 deletions pkg/kubelet/pod_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kubelet

import (
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
Expand All @@ -41,6 +42,9 @@ type podWorkers struct {
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
isWorking map[types.UID]bool
// Tracks the last undelivered work item for this pod - a work item is
// undelivered if it comes in while the worker is working.
lastUndeliveredWorkUpdate map[types.UID]workUpdate
// DockerCache is used for listing running containers.
dockerCache dockertools.DockerCache

Expand All @@ -63,22 +67,26 @@ type workUpdate struct {

func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, recorder record.EventRecorder) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
dockerCache: dockerCache,
syncPodFn: syncPodFn,
recorder: recorder,
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
lastUndeliveredWorkUpdate: map[types.UID]workUpdate{},
dockerCache: dockerCache,
syncPodFn: syncPodFn,
recorder: recorder,
}
}

func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minDockerCacheTime time.Time
for newWork := range podUpdates {
// Since we use docker cache, getting current state shouldn't cause
// performance overhead on Docker. Moreover, as long as we run syncPod
// no matter if it changes anything, having an old version of "containers"
// can cause starting eunended containers.
func() {
defer p.setIsWorking(newWork.pod.UID, false)
defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn)
// We would like to have the state of Docker from at least the moment
// when we finished the previous processing of that pod.
if err := p.dockerCache.ForceUpdateIfOlder(minDockerCacheTime); err != nil {
glog.Errorf("Error updating docker cache: %v", err)
return
}
containers, err := p.dockerCache.RunningContainers()
if err != nil {
glog.Errorf("Error listing containers while syncing pod: %v", err)
Expand All @@ -91,6 +99,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
return
}
minDockerCacheTime = time.Now()

newWork.updateCompleteFn()
}()
Expand All @@ -106,33 +115,28 @@ func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) {
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
// We need to have a buffer here, because checkForUpdates() method that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of this since it seems brittle to future changes. I don't have a better option right now though so I'm just complaining :) Don't block the PR on this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not huge fan, but I think it's not harmful.

// puts an update into channel is called from the same goroutine where
// the channel is consumed. However, it is guaranteed that in such case
// the channel is empty, so buffer of size 1 is enough.
podUpdates = make(chan workUpdate, 1)
p.podUpdates[uid] = podUpdates
go func() {
defer util.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
// TODO(wojtek-t): Consider changing to the following model:
// - add a cache of "desired" pod state
// - whenever an update of a pod comes, we update the "desired" cache
// - if per-pod goroutine is currently iddle, we send the it immediately
// to the per-pod goroutine and clear the cache;
// - when per-pod goroutine finishes processing an update it checks the
// desired cache for next update to proces
// - the crucial thing in this approach is that we don't accumulate multiple
// updates for a given pod (at any point in time there will be at most
// one update queued for a given pod, plus potentially one currently being
// processed) and additionally don't rely on the fact that an update will
// be resend (because we don't drop it)
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- workUpdate{
pod: pod,
updateCompleteFn: updateComplete,
}
} else {
p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{
pod: pod,
updateCompleteFn: updateComplete,
}
}
}

Expand All @@ -143,12 +147,23 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty
if _, exists := desiredPods[key]; !exists {
close(channel)
delete(p.podUpdates, key)
// If there is an undelivered work update for this pod we need to remove it
// since per-pod goroutine won't be able to put it to the already closed
// channel when it finish processing the current work update.
if _, cached := p.lastUndeliveredWorkUpdate[key]; cached {
delete(p.lastUndeliveredWorkUpdate, key)
}
}
}
}

func (p *podWorkers) setIsWorking(uid types.UID, isWorking bool) {
func (p *podWorkers) checkForUpdates(uid types.UID, updateComplete func()) {
p.podLock.Lock()
p.isWorking[uid] = isWorking
p.podLock.Unlock()
defer p.podLock.Unlock()
if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
p.podUpdates[uid] <- workUpdate
delete(p.lastUndeliveredWorkUpdate, uid)
} else {
p.isWorking[uid] = false
}
}