Skip to content

Commit

Permalink
produce progress events polling ctrd's content.Store
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
  • Loading branch information
ndeloof committed Jul 8, 2022
1 parent 04baf48 commit 7cfcde1
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 1 deletion.
150 changes: 150 additions & 0 deletions daemon/containerd/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package containerd

import (
"context"
"io"
"sync"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/remotes"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go/v1"
)

func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, w io.Writer, stop chan struct{}) {
var (
out = streamformatter.NewJSONProgressOutput(w, false)
ticker = time.NewTicker(100 * time.Millisecond)
start = time.Now()
done bool
)
defer ticker.Stop()

outer:
for {
select {
case <-ticker.C:
if !ongoing.IsResolved() {
continue
}

pulling := map[string]content.Status{}
if !done {
actives, err := cs.ListStatuses(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("status check failed")
continue
}
// update status of status entries!
for _, status := range actives {
pulling[status.Ref] = status
}
}

// update inactive jobs
for _, j := range ongoing.Jobs() {
key := remotes.MakeRefKey(ctx, j)
if info, ok := pulling[key]; ok {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Downloading",
Current: info.Offset,
Total: info.Total,
})
continue
}

info, err := cs.Info(ctx, j.Digest)
if err != nil {
if !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Error("failed to get content info")
continue outer
}
} else if info.CreatedAt.After(start) {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Download complete",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
} else {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Exists",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
}
}
if done {
return
}
case <-stop:
done = true // allow ui to update once more
case <-ctx.Done():
return
}
}
}

// jobs holds a list of layers being downloaded to pull reference set by name
type jobs struct {
name string
resolved bool // resolved is set to true once remote image metadata has been downloaded from registry
descs map[digest.Digest]v1.Descriptor
mu sync.Mutex
}

// newJobs creates a new instance of the job status tracker
func newJobs() *jobs {
return &jobs{
descs: map[digest.Digest]v1.Descriptor{},
}
}

// IsResolved checks whether a descriptor has been resolved
func (j *jobs) IsResolved() bool {
j.mu.Lock()
defer j.mu.Unlock()
return j.resolved
}

// Add adds a descriptor to be tracked
func (j *jobs) Add(desc v1.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()

if _, ok := j.descs[desc.Digest]; ok {
return
}
j.descs[desc.Digest] = desc
j.resolved = true
}

// Remove removes a descriptor
func (j *jobs) Remove(desc v1.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()

delete(j.descs, desc.Digest)
}

// Jobs returns a list of all tracked descriptors
func (j *jobs) Jobs() []v1.Descriptor {
j.mu.Lock()
defer j.mu.Unlock()

descs := make([]v1.Descriptor, 0, len(j.descs))
for _, d := range j.descs {
descs = append(descs, d)
}
return descs
}
33 changes: 32 additions & 1 deletion daemon/containerd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,38 @@ func (cs *containerdStore) PullImage(ctx context.Context, image, tag string, pla
resolver := newResolverFromAuthConfig(authConfig)
opts = append(opts, containerd.WithResolver(resolver))

_, err = cs.client.Pull(ctx, ref.String(), opts...)
jobs := newJobs()
h := containerdimages.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
if desc.MediaType != containerdimages.MediaTypeDockerSchema1Manifest {
jobs.Add(desc)
}
return nil, nil
})
opts = append(opts, containerd.WithImageHandler(h))

stop := make(chan struct{})
go func() {
showProgress(ctx, jobs, cs.client.ContentStore(), outStream, stop)
stop <- struct{}{}
}()

img, err := cs.client.Pull(ctx, ref.String(), opts...)
if err != nil {
return err
}

unpacked, err := img.IsUnpacked(ctx, "overlayfs")
if err != nil {
return err
}

if !unpacked {
if err := img.Unpack(ctx, "overlayfs"); err != nil {
return err
}
}
stop <- struct{}{}
<-stop
return err
}

Expand Down

0 comments on commit 7cfcde1

Please sign in to comment.