From 4a8c4110e32bd7f317f202cb848376fe032200d3 Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Fri, 8 Jul 2022 15:28:14 +0200 Subject: [PATCH 1/4] produce progress events polling ctrd's content.Store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nicolas De Loof Signed-off-by: Sebastiaan van Stijn containerd: Push progress Signed-off-by: Paweł Gronowski --- daemon/containerd/image_pull.go | 33 ++++++- daemon/containerd/progress.go | 150 ++++++++++++++++++++++++++++++++ daemon/containerd/resolver.go | 10 ++- 3 files changed, 188 insertions(+), 5 deletions(-) create mode 100644 daemon/containerd/progress.go diff --git a/daemon/containerd/image_pull.go b/daemon/containerd/image_pull.go index 5a445f96f2a95..0d1ed1cefbf18 100644 --- a/daemon/containerd/image_pull.go +++ b/daemon/containerd/image_pull.go @@ -6,11 +6,13 @@ import ( "io" "github.com/containerd/containerd" + "github.com/containerd/containerd/images" "github.com/containerd/containerd/platforms" "github.com/docker/distribution" "github.com/docker/distribution/reference" "github.com/docker/docker/api/types/registry" "github.com/docker/docker/errdefs" + "github.com/docker/docker/pkg/streamformatter" "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -42,10 +44,37 @@ func (i *ImageService) PullImage(ctx context.Context, image, tagOrDigest string, } } - resolver := newResolverFromAuthConfig(authConfig) + resolver, _ := newResolverFromAuthConfig(authConfig) opts = append(opts, containerd.WithResolver(resolver)) - _, err = i.client.Pull(ctx, ref.String(), opts...) + jobs := newJobs() + h := images.HandlerFunc(func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + if desc.MediaType != images.MediaTypeDockerSchema1Manifest { + jobs.Add(desc) + } + return nil, nil + }) + opts = append(opts, containerd.WithImageHandler(h)) + + out := streamformatter.NewJSONProgressOutput(outStream, false) + finishProgress := jobs.showProgress(ctx, out, pullProgress{Store: i.client.ContentStore(), ShowExists: true}) + defer finishProgress() + + img, err := i.client.Pull(ctx, ref.String(), opts...) + if err != nil { + return err + } + + unpacked, err := img.IsUnpacked(ctx, i.snapshotter) + if err != nil { + return err + } + + if !unpacked { + if err := img.Unpack(ctx, i.snapshotter); err != nil { + return err + } + } return err } diff --git a/daemon/containerd/progress.go b/daemon/containerd/progress.go new file mode 100644 index 0000000000000..0952f683c1d97 --- /dev/null +++ b/daemon/containerd/progress.go @@ -0,0 +1,150 @@ +package containerd + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/containerd/containerd/content" + cerrdefs "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/remotes" + "github.com/docker/docker/pkg/progress" + "github.com/docker/docker/pkg/stringid" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/sirupsen/logrus" +) + +type progressUpdater interface { + UpdateProgress(context.Context, *jobs, progress.Output, time.Time) error +} + +type jobs struct { + descs map[digest.Digest]ocispec.Descriptor + mu sync.Mutex +} + +// newJobs creates a new instance of the job status tracker +func newJobs() *jobs { + return &jobs{ + descs: map[digest.Digest]ocispec.Descriptor{}, + } +} + +func (j *jobs) showProgress(ctx context.Context, out progress.Output, updater progressUpdater) func() { + ctx, cancelProgress := context.WithCancel(ctx) + + start := time.Now() + + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := updater.UpdateProgress(ctx, j, out, start); err != nil { + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + logrus.WithError(err).Error("Updating progress failed") + } + return + } + case <-ctx.Done(): + return + } + } + }() + + return cancelProgress +} + +// Add adds a descriptor to be tracked +func (j *jobs) Add(desc ocispec.Descriptor) { + j.mu.Lock() + defer j.mu.Unlock() + + if _, ok := j.descs[desc.Digest]; ok { + return + } + j.descs[desc.Digest] = desc +} + +// Remove removes a descriptor +func (j *jobs) Remove(desc ocispec.Descriptor) { + j.mu.Lock() + defer j.mu.Unlock() + + delete(j.descs, desc.Digest) +} + +// Jobs returns a list of all tracked descriptors +func (j *jobs) Jobs() []ocispec.Descriptor { + j.mu.Lock() + defer j.mu.Unlock() + + descs := make([]ocispec.Descriptor, 0, len(j.descs)) + for _, d := range j.descs { + descs = append(descs, d) + } + return descs +} + +type pullProgress struct { + Store content.Store + ShowExists bool +} + +func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error { + actives, err := p.Store.ListStatuses(ctx, "") + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return err + } + logrus.WithError(err).Error("status check failed") + return nil + } + pulling := make(map[string]content.Status, len(actives)) + + // update status of status entries! + for _, status := range actives { + pulling[status.Ref] = status + } + + for _, j := range ongoing.Jobs() { + key := remotes.MakeRefKey(ctx, j) + if info, ok := pulling[key]; ok { + out.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(j.Digest.Encoded()), + Action: "Downloading", + Current: info.Offset, + Total: info.Total, + }) + continue + } + + info, err := p.Store.Info(ctx, j.Digest) + if err != nil { + if !cerrdefs.IsNotFound(err) { + return err + } + } else if info.CreatedAt.After(start) { + out.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(j.Digest.Encoded()), + Action: "Download complete", + HideCounts: true, + LastUpdate: true, + }) + ongoing.Remove(j) + } else if p.ShowExists { + out.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(j.Digest.Encoded()), + Action: "Exists", + HideCounts: true, + LastUpdate: true, + }) + ongoing.Remove(j) + } + } + return nil +} diff --git a/daemon/containerd/resolver.go b/daemon/containerd/resolver.go index 29c8d75c051ed..3a16ff2615868 100644 --- a/daemon/containerd/resolver.go +++ b/daemon/containerd/resolver.go @@ -8,8 +8,9 @@ import ( "github.com/sirupsen/logrus" ) -func newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) remotes.Resolver { +func newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) (remotes.Resolver, docker.StatusTracker) { opts := []docker.RegistryOpt{} + if authConfig != nil { cfgHost := registry.ConvertToHostname(authConfig.ServerAddress) if cfgHost == registry.IndexHostname { @@ -29,7 +30,10 @@ func newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) remotes.Res opts = append(opts, docker.WithAuthorizer(authorizer)) } + tracker := docker.NewInMemoryTracker() + return docker.NewResolver(docker.ResolverOptions{ - Hosts: docker.ConfigureDefaultRegistries(opts...), - }) + Hosts: docker.ConfigureDefaultRegistries(opts...), + Tracker: tracker, + }), tracker } From 3a3f98b32b47a942d4ef5e0627b0a94694e1e411 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Gronowski?= Date: Wed, 17 Aug 2022 16:50:13 +0200 Subject: [PATCH 2/4] c8d/pull: Don't unpack manually MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We pass WithPullUnpack anyway Signed-off-by: Paweł Gronowski --- daemon/containerd/image_pull.go | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/daemon/containerd/image_pull.go b/daemon/containerd/image_pull.go index 0d1ed1cefbf18..d77bc11029f5c 100644 --- a/daemon/containerd/image_pull.go +++ b/daemon/containerd/image_pull.go @@ -60,21 +60,10 @@ func (i *ImageService) PullImage(ctx context.Context, image, tagOrDigest string, finishProgress := jobs.showProgress(ctx, out, pullProgress{Store: i.client.ContentStore(), ShowExists: true}) defer finishProgress() - img, err := i.client.Pull(ctx, ref.String(), opts...) - if err != nil { - return err - } + opts = append(opts, containerd.WithPullUnpack) + opts = append(opts, containerd.WithPullSnapshotter(i.snapshotter)) - unpacked, err := img.IsUnpacked(ctx, i.snapshotter) - if err != nil { - return err - } - - if !unpacked { - if err := img.Unpack(ctx, i.snapshotter); err != nil { - return err - } - } + _, err = i.client.Pull(ctx, ref.String(), opts...) return err } From c83fce86d408439618e188bf9c581e4d127f91a3 Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Mon, 12 Sep 2022 10:40:45 +0200 Subject: [PATCH 3/4] c8d/resolver: Use hosts from daemon configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Gronowski Signed-off-by: Nicolas De Loof --- daemon/containerd/image_pull.go | 2 +- daemon/containerd/resolver.go | 64 +++++++++++++++++++++------------ daemon/containerd/service.go | 17 ++++++--- daemon/daemon.go | 2 +- 4 files changed, 56 insertions(+), 29 deletions(-) diff --git a/daemon/containerd/image_pull.go b/daemon/containerd/image_pull.go index d77bc11029f5c..99352457a9b50 100644 --- a/daemon/containerd/image_pull.go +++ b/daemon/containerd/image_pull.go @@ -44,7 +44,7 @@ func (i *ImageService) PullImage(ctx context.Context, image, tagOrDigest string, } } - resolver, _ := newResolverFromAuthConfig(authConfig) + resolver, _ := i.newResolverFromAuthConfig(authConfig) opts = append(opts, containerd.WithResolver(resolver)) jobs := newJobs() diff --git a/daemon/containerd/resolver.go b/daemon/containerd/resolver.go index 3a16ff2615868..e01001437a439 100644 --- a/daemon/containerd/resolver.go +++ b/daemon/containerd/resolver.go @@ -8,32 +8,52 @@ import ( "github.com/sirupsen/logrus" ) -func newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) (remotes.Resolver, docker.StatusTracker) { - opts := []docker.RegistryOpt{} - - if authConfig != nil { - cfgHost := registry.ConvertToHostname(authConfig.ServerAddress) - if cfgHost == registry.IndexHostname { - cfgHost = registry.DefaultRegistryHost - } - authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(host string) (string, string, error) { - if cfgHost != host { - logrus.WithField("host", host).WithField("cfgHost", cfgHost).Warn("Host doesn't match") - return "", "", nil - } - if authConfig.IdentityToken != "" { - return "", authConfig.IdentityToken, nil - } - return authConfig.Username, authConfig.Password, nil - })) - - opts = append(opts, docker.WithAuthorizer(authorizer)) - } +func (i *ImageService) newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) (remotes.Resolver, docker.StatusTracker) { + hostsFn := i.registryHosts.RegistryHosts() + hosts := hostsAuthorizerWrapper(hostsFn, authConfig) tracker := docker.NewInMemoryTracker() return docker.NewResolver(docker.ResolverOptions{ - Hosts: docker.ConfigureDefaultRegistries(opts...), + Hosts: hosts, Tracker: tracker, }), tracker } + +func hostsAuthorizerWrapper(hostsFn docker.RegistryHosts, authConfig *registrytypes.AuthConfig) docker.RegistryHosts { + return docker.RegistryHosts(func(n string) ([]docker.RegistryHost, error) { + hosts, err := hostsFn(n) + if err == nil { + for idx, host := range hosts { + if host.Authorizer == nil { + var opts []docker.AuthorizerOpt + if authConfig != nil { + opts = append(opts, authorizationCredsFromAuthConfig(*authConfig)) + } + host.Authorizer = docker.NewDockerAuthorizer(opts...) + hosts[idx] = host + } + } + } + + return hosts, err + }) +} + +func authorizationCredsFromAuthConfig(authConfig registrytypes.AuthConfig) docker.AuthorizerOpt { + cfgHost := registry.ConvertToHostname(authConfig.ServerAddress) + if cfgHost == registry.IndexHostname { + cfgHost = registry.DefaultRegistryHost + } + + return docker.WithAuthCreds(func(host string) (string, string, error) { + if cfgHost != host { + logrus.WithField("host", host).WithField("cfgHost", cfgHost).Warn("Host doesn't match") + return "", "", nil + } + if authConfig.IdentityToken != "" { + return "", authConfig.IdentityToken, nil + } + return authConfig.Username, authConfig.Password, nil + }) +} diff --git a/daemon/containerd/service.go b/daemon/containerd/service.go index ffd56569a6f8c..163bced2b13c9 100644 --- a/daemon/containerd/service.go +++ b/daemon/containerd/service.go @@ -5,6 +5,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/snapshots" "github.com/docker/docker/container" "github.com/docker/docker/daemon/images" @@ -16,15 +17,21 @@ import ( // ImageService implements daemon.ImageService type ImageService struct { - client *containerd.Client - snapshotter string + client *containerd.Client + snapshotter string + registryHosts RegistryHostsProvider +} + +type RegistryHostsProvider interface { + RegistryHosts() docker.RegistryHosts } // NewService creates a new ImageService. -func NewService(c *containerd.Client, snapshotter string) *ImageService { +func NewService(c *containerd.Client, snapshotter string, hostsProvider RegistryHostsProvider) *ImageService { return &ImageService{ - client: c, - snapshotter: snapshotter, + client: c, + snapshotter: snapshotter, + registryHosts: hostsProvider, } } diff --git a/daemon/daemon.go b/daemon/daemon.go index 729679bfac550..669c3a499502e 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -994,7 +994,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S if err := configureKernelSecuritySupport(config, driverName); err != nil { return nil, err } - d.imageService = ctrd.NewService(d.containerdCli, driverName) + d.imageService = ctrd.NewService(d.containerdCli, driverName, d) } else { layerStore, err := layer.NewStoreFromOptions(layer.StoreOptions{ Root: config.Root, From 9032e6779d9009a7d183670600a5f89e2ab6c295 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Gronowski?= Date: Thu, 1 Sep 2022 17:03:10 +0200 Subject: [PATCH 4/4] c8d/resolver: Fallback to http for insecure registries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Gronowski --- daemon/containerd/resolver.go | 57 +++++++++++++++++++++++++---------- daemon/containerd/service.go | 17 ++++++----- daemon/daemon.go | 10 +++++- registry/service.go | 7 +++++ 4 files changed, 67 insertions(+), 24 deletions(-) diff --git a/daemon/containerd/resolver.go b/daemon/containerd/resolver.go index e01001437a439..82055f67377fc 100644 --- a/daemon/containerd/resolver.go +++ b/daemon/containerd/resolver.go @@ -1,6 +1,9 @@ package containerd import ( + "net/http" + "strings" + "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" registrytypes "github.com/docker/docker/api/types/registry" @@ -9,10 +12,10 @@ import ( ) func (i *ImageService) newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) (remotes.Resolver, docker.StatusTracker) { + tracker := docker.NewInMemoryTracker() hostsFn := i.registryHosts.RegistryHosts() - hosts := hostsAuthorizerWrapper(hostsFn, authConfig) - tracker := docker.NewInMemoryTracker() + hosts := hostsWrapper(hostsFn, authConfig, i.registryService) return docker.NewResolver(docker.ResolverOptions{ Hosts: hosts, @@ -20,24 +23,29 @@ func (i *ImageService) newResolverFromAuthConfig(authConfig *registrytypes.AuthC }), tracker } -func hostsAuthorizerWrapper(hostsFn docker.RegistryHosts, authConfig *registrytypes.AuthConfig) docker.RegistryHosts { - return docker.RegistryHosts(func(n string) ([]docker.RegistryHost, error) { +func hostsWrapper(hostsFn docker.RegistryHosts, authConfig *registrytypes.AuthConfig, regService registry.Service) docker.RegistryHosts { + return func(n string) ([]docker.RegistryHost, error) { hosts, err := hostsFn(n) - if err == nil { - for idx, host := range hosts { - if host.Authorizer == nil { - var opts []docker.AuthorizerOpt - if authConfig != nil { - opts = append(opts, authorizationCredsFromAuthConfig(*authConfig)) - } - host.Authorizer = docker.NewDockerAuthorizer(opts...) - hosts[idx] = host + if err != nil { + return nil, err + } + + for i := range hosts { + if hosts[i].Authorizer == nil { + var opts []docker.AuthorizerOpt + if authConfig != nil { + opts = append(opts, authorizationCredsFromAuthConfig(*authConfig)) + } + hosts[i].Authorizer = docker.NewDockerAuthorizer(opts...) + + isInsecure := regService.IsInsecureRegistry(hosts[i].Host) + if hosts[i].Client.Transport != nil && isInsecure { + hosts[i].Client.Transport = httpFallback{super: hosts[i].Client.Transport} } } } - - return hosts, err - }) + return hosts, nil + } } func authorizationCredsFromAuthConfig(authConfig registrytypes.AuthConfig) docker.AuthorizerOpt { @@ -57,3 +65,20 @@ func authorizationCredsFromAuthConfig(authConfig registrytypes.AuthConfig) docke return authConfig.Username, authConfig.Password, nil }) } + +type httpFallback struct { + super http.RoundTripper +} + +func (f httpFallback) RoundTrip(r *http.Request) (*http.Response, error) { + resp, err := f.super.RoundTrip(r) + if err != nil { + if strings.Contains(err.Error(), "http: server gave HTTP response to HTTPS client") { + plain := r.Clone(r.Context()) + plain.URL.Scheme = "http" + return http.DefaultTransport.RoundTrip(plain) + } + } + + return resp, err +} diff --git a/daemon/containerd/service.go b/daemon/containerd/service.go index 163bced2b13c9..8e72d8ce19321 100644 --- a/daemon/containerd/service.go +++ b/daemon/containerd/service.go @@ -12,14 +12,16 @@ import ( "github.com/docker/docker/errdefs" "github.com/docker/docker/image" "github.com/docker/docker/layer" + "github.com/docker/docker/registry" "github.com/pkg/errors" ) // ImageService implements daemon.ImageService type ImageService struct { - client *containerd.Client - snapshotter string - registryHosts RegistryHostsProvider + client *containerd.Client + snapshotter string + registryHosts RegistryHostsProvider + registryService registry.Service } type RegistryHostsProvider interface { @@ -27,11 +29,12 @@ type RegistryHostsProvider interface { } // NewService creates a new ImageService. -func NewService(c *containerd.Client, snapshotter string, hostsProvider RegistryHostsProvider) *ImageService { +func NewService(c *containerd.Client, snapshotter string, hostsProvider RegistryHostsProvider, registry registry.Service) *ImageService { return &ImageService{ - client: c, - snapshotter: snapshotter, - registryHosts: hostsProvider, + client: c, + snapshotter: snapshotter, + registryHosts: hostsProvider, + registryService: registry, } } diff --git a/daemon/daemon.go b/daemon/daemon.go index 669c3a499502e..e96c95316a92a 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -14,6 +14,7 @@ import ( "path" "path/filepath" "runtime" + "strings" "sync" "time" @@ -177,6 +178,13 @@ func (daemon *Daemon) RegistryHosts() docker.RegistryHosts { for _, v := range daemon.configStore.InsecureRegistries { u, err := url.Parse(v) + if err != nil && !strings.HasPrefix(v, "http://") && !strings.HasPrefix(v, "https://") { + originalErr := err + u, err = url.Parse("http://" + v) + if err != nil { + err = originalErr + } + } c := resolverconfig.RegistryConfig{} if err == nil { v = u.Host @@ -994,7 +1002,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S if err := configureKernelSecuritySupport(config, driverName); err != nil { return nil, err } - d.imageService = ctrd.NewService(d.containerdCli, driverName, d) + d.imageService = ctrd.NewService(d.containerdCli, driverName, d, d.registryService) } else { layerStore, err := layer.NewStoreFromOptions(layer.StoreOptions{ Root: config.Root, diff --git a/registry/service.go b/registry/service.go index a4453bb17ac7c..a505f00e7e0a0 100644 --- a/registry/service.go +++ b/registry/service.go @@ -26,6 +26,7 @@ type Service interface { LoadAllowNondistributableArtifacts([]string) error LoadMirrors([]string) error LoadInsecureRegistries([]string) error + IsInsecureRegistry(string) bool } // defaultService is a registry service. It tracks configuration data such as a list @@ -232,3 +233,9 @@ func (s *defaultService) LookupPushEndpoints(hostname string) (endpoints []APIEn } return endpoints, err } + +// IsInsecureRegistry returns true if the registry at given host is configured as +// insecure registry. +func (s *defaultService) IsInsecureRegistry(host string) bool { + return !s.config.isSecureIndex(host) +}