Skip to content

Commit

Permalink
containerd: Push progress
Browse files Browse the repository at this point in the history
Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
  • Loading branch information
vvoland committed Jul 18, 2022
1 parent 17794da commit bae9267
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 78 deletions.
195 changes: 129 additions & 66 deletions daemon/containerd/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,106 +7,169 @@ import (
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
cerrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
)

func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, w io.Writer, stop chan struct{}) {
type updateProgressFunc func(ctx context.Context, ongoing *jobs, output progress.Output, start time.Time) error

func showProgress(ctx context.Context, ongoing *jobs, w io.Writer, updateFunc updateProgressFunc) func() {
stop := make(chan struct{})
ctx, cancelProgress := context.WithCancel(ctx)

var (
out = streamformatter.NewJSONProgressOutput(w, false)
ticker = time.NewTicker(100 * time.Millisecond)
start = time.Now()
done bool
)
defer ticker.Stop()

outer:
for {
select {
case <-ticker.C:
if !ongoing.IsResolved() {
continue
}
for _, j := range ongoing.Jobs() {
id := stringid.TruncateID(j.Digest.Encoded())
progress.Update(out, id, "Preparing")
}

pulling := map[string]content.Status{}
if !done {
actives, err := cs.ListStatuses(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("status check failed")
go func() {
defer func() {
ticker.Stop()
stop <- struct{}{}
}()

for {
select {
case <-ticker.C:
if !ongoing.IsResolved() {
continue
}
// update status of status entries!
for _, status := range actives {
pulling[status.Ref] = status
err := updateFunc(ctx, ongoing, out, start)
if err != nil {
logrus.WithError(err).Error("Updating progress failed")
return
}
}

// update inactive jobs
for _, j := range ongoing.Jobs() {
key := remotes.MakeRefKey(ctx, j)
if info, ok := pulling[key]; ok {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Downloading",
Current: info.Offset,
Total: info.Total,
})
continue
if done {
return
}
case <-ctx.Done():
done = true
}
}
}()

info, err := cs.Info(ctx, j.Digest)
if err != nil {
if !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Error("failed to get content info")
continue outer
}
} else if info.CreatedAt.After(start) {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Download complete",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
return func() {
cancelProgress()
<-stop
}
}

func pushProgress(tracker docker.StatusTracker) updateProgressFunc {
return func(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
for _, j := range ongoing.Jobs() {
key := remotes.MakeRefKey(ctx, j)
id := stringid.TruncateID(j.Digest.Encoded())

status, err := tracker.GetStatus(key)
if err != nil {
if cerrdefs.IsNotFound(err) {
progress.Update(out, id, "Waiting")
continue
} else {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Exists",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
return err
}

}

logrus.WithField("status", status).WithField("id", id).Debug("Status update")

if status.Committed && status.Offset >= status.Total {
progress.Update(out, id, "Pushed")
ongoing.Remove(j)
continue
}

out.WriteProgress(progress.Progress{
ID: id,
Action: "Pushing",
Current: status.Offset,
Total: status.Total,
})
}

return nil
}
}

func pullProgress(cs content.Store) updateProgressFunc {
return func(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
pulling := map[string]content.Status{}
actives, err := cs.ListStatuses(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("status check failed")
return nil
}
// update status of status entries!
for _, status := range actives {
pulling[status.Ref] = status
}

for _, j := range ongoing.Jobs() {
key := remotes.MakeRefKey(ctx, j)
if info, ok := pulling[key]; ok {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Downloading",
Current: info.Offset,
Total: info.Total,
})
continue
}
if done {
return

info, err := cs.Info(ctx, j.Digest)
if err != nil {
if !cerrdefs.IsNotFound(err) {
return err
}
} else if info.CreatedAt.After(start) {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Download complete",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
} else {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Exists",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
}
case <-stop:
done = true // allow ui to update once more
case <-ctx.Done():
return
}
return nil
}
}

// jobs holds a list of layers being downloaded to pull reference set by name
type jobs struct {
name string
resolved bool // resolved is set to true once remote image metadata has been downloaded from registry
descs map[digest.Digest]v1.Descriptor
resolved bool // resolved is set to true once all jobs are added
descs map[digest.Digest]ocispec.Descriptor
mu sync.Mutex
}

// newJobs creates a new instance of the job status tracker
func newJobs() *jobs {
return &jobs{
descs: map[digest.Digest]v1.Descriptor{},
descs: map[digest.Digest]ocispec.Descriptor{},
}
}

Expand All @@ -118,7 +181,7 @@ func (j *jobs) IsResolved() bool {
}

// Add adds a descriptor to be tracked
func (j *jobs) Add(desc v1.Descriptor) {
func (j *jobs) Add(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()

Expand All @@ -130,19 +193,19 @@ func (j *jobs) Add(desc v1.Descriptor) {
}

// Remove removes a descriptor
func (j *jobs) Remove(desc v1.Descriptor) {
func (j *jobs) Remove(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()

delete(j.descs, desc.Digest)
}

// Jobs returns a list of all tracked descriptors
func (j *jobs) Jobs() []v1.Descriptor {
func (j *jobs) Jobs() []ocispec.Descriptor {
j.mu.Lock()
defer j.mu.Unlock()

descs := make([]v1.Descriptor, 0, len(j.descs))
descs := make([]ocispec.Descriptor, 0, len(j.descs))
for _, d := range j.descs {
descs = append(descs, d)
}
Expand Down
40 changes: 28 additions & 12 deletions daemon/containerd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (cs *containerdStore) PullImage(ctx context.Context, image, tag string, pla
}
}

resolver := newResolverFromAuthConfig(authConfig)
resolver, _ := newResolverFromAuthConfig(authConfig)
opts = append(opts, containerd.WithResolver(resolver))

jobs := newJobs()
Expand All @@ -86,11 +86,8 @@ func (cs *containerdStore) PullImage(ctx context.Context, image, tag string, pla
})
opts = append(opts, containerd.WithImageHandler(h))

stop := make(chan struct{})
go func() {
showProgress(ctx, jobs, cs.client.ContentStore(), outStream, stop)
stop <- struct{}{}
}()
finishProgress := showProgress(ctx, jobs, outStream, pullProgress(cs.client.ContentStore()))
defer finishProgress()

img, err := cs.client.Pull(ctx, ref.String(), opts...)
if err != nil {
Expand All @@ -107,8 +104,6 @@ func (cs *containerdStore) PullImage(ctx context.Context, image, tag string, pla
return err
}
}
stop <- struct{}{}
<-stop
return err
}

Expand Down Expand Up @@ -279,8 +274,9 @@ func (cs *containerdStore) setupFilters(ctx context.Context, opts types.ImageLis
return filters, nil
}

func newResolverFromAuthConfig(authConfig *types.AuthConfig) remotes.Resolver {
func newResolverFromAuthConfig(authConfig *types.AuthConfig) (remotes.Resolver, docker.StatusTracker) {
opts := []docker.RegistryOpt{}

if authConfig != nil {
authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(_ string) (string, string, error) {
if authConfig.IdentityToken != "" {
Expand All @@ -292,9 +288,12 @@ func newResolverFromAuthConfig(authConfig *types.AuthConfig) remotes.Resolver {
opts = append(opts, docker.WithAuthorizer(authorizer))
}

tracker := docker.NewInMemoryTracker()

return docker.NewResolver(docker.ResolverOptions{
Hosts: docker.ConfigureDefaultRegistries(opts...),
})
Hosts: docker.ConfigureDefaultRegistries(opts...),
Tracker: tracker,
}), tracker
}

func (cs *containerdStore) LogImageEvent(imageID, refName, action string) {
Expand Down Expand Up @@ -465,6 +464,7 @@ func (cs *containerdStore) PushImage(ctx context.Context, image, tag string, met
}

is := cs.client.ImageService()
store := cs.client.ContentStore()

img, err := is.Get(ctx, ref.String())
if err != nil {
Expand All @@ -486,13 +486,29 @@ func (cs *containerdStore) PushImage(ctx context.Context, image, tag string, met
defer cs.client.ImageService().Delete(ctx, platformImg.Name, containerdimages.SynchronousDelete())
}

jobs := newJobs()

imageHandler := containerdimages.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
logrus.WithField("desc", desc).Debug("Pushing")
if desc.MediaType != containerdimages.MediaTypeDockerSchema1Manifest {
children, err := containerdimages.Children(ctx, store, desc)

if err == nil {
for _, c := range children {
jobs.Add(c)
}
}

jobs.Add(desc)
}
return nil, nil
})
imageHandler = remotes.SkipNonDistributableBlobs(imageHandler)

resolver := newResolverFromAuthConfig(authConfig)
resolver, tracker := newResolverFromAuthConfig(authConfig)

finishProgress := showProgress(ctx, jobs, outStream, pushProgress(tracker))
defer finishProgress()

logrus.WithField("desc", target).WithField("ref", ref.String()).Info("Pushing desc to remote ref")
err = cs.client.Push(ctx, ref.String(), target,
Expand Down

0 comments on commit bae9267

Please sign in to comment.