Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pod cache needs to be namespace-aware #1696

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var (

type fakePodInfoGetter struct{}

func (fakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
func (fakePodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
// This is a horrible hack to get around the fact that we can't provide
// different port numbers per kubelet...
var c client.PodInfoGetter
Expand All @@ -76,9 +76,9 @@ func (fakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
Port: 10251,
}
default:
glog.Fatalf("Can't get info for: '%v', '%v'", host, podID)
glog.Fatalf("Can't get info for: '%v', '%v'", host, podNamespace, podID)
}
return c.GetPodInfo("localhost", podID)
return c.GetPodInfo("localhost", podNamespace, podID)
}

type delegateHandler struct {
Expand Down Expand Up @@ -182,11 +182,11 @@ func podsOnMinions(c *client.Client, pods api.PodList) wait.ConditionFunc {
podInfo := fakePodInfoGetter{}
return func() (bool, error) {
for i := range pods.Items {
host, id := pods.Items[i].CurrentState.Host, pods.Items[i].ID
host, id, namespace := pods.Items[i].CurrentState.Host, pods.Items[i].ID, pods.Items[i].Namespace
if len(host) == 0 {
return false, nil
}
if _, err := podInfo.GetPodInfo(host, id); err != nil {
if _, err := podInfo.GetPodInfo(host, namespace, id); err != nil {
return false, nil
}
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/client/podinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var ErrPodInfoNotAvailable = errors.New("no pod info available")
type PodInfoGetter interface {
// GetPodInfo returns information about all containers which are part
// Returns an api.PodInfo, or an error if one occurs.
GetPodInfo(host, podID string) (api.PodInfo, error)
GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error)
}

// HTTPPodInfoGetter is the default implementation of PodInfoGetter, accesses the kubelet over HTTP.
Expand All @@ -46,13 +46,14 @@ type HTTPPodInfoGetter struct {
}

// GetPodInfo gets information about the specified pod.
func (c *HTTPPodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
func (c *HTTPPodInfoGetter) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
request, err := http.NewRequest(
"GET",
fmt.Sprintf(
"http://%s/podInfo?podID=%s",
"http://%s/podInfo?podID=%s&podNamespace=%s",
net.JoinHostPort(host, strconv.FormatUint(uint64(c.Port), 10)),
podID),
podID,
podNamespace),
nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -85,6 +86,6 @@ type FakePodInfoGetter struct {
}

// GetPodInfo is a fake implementation of PodInfoGetter.GetPodInfo.
func (c *FakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
func (c *FakePodInfoGetter) GetPodInfo(host, podNamespace string, podID string) (api.PodInfo, error) {
return c.data, c.err
}
4 changes: 2 additions & 2 deletions pkg/client/podinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestHTTPPodInfoGetter(t *testing.T) {
Client: http.DefaultClient,
Port: uint(port),
}
gotObj, err := podInfoGetter.GetPodInfo(parts[0], "foo")
gotObj, err := podInfoGetter.GetPodInfo(parts[0], api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestHTTPPodInfoGetterNotFound(t *testing.T) {
Client: http.DefaultClient,
Port: uint(port),
}
_, err = podInfoGetter.GetPodInfo(parts[0], "foo")
_, err = podInfoGetter.GetPodInfo(parts[0], api.NamespaceDefault, "foo")
if err != ErrPodInfoNotAvailable {
t.Errorf("Expected %#v, Got %#v", ErrPodInfoNotAvailable, err)
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/master/pod_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,38 @@ func NewPodCache(info client.PodInfoGetter, pods pod.Registry) *PodCache {
}
}

// makePodCacheKey constructs a key for use in a map to address a pod with specified namespace and id
func makePodCacheKey(podNamespace, podID string) string {
return podNamespace + "." + podID
}

// GetPodInfo implements the PodInfoGetter.GetPodInfo.
// The returned value should be treated as read-only.
// TODO: Remove the host from this call, it's totally unnecessary.
func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) {
func (p *PodCache) GetPodInfo(host, podNamespace, podID string) (api.PodInfo, error) {
p.podLock.Lock()
defer p.podLock.Unlock()
value, ok := p.podInfo[podID]
value, ok := p.podInfo[makePodCacheKey(podNamespace, podID)]
if !ok {
return nil, client.ErrPodInfoNotAvailable
}
return value, nil
}

func (p *PodCache) updatePodInfo(host, id string) error {
info, err := p.containerInfo.GetPodInfo(host, id)
func (p *PodCache) updatePodInfo(host, podNamespace, podID string) error {
info, err := p.containerInfo.GetPodInfo(host, podNamespace, podID)
if err != nil {
return err
}
p.podLock.Lock()
defer p.podLock.Unlock()
p.podInfo[id] = info
p.podInfo[makePodCacheKey(podNamespace, podID)] = info
return nil
}

// UpdateAllContainers updates information about all containers. Either called by Loop() below, or one-off.
func (p *PodCache) UpdateAllContainers() {
var ctx api.Context
ctx := api.NewContext()
pods, err := p.pods.ListPods(ctx, labels.Everything())
if err != nil {
glog.Errorf("Error synchronizing container list: %v", err)
Expand All @@ -82,7 +87,7 @@ func (p *PodCache) UpdateAllContainers() {
if pod.CurrentState.Host == "" {
continue
}
err := p.updatePodInfo(pod.CurrentState.Host, pod.ID)
err := p.updatePodInfo(pod.CurrentState.Host, pod.Namespace, pod.ID)
if err != nil && err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error synchronizing container: %v", err)
}
Expand Down
60 changes: 46 additions & 14 deletions pkg/master/pod_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,59 @@ import (
)

type FakePodInfoGetter struct {
host string
id string
data api.PodInfo
err error
host string
id string
namespace string
data api.PodInfo
err error
}

func (f *FakePodInfoGetter) GetPodInfo(host, id string) (api.PodInfo, error) {
func (f *FakePodInfoGetter) GetPodInfo(host, namespace, id string) (api.PodInfo, error) {
f.host = host
f.id = id
f.namespace = namespace
return f.data, f.err
}

func TestPodCacheGetDifferentNamespace(t *testing.T) {
cache := NewPodCache(nil, nil)

expectedDefault := api.PodInfo{
"foo": api.ContainerStatus{},
}
expectedOther := api.PodInfo{
"bar": api.ContainerStatus{},
}

cache.podInfo[makePodCacheKey(api.NamespaceDefault, "foo")] = expectedDefault
cache.podInfo[makePodCacheKey("other", "foo")] = expectedOther

info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if !reflect.DeepEqual(info, expectedDefault) {
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expectedOther, info)
}

info, err = cache.GetPodInfo("host", "other", "foo")
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if !reflect.DeepEqual(info, expectedOther) {
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expectedOther, info)
}
}

func TestPodCacheGet(t *testing.T) {
cache := NewPodCache(nil, nil)

expected := api.PodInfo{
"foo": api.ContainerStatus{},
}
cache.podInfo["foo"] = expected
cache.podInfo[makePodCacheKey(api.NamespaceDefault, "foo")] = expected

info, err := cache.GetPodInfo("host", "foo")
info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
Expand All @@ -57,7 +89,7 @@ func TestPodCacheGet(t *testing.T) {
func TestPodCacheGetMissing(t *testing.T) {
cache := NewPodCache(nil, nil)

info, err := cache.GetPodInfo("host", "foo")
info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
if err == nil {
t.Errorf("Unexpected non-error: %#v", err)
}
Expand All @@ -75,13 +107,13 @@ func TestPodGetPodInfoGetter(t *testing.T) {
}
cache := NewPodCache(&fake, nil)

cache.updatePodInfo("host", "foo")
cache.updatePodInfo("host", api.NamespaceDefault, "foo")

if fake.host != "host" || fake.id != "foo" {
if fake.host != "host" || fake.id != "foo" || fake.namespace != api.NamespaceDefault {
t.Errorf("Unexpected access: %#v", fake)
}

info, err := cache.GetPodInfo("host", "foo")
info, err := cache.GetPodInfo("host", api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
Expand All @@ -92,7 +124,7 @@ func TestPodGetPodInfoGetter(t *testing.T) {

func TestPodUpdateAllContainers(t *testing.T) {
pod := api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
TypeMeta: api.TypeMeta{ID: "foo", Namespace: api.NamespaceDefault},
CurrentState: api.PodState{
Host: "machine",
},
Expand All @@ -111,11 +143,11 @@ func TestPodUpdateAllContainers(t *testing.T) {

cache.UpdateAllContainers()

if fake.host != "machine" || fake.id != "foo" {
if fake.host != "machine" || fake.id != "foo" || fake.namespace != api.NamespaceDefault {
t.Errorf("Unexpected access: %#v", fake)
}

info, err := cache.GetPodInfo("machine", "foo")
info, err := cache.GetPodInfo("machine", api.NamespaceDefault, "foo")
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/registry/pod/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ func (rs *REST) fillPodInfo(pod *api.Pod) {
// Get cached info for the list currently.
// TODO: Optionally use fresh info
if rs.podCache != nil {
info, err := rs.podCache.GetPodInfo(pod.CurrentState.Host, pod.ID)
info, err := rs.podCache.GetPodInfo(pod.CurrentState.Host, pod.Namespace, pod.ID)
if err != nil {
if err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error getting container info from cache: %#v", err)
}
if rs.podInfoGetter != nil {
info, err = rs.podInfoGetter.GetPodInfo(pod.CurrentState.Host, pod.ID)
info, err = rs.podInfoGetter.GetPodInfo(pod.CurrentState.Host, pod.Namespace, pod.ID)
}
if err != nil {
if err != client.ErrPodInfoNotAvailable {
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/pod/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ type FakePodInfoGetter struct {
err error
}

func (f *FakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
func (f *FakePodInfoGetter) GetPodInfo(host, podNamespace string, podID string) (api.PodInfo, error) {
return f.info, f.err
}

Expand Down