Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix pod-cache with node semantic change #3795

Merged
merged 1 commit into from
Jan 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 33 additions & 23 deletions pkg/master/pod_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sync"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
Expand All @@ -47,7 +46,7 @@ type PodCache struct {
podStatus map[objKey]api.PodStatus
// nodes that we know exist. Cleared at the beginning of each
// UpdateAllPods call.
currentNodes map[objKey]bool
currentNodes map[objKey]api.NodeStatus
}

type objKey struct {
Expand All @@ -63,7 +62,7 @@ func NewPodCache(ipCache IPGetter, info client.PodInfoGetter, nodes client.NodeI
containerInfo: info,
pods: pods,
nodes: nodes,
currentNodes: map[objKey]bool{},
currentNodes: map[objKey]api.NodeStatus{},
podStatus: map[objKey]api.PodStatus{},
}
}
Expand All @@ -80,37 +79,34 @@ func (p *PodCache) GetPodStatus(namespace, name string) (*api.PodStatus, error)
return &value, nil
}

func (p *PodCache) nodeExistsInCache(name string) (exists, cacheHit bool) {
func (p *PodCache) getNodeStatusInCache(name string) (*api.NodeStatus, bool) {
p.lock.Lock()
defer p.lock.Unlock()
exists, cacheHit = p.currentNodes[objKey{"", name}]
return exists, cacheHit
nodeStatus, cacheHit := p.currentNodes[objKey{"", name}]
return &nodeStatus, cacheHit
}

// lock must *not* be held
func (p *PodCache) nodeExists(name string) bool {
exists, cacheHit := p.nodeExistsInCache(name)
func (p *PodCache) getNodeStatus(name string) (*api.NodeStatus, error) {
nodeStatus, cacheHit := p.getNodeStatusInCache(name)
if cacheHit {
return exists
return nodeStatus, nil
}
// TODO: suppose there's N concurrent requests for node "foo"; in that case
// it might be useful to block all of them and only look up "foo" once.
// (This code will make up to N lookups.) One way of doing that would be to
// have a pool of M mutexes and require that before looking up "foo" you must
// lock mutex hash("foo") % M.
_, err := p.nodes.Get(name)
exists = true
node, err := p.nodes.Get(name)
if err != nil {
exists = false
if !errors.IsNotFound(err) {
glog.Errorf("Unexpected error type verifying minion existence: %+v", err)
}
glog.Errorf("Unexpected error verifying node existence: %+v", err)
return nil, err
}

p.lock.Lock()
defer p.lock.Unlock()
p.currentNodes[objKey{"", name}] = exists
return exists
p.currentNodes[objKey{"", name}] = node.Status
return &node.Status, nil
}

// TODO: once Host gets moved to spec, this can take a podSpec + metadata instead of an
Expand Down Expand Up @@ -138,12 +134,26 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
return newStatus, nil
}

if !p.nodeExists(pod.Status.Host) {
// Assigned to non-existing node.
newStatus.Phase = api.PodFailed
nodeStatus, err := p.getNodeStatus(pod.Status.Host)

// Assigned to non-existing node.
if err != nil || len(nodeStatus.Conditions) == 0 {
newStatus.Phase = api.PodUnknown
return newStatus, nil
}

// Assigned to an unhealthy node.
for _, condition := range nodeStatus.Conditions {
if condition.Kind == api.NodeReady && condition.Status == api.ConditionNone {
newStatus.Phase = api.PodUnknown
return newStatus, nil
}
if condition.Kind == api.NodeReachable && condition.Status == api.ConditionNone {
newStatus.Phase = api.PodUnknown
return newStatus, nil
}
}

result, err := p.containerInfo.GetPodStatus(pod.Status.Host, pod.Namespace, pod.Name)
newStatus.HostIP = p.ipCache.GetInstanceIP(pod.Status.Host)

Expand All @@ -161,18 +171,18 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
return newStatus, err
}

func (p *PodCache) resetNodeExistenceCache() {
func (p *PodCache) resetNodeStatusCache() {
p.lock.Lock()
defer p.lock.Unlock()
p.currentNodes = map[objKey]bool{}
p.currentNodes = map[objKey]api.NodeStatus{}
}

// UpdateAllContainers updates information about all containers.
// Callers should let one call to UpdateAllContainers finish before
// calling again, or risk having new info getting clobbered by delayed
// old info.
func (p *PodCache) UpdateAllContainers() {
p.resetNodeExistenceCache()
p.resetNodeStatusCache()

ctx := api.NewContext()
pods, err := p.pods.ListPods(ctx, labels.Everything())
Expand Down
76 changes: 60 additions & 16 deletions pkg/master/pod_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,31 @@ func makePod(namespace, name, host string, containers ...string) *api.Pod {
Status: api.PodStatus{Host: host},
}
for _, c := range containers {
pod.Spec.Containers = append(pod.Spec.Containers, api.Container{
Name: c,
})
pod.Spec.Containers = append(pod.Spec.Containers, api.Container{Name: c})
}
return pod
}

func makeNode(name string) *api.Node {
func makeHealthyNode(name string) *api.Node {
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: name},
Status: api.NodeStatus{Conditions: []api.NodeCondition{
{Kind: api.NodeReady, Status: api.ConditionFull},
}},
}
}

func makeUnhealthyNode(name string) *api.Node {
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: name},
Status: api.NodeStatus{Conditions: []api.NodeCondition{
{Kind: api.NodeReady, Status: api.ConditionNone},
}},
}
}

func TestPodUpdateAllContainers(t *testing.T) {
pod := makePod(api.NamespaceDefault, "foo", "machine", "bar")
pod1 := makePod(api.NamespaceDefault, "foo", "machine", "bar")
pod2 := makePod(api.NamespaceDefault, "baz", "machine", "qux")
config := podCacheTestConfig{
ipFunc: func(host string) string {
Expand All @@ -211,8 +221,8 @@ func TestPodUpdateAllContainers(t *testing.T) {
},
kubeletContainerInfo: api.PodStatus{
Info: api.PodInfo{"bar": api.ContainerStatus{}}},
nodes: []api.Node{*makeNode("machine")},
pods: []api.Pod{*pod, *pod2},
nodes: []api.Node{*makeHealthyNode("machine")},
pods: []api.Pod{*pod1, *pod2},
}
cache := config.Construct()

Expand Down Expand Up @@ -254,7 +264,7 @@ func TestFillPodStatusNoHost(t *testing.T) {
pod := makePod(api.NamespaceDefault, "foo", "", "bar")
config := podCacheTestConfig{
kubeletContainerInfo: api.PodStatus{},
nodes: []api.Node{*makeNode("machine")},
nodes: []api.Node{*makeHealthyNode("machine")},
pods: []api.Pod{*pod},
}
cache := config.Construct()
Expand Down Expand Up @@ -283,7 +293,7 @@ func TestFillPodStatusMissingMachine(t *testing.T) {
}

status, err := cache.GetPodStatus(pod.Namespace, pod.Name)
if e, a := api.PodFailed, status.Phase; e != a {
if e, a := api.PodUnknown, status.Phase; e != a {
t.Errorf("Expected: %+v, Got %+v", e, a)
}
}
Expand All @@ -310,7 +320,7 @@ func TestFillPodStatus(t *testing.T) {
},
},
},
nodes: []api.Node{*makeNode("machine")},
nodes: []api.Node{*makeHealthyNode("machine")},
pods: []api.Pod{*pod},
}
cache := config.Construct()
Expand All @@ -337,7 +347,7 @@ func TestFillPodInfoNoData(t *testing.T) {
"net": {},
},
},
nodes: []api.Node{*makeNode("machine")},
nodes: []api.Node{*makeHealthyNode("machine")},
pods: []api.Pod{*pod},
}
cache := config.Construct()
Expand Down Expand Up @@ -376,17 +386,19 @@ func TestPodPhaseWithBadNode(t *testing.T) {

tests := []struct {
pod *api.Pod
nodes []api.Node
status api.PodPhase
test string
}{
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Host: "machine-2",
Host: "machine-two",
},
},
api.PodFailed,
[]api.Node{},
api.PodUnknown,
"no info, but bad machine",
},
{
Expand All @@ -400,7 +412,8 @@ func TestPodPhaseWithBadNode(t *testing.T) {
Host: "machine-two",
},
},
api.PodFailed,
[]api.Node{},
api.PodUnknown,
"all running but minion is missing",
},
{
Expand All @@ -414,14 +427,45 @@ func TestPodPhaseWithBadNode(t *testing.T) {
Host: "machine-two",
},
},
api.PodFailed,
[]api.Node{},
api.PodUnknown,
"all stopped but minion missing",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": runningState,
"containerB": runningState,
},
Host: "machine-two",
},
},
[]api.Node{*makeUnhealthyNode("machine-two")},
api.PodUnknown,
"all running but minion is unhealthy",
},
{
&api.Pod{
Spec: desiredState,
Status: api.PodStatus{
Info: map[string]api.ContainerStatus{
"containerA": stoppedState,
"containerB": stoppedState,
},
Host: "machine-two",
},
},
[]api.Node{*makeUnhealthyNode("machine-two")},
api.PodUnknown,
"all stopped but minion is unhealthy",
},
}
for _, test := range tests {
config := podCacheTestConfig{
kubeletContainerInfo: test.pod.Status,
nodes: []api.Node{},
nodes: test.nodes,
pods: []api.Pod{*test.pod},
}
cache := config.Construct()
Expand Down