Skip to content

Commit

Permalink
Merge pull request #44756 from rumpl/containerd-image-pull
Browse files Browse the repository at this point in the history
containerd integration: image pull
  • Loading branch information
neersighted committed Jan 11, 2023
2 parents ffb2c1f + 9032e67 commit 6d212fa
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 25 deletions.
20 changes: 19 additions & 1 deletion daemon/containerd/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"io"

"github.com/containerd/containerd"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/docker/distribution"
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types/registry"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/streamformatter"
"github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
)
Expand Down Expand Up @@ -42,9 +44,25 @@ func (i *ImageService) PullImage(ctx context.Context, image, tagOrDigest string,
}
}

resolver := newResolverFromAuthConfig(authConfig)
resolver, _ := i.newResolverFromAuthConfig(authConfig)
opts = append(opts, containerd.WithResolver(resolver))

jobs := newJobs()
h := images.HandlerFunc(func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
if desc.MediaType != images.MediaTypeDockerSchema1Manifest {
jobs.Add(desc)
}
return nil, nil
})
opts = append(opts, containerd.WithImageHandler(h))

out := streamformatter.NewJSONProgressOutput(outStream, false)
finishProgress := jobs.showProgress(ctx, out, pullProgress{Store: i.client.ContentStore(), ShowExists: true})
defer finishProgress()

opts = append(opts, containerd.WithPullUnpack)
opts = append(opts, containerd.WithPullSnapshotter(i.snapshotter))

_, err = i.client.Pull(ctx, ref.String(), opts...)
return err
}
Expand Down
150 changes: 150 additions & 0 deletions daemon/containerd/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package containerd

import (
"context"
"errors"
"sync"
"time"

"github.com/containerd/containerd/content"
cerrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/stringid"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
)

type progressUpdater interface {
UpdateProgress(context.Context, *jobs, progress.Output, time.Time) error
}

type jobs struct {
descs map[digest.Digest]ocispec.Descriptor
mu sync.Mutex
}

// newJobs creates a new instance of the job status tracker
func newJobs() *jobs {
return &jobs{
descs: map[digest.Digest]ocispec.Descriptor{},
}
}

func (j *jobs) showProgress(ctx context.Context, out progress.Output, updater progressUpdater) func() {
ctx, cancelProgress := context.WithCancel(ctx)

start := time.Now()

go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := updater.UpdateProgress(ctx, j, out, start); err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
logrus.WithError(err).Error("Updating progress failed")
}
return
}
case <-ctx.Done():
return
}
}
}()

return cancelProgress
}

// Add adds a descriptor to be tracked
func (j *jobs) Add(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()

if _, ok := j.descs[desc.Digest]; ok {
return
}
j.descs[desc.Digest] = desc
}

// Remove removes a descriptor
func (j *jobs) Remove(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()

delete(j.descs, desc.Digest)
}

// Jobs returns a list of all tracked descriptors
func (j *jobs) Jobs() []ocispec.Descriptor {
j.mu.Lock()
defer j.mu.Unlock()

descs := make([]ocispec.Descriptor, 0, len(j.descs))
for _, d := range j.descs {
descs = append(descs, d)
}
return descs
}

type pullProgress struct {
Store content.Store
ShowExists bool
}

func (p pullProgress) UpdateProgress(ctx context.Context, ongoing *jobs, out progress.Output, start time.Time) error {
actives, err := p.Store.ListStatuses(ctx, "")
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return err
}
logrus.WithError(err).Error("status check failed")
return nil
}
pulling := make(map[string]content.Status, len(actives))

// update status of status entries!
for _, status := range actives {
pulling[status.Ref] = status
}

for _, j := range ongoing.Jobs() {
key := remotes.MakeRefKey(ctx, j)
if info, ok := pulling[key]; ok {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Downloading",
Current: info.Offset,
Total: info.Total,
})
continue
}

info, err := p.Store.Info(ctx, j.Digest)
if err != nil {
if !cerrdefs.IsNotFound(err) {
return err
}
} else if info.CreatedAt.After(start) {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Download complete",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
} else if p.ShowExists {
out.WriteProgress(progress.Progress{
ID: stringid.TruncateID(j.Digest.Encoded()),
Action: "Exists",
HideCounts: true,
LastUpdate: true,
})
ongoing.Remove(j)
}
}
return nil
}
85 changes: 67 additions & 18 deletions daemon/containerd/resolver.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,84 @@
package containerd

import (
"net/http"
"strings"

"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
registrytypes "github.com/docker/docker/api/types/registry"
"github.com/docker/docker/registry"
"github.com/sirupsen/logrus"
)

func newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) remotes.Resolver {
opts := []docker.RegistryOpt{}
if authConfig != nil {
cfgHost := registry.ConvertToHostname(authConfig.ServerAddress)
if cfgHost == registry.IndexHostname {
cfgHost = registry.DefaultRegistryHost
func (i *ImageService) newResolverFromAuthConfig(authConfig *registrytypes.AuthConfig) (remotes.Resolver, docker.StatusTracker) {
tracker := docker.NewInMemoryTracker()
hostsFn := i.registryHosts.RegistryHosts()

hosts := hostsWrapper(hostsFn, authConfig, i.registryService)

return docker.NewResolver(docker.ResolverOptions{
Hosts: hosts,
Tracker: tracker,
}), tracker
}

func hostsWrapper(hostsFn docker.RegistryHosts, authConfig *registrytypes.AuthConfig, regService registry.Service) docker.RegistryHosts {
return func(n string) ([]docker.RegistryHost, error) {
hosts, err := hostsFn(n)
if err != nil {
return nil, err
}
authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(host string) (string, string, error) {
if cfgHost != host {
logrus.WithField("host", host).WithField("cfgHost", cfgHost).Warn("Host doesn't match")
return "", "", nil
}
if authConfig.IdentityToken != "" {
return "", authConfig.IdentityToken, nil

for i := range hosts {
if hosts[i].Authorizer == nil {
var opts []docker.AuthorizerOpt
if authConfig != nil {
opts = append(opts, authorizationCredsFromAuthConfig(*authConfig))
}
hosts[i].Authorizer = docker.NewDockerAuthorizer(opts...)

isInsecure := regService.IsInsecureRegistry(hosts[i].Host)
if hosts[i].Client.Transport != nil && isInsecure {
hosts[i].Client.Transport = httpFallback{super: hosts[i].Client.Transport}
}
}
return authConfig.Username, authConfig.Password, nil
}))
}
return hosts, nil
}
}

opts = append(opts, docker.WithAuthorizer(authorizer))
func authorizationCredsFromAuthConfig(authConfig registrytypes.AuthConfig) docker.AuthorizerOpt {
cfgHost := registry.ConvertToHostname(authConfig.ServerAddress)
if cfgHost == registry.IndexHostname {
cfgHost = registry.DefaultRegistryHost
}

return docker.NewResolver(docker.ResolverOptions{
Hosts: docker.ConfigureDefaultRegistries(opts...),
return docker.WithAuthCreds(func(host string) (string, string, error) {
if cfgHost != host {
logrus.WithField("host", host).WithField("cfgHost", cfgHost).Warn("Host doesn't match")
return "", "", nil
}
if authConfig.IdentityToken != "" {
return "", authConfig.IdentityToken, nil
}
return authConfig.Username, authConfig.Password, nil
})
}

type httpFallback struct {
super http.RoundTripper
}

func (f httpFallback) RoundTrip(r *http.Request) (*http.Response, error) {
resp, err := f.super.RoundTrip(r)
if err != nil {
if strings.Contains(err.Error(), "http: server gave HTTP response to HTTPS client") {
plain := r.Clone(r.Context())
plain.URL.Scheme = "http"
return http.DefaultTransport.RoundTrip(plain)
}
}

return resp, err
}
20 changes: 15 additions & 5 deletions daemon/containerd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,36 @@ import (

"github.com/containerd/containerd"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/snapshots"
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/images"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/registry"
"github.com/pkg/errors"
)

// ImageService implements daemon.ImageService
type ImageService struct {
client *containerd.Client
snapshotter string
client *containerd.Client
snapshotter string
registryHosts RegistryHostsProvider
registryService registry.Service
}

type RegistryHostsProvider interface {
RegistryHosts() docker.RegistryHosts
}

// NewService creates a new ImageService.
func NewService(c *containerd.Client, snapshotter string) *ImageService {
func NewService(c *containerd.Client, snapshotter string, hostsProvider RegistryHostsProvider, registry registry.Service) *ImageService {
return &ImageService{
client: c,
snapshotter: snapshotter,
client: c,
snapshotter: snapshotter,
registryHosts: hostsProvider,
registryService: registry,
}
}

Expand Down
10 changes: 9 additions & 1 deletion daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"path"
"path/filepath"
"runtime"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -177,6 +178,13 @@ func (daemon *Daemon) RegistryHosts() docker.RegistryHosts {

for _, v := range daemon.configStore.InsecureRegistries {
u, err := url.Parse(v)
if err != nil && !strings.HasPrefix(v, "http://") && !strings.HasPrefix(v, "https://") {
originalErr := err
u, err = url.Parse("http://" + v)
if err != nil {
err = originalErr
}
}
c := resolverconfig.RegistryConfig{}
if err == nil {
v = u.Host
Expand Down Expand Up @@ -994,7 +1002,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S
if err := configureKernelSecuritySupport(config, driverName); err != nil {
return nil, err
}
d.imageService = ctrd.NewService(d.containerdCli, driverName)
d.imageService = ctrd.NewService(d.containerdCli, driverName, d, d.registryService)
} else {
layerStore, err := layer.NewStoreFromOptions(layer.StoreOptions{
Root: config.Root,
Expand Down
7 changes: 7 additions & 0 deletions registry/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Service interface {
LoadAllowNondistributableArtifacts([]string) error
LoadMirrors([]string) error
LoadInsecureRegistries([]string) error
IsInsecureRegistry(string) bool
}

// defaultService is a registry service. It tracks configuration data such as a list
Expand Down Expand Up @@ -232,3 +233,9 @@ func (s *defaultService) LookupPushEndpoints(hostname string) (endpoints []APIEn
}
return endpoints, err
}

// IsInsecureRegistry returns true if the registry at given host is configured as
// insecure registry.
func (s *defaultService) IsInsecureRegistry(host string) bool {
return !s.config.isSecureIndex(host)
}

0 comments on commit 6d212fa

Please sign in to comment.