Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use uid in config.go instead of pod full name. #43414

Merged
merged 1 commit into from
Mar 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 18 additions & 33 deletions pkg/kubelet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -117,8 +118,8 @@ func (c *PodConfig) Sync() {
// available, then this object should be considered authoritative.
type podStorage struct {
podLock sync.RWMutex
// map of source name to pod name to pod reference
pods map[string]map[string]*v1.Pod
// map of source name to pod uid to pod reference
pods map[string]map[types.UID]*v1.Pod
mode PodConfigNotificationMode

// ensures that updates are delivered in strict order
Expand All @@ -139,7 +140,7 @@ type podStorage struct {
// TODO: allow initialization of the current state of the store with snapshotted version.
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
return &podStorage{
pods: make(map[string]map[string]*v1.Pod),
pods: make(map[string]map[types.UID]*v1.Pod),
mode: mode,
updates: updates,
sourcesSeen: sets.String{},
Expand Down Expand Up @@ -221,23 +222,22 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de

pods := s.pods[source]
if pods == nil {
pods = make(map[string]*v1.Pod)
pods = make(map[types.UID]*v1.Pod)
}

// updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
// After updated, new pod will be stored in the pod cache *pods*.
// Notice that *pods* and *oldPods* could be the same cache.
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[string]*v1.Pod) {
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered {
name := kubecontainer.GetPodFullName(ref)
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[name]; found {
pods[name] = existing
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
Expand All @@ -249,7 +249,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
continue
}
recordFirstSeenTime(ref)
pods[name] = ref
pods[ref.UID] = ref
addPods = append(addPods, ref)
}
}
Expand Down Expand Up @@ -280,10 +280,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
case kubetypes.REMOVE:
glog.V(4).Infof("Removing pods from source %s : %v", source, update.Pods)
for _, value := range update.Pods {
name := kubecontainer.GetPodFullName(value)
if existing, found := pods[name]; found {
if existing, found := pods[value.UID]; found {
// this is a delete
delete(pods, name)
delete(pods, value.UID)
removePods = append(removePods, existing)
continue
}
Expand All @@ -295,10 +294,10 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[string]*v1.Pod)
pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
for name, existing := range oldPods {
if _, found := pods[name]; !found {
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
removePods = append(removePods, existing)
}
Expand Down Expand Up @@ -339,9 +338,8 @@ func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecor
// TODO: remove the conversion when validation is performed on versioned objects.
internalPod := &api.Pod{}
if err := v1.Convert_v1_Pod_To_api_Pod(pod, internalPod, nil); err != nil {
name := kubecontainer.GetPodFullName(pod)
glog.Warningf("Pod[%d] (%s) from %s failed to convert to v1, ignoring: %v", i+1, name, source, err)
recorder.Eventf(pod, v1.EventTypeWarning, "FailedConversion", "Error converting pod %s from %s, ignoring: %v", name, source, err)
glog.Warningf("Pod[%d] (%s) from %s failed to convert to v1, ignoring: %v", i+1, format.Pod(pod), source, err)
recorder.Eventf(pod, v1.EventTypeWarning, "FailedConversion", "Error converting pod %s from %s, ignoring: %v", format.Pod(pod), source, err)
continue
}
if errs := validation.ValidatePod(internalPod); len(errs) != 0 {
Expand All @@ -359,10 +357,9 @@ func filterInvalidPods(pods []*v1.Pod, source string, recorder record.EventRecor
}
}
if len(errlist) > 0 {
name := bestPodIdentString(pod)
err := errlist.ToAggregate()
glog.Warningf("Pod[%d] (%s) from %s failed validation, ignoring: %v", i+1, name, source, err)
recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s, ignoring: %v", name, source, err)
glog.Warningf("Pod[%d] (%s) from %s failed validation, ignoring: %v", i+1, format.Pod(pod), source, err)
recorder.Eventf(pod, v1.EventTypeWarning, events.FailedValidation, "Error validating pod %s from %s, ignoring: %v", format.Pod(pod), source, err)
continue
}
filtered = append(filtered, pod)
Expand Down Expand Up @@ -515,18 +512,6 @@ func (s *podStorage) MergedState() interface{} {
return pods
}

func bestPodIdentString(pod *v1.Pod) string {
namespace := pod.Namespace
if namespace == "" {
namespace = "<empty-namespace>"
}
name := pod.Name
if name == "" {
name = "<empty-name>"
}
return fmt.Sprintf("%s.%s", name, namespace)
}

func copyPods(sourcePods []*v1.Pod) []*v1.Pod {
pods := []*v1.Pod{}
for _, source := range sourcePods {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s sortedPods) Less(i, j int) bool {
func CreateValidPod(name, namespace string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(name), // for the purpose of testing, this is unique enough
UID: types.UID(name + namespace), // for the purpose of testing, this is unique enough
Name: name,
Namespace: namespace,
},
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))

podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "new"}})
podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod))
}
Expand Down