Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for image pulling #26677

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 44 additions & 17 deletions pkg/kubelet/dockertools/kube_docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ const (

// defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting.
defaultImagePullingProgressReportInterval = 10 * time.Second

// defaultImagePullingStuckTimeout is the default timeout for image pulling stuck. If no progress
// is made for defaultImagePullingStuckTimeout, the image pulling will be cancelled.
// Docker reports image progress for every 512kB block, so normally there shouldn't be too long interval
// between progress updates.
defaultImagePullingStuckTimeout = 1 * time.Minute
)

// newKubeDockerClient creates an kubeDockerClient from an existing docker client.
Expand Down Expand Up @@ -203,55 +209,76 @@ type progress struct {
sync.RWMutex
// message stores the latest docker json message.
message *dockermessage.JSONMessage
// timestamp of the latest update.
timestamp time.Time
}

func newProgress() *progress {
return &progress{timestamp: time.Now()}
}

func (p *progress) set(msg *dockermessage.JSONMessage) {
p.Lock()
defer p.Unlock()
p.message = msg
p.timestamp = time.Now()
}

func (p *progress) get() string {
func (p *progress) get() (string, time.Time) {
p.RLock()
defer p.RUnlock()
if p.message == nil {
return "No progress"
return "No progress", p.timestamp
}
// The following code is based on JSONMessage.Display
var prefix string
if p.message.ID != "" {
prefix = fmt.Sprintf("%s: ", p.message.ID)
}
if p.message.Progress == nil {
return fmt.Sprintf("%s%s", prefix, p.message.Status)
return fmt.Sprintf("%s%s", prefix, p.message.Status), p.timestamp
}
return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String())
return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String()), p.timestamp
}

// progressReporter keeps the newest image pulling progress and periodically report the newest progress.
type progressReporter struct {
progress
image string
interval time.Duration
stopCh chan struct{}
*progress
image string
cancel context.CancelFunc
stopCh chan struct{}
}

// newProgressReporter creates a new progressReporter for specific image with specified reporting interval
func newProgressReporter(image string, interval time.Duration) *progressReporter {
return &progressReporter{image: image, interval: interval, stopCh: make(chan struct{})}
func newProgressReporter(image string, cancel context.CancelFunc) *progressReporter {
return &progressReporter{
progress: newProgress(),
image: image,
cancel: cancel,
stopCh: make(chan struct{}),
}
}

// start starts the progressReporter
func (p *progressReporter) start() {
go func() {
ticker := time.NewTicker(p.interval)
ticker := time.NewTicker(defaultImagePullingProgressReportInterval)
defer ticker.Stop()
for {
// TODO(random-liu): Report as events.
select {
case <-ticker.C:
glog.V(2).Infof("Pulling image %q: %q", p.image, p.progress.get())
progress, timestamp := p.progress.get()
// If there is no progress for defaultImagePullingStuckTimeout, cancel the operation.
if time.Now().Sub(timestamp) > defaultImagePullingStuckTimeout {
glog.Errorf("Cancel pulling image %q because of no progress for %v, latest progress: %q", p.image, defaultImagePullingStuckTimeout, progress)
p.cancel()
return
}
glog.V(2).Infof("Pulling image %q: %q", p.image, progress)
case <-p.stopCh:
glog.V(2).Infof("Stop pulling image %q: %q", p.image, p.progress.get())
progress, _ := p.progress.get()
glog.V(2).Infof("Stop pulling image %q: %q", p.image, progress)
return
}
}
Expand All @@ -270,14 +297,14 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
return err
}
opts.RegistryAuth = base64Auth
// Don't set timeout for the context because image pulling can be
// take an arbitrarily long time.
resp, err := d.client.ImagePull(context.Background(), image, opts)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: calling cancel multiple times should not have any negative impact, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.

A CancelFunc tells an operation to abandon its work. A CancelFunc does not wait for the work to stop. After the first call, subsequent calls to a CancelFunc do nothing.

resp, err := d.client.ImagePull(ctx, image, opts)
if err != nil {
return err
}
defer resp.Close()
reporter := newProgressReporter(image, defaultImagePullingProgressReportInterval)
reporter := newProgressReporter(image, cancel)
reporter.start()
defer reporter.stop()
decoder := json.NewDecoder(resp)
Expand Down