Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push progress #16

Merged
merged 1 commit into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
195 changes: 129 additions & 66 deletions daemon/containerd/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,106 +7,169 @@ import (
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
cerrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
)

func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, w io.Writer, stop chan struct{}) {
type updateProgressFunc func(ctx context.Context, ongoing *jobs, output progress.Output, start time.Time) error

func showProgress(ctx context.Context, ongoing *jobs, w io.Writer, updateFunc updateProgressFunc) func() {
stop := make(chan struct{})
ctx, cancelProgress := context.WithCancel(ctx)

var (
out = streamformatter.NewJSONProgressOutput(w, false)
ticker = time.NewTicker(100 * time.Millisecond)
start = time.Now()
done bool
)
defer ticker.Stop()

outer:
for {
select {
case <-ticker.C:
if !ongoing.IsResolved() {
continue
}
for _, j := range ongoing.Jobs() {
id := stringid.TruncateID(j.Digest.Encoded())
progress.Update(out, id, "Preparing")
}

pulling := map[string]content.Status{}
if !done {
actives, err := cs.ListStatuses(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("status check failed")
go func() {
defer func() {
ticker.Stop()
stop <- struct{}{}
}()

for {
select {
case <-ticker.C:
if !ongoing.IsResolved() {
continue
}
// update status of status entries!
for _, status := range actives {
pulling[status.Ref] = status
err := updateFunc(ctx, ongoing, out, start)
if err != nil {
logrus.WithError(err).Error("Updating progress failed")
return
}
}

// update inactive jobs
for _, j := range ongoing.Jobs() {
key := remotes.MakeRefKey(ctx, j)
if info, ok := pulling[key]; ok {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Downloading",
Current: info.Offset,
Total: info.Total,
})
continue
if done {
return
}
case <-ctx.Done():
done = true
}
}
}()

info, err := cs.Info(ctx, j.Digest)
if err != nil {
if !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Error("failed to get content info")
continue outer
}
} else if info.CreatedAt.After(start) {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Download complete",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
return func() {
cancelProgress()
<-stop
}
}

func pushProgress(tracker docker.StatusTracker) updateProgressFunc {
return func(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
for _, j := range ongoing.Jobs() {
key := remotes.MakeRefKey(ctx, j)
id := stringid.TruncateID(j.Digest.Encoded())

status, err := tracker.GetStatus(key)
if err != nil {
if cerrdefs.IsNotFound(err) {
progress.Update(out, id, "Waiting")
continue
} else {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Exists",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
return err
}

}

logrus.WithField("status", status).WithField("id", id).Debug("Status update")

if status.Committed && status.Offset >= status.Total {
progress.Update(out, id, "Pushed")
ongoing.Remove(j)
continue
}

out.WriteProgress(progress.Progress{
ID: id,
Action: "Pushing",
Current: status.Offset,
Total: status.Total,
})
}

return nil
}
}

func pullProgress(cs content.Store) updateProgressFunc {
return func(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
pulling := map[string]content.Status{}
actives, err := cs.ListStatuses(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("status check failed")
return nil
}
// update status of status entries!
for _, status := range actives {
pulling[status.Ref] = status
}

for _, j := range ongoing.Jobs() {
key := remotes.MakeRefKey(ctx, j)
if info, ok := pulling[key]; ok {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Downloading",
Current: info.Offset,
Total: info.Total,
})
continue
}
if done {
return

info, err := cs.Info(ctx, j.Digest)
if err != nil {
if !cerrdefs.IsNotFound(err) {
return err
}
} else if info.CreatedAt.After(start) {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Download complete",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
} else {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Exists",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
}
case <-stop:
done = true // allow ui to update once more
case <-ctx.Done():
return
}
return nil
}
}

// jobs holds a list of layers being downloaded to pull reference set by name
type jobs struct {
name string
resolved bool // resolved is set to true once remote image metadata has been downloaded from registry
descs map[digest.Digest]v1.Descriptor
resolved bool // resolved is set to true once all jobs are added
descs map[digest.Digest]ocispec.Descriptor
mu sync.Mutex
}

// newJobs creates a new instance of the job status tracker
func newJobs() *jobs {
return &jobs{
descs: map[digest.Digest]v1.Descriptor{},
descs: map[digest.Digest]ocispec.Descriptor{},
}
}

Expand All @@ -118,7 +181,7 @@ func (j *jobs) IsResolved() bool {
}

// Add adds a descriptor to be tracked
func (j *jobs) Add(desc v1.Descriptor) {
func (j *jobs) Add(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()

Expand All @@ -130,19 +193,19 @@ func (j *jobs) Add(desc v1.Descriptor) {
}

// Remove removes a descriptor
func (j *jobs) Remove(desc v1.Descriptor) {
func (j *jobs) Remove(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()

delete(j.descs, desc.Digest)
}

// Jobs returns a list of all tracked descriptors
func (j *jobs) Jobs() []v1.Descriptor {
func (j *jobs) Jobs() []ocispec.Descriptor {
j.mu.Lock()
defer j.mu.Unlock()

descs := make([]v1.Descriptor, 0, len(j.descs))
descs := make([]ocispec.Descriptor, 0, len(j.descs))
for _, d := range j.descs {
descs = append(descs, d)
}
Expand Down
40 changes: 28 additions & 12 deletions daemon/containerd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (cs *containerdStore) PullImage(ctx context.Context, image, tag string, pla
}
}

resolver := newResolverFromAuthConfig(authConfig)
resolver, _ := newResolverFromAuthConfig(authConfig)
opts = append(opts, containerd.WithResolver(resolver))

jobs := newJobs()
Expand All @@ -86,11 +86,8 @@ func (cs *containerdStore) PullImage(ctx context.Context, image, tag string, pla
})
opts = append(opts, containerd.WithImageHandler(h))

stop := make(chan struct{})
go func() {
showProgress(ctx, jobs, cs.client.ContentStore(), outStream, stop)
stop <- struct{}{}
}()
finishProgress := showProgress(ctx, jobs, outStream, pullProgress(cs.client.ContentStore()))
defer finishProgress()

img, err := cs.client.Pull(ctx, ref.String(), opts...)
if err != nil {
Expand All @@ -107,8 +104,6 @@ func (cs *containerdStore) PullImage(ctx context.Context, image, tag string, pla
return err
}
}
stop <- struct{}{}
<-stop
return err
}

Expand Down Expand Up @@ -279,8 +274,9 @@ func (cs *containerdStore) setupFilters(ctx context.Context, opts types.ImageLis
return filters, nil
}

func newResolverFromAuthConfig(authConfig *types.AuthConfig) remotes.Resolver {
func newResolverFromAuthConfig(authConfig *types.AuthConfig) (remotes.Resolver, docker.StatusTracker) {
opts := []docker.RegistryOpt{}

if authConfig != nil {
authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(_ string) (string, string, error) {
if authConfig.IdentityToken != "" {
Expand All @@ -292,9 +288,12 @@ func newResolverFromAuthConfig(authConfig *types.AuthConfig) remotes.Resolver {
opts = append(opts, docker.WithAuthorizer(authorizer))
}

tracker := docker.NewInMemoryTracker()

return docker.NewResolver(docker.ResolverOptions{
Hosts: docker.ConfigureDefaultRegistries(opts...),
})
Hosts: docker.ConfigureDefaultRegistries(opts...),
Tracker: tracker,
}), tracker
}

func (cs *containerdStore) LogImageEvent(imageID, refName, action string) {
Expand Down Expand Up @@ -465,6 +464,7 @@ func (cs *containerdStore) PushImage(ctx context.Context, image, tag string, met
}

is := cs.client.ImageService()
store := cs.client.ContentStore()

img, err := is.Get(ctx, ref.String())
if err != nil {
Expand All @@ -486,13 +486,29 @@ func (cs *containerdStore) PushImage(ctx context.Context, image, tag string, met
defer cs.client.ImageService().Delete(ctx, platformImg.Name, containerdimages.SynchronousDelete())
}

jobs := newJobs()

imageHandler := containerdimages.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
logrus.WithField("desc", desc).Debug("Pushing")
if desc.MediaType != containerdimages.MediaTypeDockerSchema1Manifest {
children, err := containerdimages.Children(ctx, store, desc)

if err == nil {
for _, c := range children {
jobs.Add(c)
}
}

jobs.Add(desc)
}
return nil, nil
})
imageHandler = remotes.SkipNonDistributableBlobs(imageHandler)

resolver := newResolverFromAuthConfig(authConfig)
resolver, tracker := newResolverFromAuthConfig(authConfig)

finishProgress := showProgress(ctx, jobs, outStream, pushProgress(tracker))
defer finishProgress()

logrus.WithField("desc", target).WithField("ref", ref.String()).Info("Pushing desc to remote ref")
err = cs.client.Push(ctx, ref.String(), target,
Expand Down