Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Adding sync pod latency metric." and "Thread-per-pod model in Kubelet" #4867

Merged
merged 1 commit into from
Feb 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
98 changes: 50 additions & 48 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ const podOomScoreAdj = -100

// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error
SyncPods([]api.BoundPod) error
}

type SourceReadyFn func(source string) bool
Expand Down Expand Up @@ -114,6 +111,7 @@ func NewMainKubelet(
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
podInfraContainerImage: podInfraContainerImage,
podWorkers: newPodWorkers(),
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
httpClient: &http.Client{},
Expand All @@ -136,7 +134,6 @@ func NewMainKubelet(
return nil, err
}
klet.dockerCache = dockerCache
klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod)

metrics.Register(dockerCache)

Expand Down Expand Up @@ -456,6 +453,43 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
kl.syncLoop(updates, kl)
}

// Per-pod workers.
type podWorkers struct {
lock sync.Mutex

// Set of pods with existing workers.
workers util.StringSet
}

func newPodWorkers() *podWorkers {
return &podWorkers{
workers: util.NewStringSet(),
}
}

// Runs a worker for "podFullName" asynchronously with the specified "action".
// If the worker for the "podFullName" is already running, functions as a no-op.
func (self *podWorkers) Run(podFullName string, action func()) {
self.lock.Lock()
defer self.lock.Unlock()

// This worker is already running, let it finish.
if self.workers.Has(podFullName) {
return
}
self.workers.Insert(podFullName)

// Run worker async.
go func() {
defer util.HandleCrash()
action()

self.lock.Lock()
defer self.lock.Unlock()
self.workers.Delete(podFullName)
}()
}

func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string {
binds := []string{}
for _, mount := range container.VolumeMounts {
Expand Down Expand Up @@ -945,7 +979,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke
func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
start := time.Now()
defer func() {
metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start))
metrics.ImagePullLatency.Observe(float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()))
}()

if err := kl.dockerPuller.Pull(img); err != nil {
Expand Down Expand Up @@ -1273,7 +1307,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker
}

// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
glog.V(4).Infof("Desired: %#v", pods)
var err error
desiredContainers := make(map[podContainer]empty)
Expand All @@ -1299,14 +1333,13 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metr
}

// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
kl.podWorkers.Run(podFullName, func() {
if err := kl.syncPod(pod, dockerContainers); err != nil {
glog.Errorf("Error syncing pod, skipping: %v", err)
record.Eventf(pod, "failedSync", "Error syncing pod, skipping: %v", err)
}
})
}

// Stop the workers for no-longer existing pods.
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)

// Kill any containers we don't need.
killed := []string{}
for ix := range dockerContainers {
Expand Down Expand Up @@ -1421,21 +1454,19 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) {
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for {
unsyncedPod := false
podSyncTypes := make(map[types.UID]metrics.SyncPodType)
select {
case u := <-updates:
kl.updatePods(u, podSyncTypes)
kl.updatePods(u)
unsyncedPod = true
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
}
start := time.Now()
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
select {
case u := <-updates:
kl.updatePods(u, podSyncTypes)
kl.updatePods(u)
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
Expand All @@ -1447,54 +1478,25 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
glog.Errorf("Failed to get bound pods.")
return
}
if err := handler.SyncPods(pods, podSyncTypes, start); err != nil {
if err := handler.SyncPods(pods); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
}
}

// Updated the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
func (kl *Kubelet) updatePods(u PodUpdate) {
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")

// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make(map[types.UID]struct{})
for i := range kl.pods {
existingPods[kl.pods[i].UID] = struct{}{}
}
for i := range u.Pods {
if _, ok := existingPods[u.Pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodCreate
}
}

kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")

// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}

kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}

// Mark all remaining pods as sync.
for i := range kl.pods {
if _, ok := podSyncTypes[kl.pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
}
}
}

// Returns Docker version for this Kubelet.
Expand Down