Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve some logging in cacher #34322

Merged
merged 1 commit into from
Oct 8, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 21 additions & 9 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
Expand Down Expand Up @@ -651,11 +652,14 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
default:
}

resultLen := len(c.result)
// OK, block sending, but only for up to 5 seconds.
// cacheWatcher.add is called very often, so arrange
// to reuse timers instead of constantly allocating.
startTime := time.Now()
trace := util.NewTrace(
fmt.Sprintf("cacheWatcher %v: waiting for add (initial result size %v)",
reflect.TypeOf(event.Object).String(), len(c.result)))
defer trace.LogIfLong(5 * time.Millisecond)

const timeout = 5 * time.Second
t, ok := timerPool.Get().(*time.Timer)
if ok {
Expand All @@ -680,8 +684,6 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
c.forget(false)
c.stop()
}
glog.V(2).Infof("cacheWatcher add function blocked processing of %v for %v (initial result size %v)",
reflect.TypeOf(event.Object).String(), time.Since(startTime), resultLen)
}

func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
Expand Down Expand Up @@ -717,17 +719,27 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
// As long as these are not processed, we are not processing
// any incoming events, so if it takes long, we may actually
// block all watchers for some time.
// TODO: If it appears to be long in some cases, we may consider
// - longer result buffers if there are a lot of initEvents
// - try some parallelization
const initProcessThreshold = 5 * time.Millisecond
// TODO: From the logs it seems that there happens processing
// times even up to 1s which is very long. However, this doesn't
// depend that much on the number of initEvents. E.g. from the
// 2000-node Kubemark run we have logs like this, e.g.:
// ... processing 13862 initEvents took 66.808689ms
// ... processing 14040 initEvents took 993.532539ms
// We should understand what is blocking us in those cases (e.g.
// is it lack of CPU, network, or sth else) and potentially
// consider increase size of result buffer in those cases.
const initProcessThreshold = 50 * time.Millisecond
startTime := time.Now()
for _, event := range initEvents {
c.sendWatchCacheEvent(event)
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
glog.V(2).Infof("processing %d initEvents took %v", len(initEvents), processingTime)
objType := "<null>"
if len(initEvents) > 0 {
objType = reflect.TypeOf(initEvents[0].Object).String()
}
glog.V(2).Infof("processing %d initEvents of %stook %v", len(initEvents), objType, processingTime)
}

defer close(c.result)
Expand Down