Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

containerd integration: produce progress events polling ctrd's content.Store #43819

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 33 additions & 1 deletion daemon/containerd/image_pull.go
Expand Up @@ -5,6 +5,7 @@ import (
"io"

"github.com/containerd/containerd"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/docker/distribution"
"github.com/docker/distribution/reference"
Expand Down Expand Up @@ -41,7 +42,38 @@ func (i *ImageService) PullImage(ctx context.Context, image, tagOrDigest string,
}
}

_, err = i.client.Pull(ctx, ref.String(), opts...)
jobs := newJobs()
h := images.HandlerFunc(func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
if desc.MediaType != images.MediaTypeDockerSchema1Manifest {
jobs.Add(desc)
}
return nil, nil
})
opts = append(opts, containerd.WithImageHandler(h))

stop := make(chan struct{})
go func() {
showProgress(ctx, jobs, i.client.ContentStore(), outStream, stop)
stop <- struct{}{}
}()

img, err := i.client.Pull(ctx, ref.String(), opts...)
if err != nil {
return err
}

unpacked, err := img.IsUnpacked(ctx, containerd.DefaultSnapshotter)
if err != nil {
return err
}

if !unpacked {
if err := img.Unpack(ctx, containerd.DefaultSnapshotter); err != nil {
return err
}
}
stop <- struct{}{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this? Seems like this races with your goroutine above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we can close the channel (from the goroutine) rather than sending?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closing channel would be a valid alternative, indeed

<-stop
return err
}

Expand Down
149 changes: 149 additions & 0 deletions daemon/containerd/progress.go
@@ -0,0 +1,149 @@
package containerd

import (
"context"
"io"
"sync"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/remotes"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
)

func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, w io.Writer, stop chan struct{}) {
var (
out = streamformatter.NewJSONProgressOutput(w, false)
ticker = time.NewTicker(100 * time.Millisecond)
start = time.Now()
done bool
)
defer ticker.Stop()

outer:
for {
select {
case <-ticker.C:
if !ongoing.IsResolved() {
continue
}

pulling := map[string]content.Status{}
if !done {
actives, err := cs.ListStatuses(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("status check failed")
continue
}
// update status of status entries!
for _, status := range actives {
pulling[status.Ref] = status
}
}

// update inactive jobs
for _, j := range ongoing.Jobs() {
key := remotes.MakeRefKey(ctx, j)
if info, ok := pulling[key]; ok {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Downloading",
Current: info.Offset,
Total: info.Total,
})
continue
}

info, err := cs.Info(ctx, j.Digest)
if err != nil {
if !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Error("failed to get content info")
continue outer
}
} else if info.CreatedAt.After(start) {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Download complete",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
} else {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Exists",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
}
}
if done {
return
}
case <-stop:
done = true // allow ui to update once more
case <-ctx.Done():
return
}
}
}

// jobs holds a list of layers being downloaded to pull reference set by digest.
type jobs struct {
resolved bool // resolved is set to true once remote image metadata has been downloaded from registry
descs map[digest.Digest]v1.Descriptor
mu sync.Mutex
}

// newJobs creates a new instance of the job status tracker
func newJobs() *jobs {
return &jobs{
descs: map[digest.Digest]v1.Descriptor{},
}
}

// IsResolved checks whether a descriptor has been resolved
func (j *jobs) IsResolved() bool {
j.mu.Lock()
defer j.mu.Unlock()
return j.resolved
}

// Add adds a descriptor to be tracked
func (j *jobs) Add(desc v1.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()

if _, ok := j.descs[desc.Digest]; ok {
return
}
j.descs[desc.Digest] = desc
j.resolved = true
}

// Remove removes a descriptor
func (j *jobs) Remove(desc v1.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()

delete(j.descs, desc.Digest)
}

// Jobs returns a list of all tracked descriptors
func (j *jobs) Jobs() []v1.Descriptor {
j.mu.Lock()
defer j.mu.Unlock()

descs := make([]v1.Descriptor, 0, len(j.descs))
for _, d := range j.descs {
descs = append(descs, d)
}
return descs
}