Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add protection for the pods member varaible. #4494

Merged
merged 1 commit into from
Feb 25, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 52 additions & 18 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,14 @@ type Kubelet struct {
podInfraContainerImage string
podWorkers *podWorkers
resyncInterval time.Duration
pods []api.BoundPod
sourceReady SourceReadyFn

// Protects the pods array
// We make complete array copies out of this while locked, which is OK because once added to this array,
// pods are immutable
podLock sync.RWMutex
pods []api.BoundPod

// Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events
dockerIDToRef map[dockertools.DockerID]*api.ObjectReference
Expand Down Expand Up @@ -1417,6 +1422,24 @@ func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
return filtered
}

func (kl *Kubelet) handleUpdate(u PodUpdate) {
kl.podLock.Lock()
defer kl.podLock.Unlock()
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)

default:
panic("syncLoop does not support incremental changes")
}
}

// syncLoop is the main loop for processing changes. It watches for changes from
// four channels (file, etcd, server, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
Expand Down Expand Up @@ -1444,8 +1467,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
}
}

err := handler.SyncPods(kl.pods)
pods, err := kl.GetBoundPods()
if err != nil {
glog.Errorf("Failed to get bound pods.")
return
}
if err := handler.SyncPods(pods); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
}
Expand Down Expand Up @@ -1514,16 +1541,19 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri

// GetBoundPods returns all pods bound to the kubelet and their spec
func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) {
return kl.pods, nil
kl.podLock.RLock()
defer kl.podLock.RUnlock()
return append([]api.BoundPod{}, kl.pods...), nil
}

// GetPodFullName provides the first pod that matches namespace and name, or false
// if no such pod can be found.
// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found.
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should not be returning pointer here, so user will know that he's getting a copy of the pod.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, pointer is OK here. In either case it's a shallow copy, so not a "real" copy anyway. Returning non-pointer won't change that. See the comment I asked for in the struct definition.

kl.podLock.RLock()
defer kl.podLock.RUnlock()
for i := range kl.pods {
pod := &kl.pods[i]
pod := kl.pods[i]
if pod.Namespace == namespace && pod.Name == name {
return pod, true
return &pod, true
}
}
return nil, false
Expand Down Expand Up @@ -1616,23 +1646,27 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio
return ready
}

// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
var spec api.PodSpec
var podStatus api.PodStatus
found := false
func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency I believe it should be near GetPodByName, and return *?api.BoundPod.

kl.podLock.RLock()
defer kl.podLock.RUnlock()
for _, pod := range kl.pods {
if GetPodFullName(&pod) == podFullName {
spec = pod.Spec
found = true
break
return &pod.Spec, true
}
}
return nil, false
}

// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
var podStatus api.PodStatus
spec, found := kl.GetPodByFullName(podFullName)

if !found {
return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
}

info, err := dockertools.GetDockerPodInfo(kl.dockerClient, spec, podFullName, uid)
info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid)

if err != nil {
// Error handling
Expand All @@ -1648,13 +1682,13 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu
}

// Assume info is ready to process
podStatus.Phase = getPhase(&spec, info)
podStatus.Phase = getPhase(spec, info)
for _, c := range spec.Containers {
containerStatus := info[c.Name]
containerStatus.Ready = kl.readiness.IsReady(containerStatus)
info[c.Name] = containerStatus
}
podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(&spec, info)...)
podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, info)...)

netContainerInfo, found := info[dockertools.PodInfraContainerName]
if found {
Expand Down