Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transform the podCache into a write-through cache. #3927

Merged
merged 1 commit into from
Jan 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ func (m *Master) init(c *Config) {
m.podRegistry,
)
go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30)
go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30)

// TODO: Factor out the core API registration
m.storage = map[string]apiserver.RESTStorage{
Expand Down
47 changes: 41 additions & 6 deletions pkg/master/pod_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,38 @@ func NewPodCache(ipCache IPGetter, info client.PodInfoGetter, nodes client.NodeI

// GetPodStatus gets the stored pod status.
func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error) {
status := p.getPodStatusInternal(namespace, name)
if status != nil {
return status, nil
}
return p.updateCacheAndReturn(namespace, name)
}

func (p *PodCache) updateCacheAndReturn(namespace, name string) (*api.PodStatus, error) {
pod, err := p.pods.GetPod(api.WithNamespace(api.NewContext(), namespace), name)
if err != nil {
return nil, err
}
if err := p.updatePodStatus(pod); err != nil {
return nil, err
}
status := p.getPodStatusInternal(namespace, name)
if status == nil {
glog.Warningf("nil status after successful update. that's odd... (%s %s)", namespace, name)
return nil, client.ErrPodInfoNotAvailable
}
return status, nil
}

func (p *PodCache) getPodStatusInternal(namespace, name string) *api.PodStatus {
p.lock.Lock()
defer p.lock.Unlock()
value, ok := p.podStatus[objKey{namespace, name}]
if !ok {
return nil, client.ErrPodInfoNotAvailable
return nil
}
// Make a copy
return &value, nil
return &value
}

func (p *PodCache) ClearPodStatus(namespace, name string) {
Expand Down Expand Up @@ -178,19 +202,30 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
return newStatus, err
}

func (p *PodCache) resetNodeStatusCache() {
func (p *PodCache) GarbageCollectPodStatus() {
pods, err := p.pods.ListPods(api.NewContext(), labels.Everything())
if err != nil {
glog.Errorf("Error getting pod list: %v", err)
}
keys := map[objKey]bool{}
for _, pod := range pods.Items {
keys[objKey{pod.Namespace, pod.Name}] = true
}
p.lock.Lock()
defer p.lock.Unlock()
p.currentNodes = map[objKey]api.NodeStatus{}
for key := range p.podStatus {
if _, found := keys[key]; !found {
glog.Infof("Deleting orphaned cache entry: %v", key)
delete(p.podStatus, key)
}
}
}

// UpdateAllContainers updates information about all containers.
// Callers should let one call to UpdateAllContainers finish before
// calling again, or risk having new info getting clobbered by delayed
// old info.
func (p *PodCache) UpdateAllContainers() {
p.resetNodeStatusCache()

ctx := api.NewContext()
pods, err := p.pods.ListPods(ctx, labels.Everything())
if err != nil {
Expand Down
66 changes: 60 additions & 6 deletions pkg/master/pod_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ func TestPodCacheGet(t *testing.T) {
}

func TestPodCacheDelete(t *testing.T) {
cache := NewPodCache(nil, nil, nil, nil)
config := podCacheTestConfig{
err: client.ErrPodInfoNotAvailable,
}
cache := config.Construct()

expected := api.PodStatus{
Info: api.PodInfo{
Expand Down Expand Up @@ -156,14 +159,38 @@ func TestPodCacheDelete(t *testing.T) {
}

func TestPodCacheGetMissing(t *testing.T) {
cache := NewPodCache(nil, nil, nil, nil)
pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar")
config := podCacheTestConfig{
ipFunc: func(host string) string {
if host == "machine" {
return "1.2.3.5"
}
return ""
},
kubeletContainerInfo: api.PodStatus{
Info: api.PodInfo{"bar": api.ContainerStatus{}}},
nodes: []api.Node{*makeHealthyNode("machine")},
pod: pod1,
}
cache := config.Construct()

status, err := cache.GetPodStatus(api.NamespaceDefault, "foo")
if err == nil {
t.Errorf("Unexpected non-error: %+v", err)
if err != nil {
t.Errorf("Unexpected error: %+v", err)
}
if status != nil {
t.Errorf("Unexpected status: %+v", status)
if status == nil {
t.Errorf("Unexpected non-status.")
}
expected := &api.PodStatus{
Phase: "Pending",
Host: "machine",
HostIP: "1.2.3.5",
Info: api.PodInfo{
"bar": api.ContainerStatus{},
},
}
if !reflect.DeepEqual(status, expected) {
t.Errorf("expected:\n%#v\ngot:\n%#v\n", expected, status)
}
}

Expand All @@ -177,6 +204,8 @@ type podCacheTestConfig struct {
ipFunc func(string) string // Construct will set a default if nil
nodes []api.Node
pods []api.Pod
pod *api.Pod
err error
kubeletContainerInfo api.PodStatus

// Construct will fill in these fields
Expand All @@ -202,6 +231,8 @@ func (c *podCacheTestConfig) Construct() *PodCache {
},
}
c.fakePods = registrytest.NewPodRegistry(&api.PodList{Items: c.pods})
c.fakePods.Pod = c.pod
c.fakePods.Err = c.err
return NewPodCache(
fakeIPCache(c.ipFunc),
c.fakePodInfo,
Expand Down Expand Up @@ -829,3 +860,26 @@ func TestPodPhaseWithRestartOnFailure(t *testing.T) {
}
}
}

func TestGarbageCollection(t *testing.T) {
pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar")
pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux")
config := podCacheTestConfig{
pods: []api.Pod{*pod1, *pod2},
}
cache := config.Construct()

expected := api.PodStatus{
Info: api.PodInfo{
"extra": api.ContainerStatus{},
},
}
cache.podStatus[objKey{api.NamespaceDefault, "extra"}] = expected

cache.GarbageCollectPodStatus()

status, found := cache.podStatus[objKey{api.NamespaceDefault, "extra"}]
if found {
t.Errorf("unexpectedly found: %v for key %v", status, objKey{api.NamespaceDefault, "extra"})
}
}