Skip to content

Commit

Permalink
decopule cache/remotecache from registry
Browse files Browse the repository at this point in the history
Signed-off-by: Akihiro Suda <suda.akihiro@lab.ntt.co.jp>
  • Loading branch information
AkihiroSuda committed Jul 4, 2018
1 parent bbcd8ad commit 80d2f82
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 188 deletions.
97 changes: 41 additions & 56 deletions cache/remotecache/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,54 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/push"
digest "github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

type ExporterOpt struct {
SessionManager *session.Manager
type ResolveCacheExporterFunc func(ctx context.Context, typ, target string) (Exporter, error)

func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.FromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
}
pw.Write(id, st)
return func(err error) error {
now := time.Now()
st.Completed = &now
pw.Write(id, st)
pw.Close()
return err
}
}

func NewCacheExporter(opt ExporterOpt) *CacheExporter {
return &CacheExporter{opt: opt}
type Exporter interface {
solver.CacheExporterTarget
Finalize(ctx context.Context) error
}

type CacheExporter struct {
opt ExporterOpt
type contentCacheExporter struct {
solver.CacheExporterTarget
chains *v1.CacheChains
ingester content.Ingester
}

func (ce *CacheExporter) ExporterForTarget(target string) *RegistryCacheExporter {
func NewExporter(ingester content.Ingester) Exporter {
cc := v1.NewCacheChains()
return &RegistryCacheExporter{target: target, CacheExporterTarget: cc, chains: cc, exporter: ce}
return &contentCacheExporter{CacheExporterTarget: cc, chains: cc, ingester: ingester}
}

func (ce *CacheExporter) Finalize(ctx context.Context, cc *v1.CacheChains, target string) error {
func (ce *contentCacheExporter) Finalize(ctx context.Context) error {
return export(ctx, ce.ingester, ce.chains)
}

func export(ctx context.Context, ingester content.Ingester, cc *v1.CacheChains) error {
config, descs, err := cc.Marshal()
if err != nil {
return err
Expand All @@ -58,19 +77,16 @@ func (ce *CacheExporter) Finalize(ctx context.Context, cc *v1.CacheChains, targe
mfst.SchemaVersion = 2
mfst.MediaType = images.MediaTypeDockerSchema2ManifestList

allBlobs := map[digest.Digest]struct{}{}
mp := contentutil.NewMultiProvider(nil)
for _, l := range config.Layers {
if _, ok := allBlobs[l.Blob]; ok {
continue
}
dgstPair, ok := descs[l.Blob]
if !ok {
return errors.Errorf("missing blob %s", l.Blob)
}
allBlobs[l.Blob] = struct{}{}
mp.Add(l.Blob, dgstPair.Provider)

layerDone := oneOffProgress(ctx, fmt.Sprintf("writing layer %s", l.Blob))
if err := contentutil.Copy(ctx, ingester, dgstPair.Provider, dgstPair.Descriptor); err != nil {
return layerDone(errors.Wrap(err, "error writing layer blob"))
}
layerDone(nil)
mfst.Manifests = append(mfst.Manifests, dgstPair.Descriptor)
}

Expand All @@ -85,13 +101,11 @@ func (ce *CacheExporter) Finalize(ctx context.Context, cc *v1.CacheChains, targe
MediaType: v1.CacheConfigMediaTypeV0,
}
configDone := oneOffProgress(ctx, fmt.Sprintf("writing config %s", dgst))
buf := contentutil.NewBuffer()
if err := content.WriteBlob(ctx, buf, dgst.String(), bytes.NewReader(dt), desc); err != nil {
if err := content.WriteBlob(ctx, ingester, dgst.String(), bytes.NewReader(dt), desc); err != nil {
return configDone(errors.Wrap(err, "error writing config blob"))
}
configDone(nil)

mp.Add(dgst, buf)
mfst.Manifests = append(mfst.Manifests, desc)

dt, err = json.Marshal(mfst)
Expand All @@ -100,44 +114,15 @@ func (ce *CacheExporter) Finalize(ctx context.Context, cc *v1.CacheChains, targe
}
dgst = digest.FromBytes(dt)

buf = contentutil.NewBuffer()
desc = ocispec.Descriptor{
Digest: dgst,
Size: int64(len(dt)),
Digest: dgst,
Size: int64(len(dt)),
MediaType: mfst.MediaType,
}
mfstDone := oneOffProgress(ctx, fmt.Sprintf("writing manifest %s", dgst))
if err := content.WriteBlob(ctx, buf, dgst.String(), bytes.NewReader(dt), desc); err != nil {
if err := content.WriteBlob(ctx, ingester, dgst.String(), bytes.NewReader(dt), desc); err != nil {
return mfstDone(errors.Wrap(err, "error writing manifest blob"))
}
mfstDone(nil)
mp.Add(dgst, buf)

return push.Push(ctx, ce.opt.SessionManager, mp, dgst, target, false)
}

type RegistryCacheExporter struct {
solver.CacheExporterTarget
chains *v1.CacheChains
target string
exporter *CacheExporter
}

func (ce *RegistryCacheExporter) Finalize(ctx context.Context) error {
return ce.exporter.Finalize(ctx, ce.chains, ce.target)
}

func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.FromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
}
pw.Write(id, st)
return func(err error) error {
now := time.Now()
st.Completed = &now
pw.Write(id, st)
pw.Close()
return err
}
return nil
}
102 changes: 38 additions & 64 deletions cache/remotecache/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,77 +3,34 @@ package remotecache
import (
"context"
"encoding/json"
"net/http"
"time"
"io"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/auth"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/worker"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

type ImportOpt struct {
SessionManager *session.Manager
Worker worker.Worker // TODO: remove. This sets the worker where the cache is imported to. Should be passed on load instead.
}
// ResolveCacheImporterFunc returns importer and descriptor.
// Currently typ needs to be an empty string.
type ResolveCacheImporterFunc func(ctx context.Context, typ, ref string) (Importer, ocispec.Descriptor, error)

func NewCacheImporter(opt ImportOpt) *CacheImporter {
return &CacheImporter{opt: opt}
type Importer interface {
Resolve(ctx context.Context, desc ocispec.Descriptor, id string, w worker.Worker) (solver.CacheManager, error)
}

type CacheImporter struct {
opt ImportOpt
func NewImporter(provider content.Provider) Importer {
return &contentCacheImporter{provider: provider}
}

func (ci *CacheImporter) getCredentialsFromSession(ctx context.Context) func(string) (string, string, error) {
id := session.FromContext(ctx)
if id == "" {
return nil
}

return func(host string) (string, string, error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

caller, err := ci.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return "", "", err
}

return auth.CredentialsFunc(context.TODO(), caller)(host)
}
type contentCacheImporter struct {
provider content.Provider
}

func (ci *CacheImporter) Resolve(ctx context.Context, ref string) (solver.CacheManager, error) {
resolver := docker.NewResolver(docker.ResolverOptions{
Client: http.DefaultClient,
Credentials: ci.getCredentialsFromSession(ctx),
})

ref, desc, err := resolver.Resolve(ctx, ref)
if err != nil {
return nil, err
}

fetcher, err := resolver.Fetcher(ctx, ref)
if err != nil {
return nil, err
}

b := contentutil.NewBuffer()

if _, err := remotes.FetchHandler(b, fetcher)(ctx, desc); err != nil {
return nil, err
}

dt, err := content.ReadBlob(ctx, b, desc)
func (ci *contentCacheImporter) Resolve(ctx context.Context, desc ocispec.Descriptor, id string, w worker.Worker) (solver.CacheManager, error) {
dt, err := readBlob(ctx, ci.provider, desc)
if err != nil {
return nil, err
}
Expand All @@ -94,19 +51,15 @@ func (ci *CacheImporter) Resolve(ctx context.Context, ref string) (solver.CacheM
}
allLayers[m.Digest] = v1.DescriptorProviderPair{
Descriptor: m,
Provider: contentutil.FromFetcher(fetcher, m),
Provider: ci.provider,
}
}

if configDesc.Digest == "" {
return nil, errors.Errorf("invalid build cache from %s", ref)
}

if _, err := remotes.FetchHandler(b, fetcher)(ctx, configDesc); err != nil {
return nil, err
return nil, errors.Errorf("invalid build cache from %+v", desc)
}

dt, err = content.ReadBlob(ctx, b, configDesc)
dt, err = readBlob(ctx, ci.provider, configDesc)
if err != nil {
return nil, err
}
Expand All @@ -116,9 +69,30 @@ func (ci *CacheImporter) Resolve(ctx context.Context, ref string) (solver.CacheM
return nil, err
}

keysStorage, resultStorage, err := v1.NewCacheKeyStorage(cc, ci.opt.Worker)
keysStorage, resultStorage, err := v1.NewCacheKeyStorage(cc, w)
if err != nil {
return nil, err
}
return solver.NewCacheManager(ref, keysStorage, resultStorage), nil
return solver.NewCacheManager(id, keysStorage, resultStorage), nil
}

func readBlob(ctx context.Context, provider content.Provider, desc ocispec.Descriptor) ([]byte, error) {
maxBlobSize := int64(1 << 20)
if desc.Size > maxBlobSize {
return nil, errors.Errorf("blob %s is too large (%d > %d)", desc.Digest, desc.Size, maxBlobSize)
}
dt, err := content.ReadBlob(ctx, provider, desc)
if err != nil {
// NOTE: even if err == EOF, we might have got expected dt here.
// For instance, http.Response.Body is known to return non-zero bytes with EOF.
if err == io.EOF {
if dtDigest := desc.Digest.Algorithm().FromBytes(dt); dtDigest != desc.Digest {
err = errors.Wrapf(err, "got EOF, expected %s (%d bytes), got %s (%d bytes)",
desc.Digest, desc.Size, dtDigest, len(dt))
} else {
err = nil
}
}
}
return dt, err
}
73 changes: 73 additions & 0 deletions cache/remotecache/registry/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package registry

import (
"context"
"time"

"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/moby/buildkit/cache/remotecache"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/auth"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/tracing"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

func ResolveCacheExporterFunc(sm *session.Manager) remotecache.ResolveCacheExporterFunc {
return func(ctx context.Context, typ, ref string) (remotecache.Exporter, error) {
if typ != "" {
return nil, errors.Errorf("unsupported cache exporter type: %s", typ)
}
remote := newRemoteResolver(ctx, sm)
pusher, err := remote.Pusher(ctx, ref)
if err != nil {
return nil, err
}
return remotecache.NewExporter(contentutil.FromPusher(pusher)), nil
}
}

func ResolveCacheImporterFunc(sm *session.Manager) remotecache.ResolveCacheImporterFunc {
return func(ctx context.Context, typ, ref string) (remotecache.Importer, specs.Descriptor, error) {
if typ != "" {
return nil, specs.Descriptor{}, errors.Errorf("unsupported cache importer type: %s", typ)
}
remote := newRemoteResolver(ctx, sm)
xref, desc, err := remote.Resolve(ctx, ref)
if err != nil {
return nil, specs.Descriptor{}, err
}
fetcher, err := remote.Fetcher(ctx, xref)
if err != nil {
return nil, specs.Descriptor{}, err
}
return remotecache.NewImporter(contentutil.FromFetcher(fetcher)), desc, nil
}
}

func newRemoteResolver(ctx context.Context, sm *session.Manager) remotes.Resolver {
return docker.NewResolver(docker.ResolverOptions{
Client: tracing.DefaultClient,
Credentials: getCredentialsFunc(ctx, sm),
})
}

func getCredentialsFunc(ctx context.Context, sm *session.Manager) func(string) (string, string, error) {
id := session.FromContext(ctx)
if id == "" {
return nil
}
return func(host string) (string, string, error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

caller, err := sm.Get(timeoutCtx, id)
if err != nil {
return "", "", err
}

return auth.CredentialsFunc(context.TODO(), caller)(host)
}
}
Loading

0 comments on commit 80d2f82

Please sign in to comment.