Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn on kubecontainer.Cache in kubelet #19850

Merged
merged 1 commit into from
Jan 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
179 changes: 74 additions & 105 deletions pkg/kubelet/kubelet.go
Expand Up @@ -115,8 +115,14 @@ const (
plegChannelCapacity = 1000

// Generic PLEG relies on relisting for discovering container events.
// The period directly affects the response time of kubelet.
plegRelistPeriod = time.Second * 3
// A longer period means that kubelet will take longer to detect container
// changes and to update pod status. On the other hand, a shorter period
// will cause more frequent relisting (e.g., container runtime operations),
// leading to higher cpu usage.
// Note that even though we set the period to 1s, the relisting itself can
// take more than 1s to finish if the container runtime responds slowly
// and/or when there are many container changes in one cycle.
plegRelistPeriod = time.Second * 1

// backOffPeriod is the period to back off when pod syncing resulting in an
// error. It is also used as the base period for the exponential backoff
Expand Down Expand Up @@ -340,6 +346,8 @@ func NewMainKubelet(

klet.livenessManager = proberesults.NewManager()

klet.podCache = kubecontainer.NewCache()

// Initialize the runtime.
switch containerRuntime {
case "docker":
Expand All @@ -365,8 +373,6 @@ func NewMainKubelet(
imageBackOff,
serializeImagePulls,
)

klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil)
case "rkt":
conf := &rkt.Config{
Path: rktPath,
Expand All @@ -387,14 +393,13 @@ func NewMainKubelet(
return nil, err
}
klet.containerRuntime = rktRuntime
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil)

// No Docker daemon to put in a container.
dockerDaemonContainer = ""
default:
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
}

klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache)
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR, klet.isContainerRuntimeVersionCompatible)

// setup containerGC
Expand Down Expand Up @@ -441,7 +446,7 @@ func NewMainKubelet(
}
klet.runtimeCache = runtimeCache
klet.workQueue = queue.NewBasicWorkQueue()
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod)
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)
Expand Down Expand Up @@ -578,6 +583,9 @@ type Kubelet struct {
// Generates pod events.
pleg pleg.PodLifecycleEventGenerator

// Store kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache

// The name of the resource-only container to run the Kubelet in (empty for no container).
// Name must be absolute.
resourceContainer string
Expand Down Expand Up @@ -1564,31 +1572,42 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
return nil
}

func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) {
start := kl.clock.Now()
// TODO: Remove runningPod from the arguments.
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error {
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; !ok {
glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
} else {
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
}

// Before returning, regenerate status and store it in the cache.
defer func() {
status, err := kl.generatePodStatus(pod)
if err != nil {
glog.Errorf("Unable to generate status for pod %q with error(%v)", format.Pod(pod), err)
// Propagate the error upstream.
syncErr = err
if updateType == kubetypes.SyncPodCreate {
if !firstSeenTime.IsZero() {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
} else {
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
kl.statusManager.SetPodStatus(pod, status)
glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
}
}()
}

// Query the container runtime (or cache) to retrieve the pod status, and
// update it in the status manager.
podStatus, statusErr := kl.getRuntimePodStatus(pod)
apiPodStatus, err := kl.generatePodStatus(pod, podStatus, statusErr)
if err != nil {
return err
}
// Record the time it takes for the pod to become running.
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
// TODO: The logic seems wrong since the pod phase can become pending when
// the container runtime is temporarily not available.
if statusErr == nil && !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
if statusErr != nil {
return statusErr
}

// Kill pods we can't run.
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
Expand Down Expand Up @@ -1639,51 +1658,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
}
kl.volumeManager.SetVolumes(pod.UID, podVolumes)

// The kubelet is the source of truth for pod status. It ignores the status sent from
// the apiserver and regenerates status for every pod update, incrementally updating
// the status it received at pod creation time.
//
// The container runtime needs 2 pieces of information from the status to sync a pod:
// The terminated state of containers (to restart them) and the podIp (for liveness probes).
// New pods don't have either, so we skip the expensive status generation step.
//
// If we end up here with a create event for an already running pod, it could result in a
// restart of its containers. This cannot happen unless the kubelet restarts, because the
// delete before the second create would cancel this pod worker.
//
// If the kubelet restarts, we have a bunch of running containers for which we get create
// events. This is ok, because the pod status for these will include the podIp and terminated
// status. Any race conditions here effectively boils down to -- the pod worker didn't sync
// state of a newly started container with the apiserver before the kubelet restarted, so
// it's OK to pretend like the kubelet started them after it restarted.

var apiPodStatus api.PodStatus
var podStatus *kubecontainer.PodStatus

// Always generate the kubecontainer.PodStatus to know whether there are
// running containers associated with the pod.
podStatusPtr, apiPodStatusPtr, err := kl.containerRuntime.GetPodStatusAndAPIPodStatus(pod)
if err != nil {
glog.Errorf("Unable to get status for pod %q: %v", format.Pod(pod), err)
return err
}
apiPodStatus = *apiPodStatusPtr
podStatus = podStatusPtr

if updateType == kubetypes.SyncPodCreate {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
if !firstSeenTime.IsZero() {
metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
// kubelet may have just been restarted. Re-use the last known
// apiPodStatus.
apiPodStatus = pod.Status
apiPodStatus.StartTime = &unversioned.Time{Time: start}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
glog.V(3).Infof("Reusing api pod status for new pod %q", format.Pod(pod))
}

pullSecrets, err := kl.getPullSecretsForPod(pod)
if err != nil {
glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err)
Expand Down Expand Up @@ -2285,10 +2259,6 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
glog.Errorf("Kubelet does not support snapshot update")
}
case e := <-plegCh:
// Filter out started events since we don't use them now.
if e.Type == pleg.ContainerStarted {
break
}
pod, ok := kl.podManager.GetPodByUID(e.ID)
if !ok {
// If the pod no longer exists, ignore the event.
Expand Down Expand Up @@ -3084,19 +3054,21 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
}
}

// By passing the pod directly, this method avoids pod lookup, which requires
// grabbing a lock.
// TODO(random-liu): api.PodStatus is named as podStatus, this maybe confusing, this may happen in other functions
// after refactoring, modify them later.
func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {

// Get the internal PodStatus from the cache if the cache exists;
// otherwise, query the runtime directly.
func (kl *Kubelet) getRuntimePodStatus(pod *api.Pod) (*kubecontainer.PodStatus, error) {
start := kl.clock.Now()
defer func() {
metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start))
}()
if kl.podCache != nil {
return kl.podCache.Get(pod.UID)
}
return kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
}

func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus, statusErr error) (api.PodStatus, error) {
glog.V(3).Infof("Generating status for %q", format.Pod(pod))

// TODO: Consider include the container information.
if kl.pastActiveDeadline(pod) {
reason := "DeadlineExceeded"
Expand All @@ -3107,44 +3079,41 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
Message: "Pod was active on the node longer than specified deadline"}, nil
}

spec := &pod.Spec
podStatus, err := kl.containerRuntime.GetAPIPodStatus(pod)

if err != nil {
// Error handling
glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), err)
if strings.Contains(err.Error(), "resource temporarily unavailable") {
// Leave upstream layer to decide what to do
return api.PodStatus{}, err
}

pendingStatus := api.PodStatus{
if statusErr != nil {
// TODO: Re-evaluate whether we should set the status to "Pending".
glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), statusErr)
return api.PodStatus{
Phase: api.PodPending,
Reason: "GeneralError",
Message: fmt.Sprintf("Query container info failed with error (%v)", err),
}
return pendingStatus, nil
Message: fmt.Sprintf("Query container info failed with error (%v)", statusErr),
}, nil
}
// Ask the runtime to convert the internal PodStatus to api.PodStatus.
s, err := kl.containerRuntime.ConvertPodStatusToAPIPodStatus(pod, podStatus)
if err != nil {
glog.Infof("Failed to convert PodStatus to api.PodStatus for %q: %v", format.Pod(pod), err)
return api.PodStatus{}, err
}

// Assume info is ready to process
podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses)
kl.probeManager.UpdatePodStatus(pod.UID, podStatus)

podStatus.Conditions = append(podStatus.Conditions, status.GeneratePodReadyCondition(spec, podStatus.ContainerStatuses, podStatus.Phase))
spec := &pod.Spec
s.Phase = GetPhase(spec, s.ContainerStatuses)
kl.probeManager.UpdatePodStatus(pod.UID, s)
s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.ContainerStatuses, s.Phase))

if !kl.standaloneMode {
hostIP, err := kl.GetHostIP()
if err != nil {
glog.V(4).Infof("Cannot get host IP: %v", err)
} else {
podStatus.HostIP = hostIP.String()
if podUsesHostNetwork(pod) && podStatus.PodIP == "" {
podStatus.PodIP = hostIP.String()
s.HostIP = hostIP.String()
if podUsesHostNetwork(pod) && s.PodIP == "" {
s.PodIP = hostIP.String()
}
}
}

return *podStatus, nil
return *s, nil
}

// Returns logs of current machine.
Expand Down
28 changes: 24 additions & 4 deletions pkg/kubelet/pod_workers.go
Expand Up @@ -71,6 +71,9 @@ type podWorkers struct {

// resyncInterval is the duration to wait until the next sync.
resyncInterval time.Duration

// podCache stores kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache
}

type workUpdate struct {
Expand All @@ -88,7 +91,7 @@ type workUpdate struct {
}

func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType,
recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration) *podWorkers {
recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
Expand All @@ -99,13 +102,27 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT
workQueue: workQueue,
resyncInterval: resyncInterval,
backOffPeriod: backOffPeriod,
podCache: podCache,
}
}

func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minRuntimeCacheTime time.Time

for newWork := range podUpdates {
err := func() (err error) {
podID := newWork.pod.UID
if p.podCache != nil {
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
// TODO: We don't consume the return PodStatus yet, but we
// should pass it to syncPod() eventually.
p.podCache.GetNewerThan(podID, minRuntimeCacheTime)
}
// TODO: Deprecate the runtime cache.
// We would like to have the state of the containers from at least
// the moment when we finished the previous processing of that pod.
if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {
Expand Down Expand Up @@ -206,10 +223,13 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty

func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
// Requeue the last update if the last sync returned error.
if syncErr != nil {
p.workQueue.Enqueue(uid, p.backOffPeriod)
} else {
switch {
case syncErr == nil:
// No error; requeue at the regular resync interval.
p.workQueue.Enqueue(uid, p.resyncInterval)
default:
// Error occurred during the sync; back off and then retry.
p.workQueue.Enqueue(uid, p.backOffPeriod)
}
p.checkForUpdates(uid)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/pod_workers_test.go
Expand Up @@ -60,6 +60,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
queue.NewBasicWorkQueue(),
time.Second,
time.Second,
nil,
)
return podWorkers, processed
}
Expand Down Expand Up @@ -190,7 +191,7 @@ func TestFakePodWorkers(t *testing.T) {
kubeletForRealWorkers := &simpleFakeKubelet{}
kubeletForFakeWorkers := &simpleFakeKubelet{}

realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second)
realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second, nil)
fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t}

tests := []struct {
Expand Down