Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "Kubelet doesn't kill old pods when BoundPods is empty" issue #3355

Merged
merged 1 commit into from
Jan 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/kubelet/config/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface
}

func (s *sourceEtcd) run() {
watching := s.helper.Watch(s.key, 0)
boundPods := api.BoundPods{}
err := s.helper.ExtractToList(s.key, &boundPods)
if err != nil {
glog.Errorf("etcd failed to retrieve the value for the key %q. Error: %v", s.key, err)
return
}
// Push update. Maybe an empty PodList to allow EtcdSource to be marked as seen
s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.EtcdSource}
index, _ := s.helper.ResourceVersioner.ResourceVersion(&boundPods)
watching := s.helper.Watch(s.key, index)
for {
select {
case event, ok := <-watching.ResultChan():
Expand Down Expand Up @@ -87,6 +96,9 @@ func (s *sourceEtcd) run() {
// It returns a list of containers, or an error if one occurs.
func eventToPods(ev watch.Event) ([]api.BoundPod, error) {
pods := []api.BoundPod{}
if ev.Object == nil {
return pods, nil
}
boundPods, ok := ev.Object.(*api.BoundPods)
if !ok {
return pods, errors.New("unable to parse response as BoundPods")
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/config/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestEventToPods(t *testing.T) {
{
input: watch.Event{Object: nil},
pods: []api.BoundPod{},
fail: true,
fail: false,
},
{
input: watch.Event{Object: &api.BoundPods{}},
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/config/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (s *sourceFile) extractFromPath() error {
if !os.IsNotExist(err) {
return err
}
// Emit an update with an empty PodList to allow FileSource to be marked as seen
s.updates <- kubelet.PodUpdate{[]api.BoundPod{}, kubelet.SET, kubelet.FileSource}
return fmt.Errorf("path does not exist, ignoring")
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/kubelet/config/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,14 @@ func TestUpdateOnNonExistentFile(t *testing.T) {
NewSourceFile("random_non_existent_path", time.Millisecond, ch)
select {
case got := <-ch:
t.Errorf("Expected no update, Got %#v", got)
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource)
if !api.Semantic.DeepEqual(expected, update) {
t.Fatalf("Expected %#v, Got %#v", expected, update)
}

case <-time.After(2 * time.Millisecond):
t.Errorf("Expected update, timeout instead")
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/config/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (s *sourceURL) extractFromURL() error {
return fmt.Errorf("%v: %v", s.url, resp.Status)
}
if len(data) == 0 {
// Emit an update with an empty PodList to allow HTTPSource to be marked as seen
s.updates <- kubelet.PodUpdate{[]api.BoundPod{}, kubelet.SET, kubelet.HTTPSource}
return fmt.Errorf("zero-length data received from %v", s.url)
}
// Short circuit if the manifest has not changed since the last time it was read.
Expand Down
3 changes: 0 additions & 3 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,9 +1118,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
}
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
if kl.pods == nil {
continue
}
}

err := handler.SyncPods(kl.pods)
Expand Down