Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubelet: Periodically reporting image pulling progress in log #26145

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 75 additions & 1 deletion pkg/kubelet/dockertools/kube_docker_client.go
Expand Up @@ -23,8 +23,11 @@ import (
"fmt"
"io"
"io/ioutil"
"sync"
"time"

"github.com/golang/glog"

dockermessage "github.com/docker/docker/pkg/jsonmessage"
dockerstdcopy "github.com/docker/docker/pkg/stdcopy"
dockerapi "github.com/docker/engine-api/client"
Expand Down Expand Up @@ -58,6 +61,9 @@ const (

// defaultShmSize is the default ShmSize to use (in bytes) if not specified.
defaultShmSize = int64(1024 * 1024 * 64)

// defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting.
defaultImagePullingProgressReportInterval = 10 * time.Second
)

// newKubeDockerClient creates an kubeDockerClient from an existing docker client.
Expand Down Expand Up @@ -192,6 +198,71 @@ func base64EncodeAuth(auth dockertypes.AuthConfig) (string, error) {
return base64.URLEncoding.EncodeToString(buf.Bytes()), nil
}

// progress is a wrapper of dockermessage.JSONMessage with a lock protecting it.
type progress struct {
sync.RWMutex
// message stores the latest docker json message.
message *dockermessage.JSONMessage
}

func (p *progress) set(msg *dockermessage.JSONMessage) {
p.Lock()
defer p.Unlock()
p.message = msg
}

func (p *progress) get() string {
p.RLock()
defer p.RUnlock()
if p.message == nil {
return "No progress"
}
var prefix string
if p.message.ID != "" {
prefix = fmt.Sprintf("%s: ", p.message.ID)
}
if p.message.Progress == nil {
return fmt.Sprintf("%s%s", prefix, p.message.Status)
}
return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String())
}

// progressReporter keeps the newest image pulling progress and periodically report the newest progress.
type progressReporter struct {
progress
image string
interval time.Duration
stopCh chan struct{}
}

// newProgressReporter creates a new progressReporter for specific image with specified reporting interval
func newProgressReporter(image string, interval time.Duration) *progressReporter {
return &progressReporter{image: image, interval: interval, stopCh: make(chan struct{})}
}

// start starts the progressReporter
func (p *progressReporter) start() {
go func() {
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
// TODO(random-liu): Report as events.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: #19077 tracks exposing image pull progress. Events might not be the best mechanism.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vishh Yeah. Event is a easily doable approach better than log, but as what you said, it's not the best solution.

select {
case <-ticker.C:
glog.V(2).Infof("Pulling image %q: %q", p.image, p.progress.get())
case <-p.stopCh:
glog.V(2).Infof("Stop pulling image %q: %q", p.image, p.progress.get())
return
}
}
}()
}

// stop stops the progressReporter
func (p *progressReporter) stop() {
close(p.stopCh)
}

func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error {
// RegistryAuth is the base64 encoded credentials for the registry
base64Auth, err := base64EncodeAuth(auth)
Expand All @@ -209,7 +280,9 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
return err
}
defer resp.Close()
// TODO(random-liu): Use the image pulling progress information.
reporter := newProgressReporter(image, defaultImagePullingProgressReportInterval)
reporter.start()
defer reporter.stop()
decoder := json.NewDecoder(resp)
for {
var msg dockermessage.JSONMessage
Expand All @@ -223,6 +296,7 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
if msg.Error != nil {
return msg.Error
}
reporter.set(&msg)
}
return nil
}
Expand Down