Skip to content

Commit

Permalink
Propagate compression options to the inline cache export
Browse files Browse the repository at this point in the history
Co-authored-by: Tonis Tiigi <tonistiigi@gmail.com>
Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
  • Loading branch information
ktock and tonistiigi committed Oct 13, 2021
1 parent b2ff444 commit 8fab465
Show file tree
Hide file tree
Showing 21 changed files with 342 additions and 56 deletions.
13 changes: 10 additions & 3 deletions cache/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/leaseutil"
Expand Down Expand Up @@ -1125,7 +1126,7 @@ func TestConversion(t *testing.T) {
require.NoError(t, eg.Wait())
}

func TestGetRemote(t *testing.T) {
func TestGetRemotes(t *testing.T) {
t.Parallel()
// windows fails when lazy blob is being extracted with "invalid windows mount type: 'bind'"
if runtime.GOOS != "linux" {
Expand Down Expand Up @@ -1251,15 +1252,21 @@ func TestGetRemote(t *testing.T) {

checkNumBlobs(ctx, t, co.cs, 1)

// Call GetRemote on all the refs
// Call GetRemotes on all the refs
eg, egctx := errgroup.WithContext(ctx)
for _, ir := range refs {
ir := ir.(*immutableRef)
for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip, compression.EStargz, compression.Zstd} {
compressionType := compressionType
compressionopt := solver.CompressionOpt{
Type: compressionType,
Force: true,
}
eg.Go(func() error {
remote, err := ir.GetRemote(egctx, true, compressionType, true, nil)
remotes, err := ir.GetRemotes(egctx, true, compressionopt, false, nil)
require.NoError(t, err)
require.Equal(t, 1, len(remotes))
remote := remotes[0]
refChain := ir.parentRefChain()
for i, desc := range remote.Descriptors {
switch compressionType {
Expand Down
20 changes: 19 additions & 1 deletion cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ImmutableRef interface {
Clone() ImmutableRef

Extract(ctx context.Context, s session.Group) error // +progress
GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error)
GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, all bool, s session.Group) ([]*solver.Remote, error)
}

type MutableRef interface {
Expand Down Expand Up @@ -376,6 +376,24 @@ func compressionVariantDigestLabel(compressionType compression.Type) string {
return compressionVariantDigestLabelPrefix + compressionType.String()
}

func (sr *immutableRef) getCompressions(ctx context.Context) (res []compression.Type, _ error) {
cs := sr.cm.ContentStore
info, err := cs.Info(ctx, sr.getBlob())
if errors.Is(err, errdefs.ErrNotFound) {
return nil, nil
} else if err != nil {
return nil, err
}
for k := range info.Labels {
if strings.HasPrefix(k, compressionVariantDigestLabelPrefix) {
if t := compression.Parse(strings.TrimPrefix(k, compressionVariantDigestLabelPrefix)); t != compression.UnknownCompression {
res = append(res, t)
}
}
}
return
}

func (sr *immutableRef) getCompressionBlob(ctx context.Context, compressionType compression.Type) (ocispecs.Descriptor, error) {
cs := sr.cm.ContentStore
info, err := cs.Info(ctx, sr.getBlob())
Expand Down
81 changes: 77 additions & 4 deletions cache/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,89 @@ type Unlazier interface {
Unlazy(ctx context.Context) error
}

// GetRemote gets a *solver.Remote from content store for this ref (potentially pulling lazily).
// Note: Use WorkerRef.GetRemote instead as moby integration requires custom GetRemote implementation.
func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error) {
// GetRemotes gets []*solver.Remote from content store for this ref (potentially pulling lazily).
// Note: Use WorkerRef.GetRemotes instead as moby integration requires custom GetRemotes implementation.
func (sr *immutableRef) GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, all bool, s session.Group) ([]*solver.Remote, error) {
ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return nil, err
}
defer done(ctx)

err = sr.computeBlobChain(ctx, createIfNeeded, compressionType, forceCompression, s)
remote, err := sr.getRemote(ctx, createIfNeeded, compressionopt, s)
if err != nil {
return nil, err
}
if !all || !compressionopt.Force || sr.parent == nil || len(remote.Descriptors) == 0 {
return []*solver.Remote{remote}, nil // early return if compression variants aren't required
}

// Create all available remotes with all combination of copmressions
desc := remote.Descriptors[len(remote.Descriptors)-1]
remotes, err := sr.parent.getAvailableBlobs(ctx, s)
if err != nil {
return nil, err
}
for _, r := range remotes {
r.Descriptors = append(r.Descriptors, desc)
p := contentutil.NewMultiProvider(r.Provider)
p.Add(desc.Digest, sr.cm.ContentStore)
r.Provider = p
}
return append([]*solver.Remote{remote}, remotes...), nil
}

func (sr *immutableRef) getAvailableBlobs(ctx context.Context, s session.Group) ([]*solver.Remote, error) {
compressions, err := sr.getCompressions(ctx)
if err != nil {
return nil, err
}
if len(compressions) == 0 {
// no available compression blobs for this blob (maybe a lazy ref).
// return a single remote with a default compression.
remote, err := sr.getRemote(ctx, false, solver.CompressionOpt{Type: compression.Default}, s)
if err != nil {
return nil, err
}
return []*solver.Remote{remote}, nil
}
var parents []*solver.Remote
if sr.parent != nil {
parents, err = sr.parent.getAvailableBlobs(ctx, s)
if err != nil {
return nil, err
}
}
var res []*solver.Remote
for _, c := range compressions {
desc, err := sr.getCompressionBlob(ctx, c)
if err != nil {
return nil, err
}
if len(parents) == 0 { // bottommost ref
res = append(res, &solver.Remote{
Descriptors: []ocispecs.Descriptor{desc},
Provider: sr.cm.ContentStore,
})
continue
}
for _, pRemote := range parents {
p := contentutil.NewMultiProvider(pRemote.Provider)
p.Add(desc.Digest, sr.cm.ContentStore)
res = append(res, &solver.Remote{
Descriptors: append(pRemote.Descriptors, desc),
Provider: p,
})
}
}
return res, nil
}

func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, s session.Group) (*solver.Remote, error) {
compressionType := compressionopt.Type
forceCompression := compressionopt.Force

err := sr.computeBlobChain(ctx, createIfNeeded, compressionType, forceCompression, s)
if err != nil {
return nil, err
}
Expand Down
20 changes: 18 additions & 2 deletions cache/remotecache/v1/cachestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,25 @@ func (cs *cacheResultStorage) Load(ctx context.Context, res solver.CacheResult)
return worker.NewWorkerRefResult(ref, cs.w), nil
}

func (cs *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheResult, _ session.Group) (*solver.Remote, error) {
func (cs *cacheResultStorage) LoadRemotes(ctx context.Context, res solver.CacheResult, compressionopts *solver.CompressionOpt, _ session.Group) ([]*solver.Remote, error) {
if r := cs.byResultID(res.ID); r != nil && r.result != nil {
return r.result, nil
if compressionopts == nil {
return []*solver.Remote{r.result}, nil
}
// Any of blobs in the remote must meet the specified compression option.
match := false
for _, desc := range r.result.Descriptors {
m := compressionopts.Type.IsMediaType(desc.MediaType)
match = match || m
if compressionopts.Force && !m {
match = false
break
}
}
if match {
return []*solver.Remote{r.result}, nil
}
return nil, nil // return nil as it's best effort.
}
return nil, errors.WithStack(solver.ErrNotFound)
}
Expand Down
50 changes: 50 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2231,6 +2231,12 @@ func testBuildExportZstd(t *testing.T, sb integration.Sandbox) {
},
},
},
// compression option should work even with inline cache exports
CacheExports: []CacheOptionsEntry{
{
Type: "inline",
},
},
}, nil)
require.NoError(t, err)

Expand Down Expand Up @@ -3034,6 +3040,50 @@ func testBasicInlineCacheImportExport(t *testing.T, sb integration.Sandbox) {

checkAllRemoved(t, c, sb)

// Export the cache again with compression
resp, err = c.Solve(sb.Context(), def, SolveOpt{
// specifying inline cache exporter is needed for reproducing containerimage.digest
// (not needed for reproducing rootfs/unique)
Exports: []ExportEntry{
{
Type: ExporterImage,
Attrs: map[string]string{
"name": target,
"push": "true",
"compression": "uncompressed", // inline cache should work with compression
"force-compression": "true",
},
},
},
CacheExports: []CacheOptionsEntry{
{
Type: "inline",
},
},
CacheImports: []CacheOptionsEntry{
{
Type: "registry",
Attrs: map[string]string{
"ref": target,
},
},
},
}, nil)
require.NoError(t, err)

dgst2uncompress, ok := resp.ExporterResponse[exptypes.ExporterImageDigestKey]
require.Equal(t, ok, true)

// dgst2uncompress != dgst, because the compression type is different
unique2uncompress, err := readFileInImage(sb.Context(), c, target+"@"+dgst2uncompress, "/unique")
require.NoError(t, err)
require.EqualValues(t, unique, unique2uncompress)

err = c.Prune(sb.Context(), nil, PruneAll)
require.NoError(t, err)

checkAllRemoved(t, c, sb)

resp, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Expand Down
27 changes: 24 additions & 3 deletions exporter/containerimage/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/moby/buildkit/exporter/containerimage/exptypes"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/buildinfo"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil"
Expand Down Expand Up @@ -207,6 +208,15 @@ func (e *imageExporterInstance) Name() string {
return "exporting to image"
}

func (e *imageExporterInstance) Config() exporter.Config {
return exporter.Config{
Compression: solver.CompressionOpt{
Type: e.layerCompression,
Force: e.forceCompression,
},
}
}

func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, sessionID string) (map[string]string, error) {
if src.Metadata == nil {
src.Metadata = make(map[string][]byte)
Expand Down Expand Up @@ -287,22 +297,28 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
if e.push {
annotations := map[digest.Digest]map[string]string{}
mprovider := contentutil.NewMultiProvider(e.opt.ImageWriter.ContentStore())
compressionopt := solver.CompressionOpt{
Type: e.layerCompression,
Force: e.forceCompression,
}
if src.Ref != nil {
remote, err := src.Ref.GetRemote(ctx, false, e.layerCompression, e.forceCompression, session.NewGroup(sessionID))
remotes, err := src.Ref.GetRemotes(ctx, false, compressionopt, false, session.NewGroup(sessionID))
if err != nil {
return nil, err
}
remote := remotes[0]
for _, desc := range remote.Descriptors {
mprovider.Add(desc.Digest, remote.Provider)
addAnnotations(annotations, desc)
}
}
if len(src.Refs) > 0 {
for _, r := range src.Refs {
remote, err := r.GetRemote(ctx, false, e.layerCompression, e.forceCompression, session.NewGroup(sessionID))
remotes, err := r.GetRemotes(ctx, false, compressionopt, false, session.NewGroup(sessionID))
if err != nil {
return nil, err
}
remote := remotes[0]
for _, desc := range remote.Descriptors {
mprovider.Add(desc.Digest, remote.Provider)
addAnnotations(annotations, desc)
Expand Down Expand Up @@ -352,10 +368,15 @@ func (e *imageExporterInstance) unpackImage(ctx context.Context, img images.Imag
}
}

remote, err := topLayerRef.GetRemote(ctx, true, e.layerCompression, e.forceCompression, s)
compressionopt := solver.CompressionOpt{
Type: e.layerCompression,
Force: e.forceCompression,
}
remotes, err := topLayerRef.GetRemotes(ctx, true, compressionopt, false, s)
if err != nil {
return err
}
remote := remotes[0]

// ensure the content for each layer exists locally in case any are lazy
if unlazier, ok := remote.Provider.(cache.Unlazier); ok {
Expand Down
7 changes: 6 additions & 1 deletion exporter/containerimage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,22 @@ func (ic *ImageWriter) exportLayers(ctx context.Context, compressionType compres
layersDone := oneOffProgress(ctx, "exporting layers")

out := make([]solver.Remote, len(refs))
compressionopt := solver.CompressionOpt{
Type: compressionType,
Force: forceCompression,
}

for i, ref := range refs {
func(i int, ref cache.ImmutableRef) {
if ref == nil {
return
}
eg.Go(func() error {
remote, err := ref.GetRemote(ctx, true, compressionType, forceCompression, s)
remotes, err := ref.GetRemotes(ctx, true, compressionopt, false, s)
if err != nil {
return err
}
remote := remotes[0]
out[i] = *remote
return nil
})
Expand Down
6 changes: 6 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/solver"
)

type Exporter interface {
Expand All @@ -12,6 +13,7 @@ type Exporter interface {

type ExporterInstance interface {
Name() string
Config() Config
Export(ctx context.Context, src Source, sessionID string) (map[string]string, error)
}

Expand All @@ -20,3 +22,7 @@ type Source struct {
Refs map[string]cache.ImmutableRef
Metadata map[string][]byte
}

type Config struct {
Compression solver.CompressionOpt
}
4 changes: 4 additions & 0 deletions exporter/local/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (e *localExporterInstance) Name() string {
return "exporting to client"
}

func (e *localExporter) Config() exporter.Config {
return exporter.Config{}
}

func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) {

timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
Expand Down
Loading

0 comments on commit 8fab465

Please sign in to comment.